最近我在學習 Python 的運行模型。我對 Python 的一些內部機制很是好奇,比如 Python 是怎麼實現類似 YIELDVALUE、YIELDFROM 這樣的操作碼的;對於 遞推式構造列表(List Comprehensions)、生成器表達式(generator expressions)以及其他一些有趣的 Python 特性是怎麼編譯的;從字節碼的層面來看,當異常拋出的時候都發生了什麼事情。翻閱 CPython 的代碼對於解答這些問題當然是很有幫助的,但我仍然覺得以這樣的方式來做的話對於理解字節碼的執行和堆棧的變化還是缺少點什麼。GDB 是個好選擇,但是我懶,而且只想使用一些比較高階的接口寫點 Python 代碼來完成這件事。
所以呢,我的目標就是創建一個字節碼級別的追蹤 API,類似 sys.setrace 所提供的那樣,但相對而言會有更好的粒度。這充分鍛煉了我編寫 Python 實現的 C 代碼的編碼能力。我們所需要的有如下幾項,在這篇文章中所用的 Python 版本為 3.5。
一個新的 Cpython 操作碼
新操作碼:DEBUG_OP
這個新的操作碼 DEBUG_OP 是我第一次嘗試寫 CPython 實現的 C 代碼,我將盡可能的讓它保持簡單。 我們想要達成的目的是,當我們的操作碼被執行的時候我能有一種方式來調用一些 Python 代碼。同時,我們也想能夠追蹤一些與執行上下文有關的數據。我們的操作碼會把這些信息當作參數傳遞給我們的回調函數。通過操作碼能辨識出的有用信息如下:
所以呢,我們的操作碼需要做的事情是:
聽起來挺簡單的,現在開始動手吧!聲明:下面所有的解釋說明和代碼是經過了大量段錯誤調試之後總結得到的結論。首先要做的是給操作碼定義一個名字和相應的值,因此我們需要在 Include/opcode.h中添加代碼。
/** My own comments begin by '**' **/ /** From: Includes/opcode.h **/ /* Instruction opcodes for compiled code */ /** We just have to define our opcode with a free value 0 was the first one I found **/ #define DEBUG_OP 0 #define POP_TOP 1 #define ROT_TWO 2 #define ROT_THREE 3
這部分工作就完成了,現在我們去編寫操作碼真正干活的代碼。
實現 DEBUG_OP
在考慮如何實現DEBUG_OP之前我們需要了解的是 DEBUG_OP 提供的接口將長什麼樣。 擁有一個可以調用其他代碼的新操作碼是相當酷眩的,但是究竟它將調用哪些代碼捏?這個操作碼如何找到回調函數的捏?我選擇了一種最簡單的方法:在幀的全局區域寫死函數名。那麼問題就變成了,我該怎麼從字典中找到一個固定的 C 字符串?為了回答這個問題我們來看看在 Python 的 main loop 中使用到的和上下文管理相關的標識符 enter 和 exit。
我們可以看到這兩標識符被使用在操作碼 SETUP_WITH 中:
/** From: Python/ceval.c **/ TARGET(SETUP_WITH) { _Py_IDENTIFIER(__exit__); _Py_IDENTIFIER(__enter__); PyObject *mgr = TOP(); PyObject *exit = special_lookup(mgr, &PyId___exit__), *enter; PyObject *res;
現在,看一眼宏 _Py_IDENTIFIER 定義
/** From: Include/object.h **/ /********************* String Literals ****************************************/ /* This structure helps managing static strings. The basic usage goes like this: Instead of doing r = PyObject_CallMethod(o, "foo", "args", ...); do _Py_IDENTIFIER(foo); ... r = _PyObject_CallMethodId(o, &PyId_foo, "args", ...); PyId_foo is a static variable, either on block level or file level. On first usage, the string "foo" is interned, and the structures are linked. On interpreter shutdown, all strings are released (through _PyUnicode_ClearStaticStrings). Alternatively, _Py_static_string allows to choose the variable name. _PyUnicode_FromId returns a borrowed reference to the interned string. _PyObject_{Get,Set,Has}AttrId are __getattr__ versions using _Py_Identifier*. */ typedef struct _Py_Identifier { struct _Py_Identifier *next; const char* string; PyObject *object; } _Py_Identifier; #define _Py_static_string_init(value) { 0, value, 0 } #define _Py_static_string(varname, value) static _Py_Identifier varname = _Py_static_string_init(value) #define _Py_IDENTIFIER(varname) _Py_static_string(PyId_##varname, #varname)
嗯,注釋部分已經說明得很清楚了。通過一番查找,我們發現了可以用來從字典找固定字符串的函數 _PyDict_GetItemId,所以我們操作碼的查找部分的代碼就是長這樣滴。
/** Our callback function will be named op_target **/ PyObject *target = NULL; _Py_IDENTIFIER(op_target); target = _PyDict_GetItemId(f->f_globals, &PyId_op_target); if (target == NULL && _PyErr_OCCURRED()) { if (!PyErr_ExceptionMatches(PyExc_KeyError)) goto error; PyErr_Clear(); DISPATCH(); }
為了方便理解,對這一段代碼做一些說明:
下一步就是收集我們想要的堆棧信息。
/** This code create a list with all the values on the current stack **/ PyObject *value = PyList_New(0); for (i = 1 ; i <= STACK_LEVEL(); i++) { tmp = PEEK(i); if (tmp == NULL) { tmp = Py_None; } PyList_Append(value, tmp); }
最後一步就是調用我們的回調函數!我們用 call_function 來搞定這件事,我們通過研究操作碼 CALL_FUNCTION 的實現來學習怎麼使用 call_function 。
/** From: Python/ceval.c **/ TARGET(CALL_FUNCTION) { PyObject **sp, *res; /** stack_pointer is a local of the main loop. It's the pointer to the stacktop of our frame **/ sp = stack_pointer; res = call_function(&sp, oparg); /** call_function handles the args it consummed on the stack for us **/ stack_pointer = sp; PUSH(res); /** Standard exception handling **/ if (res == NULL) goto error; DISPATCH(); }
有了上面這些信息,我們終於可以搗鼓出一個操作碼DEBUG_OP的草稿了:
TARGET(DEBUG_OP) { PyObject *value = NULL; PyObject *target = NULL; PyObject *res = NULL; PyObject **sp = NULL; PyObject *tmp; int i; _Py_IDENTIFIER(op_target); target = _PyDict_GetItemId(f->f_globals, &PyId_op_target); if (target == NULL && _PyErr_OCCURRED()) { if (!PyErr_ExceptionMatches(PyExc_KeyError)) goto error; PyErr_Clear(); DISPATCH(); } value = PyList_New(0); Py_INCREF(target); for (i = 1 ; i <= STACK_LEVEL(); i++) { tmp = PEEK(i); if (tmp == NULL) tmp = Py_None; PyList_Append(value, tmp); } PUSH(target); PUSH(value); Py_INCREF(f); PUSH(f); sp = stack_pointer; res = call_function(&sp, 2); stack_pointer = sp; if (res == NULL) goto error; Py_DECREF(res); DISPATCH(); }
在編寫 CPython 實現的 C 代碼方面我確實沒有什麼經驗,有可能我漏掉了些細節。如果您有什麼建議還請您糾正,我期待您的反饋。
編譯它,成了!
一切看起來很順利,但是當我們嘗試去使用我們定義的操作碼 DEBUG_OP 的時候卻失敗了。自從 2008 年之後,Python 使用預先寫好的 goto(你也可以從 這裡獲取更多的訊息)。故,我們需要更新下 goto jump table,我們在 Python/opcode_targets.h 中做如下修改。
/** From: Python/opcode_targets.h **/ /** Easy change since DEBUG_OP is the opcode number 1 **/ static void *opcode_targets[256] = { //&&_unknown_opcode, &&TARGET_DEBUG_OP, &&TARGET_POP_TOP, /** ... **/
這就完事了,我們現在就有了一個可以工作的新操作碼。唯一的問題就是這貨雖然存在,但是沒有被人調用過。接下來,我們將DEBUG_OP注入到函數的字節碼中。
在 Python 字節碼中注入操作碼 DEBUG_OP
有很多方式可以在 Python 字節碼中注入新的操作碼:
為了創造出一個新操作碼,有了上面的那一堆 C 代碼就夠了。現在讓我們回到原點,開始理解奇怪甚至神奇的 Python!
我們將要做的事兒有:
和 code object 有關的小貼士
如果你從沒聽說過 code object,這裡有一個簡單的介紹網路上也有一些相關的文檔可供查閱,可以直接 Ctrl+F 查找 code object
還有一件事情需要注意的是在這篇文章所指的環境中 code object 是不可變的:
Python 3.4.2 (default, Oct 8 2014, 10:45:20) [GCC 4.9.1] on linux Type "help", "copyright", "credits" or "license" for more information. >>> x = lambda y : 2 >>> x.__code__ <code object <lambda> at 0x7f481fd88390, file "<stdin>", line 1> >>> x.__code__.co_name '<lambda>' >>> x.__code__.co_name = 'truc' Traceback (most recent call last): File "<stdin>", line 1, in <module> AttributeError: readonly attribute >>> x.__code__.co_consts = ('truc',) Traceback (most recent call last): File "<stdin>", line 1, in <module> AttributeError: readonly attribute
但是不用擔心,我們將會找到方法繞過這個問題的。
使用的工具
為了修改字節碼我們需要一些工具:
用 dis.Bytecode 反編譯 code object 能告訴我們一些有關操作碼、參數和上下文的信息。
# Python3.4 >>> import dis >>> f = lambda x: x + 3 >>> for i in dis.Bytecode(f.__code__): print (i) ... Instruction(opname='LOAD_FAST', opcode=124, arg=0, argval='x', argrepr='x', offset=0, starts_line=1, is_jump_target=False) Instruction(opname='LOAD_CONST', opcode=100, arg=1, argval=3, argrepr='3', offset=3, starts_line=None, is_jump_target=False) Instruction(opname='BINARY_ADD', opcode=23, arg=None, argval=None, argrepr='', offset=6, starts_line=None, is_jump_target=False) Instruction(opname='RETURN_VALUE', opcode=83, arg=None, argval=None, argrepr='', offset=7, starts_line=None, is_jump_target=False)
為了能夠修改 code object,我定義了一個很小的類用來復制 code object,同時能夠按我們的需求修改相應的值,然後重新生成一個新的 code object。
class MutableCodeObject(object): args_name = ("co_argcount", "co_kwonlyargcount", "co_nlocals", "co_stacksize", "co_flags", "co_code", "co_consts", "co_names", "co_varnames", "co_filename", "co_name", "co_firstlineno", "co_lnotab", "co_freevars", "co_cellvars") def __init__(self, initial_code): self.initial_code = initial_code for attr_name in self.args_name: attr = getattr(self.initial_code, attr_name) if isinstance(attr, tuple): attr = list(attr) setattr(self, attr_name, attr) def get_code(self): args = [] for attr_name in self.args_name: attr = getattr(self, attr_name) if isinstance(attr, list): attr = tuple(attr) args.append(attr) return self.initial_code.__class__(*args)
這個類用起來很方便,解決了上面提到的 code object 不可變的問題。
>>> x = lambda y : 2 >>> m = MutableCodeObject(x.__code__) >>> m <new_code.MutableCodeObject object at 0x7f3f0ea546a0> >>> m.co_consts [None, 2] >>> m.co_consts[1] = '3' >>> m.co_name = 'truc' >>> m.get_code() <code object truc at 0x7f3f0ea2bc90, file "<stdin>", line 1>
測試我們的新操作碼
我們現在擁有了注入 DEBUG_OP 的所有工具,讓我們來驗證下我們的實現是否可用。我們將我們的操作碼注入到一個最簡單的函數中:
from new_code import MutableCodeObject def op_target(*args): print("WOOT") print("op_target called with args <{0}>".format(args)) def nop(): pass new_nop_code = MutableCodeObject(nop.__code__) new_nop_code.co_code = b"\x00" + new_nop_code.co_code[0:3] + b"\x00" + new_nop_code.co_code[-1:] new_nop_code.co_stacksize += 3 nop.__code__ = new_nop_code.get_code() import dis dis.dis(nop) nop() # Don't forget that ./python is our custom Python implementing DEBUG_OP hakril@computer ~/python/CPython3.5 % ./python proof.py 8 0 <0> 1 LOAD_CONST 0 (None) 4 <0> 5 RETURN_VALUE WOOT op_target called with args <([], <frame object at 0x7fde9eaebdb0>)> WOOT op_target called with args <([None], <frame object at 0x7fde9eaebdb0>)>
看起來它成功了!有一行代碼需要說明一下 new_nop_code.co_stacksize += 3
現在我們可以將我們的操作碼注入到每一個 Python 函數中了!
重寫字節碼
正如我們在上面的例子中所看到的那樣,重寫 Pyhton 的字節碼似乎 so easy。為了在每一個操作碼之間注入我們的操作碼,我們需要獲取每一個操作碼的偏移量,然後將我們的操作碼注入到這些位置上(把我們操作碼注入到參數上是有壞處大大滴)。這些偏移量也很容易獲取,使用 dis.Bytecode,就像這樣。
def add_debug_op_everywhere(code_obj): # We get every instruction offset in the code object offsets = [instr.offset for instr in dis.Bytecode(code_obj)] # And insert a DEBUG_OP at every offset return insert_op_debug_list(code_obj, offsets) def insert_op_debug_list(code, offsets): # We insert the DEBUG_OP one by one for nb, off in enumerate(sorted(offsets)): # Need to ajust the offsets by the number of opcodes already inserted before # That's why we sort our offsets! code = insert_op_debug(code, off + nb) return code # Last problem: what does insert_op_debug looks like?
基於上面的例子,有人可能會想我們的 insert_op_debug 會在指定的偏移量增加一個"\x00",這尼瑪是個坑啊!我們第一個 DEBUG_OP 注入的例子中被注入的函數是沒有任何的分支的,為了能夠實現完美一個函數注入函數 insert_op_debug 我們需要考慮到存在分支操作碼的情況。
Python 的分支一共有兩種:
(1) 絕對分支:看起來是類似這樣子的 Instruction_Pointer = argument(instruction)
(2)相對分支:看起來是類似這樣子的 Instruction_Pointer += argument(instruction)
相對分支總是向前的
我們希望這些分支在我們插入操作碼之後仍然能夠正常工作,為此我們需要修改一些指令參數。以下是其邏輯流程:
(1) 對於每一個在插入偏移量之前的相對分支而言
如果目標地址是嚴格大於我們的插入偏移量的話,將指令參數增加 1
如果相等,則不需要增加 1 就能夠在跳轉操作和目標地址之間執行我們的操作碼DEBUG_OP
如果小於,插入我們的操作碼的話並不會影響到跳轉操作和目標地址之間的距離
(2) 對於 code object 中的每一個絕對分支而言
如果目標地址是嚴格大於我們的插入偏移量的話,將指令參數增加 1
如果相等,那麼不需要任何修改,理由和相對分支部分是一樣的
如果小於,插入我們的操作碼的話並不會影響到跳轉操作和目標地址之間的距離
下面是實現:
# Helper def bytecode_to_string(bytecode): if bytecode.arg is not None: return struct.pack("<Bh", bytecode.opcode, bytecode.arg) return struct.pack("<B", bytecode.opcode) # Dummy class for bytecode_to_string class DummyInstr: def __init__(self, opcode, arg): self.opcode = opcode self.arg = arg def insert_op_debug(code, offset): opcode_jump_rel = ['FOR_ITER', 'JUMP_FORWARD', 'SETUP_LOOP', 'SETUP_WITH', 'SETUP_EXCEPT', 'SETUP_FINALLY'] opcode_jump_abs = ['POP_JUMP_IF_TRUE', 'POP_JUMP_IF_FALSE', 'JUMP_ABSOLUTE'] res_codestring = b"" inserted = False for instr in dis.Bytecode(code): if instr.offset == offset: res_codestring += b"\x00" inserted = True if instr.opname in opcode_jump_rel and not inserted: #relative jump are always forward if offset < instr.offset + 3 + instr.arg: # inserted beetwen jump and dest: add 1 to dest (3 for size) #If equal: jump on DEBUG_OP to get info before exec instr res_codestring += bytecode_to_string(DummyInstr(instr.opcode, instr.arg + 1)) continue if instr.opname in opcode_jump_abs: if instr.arg > offset: res_codestring += bytecode_to_string(DummyInstr(instr.opcode, instr.arg + 1)) continue res_codestring += bytecode_to_string(instr) # replace_bytecode just replaces the original code co_code return replace_bytecode(code, res_codestring)
讓我們看一下效果如何:
>>> def lol(x): ... for i in range(10): ... if x == i: ... break >>> dis.dis(lol) 101 0 SETUP_LOOP 36 (to 39) 3 LOAD_GLOBAL 0 (range) 6 LOAD_CONST 1 (10) 9 CALL_FUNCTION 1 (1 positional, 0 keyword pair) 12 GET_ITER >> 13 FOR_ITER 22 (to 38) 16 STORE_FAST 1 (i) 102 19 LOAD_FAST 0 (x) 22 LOAD_FAST 1 (i) 25 COMPARE_OP 2 (==) 28 POP_JUMP_IF_FALSE 13 103 31 BREAK_LOOP 32 JUMP_ABSOLUTE 13 35 JUMP_ABSOLUTE 13 >> 38 POP_BLOCK >> 39 LOAD_CONST 0 (None) 42 RETURN_VALUE >>> lol.__code__ = transform_code(lol.__code__, add_debug_op_everywhere, add_stacksize=3) >>> dis.dis(lol) 101 0 <0> 1 SETUP_LOOP 50 (to 54) 4 <0> 5 LOAD_GLOBAL 0 (range) 8 <0> 9 LOAD_CONST 1 (10) 12 <0> 13 CALL_FUNCTION 1 (1 positional, 0 keyword pair) 16 <0> 17 GET_ITER >> 18 <0> 102 19 FOR_ITER 30 (to 52) 22 <0> 23 STORE_FAST 1 (i) 26 <0> 27 LOAD_FAST 0 (x) 30 <0> 103 31 LOAD_FAST 1 (i) 34 <0> 35 COMPARE_OP 2 (==) 38 <0> 39 POP_JUMP_IF_FALSE 18 42 <0> 43 BREAK_LOOP 44 <0> 45 JUMP_ABSOLUTE 18 48 <0> 49 JUMP_ABSOLUTE 18 >> 52 <0> 53 POP_BLOCK >> 54 <0> 55 LOAD_CONST 0 (None) 58 <0> 59 RETURN_VALUE # Setup the simplest handler EVER >>> def op_target(stack, frame): ... print (stack) # GO >>> lol(2) [] [] [<class 'range'>] [10, <class 'range'>] [range(0, 10)] [<range_iterator object at 0x7f1349afab80>] [0, <range_iterator object at 0x7f1349afab80>] [<range_iterator object at 0x7f1349afab80>] [2, <range_iterator object at 0x7f1349afab80>] [0, 2, <range_iterator object at 0x7f1349afab80>] [False, <range_iterator object at 0x7f1349afab80>] [<range_iterator object at 0x7f1349afab80>] [1, <range_iterator object at 0x7f1349afab80>] [<range_iterator object at 0x7f1349afab80>] [2, <range_iterator object at 0x7f1349afab80>] [1, 2, <range_iterator object at 0x7f1349afab80>] [False, <range_iterator object at 0x7f1349afab80>] [<range_iterator object at 0x7f1349afab80>] [2, <range_iterator object at 0x7f1349afab80>] [<range_iterator object at 0x7f1349afab80>] [2, <range_iterator object at 0x7f1349afab80>] [2, 2, <range_iterator object at 0x7f1349afab80>] [True, <range_iterator object at 0x7f1349afab80>] [<range_iterator object at 0x7f1349afab80>] [] [None]
甚好!現在我們知道了如何獲取堆棧信息和 Python 中每一個操作對應的幀信息。上面結果所展示的結果目前而言並不是很實用。在最後一部分中讓我們對注入做進一步的封裝。
增加 Python 封裝
正如您所見到的,所有的底層接口都是好用的。我們最後要做的一件事是讓 op_target 更加方便使用(這部分相對而言比較空泛一些,畢竟在我看來這不是整個項目中最有趣的部分)。
首先我們來看一下幀的參數所能提供的信息,如下所示:
經過我們的處理我們可以得知 DEBUG_OP 之後要被執行的操作碼,這對我們聚合數據並展示是相當有用的。
新建一個用於追蹤函數內部機制的類:
一旦我們知道下一個操作,我們就可以分析它並修改它的參數。舉例來說我們可以增加一個 auto-follow-called-functions 的特性。
def op_target(l, f, exc=None): if op_target.callback is not None: op_target.callback(l, f, exc) class Trace: def __init__(self, func): self.func = func def call(self, *args, **kwargs): self.add_func_to_trace(self.func) # Activate Trace callback for the func call op_target.callback = self.callback try: res = self.func(*args, **kwargs) except Exception as e: res = e op_target.callback = None return res def add_func_to_trace(self, f): # Is it code? is it already transformed? if not hasattr(f ,"op_debug") and hasattr(f, "__code__"): f.__code__ = transform_code(f.__code__, transform=add_everywhere, add_stacksize=ADD_STACK) f.__globals__['op_target'] = op_target f.op_debug = True def do_auto_follow(self, stack, frame): # Nothing fancy: FrameAnalyser is just the wrapper that gives the next executed instruction next_instr = FrameAnalyser(frame).next_instr() if "CALL" in next_instr.opname: arg = next_instr.arg f_index = (arg & 0xff) + (2 * (arg >> 8)) called_func = stack[f_index] # If call target is not traced yet: do it if not hasattr(called_func, "op_debug"): self.add_func_to_trace(called_func)
現在我們實現一個 Trace 的子類,在這個子類中增加 callback 和 doreport 這兩個方法。callback 方法將在每一個操作之後被調用。doreport 方法將我們收集到的信息打印出來。
這是一個偽函數追蹤器實現:
class DummyTrace(Trace): def __init__(self, func): self.func = func self.data = collections.OrderedDict() self.last_frame = None self.known_frame = [] self.report = [] def callback(self, stack, frame, exc): if frame not in self.known_frame: self.known_frame.append(frame) self.report.append(" === Entering New Frame {0} ({1}) ===".format(frame.f_code.co_name, id(frame))) self.last_frame = frame if frame != self.last_frame: self.report.append(" === Returning to Frame {0} {1}===".format(frame.f_code.co_name, id(frame))) self.last_frame = frame self.report.append(str(stack)) instr = FrameAnalyser(frame).next_instr() offset = str(instr.offset).rjust(8) opname = str(instr.opname).ljust(20) arg = str(instr.arg).ljust(10) self.report.append("{0} {1} {2} {3}".format(offset, opname, arg, instr.argval)) self.do_auto_follow(stack, frame) def do_report(self): print("\n".join(self.report))
這裡有一些實現的例子和使用方法。格式有些不方便觀看,畢竟我並不擅長於搞這種對用戶友好的報告的事兒。
遞推式構造列表(List Comprehensions)的追蹤示例。
總結
這個小項目是一個了解 Python 底層的良好途徑,包括解釋器的 main loop,Python 實現的 C 代碼編程、Python 字節碼。通過這個小工具我們可以看到 Python 一些有趣構造函數的字節碼行為,例如生成器、上下文管理和遞推式構造列表。
這裡是這個小項目的完整代碼。更進一步的,我們還可以做的是修改我們所追蹤的函數的堆棧。我雖然不確定這個是否有用,但是可以肯定是這一過程是相當有趣的。