We've already introduced it python Several thread synchronization tools . Python Thread synchronization ( One ) — Race conditions and thread locks python Thread synchronization ( Two ) — Conditions of the object python Thread synchronization ( 3、 ... and ) — Semaphore
The thread synchronization tools introduced in this article are compared with the three types of tools already introduced above , More simple and practical .
The use of events is one of the simplest mechanisms for communication between threads — A thread signals Events , Another thread waits and responds to the signal . python threading The event object provided in the package Event Is used to do this . When the flag bit in the event object is set by True Turn into False, All threads waiting on this event will be awakened . therefore ,python Event object in Event The following methods are provided for calling :
is_set()
Returns whether the event flag is True.
set()
Set the event internal flag bit to True, Then wake up all the threads waiting on this event .
clear()
Clear the sign , Reset the event flag to False, After that, several threads can block the event object again .
wait(timeout=None)
Block the thread until the internal variable is true. If the internal flag is true, Will return immediately . Otherwise, the thread will be blocked , Until the call set() Method to set the flag to true Or an optional timeout . If it is because of timeout, return , Will return False, Otherwise it will return to True.
The following example shows all 5 Threads are blocked on an event object , until 3 Seconds later , Main thread call set Method triggers an event signal , You can see everything 5 All threads begin execution immediately .
import logging
from threading import Thread, Event
from time import sleep
class EventThread(Thread):
def __init__(self, event, id):
super().__init__()
self._event = event
self._id = id
def run(self):
logging.info('%r start running' % self)
self._event.wait()
logging.info('%r continue running after event' % self)
def __repr__(self):
return 'EventThread(%s)' % self._id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
event = Event()
for i in range(5):
thread = EventThread(event, i)
thread.start()
logging.info('main start sleeping')
sleep(3)
logging.info('main set event')
event.set()
Printed out :
2019-05-14 09:15:50,626 - INFO: EventThread(0) start running 2019-05-14 09:15:50,626 - INFO: EventThread(1) start running 2019-05-14 09:15:50,626 - INFO: EventThread(2) start running 2019-05-14 09:15:50,626 - INFO: EventThread(3) start running 2019-05-14 09:15:50,626 - INFO: EventThread(4) start running 2019-05-14 09:15:50,626 - INFO: main start sleeping 2019-05-14 09:15:53,639 - INFO: main set event 2019-05-14 09:15:53,645 - INFO: EventThread(1) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(0) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(2) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(4) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(3) continue running after event
The fence class is another simple synchronization primitive , We have already introduced Linux And Java Fence in . java Thread synchronization tool class
The fence object is used to make multiple threads wait for each other . He maintains an internal counter , The value is passed in by default by the constructor , Whenever a thread calls wait Method , Then the value is atomically subtracted 1, Until it's reduced to 0, Then let all blocking wait The thread on the fence object continues to execute .
Barrier(parties, action=None, timeout=None)
wait(timeout=None)
The most important method in fence objects is wait The method . Thread blocking waiting , Until the constructor passes in parties Threads are blocked waiting for wait Method or timeout , If the timeout passed in by this method is None, The default timeout passed in by the constructor is used . Once the timeout occurs , The fence will immediately go into a broken state , At this point, other threads that are still blocking waiting for the fence will receive wait Method BrokenBarrierError abnormal . If you try to call... On a broken fence object wait Method , It will also be thrown immediately BrokenBarrierError abnormal . Return a number , The value is 0 To parties - 1, The interpreter guarantees that all threads waiting on the same fence , Each has a different return value , So that you can rely on wait Method to do some processing . If... Is provided in the constructor when creating the fence object action Parameters , It will be called before one of the threads is released . If this call throws an exception , The fence object will enter a broken state .
reset()
Reset the fence to the default initial state . If there are still threads waiting to be released in the fence , These threads will receive BrokenBarrierError abnormal . Unless it is necessary , Otherwise, this method is not recommended , Many times, instead of reusing a fence with an unknown state , Why not create a new one .
abort()
Put the fence in a broken state . This will result in all already invoked and future invoked wait() Method BrokenBarrierError abnormal .
The use of the fence is simple , But it's very practical , In the real world , We usually need to call the interfaces of many business parties concurrently , And collect their returns , After all interfaces are returned, proceed to the next step . But not all interface calls are necessary , So for this scenario , A necessary optimization is once the necessary interface returns are collected , Immediately interrupt calls to other interfaces , And start the operation after that . The above requirements will be very simple and elegant if the fence is used to solve them , although Python We cannot terminate threads out of the process , But we can get through the fence abort Method to make those threads that have not finished execution throw an exception once the execution ends , So that we don't need to pay attention to them . The following example simulates the process described above .
import logging
import random
from threading import Thread, Barrier
from time import sleep, time
class InterfaceThread(Thread):
def __init__(self, majorbarrier, minorbarrier, id, major):
super().__init__()
self._majorbarrier = majorbarrier
self._minorbarrier = minorbarrier
self._id = id
self._major = major
def run(self):
nsec = random.uniform(0, 4)
logging.info('%r start running sleep %s' % (self, nsec))
sleep(nsec)
logging.info('%r after sleeping' % self)
if self._major:
try:
result = self._majorbarrier.wait()
if result == 0:
self._minorbarrier.abort()
except:
logging.error('%s waitting on majorbarrier aborted' % self)
return
else:
try:
self._minorbarrier.wait()
except:
logging.warning('%s watting on minorbarrier aborted' % self)
return
logging.info('%r continue running after barrier' % self)
def __repr__(self):
return 'InterfaceThread(%s【major: %s】)' % (self._id, self._major)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')
start = time()
majorbarrier = Barrier(4)
minorbarrier = Barrier(3)
threads = list()
for i in range(6):
threads.append(InterfaceThread(majorbarrier, minorbarrier, i, bool(i >= 3)))
for thread in threads:
thread.start()
result = majorbarrier.wait()
if result == 0:
minorbarrier.abort()
logging.info('run by %s' % (time() - start))
In the above example, two fence objects are created , It is used to synchronize necessary interface calls and unnecessary interface calls , We pass the random sleep 0 To 4 Seconds to simulate interface calls . Once the fence is necessary wait Method returns 0, It means that all the necessary interfaces have returned , At this point, you can call the... Of the unnecessary fence abort Methods to destroy unnecessary fences , At the same time, the program continues to execute , So as to minimize the overall running time .
Printed out :
2019-05-14 14:00:05,045 - INFO: InterfaceThread(0【major: False】) start running sleep 1.3645551759667334 2019-05-14 14:00:05,050 - INFO: InterfaceThread(1【major: False】) start running sleep 3.5451267021153607 2019-05-14 14:00:05,050 - INFO: InterfaceThread(2【major: False】) start running sleep 3.0433784558963644 2019-05-14 14:00:05,052 - INFO: InterfaceThread(3【major: True】) start running sleep 2.0092681547999875 2019-05-14 14:00:05,053 - INFO: InterfaceThread(4【major: True】) start running sleep 2.266415383907653 2019-05-14 14:00:05,053 - INFO: InterfaceThread(5【major: True】) start running sleep 0.6692143957122372 2019-05-14 14:00:05,728 - INFO: InterfaceThread(5【major: True】) after sleeping 2019-05-14 14:00:06,416 - INFO: InterfaceThread(0【major: False】) after sleeping 2019-05-14 14:00:07,077 - INFO: InterfaceThread(3【major: True】) after sleeping 2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) after sleeping 2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - INFO: run by 2.284111976623535 2019-05-14 14:00:07,329 - INFO: InterfaceThread(5【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - INFO: InterfaceThread(3【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - WARNING: InterfaceThread(0【major: False】) watting on minorbarrier aborted 2019-05-14 14:00:08,109 - INFO: InterfaceThread(2【major: False】) after sleeping 2019-05-14 14:00:08,110 - WARNING: InterfaceThread(2【major: False】) watting on minorbarrier aborted 2019-05-14 14:00:08,613 - INFO: InterfaceThread(1【major: False】) after sleeping 2019-05-14 14:00:08,613 - WARNING: InterfaceThread(1【major: False】) watting on minorbarrier aborted
You can see , Call six threads concurrently , according to sleep Time , belong 3.5451267 More than seconds . But in fact , Because all the important threads are completed in , The main thread uses only 2.284111976623535 Seconds has been returned . such , We have greatly improved the interface performance , But threads 1、2 because sleep drawn-out , Failed to return before the main thread returns .
The whole project includes: op
ShowMeAI日報系列全新升級!覆蓋AI人工智能 工具&a