gevent是Python的一個用於網絡IO的函數庫,其中應用到了 coroutine(協同程序) 的思想。首先來了解下目前網絡框架的幾種基本的網絡I/O模型:
阻塞式單線程:這是最基本的I/O模型,只有在處理完一個請求之後才會處理下一個請求。它的缺點是效能差,如果有請求阻塞住,會讓服務無法繼續接受請求。但是這種模型編寫代碼相對簡單,在應對訪問量不大的情況時是非常適合的。
阻塞式多線程:針對於單線程接受請求量有限的缺點,一個很自然的想法就是給每一個請求開一個線程去處理。這樣做的好處是能夠接受更多的請求,缺點是在線程產生到一定數量之後,進程之間需要大量進行切換上下文的操作,會占用CPU大量的時間,不過這樣處理的話編寫代碼的難道稍高於單進程的情況。
非阻塞式事件驅動:為了解決多線程的問題,有一種做法是利用一個循環來檢查是否有網絡IO的事件發生,以便決定如何來進行處理(reactor設計模式)。這樣的做的好處是進一步降低了CPU的資源消耗。缺點是這樣做會讓程序難以編寫,因為請求接受後的處理過程由reactor來決定,使得程序的執行流程難以把握。當接受到一個請求後如果涉及到阻塞的操作,這個請求的處理就會停下來去接受另一個請求,程序執行的流程不會像線性程序那樣直觀。twisted框架就是應用這種IO模型的典型例子。
非阻塞式Coroutine:這個模式是為了解決事件驅動模型執行流程不直觀的問題,它在本質上也是事件驅動的,加入了Coroutine的概念,我要學習的gevent就是應用這種IO模型的函數庫。
接下來說說Coroutine(協程)這個概念,coroutine可以理解為一個輕量級的線程,為了解決了多線程上下文切換的損耗,提供了一個軟件的協程切換。並且相對於事件驅動,能夠將程序的執行過程由編寫程序的人更好的控制。下面的圖展現了協程的執行過程:
在了解了關於gevent的基本概念之後,接下來了就開始安裝gevent。
?
apt-get
install
libevent-dev
apt-get
install
python-all-dev
pip
install
gevent
現在基本的概念了解後,接下來就可以開始了解相關的代碼了
-----------------------------------------------------------------------------
在上一篇裡了解了gevent應用的IO模型概念之後,接下來開始真正了解gevent的使用。
在gevent裡面最多應用到的就是greenlet,一個輕量級的協程實現。在任何時間點,只有一個greenlet處於運行狀態。Greenlet與multiprocessing 和 threading這兩個庫提供的真正的並行結構的區別在於這兩個庫會真正的切換進程,POSIX線程是由操作系統來負責調度,並且它們是真正並行的。
應對並發的主要思路就是將一個大的任務分解成一個子任務的集合並且能夠讓它並行或者異步地執行,而不是一次執行一個或者同步執行。在兩個子任務中的切換被稱為上下文切換。
gevent裡面的上下文切換是非常平滑的。在下面的例子程序中,我們可以看到兩個上下文通過調用 gevent.sleep()來互相切換。
?import
gevent
def
foo():
print
(
'Running in foo'
)
gevent.sleep(
0
)
print
(
'Explicit context switch to foo again'
)
def
bar():
print
(
'Explicit context to bar'
)
gevent.sleep(
0
)
print
(
'Implicit context switch back to bar'
)
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
這段程序的執行結果如下:
?Running
in
foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
從這個執行結果可以看出這個程序的執行過程,在這裡的兩個函數是交替執行的。
gevent的真正威力是在處理網絡和帶有IO阻塞的功能時能夠這些任務協調地運行。gevent來實現了這些具體的細節來保證在需要的時候greenlet上下文進行切換。在這裡用一個例子來說明。
?import
time
import
gevent
from
gevent
import
select
start
=
time.time()
tic
=
lambda
:
'at %1.1f seconds'
%
(time.time()
-
start)
def
gr1():
# Busy waits for a second, but we don't want to stick around...
print
(
'Started Polling: '
, tic())
select.select([], [], [],
2
)
print
(
'Ended Polling: '
, tic())
def
gr2():
# Busy waits for a second, but we don't want to stick around...
print
(
'Started Polling: '
, tic())
select.select([], [], [],
2
)
print
(
'Ended Polling: '
, tic())
def
gr3():
print
(
"Hey lets do some stuff while the greenlets poll, at"
, tic())
gevent.sleep(
1
)
gevent.joinall([
gevent.spawn(gr1),
gevent.spawn(gr2),
gevent.spawn(gr3),
])
在上面的例子裡,select() 通常是一個阻塞的調用。
程序的執行結果如下:
?Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets
do
some stuff
while
the greenlets poll, at at 0.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 2.0 seconds
接下來一個例子中可以看到gevent是安排各個任務的執行的。
?import
gevent
import
random
def
task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(random.randint(
0
,
2
)
*
0.001
)
print
(
'Task'
, pid,
'done'
)
def
synchronous():
for
i
in
range
(
1
,
10
):
task(i)
def
asynchronous():
threads
=
[gevent.spawn(task, i)
for
i
in
xrange
(
10
)]
gevent.joinall(threads)
print
(
'Synchronous:'
)
synchronous()
print
(
'Asynchronous:'
)
asynchronous()
執行結果如下:
?root@master:~
# python two.py
Synchronous:
(
'Task'
, 1,
'done'
)
(
'Task'
, 2,
'done'
)
(
'Task'
, 3,
'done'
)
(
'Task'
, 4,
'done'
)
(
'Task'
, 5,
'done'
)
(
'Task'
, 6,
'done'
)
(
'Task'
, 7,
'done'
)
(
'Task'
, 8,
'done'
)
(
'Task'
, 9,
'done'
)
Asynchronous:
(
'Task'
, 0,
'done'
)
(
'Task'
, 9,
'done'
)
(
'Task'
, 7,
'done'
)
(
'Task'
, 3,
'done'
)
(
'Task'
, 6,
'done'
)
(
'Task'
, 5,
'done'
)
(
'Task'
, 4,
'done'
)
(
'Task'
, 1,
'done'
)
(
'Task'
, 2,
'done'
)
(
'Task'
, 8,
'done'
)
在同步的情況下,任務是按順序執行的,在執行各個任務的時候會阻塞主線程。
而gevent.spawn 的重要功能就是封裝了greenlet裡面的函數。初始化的greenlet放在了threads這個list裡面,被傳遞給了 gevent.joinall 這個函數,它會阻塞當前的程序來執行所有的greenlet。
在異步執行的情況下,所有任務的執行順序是完全隨機的。每一個greenlet的都不會阻塞其他greenlet的執行。
在有時候需要異步地從服務器獲取數據,gevent可以通過判斷從服務器的數據載入情況來處理請求。
?import
gevent.monkey
gevent.monkey.patch_socket()
import
gevent
import
urllib2
import
simplejson as json
def
fetch(pid):
response
=
urllib2.urlopen(
'http://json-time.appspot.com/time.json'
)
result
=
response.read()
json_result
=
json.loads(result)
datetime
=
json_result[
'datetime'
]
print
'Process '
, pid, datetime
return
json_result[
'datetime'
]
def
synchronous():
for
i
in
range
(
1
,
10
):
fetch(i)
def
asynchronous():
threads
=
[]
for
i
in
range
(
1
,
10
):
threads.append(gevent.spawn(fetch, i))
gevent.joinall(threads)
print
'Synchronous:'
synchronous()
print
'Asynchronous:'
asynchronous()
就像之前說的,greenlet是確定的。給每個greenlet相同的配置和相同的輸入,得到的輸出是相同的。我們可以用python 的多進程池和gevent池來作比較。下面的例子可以說明這個特點:
?import
time
def
echo(i):
time.sleep(
0.001
)
return
i
# Non Deterministic Process Pool
from
multiprocessing.pool
import
Pool
p
=
Pool(
10
)
run1
=
[a
for
a
in
p.imap_unordered(echo,
xrange
(
10
))]
run2
=
[a
for
a
in
p.imap_unordered(echo,
xrange
(
10
))]
run3
=
[a
for
a
in
p.imap_unordered(echo,
xrange
(
10
))]
run4
=
[a
for
a
in
p.imap_unordered(echo,
xrange
(
10
))]
print
( run1
=
=
run2
=
=
run3
=
=
run4 )
# Deterministic Gevent Pool
from
gevent.pool
import
Pool
p
=
Pool(
10
)
run1
=
[a
for
a
in
p.imap_unordered(echo,
xrange
(
10
))]
run2
=
[a
for
a
in
p.imap_unordered(echo,
xrange
(
10
))]
run3
=
[a
for
a
in
p.imap_unordered(echo,
xrange
(
10
))]
run4
=
[a
for
a
in
p.imap_unordered(echo,
xrange
(
10
))]
print
( run1
=
=
run2
=
=
run3
=
=
run4 )
下面是執行結果:
?False
True
從上面的例子可以看出,執行同一個函數,產生的greenlet是相同的,而產生的process是不同的。
在處理並發編程的時候會碰到一些問題,比如競爭資源的問題。最簡單的情況,當有兩個線程或進程訪問同一資源並且修改這個資源的時候,就會引發資源競爭的問題。那麼這個資源最終的值就會取決於那個線程或進程是最後執行的。這是個問題,總之,在處理全局的程序不確定行為的時候,需要盡量避免資源競爭的問題
最好的方法就是在任何時候盡量避免使用全局的狀態。全局狀態是經常會坑你的!
在gevent裡面封裝了一些初始化greenlet的方法,下面是幾個最常用的例子:
?import
gevent
from
gevent
import
Greenlet
def
foo(message, n):
"""
Each thread will be passed the message, and n arguments
in its initialization.
"""
gevent.sleep(n)
print
(message)
# Initialize a new Greenlet instance running the named function
# foo
thread1
=
Greenlet.spawn(foo,
"Hello"
,
1
)
# Wrapper for creating and runing a new Greenlet from the named
# function foo, with the passed arguments
thread2
=
gevent.spawn(foo,
"I live!"
,
2
)
# Lambda expressions
thread3
=
gevent.spawn(
lambda
x: (x
+
1
),
2
)
threads
=
[thread1, thread2, thread3]
# Block until all threads complete.
gevent.joinall(threads)
在上面的程序裡使用 spawn 方法來產生greenlet。還有一種初始化greenlet的方法,就是創建Greenlet的子類,並且重寫 _run 方法。
?import
gevent
from
gevent
import
Greenlet
class
MyGreenlet(Greenlet):
def
__init__(
self
, message, n):
Greenlet.__init__(
self
)
self
.message
=
message
self
.n
=
n
def
_run(
self
):
print
(
self
.message)
gevent.sleep(
self
.n)
g
=
MyGreenlet(
"Hi there!"
,
3
)
g.start()
g.join()
就像其他的代碼一樣,greenlet在執行的時候也會出錯。Greenlet有可能會無法拋出異常,停止失敗,或者消耗了太多的系統資源。
greenlet的內部狀態通常是一個依賴時間的參數。greenlet有一些標記來讓你能夠監控greenlet的狀態。
import
gevent
def
win():
return
'You win!'
def
fail():
raise
Exception(
'You fail at failing.'
)
winner
=
gevent.spawn(win)
loser
=
gevent.spawn(fail)
print
(winner.started)
# True
print
(loser.started)
# True
# Exceptions raised in the Greenlet, stay inside the Greenlet.
try
:
gevent.joinall([winner, loser])
except
Exception as e:
print
(
'This will never be reached'
)
print
(winner.value)
# 'You win!'
print
(loser.value)
# None
print
(winner.ready())
# True
print
(loser.ready())
# True
print
(winner.successful())
# True
print
(loser.successful())
# False
# The exception raised in fail, will not propogate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.
print
(loser.exception)
# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()
這段代碼的執行結果如下:
?True
True
You win!
None
True
True
True
False
You fail at failing.
在主程序收到一個SIGQUIT 之後會阻塞程序的執行讓Greenlet無法繼續執行。這會導致僵屍進程的產生,需要在操作系統中將這些僵屍進程清除掉。
?import
gevent
import
signal
def
run_forever():
gevent.sleep(
1000
)
if
__name__
=
=
'__main__'
:
gevent.signal(signal.SIGQUIT, gevent.shutdown)
thread
=
gevent.spawn(run_forever)
thread.join()
gevent提供了對與代碼運行時的時間限制功能,也就是超時功能。
?import
gevent
from
gevent
import
Timeout
seconds
=
10
timeout
=
Timeout(seconds)
timeout.start()
def
wait():
gevent.sleep(
10
)
try
:
gevent.spawn(wait).join()
except
Timeout:
print
'Could not complete'
也可以通過用with 上下文的方法來實現超時的功能:
?import
gevent
from
gevent
import
Timeout
time_to_wait
=
5
# seconds
class
TooLong(Exception):
pass
with Timeout(time_to_wait, TooLong):
gevent.sleep(
10
)
gevent還提供了一些超時的參數以應對不同的狀況:
?import
gevent
from
gevent
import
Timeout
def
wait():
gevent.sleep(
2
)
timer
=
Timeout(
1
).start()
thread1
=
gevent.spawn(wait)
try
:
thread1.join(timeout
=
timer)
except
Timeout:
print
(
'Thread 1 timed out'
)
# --
timer
=
Timeout.start_new(
1
)
thread2
=
gevent.spawn(wait)
try
:
thread2.get(timeout
=
timer)
except
Timeout:
print
(
'Thread 2 timed out'
)
# --
try
:
gevent.with_timeout(
1
, wait)
except
Timeout:
print
(
'Thread 3 timed out'
)
運行結果如下:
?Thread 1 timed out
Thread 2 timed out
Thread 3 timed out
現在這是gevent裡面的一個難點。下面一個例子裡可能看到 monkey.patch_socket() 能夠在運行時裡面修改基礎庫socket:
?import
socket
print
( socket.socket )
print
"After monkey patch"
from
gevent
import
monkey
monkey.patch_socket()
print
( socket.socket )
import
select
print
select.select
monkey.patch_select()
print
"After monkey patch"
print
( select.select )
運行結果如下:
?class
'socket.socket'
After monkey patch
class
'gevent.socket.socket'
built-
in
function
select
After monkey patch
function
select
at 0x1924de8
Python的運行時裡面允許能夠大部分的對象都是可以修改的,包括模塊,類和方法。這通常是一個壞主意,然而在極端的情況下,當有一個庫需要加入一些Python基本的功能的時候,monkey patch就能派上用場了。在上面的例子裡,gevent能夠改變基礎庫裡的一些使用IO阻塞模型的庫比如socket,ssl,threading等等並且把它們改成協程的執行方式。