最近通過的PEP-0492為 Python 3.5 在處理協程時增加了一些特殊的語法。新功能中很大一部分在3.5 之前的版本就已經有了,不過之前的語法並不算最好的,因為生成器和協程的概念本身就有點混在一起。PEP-0492 通過使用 async 關鍵字顯示的對生成器和協程做了區分。
本文旨在說明這些新的機制在底層是如何工作的。如果你只是對怎麼使用這些功能感興趣,那我建議你可以忽略這篇文章,而是去看一下內置的 asyncio 模塊的文檔。如果你對底層的概念感興趣,關心這些底層功能如何能構建你自己的 asyncio 模塊,那你會發現本文會有有意思。
本文中我們會完全放棄任何異步 I/O 方法,而只限於使用多協程的交互。下面是兩個很小的函數:
def coro1(): print("C1: Start") print("C1: Stop") def coro2(): print("C2: Start") print("C2: a") print("C2: b") print("C2: c") print("C2: Stop")
我們從兩個最簡單的函數開始,coro1和coro2。我們可以按順序來執行這兩個函數:
coro1() coro2()
我們得到期望的輸出結果:
C1: Start C1: Stop C2: Start C2: a C2: b C2: c C2: Stop
不過,基於某些原因,我們可能會期望這些代碼交互運行。普通的函數做不到這點,所以我們把這些函數轉換成攜程:
async def coro1(): print("C1: Start") print("C1: Stop") async def coro2(): print("C2: Start") print("C2: a") print("C2: b") print("C2: c") print("C2: Stop")
通過新的 async 關鍵字的魔法,這些函數不再是函數了,現在它們變成了協程(更准確的說是本地協程函數)。普通函數被調用的時候,函數體會被執行,但是在調用協程函數的時候,函數體並不會被執行,你得到的是一個協程對象:
c1 = coro1() c2 = coro2() print(c1, c2)
輸出:
<coroutine object coro1 at 0x10ea60990> <coroutine object coro2 at 0x10ea60a40>
(解釋器還會打印一些運行時的警告信息,先忽略掉)。
那麼,為什麼要有一個協程對象?代碼到底如何執行?執行協程的一種方式是使用 await 表達式(使用新的 await 關鍵字)。你可能會想,可以這樣來做:
await c1
不過,你肯定會失望了。await 表達式只有在本地協程函數裡才是有效的。你必須這樣做:
async def main(): await c1
接下來問題來了,main 函數又是如何開始執行的呢?
關鍵之處是協程確實是與 Python 的生成器非常相似,也都有一個 send 方法。我們可以通過調用 send 方法來啟動一個協程的執行。
c1.send(None)
這樣我們的第一個協程終於可以執行完成了,不過我們也得到了一個討厭的 StopIteration 異常:
C1: Start C1: Stop Traceback (most recent call last): File "test3.py", line 16, in c1.send(None) StopIteration
StopIteration 異常是一種標記生成器(或者像這裡的協程)執行結束的機制。雖然這是一個異常,但是確實是我們期望的!我們可以用適當的 try-catch 代碼將其包起來,這樣就可以避免錯誤提示。接下來我們讓我們的第二個協程也執行起來:
try: c1.send(None) except StopIteration: pass try: c2.send(None) except StopIteration: pass
現在我們得到了全部的輸出,不過有點讓人失望的是這跟最初的輸出結果沒有啥區別。因此我們增加了不少代碼,不過還沒有做到交替執行。協程與線程相似的地方是多個線程之間也可以交替執行,不過與線程不同之處在於協程之間的切換是顯式的,而線程是隱式的(大多數情況下是更好的方式)。所以我們需要加入顯式切換的代碼。
通常生成器的 send 方法會一直運行,直到通過 yield 關鍵字放棄執行,也許你認為我們的 coro1 可以改成這個樣子:
async def coro1(): print("C1: Start") yield print("C1: Stop")
但是我們不能在協程裡使用 yield。作為替換,我們可以使用新的 await 表達式來暫停協程的執行,直到 awaitable 執行結束。於是我們需要的代碼類似於 await _something_;問題是這裡 _something_ 是什麼呢?我們必須 await 某個東西,而不是空!這個 PEP 解釋了什麼是可以 await 的(awaitable)。其中一種是另一個本地協程,不過這個對我們了解底層細節沒有啥幫助。另一種是通過特定 CPython API 定義的對象,不過我們暫時還不打算引入擴展模塊,而只限於使用純 Python。除此之外,還剩下兩種選擇:基於生成器的協程對象,或者一個特殊的類似 Future 的對象。
接下來,我們會選擇基於生成器的協程對象。基本上一個 Python 的生成器(例如:某個有yield表達式的函數)可以通過 types.coroutine 裝飾被標記成一個協程。所以,這是一個最簡單的例子:
@types.coroutine def switch(): yield
這定義了一個基於生成器的協程函數。要得到基於生成器的協程對象,只需要執行這個函數。我們可以把我們的 coro1 協程修改成下面這樣:
async def coro1(): print("C1: Start") await switch() print("C1: Stop")
通過上面的修改,我們期望 coro1 和 coro2 可以交錯執行。到目前為止,輸出是這樣的:
C1: Start C2: Start C2: a C2: b C2: c C2: Stop
我沒看到正如期望的,在第一條打印語句之後,coro1 停止執行,coro2 接著執行。實際上,我們可以通過下面的代碼查看協程對象是如何暫停執行的:
print("c1 suspended at: {}:{}".format(c1.gi_frame.f_code.co_filename, c1.gi_frame.f_lineno))
這可以打印 await 表達式所在的行。(注意:打印的是最外層的 await,所以這裡只是起示例作用,通常情況下用處不大)。
現在的問題是,如何讓 coro1 繼續執行完呢?我們可以再調用一次 send,代碼如下:
try: c1.send(None) except StopIteration: pass try: c2.send(None) except StopIteration: pass try: c1.send(None) except StopIteration: pass
得到的輸出跟預期一樣:
C1: Start C2: Start C2: a C2: b C2: c C2: Stop C1: Stop
目前,我們通過為不同的協程顯式調用 send來讓它們都執行結束。通常情況下這種方式不是很好。我們希望的是有一個函數來控制所有的協程的運行,直到全部協程都執行完成。換句話說,我們期望連續不斷的調用 send,驅動不同的協程去執行,直到send拋出 StopIteration 異常。
為此我們新建一個函數,這個函數傳入一個協程列表,函數執行這些協程直到全部結束。我們現在要做的就是調用這個函數。
def run(coros): coros = list(coros) while coros: # Duplicate list for iteration so we can remove from original list. for coro in list(coros): try: coro.send(None) except StopIteration: coros.remove(coro)
這段代碼每次從協程列表裡取一個協程執行,如果捕獲到 StopIteration 異常,就把這個協程從隊列裡去掉。
接下來我們把手工調用 send 的代碼去掉,代碼如下:
c1 = coro1() c2 = coro2() run([c1, c2])
綜上所述,在 Python 3.5,我們現在可以通過新的 await 和 async 功能很輕松的執行協程。本文的相關代碼可以在 github 上找到。