我們已經介紹了 python 的幾種線程同步工具。 Python 線程同步(一) — 競爭條件與線程鎖python 線程同步(二) — 條件對象python 線程同步(三) — 信號量
本文介紹的線程同步工具相比上面已經介紹過的三類工具來說,更加簡單實用。
事件的使用是線程間通信的最簡單機制之一 — 一個線程發出事件信號,另一個線程等待並響應該信號。 python threading 包中提供的事件對象 Event 就是用來做這件事的。 當事件對象中的標志位由 True 變為 False,所有等待在該事件上的線程都將被喚醒。 因此,python 中的事件對象 Event 提供了以下方法供調用:
is_set()
返回事件標志是否為 True。
set()
將事件內部標志位設置為 True,接著喚醒所有等待在該事件上的線程。
clear()
清除標志,將事件標志重置為 False,此後若干個線程又可以重新阻塞在該事件對象上。
wait(timeout=None)
阻塞線程直到內部變量為true。如果調用時內部標志為true,將立即返回。否則將阻塞線程,直到調用 set() 方法將標志設置為true或者發生可選的超時。 如果是因為超時返回,則會返回 False,否則會返回 True。
下面的例子展示了所有5個線程均阻塞在一個事件對象上,直到3秒後,主線程調用 set 方法觸發事件信號,可以看到所有 5 個線程均立即開始執行。
import logging
from threading import Thread, Event
from time import sleep
class EventThread(Thread):
def __init__(self, event, id):
super().__init__()
self._event = event
self._id = id
def run(self):
logging.info('%r start running' % self)
self._event.wait()
logging.info('%r continue running after event' % self)
def __repr__(self):
return 'EventThread(%s)' % self._id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
event = Event()
for i in range(5):
thread = EventThread(event, i)
thread.start()
logging.info('main start sleeping')
sleep(3)
logging.info('main set event')
event.set()
打印出了:
2019-05-14 09:15:50,626 - INFO: EventThread(0) start running 2019-05-14 09:15:50,626 - INFO: EventThread(1) start running 2019-05-14 09:15:50,626 - INFO: EventThread(2) start running 2019-05-14 09:15:50,626 - INFO: EventThread(3) start running 2019-05-14 09:15:50,626 - INFO: EventThread(4) start running 2019-05-14 09:15:50,626 - INFO: main start sleeping 2019-05-14 09:15:53,639 - INFO: main set event 2019-05-14 09:15:53,645 - INFO: EventThread(1) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(0) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(2) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(4) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(3) continue running after event
柵欄類是另一個簡單的同步原語,此前我們已經介紹過 Linux 與 Java 中的柵欄。 java 線程同步工具類
柵欄對象用於讓多個線程互相等待。 他維護了一個內部的計數器,值由構造方法默認傳入,每當有一個線程調用 wait 方法,則該值原子地減 1,直到減到 0,則讓所有阻塞 wait 在該柵欄對象上的線程繼續執行。
Barrier(parties, action=None, timeout=None)
wait(timeout=None)
柵欄對象中最重要的方法就是 wait 方法了。 線程阻塞等待,直到構造方法傳入的 parties 個線程均阻塞等待在 wait 方法或超時,如果該方法傳入的超時時間為 None,則使用構造方法傳入的默認超時。 一旦超時發生,柵欄將立即進入破損狀態,此時其他仍阻塞等待該柵欄的線程將收到 wait 方法拋出的 BrokenBarrierError 異常。 如果試圖在已破損的柵欄對象上調用 wait 方法,也會立即拋出 BrokenBarrierError 異常。 返回一個數字,值為 0 到 parties - 1,解釋器保證了所有等待在同一個柵欄上的線程中,每一個的返回值都不同,以便讓你可以依賴 wait 方法的返回值來做一些處理。 如果創建柵欄對象時在構造函數中提供了 action 參數,它將在其中一個線程釋放前被調用。如果此調用引發了異常,柵欄對象將進入破損狀態。
reset()
重置柵欄為默認的初始態。 如果柵欄中仍有線程等待釋放,這些線程將會收到 BrokenBarrierError 異常。 除非非常必要,否則並不建議使用該方法,很多時候與其重用一個狀態未知的柵欄,不如新建一個。
abort()
使柵欄進入破損態。 這將導致所有已經調用和未來調用的 wait() 方法中引發 BrokenBarrierError 異常。
柵欄的使用雖然簡單,但卻十分實用,在實際環境中,我們通常需要並發調用很多業務方的接口,並收集他們的返回,然後在所有接口均返回後再進行下一步處理。 但並不是所有接口的調用都是必須的,因此對於該場景,一個必要的優化方式是一旦收集到必要接口的返回,立即中斷其他接口的調用,並開始這之後的操作。 上述需求如果使用柵欄來解決會顯得非常簡單而優雅,雖然 Python 中我們並不能在線程外終止線程,但我們可以通過柵欄的 abort 方法讓那些尚未執行結束的線程一旦執行結束即拋出異常,從而讓我們不需要去關注他們。 下面的例子模擬了上面描述的過程。
import logging
import random
from threading import Thread, Barrier
from time import sleep, time
class InterfaceThread(Thread):
def __init__(self, majorbarrier, minorbarrier, id, major):
super().__init__()
self._majorbarrier = majorbarrier
self._minorbarrier = minorbarrier
self._id = id
self._major = major
def run(self):
nsec = random.uniform(0, 4)
logging.info('%r start running sleep %s' % (self, nsec))
sleep(nsec)
logging.info('%r after sleeping' % self)
if self._major:
try:
result = self._majorbarrier.wait()
if result == 0:
self._minorbarrier.abort()
except:
logging.error('%s waitting on majorbarrier aborted' % self)
return
else:
try:
self._minorbarrier.wait()
except:
logging.warning('%s watting on minorbarrier aborted' % self)
return
logging.info('%r continue running after barrier' % self)
def __repr__(self):
return 'InterfaceThread(%s【major: %s】)' % (self._id, self._major)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')
start = time()
majorbarrier = Barrier(4)
minorbarrier = Barrier(3)
threads = list()
for i in range(6):
threads.append(InterfaceThread(majorbarrier, minorbarrier, i, bool(i >= 3)))
for thread in threads:
thread.start()
result = majorbarrier.wait()
if result == 0:
minorbarrier.abort()
logging.info('run by %s' % (time() - start))
上面的例子中創建了兩個柵欄對象,分別用來同步必要接口調用與非必要接口調用,我們通過隨機 sleep 0 到 4 秒來模擬接口調用。 一旦必要柵欄的 wait 方法返回 0,則意味著必要接口已全部返回,此時可以通過調用非必要柵欄的 abort 方法來破壞非必要柵欄,同時程序繼續執行,從而實現整體運行時間的最大限度縮短。
打印出了:
2019-05-14 14:00:05,045 - INFO: InterfaceThread(0【major: False】) start running sleep 1.3645551759667334 2019-05-14 14:00:05,050 - INFO: InterfaceThread(1【major: False】) start running sleep 3.5451267021153607 2019-05-14 14:00:05,050 - INFO: InterfaceThread(2【major: False】) start running sleep 3.0433784558963644 2019-05-14 14:00:05,052 - INFO: InterfaceThread(3【major: True】) start running sleep 2.0092681547999875 2019-05-14 14:00:05,053 - INFO: InterfaceThread(4【major: True】) start running sleep 2.266415383907653 2019-05-14 14:00:05,053 - INFO: InterfaceThread(5【major: True】) start running sleep 0.6692143957122372 2019-05-14 14:00:05,728 - INFO: InterfaceThread(5【major: True】) after sleeping 2019-05-14 14:00:06,416 - INFO: InterfaceThread(0【major: False】) after sleeping 2019-05-14 14:00:07,077 - INFO: InterfaceThread(3【major: True】) after sleeping 2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) after sleeping 2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - INFO: run by 2.284111976623535 2019-05-14 14:00:07,329 - INFO: InterfaceThread(5【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - INFO: InterfaceThread(3【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - WARNING: InterfaceThread(0【major: False】) watting on minorbarrier aborted 2019-05-14 14:00:08,109 - INFO: InterfaceThread(2【major: False】) after sleeping 2019-05-14 14:00:08,110 - WARNING: InterfaceThread(2【major: False】) watting on minorbarrier aborted 2019-05-14 14:00:08,613 - INFO: InterfaceThread(1【major: False】) after sleeping 2019-05-14 14:00:08,613 - WARNING: InterfaceThread(1【major: False】) watting on minorbarrier aborted
可以看到,並發調用六個線程,按照 sleep 時間,應該在 3.5451267 秒以上。 而實際上,由於重要線程均以完成,主線程只用 2.284111976623535 秒便已返回。 這樣,我們就實現了接口性能的大幅提升,但線程 1、2 由於 sleep 時間過長,沒有能夠在主線程返回前返回。