我們使用多線程的目的通常是並發的運行單獨的操作,但有時候也需要在兩個或多個線程中同步操作。在Python中,線程同步有多種方式,包括Event、Condition和Barrier
調用者可以用set() 和clear() 方法控制這個標志,其他線程可以使用wait() 暫停,直到這個標志被設置
import logging
import time
from threading import Thread, Event
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s-%(asctime)s-%(message)s')
def wait_for_start_work(e_start_work: Event, name: str):
logging.debug(f'{
name} 等待工作...')
e_start_work.wait() # 會一直阻塞直到事件被set
while e_start_work.is_set():
logging.debug(f'{
name} 正在工作...')
time.sleep(1)
else:
logging.debug(f'{
name} 下班喽!!!!')
def wait_for_start_work_timeout(e_start_work: Event, name: str):
logging.debug(f'{
name} 等待工作...')
event_is_set = e_start_work.wait(timeout=1) # 等待最多1秒
if not event_is_set:
logging.debug(f'{
name} 還不通知工作,我自己先干了。。')
else:
logging.debug(f'{
name} 正在工作...')
if __name__ == '__main__':
e = Event()
for i in range(2):
t = Thread(target=wait_for_start_work, args=(e, f'員工{
i}'))
t.start()
t_xiaowang = Thread(target=wait_for_start_work_timeout, args=(e, f'負責的小王'))
t_xiaowang.start()
time.sleep(2)
logging.info('領導說: 開始工作了!!!')
e.set()
time.sleep(2)
logging.info('領導說: 下班了!!!')
e.clear()
結果
DEBUG-2022-06-24 10:14:07,076-員工0 等待工作...
DEBUG-2022-06-24 10:14:07,076-員工1 等待工作...
DEBUG-2022-06-24 10:14:07,077-負責的小王 等待工作...
DEBUG-2022-06-24 10:14:08,091-負責的小王 還不通知工作,我自己先干了。。
INFO-2022-06-24 10:14:09,077-領導說: 開始工作了!!!
DEBUG-2022-06-24 10:14:09,077-員工0 正在工作...
DEBUG-2022-06-24 10:14:09,077-員工1 正在工作...
DEBUG-2022-06-24 10:14:10,086-員工0 正在工作...
DEBUG-2022-06-24 10:14:10,086-員工1 正在工作...
INFO-2022-06-24 10:14:11,085-領導說: 下班了!!!
DEBUG-2022-06-24 10:14:11,101-員工0 下班喽!!!!
DEBUG-2022-06-24 10:14:11,101-員工1 下班喽!!!!
Condition使用了一個共享資源鎖,允許多個線程等待資源的更新。
import logging
import time
from threading import Thread, Condition
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s-%(asctime)s-%(message)s')
def wait_for_start_work(c_start_work: Condition, name: str):
logging.debug(f'{
name} 等待工作...')
with c_start_work:
c_start_work.wait() # 一直等待直到收到通知
logging.debug(f'{
name} 正在工作...')
def wait_for_start_work_timeout(c_start_work: Condition, name: str):
logging.debug(f'{
name} 等待工作...')
with c_start_work:
c_start_work.wait(timeout=1) # 等待最多1秒
logging.debug(f'{
name} 還不通知工作,我自己先干了。。')
if __name__ == '__main__':
c = Condition()
for i in range(2):
t = Thread(target=wait_for_start_work, args=(c, f'員工{
i}'))
t.start()
t_xiaowang = Thread(target=wait_for_start_work_timeout, args=(c, f'負責的小王'))
t_xiaowang.start()
time.sleep(2)
with c: # 這裡使用with來獲得與Condition關聯的鎖,也可以顯示的使用acquire()和release()方法
logging.info('領導說: 開始工作了!!!')
c.notify_all() # 通知wait狀態的所有線程
# c.notify(2) # 通知2個wait狀態的線程
結果
DEBUG-2022-06-24 10:39:00,615-員工0 等待工作...
DEBUG-2022-06-24 10:39:00,616-員工1 等待工作...
DEBUG-2022-06-24 10:39:00,616-負責的小王 等待工作...
DEBUG-2022-06-24 10:39:01,630-負責的小王 還不通知工作,我自己先干了。。
INFO-2022-06-24 10:39:02,628-領導說: 開始工作了!!!
DEBUG-2022-06-24 10:39:02,628-員工0 正在工作...
DEBUG-2022-06-24 10:39:02,629-員工1 正在工作...
Barrier會建立一個集合點,當指定數量的線程到達此點後才會放行
import logging
import time
import threading
from threading import Thread, Barrier
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s-%(asctime)s-%(message)s')
def wait_for_start_work(b_start_work: Barrier):
logging.debug('{} 等待其他人到齊。。'.format(threading.current_thread().name))
b_start_work.wait() # 會一直阻塞直到被放行
logging.debug('{} 正在工作...'.format(threading.current_thread().name))
if __name__ == '__main__':
b = Barrier(3) # 三個線程的集合點
for i in range(6):
t = Thread(target=wait_for_start_work, name=f'員工{
i}',args=(b, ))
t.start()
time.sleep(1)
結果
DEBUG-2022-06-24 11:01:18,710-員工0 等待其他人到齊。。
DEBUG-2022-06-24 11:01:19,726-員工1 等待其他人到齊。。
DEBUG-2022-06-24 11:01:20,738-員工2 等待其他人到齊。。
DEBUG-2022-06-24 11:01:20,739-員工2 正在工作...
DEBUG-2022-06-24 11:01:20,739-員工1 正在工作...
DEBUG-2022-06-24 11:01:20,739-員工0 正在工作...
DEBUG-2022-06-24 11:01:21,755-員工3 等待其他人到齊。。
DEBUG-2022-06-24 11:01:22,768-員工4 等待其他人到齊。。
DEBUG-2022-06-24 11:01:23,768-員工5 等待其他人到齊。。
DEBUG-2022-06-24 11:01:23,768-員工5 正在工作...
DEBUG-2022-06-24 11:01:23,768-員工3 正在工作...
DEBUG-2022-06-24 11:01:23,768-員工4 正在工作...