程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

python 線程同步(二) -- 條件對象

編輯:Python

1. 引言

上一篇文章中,我們介紹了線程同步與 Python 中的鎖機制。 Python 線程同步(一) — 競爭條件與線程鎖

但鎖機制只能解決最為簡單和通用的線程同步場景,本文我們就來詳細介紹更為復雜的場景下需要使用哪些新的線程同步工具 — 條件對象。

2. 簡介

我們此前解析過 Java 中的條件對象的源碼。 鎖的等待與喚醒 — ConditionObject 源碼解析

理解了 java 中的條件對象的執行原理,我們就會發現 python 中的條件對象與 java 中的條件對象實際上完全是一個東西。 有這樣一個場景,訂單的狀態在不斷變更,線程1關心訂單支付成功狀態並在此後做一些事,線程2關心訂單發起退款狀態並在此後做一些事,而業務線程則在業務執行過程中不斷變更訂單狀態,而當訂單一創建,我們需要讓線程1、線程2阻塞等待,而只有到了預期狀態被成功更新,才喚醒,而狀態本身是一個競爭條件,其變更與查詢都需要加鎖。 看上去上面的場景非常復雜,但使用條件對象去處理就會非常方便。

3. 條件對象的執行流程

條件對象總是保存有一個鎖的引用,創建條件對象時可以作為參數傳入,必須是 threading.Lock 或者 threading.RLock,如果沒有傳入,則會創建默認的 threading.RLock。 條件對象也有著加鎖與解鎖方法,條件對象只負責調用對象鎖成員的對應方法。 加鎖後,一旦調用 wait 方法,則自動釋放鎖並阻塞等待,此時,另一個等待鎖的線程開始執行,直到該線程調用 notify 或 notify_all 方法並釋放鎖,等待著的線程才能繼續執行。

4. 加鎖與解鎖

acquire(*args) release()

如上文所述,加鎖與解鎖實際上是直接調用條件對象所持有的鎖實例的對應方法。

5. 等待

5.1. wait

wait(timeout=None)

阻塞等待直到被喚醒或超時。 必須在線程獲取到鎖之後再調用該方法,否則會拋出 RuntimeError。 這個方法釋放鎖,然後阻塞,直到在另外一個線程中調用同一個條件變量的 notify() 或 notify_all() 喚醒它,或者直到可選的超時發生。 如果條件對象持有的是 RLock,那麼他不會調用 release 方法釋放鎖,而是調用 RLock 的內部接口,一次性釋放。 從 python3.2 開始,這個方法總是返回 None。

5.2. wait_for

wait_for(predicate, timeout=None)

等待,直到條件計算為真或超時。 predicate 應該是一個可調用對象而且它的返回值可被解釋為一個布爾值。 與 wait 方法一樣,wait_for 方法也支持傳入一個 timeout 參數。 其實現方法大致相當於:

while not predicate():
cv.wait()

6. 喚醒

6.1. notify

notify(n=1)

喚醒等待在這個條件對象的線程,傳入參數為喚醒線程數量,默認為 1。 如果調用線程在沒有獲得鎖的情況下調用這個方法,會引發 RuntimeError 異常。 需要注意的是,被喚醒的線程實際上不會返回它調用的 wait() ,直到它可以重新獲得鎖,而 notify 方法並不會釋放鎖。

6.2. notify_all

notify_all()

喚醒所有正在等待這個條件對象的線程。 相當於:

cv.notify(threading.active_count())

7. 上下文管理協議與示例

條件對象也同樣支持 python 上下文管理協議,下面我們通過條件對象及上下文管理協議實現我們開始時所設想的對訂單狀態的監聽程序:

import logging
import random
from threading import Thread, Condition
class Queue():
def __init__(self):
self.runtimes = 0
self.front = -1
self.rear = -1
self.queue = []
def enqueue(self, ele): # 入隊操作
self.queue.append(ele)
self.rear = self.rear + 1
def dequeue(self): # 出隊操作
if self.isempty():
return None
node = self.queue.pop(0)
self.front = self.front + 1
return node
def isempty(self):
return self.front == self.rear
class CareStatusThread(Thread):
def __init__(self, carestatus, conobj, queue, threshold):
super().__init__()
self.orderids = []
self.conobj = conobj
self.queue = queue
self.notifystatus = carestatus
self.threshold = threshold
def run(self):
while True:
with self.conobj:
logging.info('%r start running' % self)
if self.queue.isempty():
logging.info('%r queue is empty' % self)
self.conobj.wait()
firstorder = None
orderids = list()
while True:
order = self.queue.dequeue()
if order is None or order == firstorder:
break
if order['status'] != self.notifystatus:
if firstorder is None:
firstorder = order
self.queue.enqueue(order)
continue
orderids.append(order['id'])
if len(orderids) > 0:
self.orderids.extend(orderids)
logging.info('%r orders%s add in list' % (self, orderids))
logging.info('%r run over' % self)
if self.queue.runtimes == self.threshold:
return
self.conobj.wait()
def __repr__(self):
return 'CareStatusThread(%s)' % self.notifystatus
class ProducerThread(Thread):
def __init__(self, orderlist, conobj, queue, threshold):
super().__init__()
self.orderlist = orderlist
self.conobj = conobj
self.queue = queue
self.threshold = threshold
def run(self):
for _ in range(self.threshold):
with self.conobj:
times = int(random.uniform(1, 5))
for _ in range(times):
index = int(random.uniform(0, len(self.orderlist)))
order = self.orderlist[index]
fromstatus = order['status']
order['status'] += 1
self.queue.enqueue(order)
logging.info('%r change order %s from %s to %s' % (self, order['id'], fromstatus, order['status']))
self.queue.runtimes += 1
logging.info('%r run over' % self)
self.conobj.notify_all()
def __repr__(self):
return 'ProducerThread'
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
conobj = Condition()
queue = Queue()
orderid = 10001
threshold = 10
orderlist = [{'id': orderid + i, 'status': 0} for i in range(50)]
producer = ProducerThread(orderlist, conobj, queue, threshold)
afterpay = CareStatusThread(1, conobj, queue, threshold)
afterrefund = CareStatusThread(2, conobj, queue, threshold)
afterpay.start()
afterrefund.start()
producer.start()
producer.join()
afterpay.join()
afterrefund.join()
logging.info('%r orderids: %s' % (afterpay, afterpay.orderids))
logging.info('%r orderids: %s' % (afterrefund, afterrefund.orderids))

7.1. 執行結果

打印出了:

2019-05-11 10:01:43,340 - INFO: CareStatusThread(1) start running 2019-05-11 10:01:43,342 - INFO: CareStatusThread(1) queue is empty 2019-05-11 10:01:43,343 - INFO: CareStatusThread(2) start running 2019-05-11 10:01:43,344 - INFO: CareStatusThread(2) queue is empty 2019-05-11 10:01:43,346 - INFO: ProducerThread change order 10020 from 0 to 1 2019-05-11 10:01:43,347 - INFO: ProducerThread change order 10041 from 0 to 1 2019-05-11 10:01:43,348 - INFO: ProducerThread run over 2019-05-11 10:01:43,348 - INFO: ProducerThread change order 10029 from 0 to 1 2019-05-11 10:01:43,349 - INFO: ProducerThread run over 2019-05-11 10:01:43,350 - INFO: CareStatusThread(1) orders[10020, 10041, 10029] add in list 2019-05-11 10:01:43,350 - INFO: CareStatusThread(1) run over 2019-05-11 10:01:43,351 - INFO: ProducerThread change order 10010 from 0 to 1 2019-05-11 10:01:43,351 - INFO: ProducerThread change order 10020 from 1 to 2 2019-05-11 10:01:43,352 - INFO: ProducerThread run over 2019-05-11 10:01:43,352 - INFO: ProducerThread change order 10003 from 0 to 1 2019-05-11 10:01:43,353 - INFO: ProducerThread run over 2019-05-11 10:01:43,353 - INFO: ProducerThread change order 10005 from 0 to 1 2019-05-11 10:01:43,354 - INFO: ProducerThread change order 10010 from 1 to 2 2019-05-11 10:01:43,354 - INFO: ProducerThread change order 10032 from 0 to 1 2019-05-11 10:01:43,354 - INFO: ProducerThread run over 2019-05-11 10:01:43,355 - INFO: ProducerThread change order 10025 from 0 to 1 2019-05-11 10:01:43,355 - INFO: ProducerThread change order 10034 from 0 to 1 2019-05-11 10:01:43,356 - INFO: ProducerThread change order 10036 from 0 to 1 2019-05-11 10:01:43,356 - INFO: ProducerThread change order 10033 from 0 to 1 2019-05-11 10:01:43,357 - INFO: ProducerThread run over 2019-05-11 10:01:43,357 - INFO: ProducerThread change order 10011 from 0 to 1 2019-05-11 10:01:43,357 - INFO: ProducerThread change order 10012 from 0 to 1 2019-05-11 10:01:43,358 - INFO: ProducerThread run over 2019-05-11 10:01:43,358 - INFO: ProducerThread change order 10045 from 0 to 1 2019-05-11 10:01:43,359 - INFO: ProducerThread change order 10036 from 1 to 2 2019-05-11 10:01:43,359 - INFO: ProducerThread run over 2019-05-11 10:01:43,360 - INFO: ProducerThread change order 10035 from 0 to 1 2019-05-11 10:01:43,360 - INFO: ProducerThread change order 10013 from 0 to 1 2019-05-11 10:01:43,361 - INFO: ProducerThread change order 10014 from 0 to 1 2019-05-11 10:01:43,361 - INFO: ProducerThread change order 10039 from 0 to 1 2019-05-11 10:01:43,361 - INFO: ProducerThread run over 2019-05-11 10:01:43,362 - INFO: ProducerThread change order 10039 from 1 to 2 2019-05-11 10:01:43,362 - INFO: ProducerThread run over 2019-05-11 10:01:43,363 - INFO: CareStatusThread(2) orders[10010, 10020, 10010, 10036, 10036, 10039, 10039] add in list 2019-05-11 10:01:43,364 - INFO: CareStatusThread(2) run over 2019-05-11 10:01:43,364 - INFO: CareStatusThread(1) start running 2019-05-11 10:01:43,365 - INFO: CareStatusThread(1) orders[10005, 10032, 10025, 10034, 10033, 10011, 10012, 10045, 10035, 10013, 10014] add in list 2019-05-11 10:01:43,365 - INFO: CareStatusThread(1) run over 2019-05-11 10:01:43,366 - INFO: CareStatusThread(1) orderids: [10020, 10041, 10029, 10005, 10032, 10025, 10034, 10033, 10011, 10012, 10045, 10035, 10013, 10014] 2019-05-11 10:01:43,366 - INFO: CareStatusThread(2) orderids: [10010, 10020, 10010, 10036, 10036, 10039, 10039]

7.2. 解析

上面代碼中,我們創建了三個線程:

  1. 生產者 ProducerThread 負責隨機改變若干個訂單的狀態,並將被改變訂單加入到訂單隊列中
  2. CareStatusThread(1) 監聽訂單從 0 狀態到 1 狀態,並加入到自己的訂單列表中
  3. CareStatusThread(2) 監聽訂單從 1 狀態到 2 狀態,並加入到自己的訂單列表中

我們看到 ProducerThread 執行變更狀態後,通過 notify_all 喚醒狀態監聽線程後解鎖,而 CareStatusThread 則緊接著執行相應的業務邏輯消費隊列。 這是一個典型的生產者-消費者模型,最終我們看到兩個消費者線程分別收集到了自己所關心的一系列訂單id。

8. 參考資料

https://docs.python.org/zh-cn/3.6/library/threading.html。


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved