Thread safety is a concept in multi thread or multi process programming , In a program that runs in parallel with multiple threads that share data , Thread safe code can ensure that all threads can execute normally and correctly through synchronization mechanism , There will be no data pollution and other accidents .
Thread safety is mainly caused by thread switching , Like a room ( process ) There is 10 Granulated sugar ( resources ), In addition to that 3 I'm a little man (1 Main threads 、2 Child threads ), Be a villain A Ate 3 When he was forced to rest by the system after a sugar, he thought that there was still 7 Granulated sugar , And being a villain B Work and eat 3 Granulated sugar , So being a villain A When you go back to work, you think there's still sugar left 7 star , But actually only 4 It's gone .
In the above example, the thread A And thread B The data of is out of sync , That's thread safety , It can lead to very serious accidents , Let's use the following example to illustrate .
Here's a number num The initial value is 0, We turn on 2 Thread thread :
Threads 1 Yes num Ten million times +1 The operation of
Threads 2 Yes num Ten million times -1 The operation of
The results can be staggering ,num In the end, it's not what we expected 0:
import threading
num = 0
def add():
global num
for i in range(10_000_000):
num += 1
def sub():
global num
for i in range(10_000_000):
num -= 1
''' add() and sub() Two threads are working on num To operate Look at the results , It can be inferred that : 1、 Simultaneous operation is not a program crash (num yes int type ,int The type itself is thread safe ) 2、 But the operation result will be in an unknown state , This indicates that this process is thread unsafe . Because the order in which threads run is unknown . '''
if __name__ == "__main__":
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# Results three times collection
# num result : 669214
# num result : -1849179
# num result : -525674
This is a very good case , To solve this problem, we must pass lock To ensure the timing of thread switching .
What we need to pay attention to is , stay Python In the basic data type list、tuple、dict Itself is Thread safe , So if there are multiple threads on this 3 When you operate a container , We don't have to think about thread safety .( Very good design )
The lock is Python It provides us with a means to control thread switching by ourselves , The use of locks can make the switching of threads orderly .
Once the threads switch in order , Access to data between threads 、 The modification becomes controllable , So to ensure thread safety , You have to use a lock .
threading modular Provided in 5 The most common lock , Here's a breakdown by function :
Synchronization lock :lock( Only one can be released at a time )
Recursive lock :rlock( Only one can be released at a time )
Conditional lock :condition( You can release any one at a time )
Event lock :event( All at once )
Semaphore lock :semaphore( You can release a specific one at a time )
Basic introduction
Lock There are many names for locks , Such as : Synchronization lock 、 The mutex
What do they mean ? As shown below :
Mutually exclusive It refers to a resource There can only be one visitor at a time Visit it , It is unique and exclusive , however Mutual exclusion cannot limit Visitors' response to resources Order of access , That is, access is A disorderly
Synchronization is based on mutual exclusion ( In most cases ), Through other mechanisms, visitors can access resources orderly
Synchronization is already mutually exclusive , Is a more complex implementation of mutex , Because it realizes the characteristics of orderly access on the basis of mutual exclusion
Here is threading Module and synchronization lock provide related methods :
Usage mode
Synchronization lock can only release one thread at a time , A locked thread will not hand over the execution right at runtime , Only when the thread is unlocked will the execution right be transferred to other threads through system scheduling .
As shown below , Use synchronization lock to solve the top problem :
import threading
num = 0
def add():
lock.acquire() # Synchronization lock
global num
for i in range(10_000_000):
num += 1
lock.release()
def sub():
lock.acquire() # Synchronization lock Only one at a time
global num
for i in range(10_000_000):
num -= 1
lock.release()
if __name__ == "__main__":
lock = threading.Lock() # Create a synchronization lock
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# Results three times collection
# num result : 0
# num result : 0
# num result : 0
So this code is It becomes completely serial , For this kind of computing intensive I/O In terms of business , It's not as fast as serializing single thread execution directly , So this is just an example , Can't outline the real purpose of the lock .
For synchronous locks , once acquire() Must correspond to once release(), It can't be used repeatedly acquire() And then reuse it many times release() The operation of , This will cause deadlock and block the program , It's not moving at all , As shown below :
import threading
num = 0
def add():
lock.acquire() # locked
lock.acquire() # Deadlock
# Don't execute
global num
for i in range(10_000_000):
num += 1
lock.release()
lock.release()
def sub():
lock.acquire() # locked
lock.acquire() # Deadlock
# Don't execute
global num
for i in range(10_000_000):
num -= 1
lock.release()
lock.release()
if __name__ == "__main__":
lock = threading.Lock() # establish lock
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
with sentence ( Automatic locking 、 Unlock )
because threading.Lock() Object enter__() And __exit() Method , So we can use with The sentence goes on Context management forms Lock and unlock operation :
import threading
num = 0
def add():
with lock: # with sentence lock Usage of , Automatic locking 、 Unlock
# Automatic locking
global num
for i in range(10_000_000):
num += 1
# Auto unlock
def sub():
with lock:
# Automatic locking
global num
for i in range(10_000_000):
num -= 1
# Auto unlock
if __name__ == "__main__":
lock = threading.Lock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# Results three times collection
# num result : 0
# num result : 0
# num result : 0
Basic introduction
Recursive lock is an upgraded version of synchronous lock , On the basis of synchronous lock It can be used repeatedly for many times acquire() after Reuse it many times release() The operation of , But be sure to pay attention to Locking times and unlocking times must be consistent , Otherwise, it will also cause deadlock .
Here is threading Module and recursive lock provide related methods :
Usage mode
Here's a simple use of recursive locks , In the following operation, if synchronization lock is used, deadlock will occur , But recursive locks don't :
import threading
num = 0
def add():
lock.acquire() # Lock for the first time
lock.acquire() # Second lock , This kind of writing Recursive lock Normal operation ; The synchronization lock will deadlock ;
global num
for i in range(10_000_000):
num += 1
lock.release() # incomprehension The meaning of recursive lock ???
lock.release()
def sub():
lock.acquire()
lock.acquire()
global num
for i in range(10_000_000):
num -= 1
lock.release()
lock.release()
if __name__ == "__main__":
lock = threading.RLock() # establish Recursive lock object
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# Results three times collection
# num result : 0
# num result : 0
# num result : 0
with sentence
because threading.RLock() Object enter__() And __exit() Method , So we can use with Statement to lock and unlock in the form of context management :
import threading
num = 0
def add():
with lock: # Recursive lock and synchronous lock You can use with sentence , Perform automatic locking 、 Unlock
# Automatic locking
global num
for i in range(10_000_000):
num += 1
# Auto unlock
def sub():
with lock:
# Automatic locking
global num
for i in range(10_000_000):
num -= 1
# Auto unlock
if __name__ == "__main__":
lock = threading.RLock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# Results three times collection
# num result : 0
# num result : 0
# num result : 0
Basic introduction
Conditional lock It is based on recursive lock and can Pause The function of thread running . And we can use wait() And notify() Come on Control the number of threads executed .
Be careful : Conditional locks are free Set up A release How many? Threads .
Here is threading Module and conditional lock provide related methods :
Usage mode
The following case will start 10 Child threads , And will immediately 10 The child thread is set to wait .
Then we can send one or more notifications , To restore the waiting child thread to continue running :
import threading
currentRunThreadNumber = 0 # Number of threads currently running
maxSubThreadNumber = 10
def task():
global currentRunThreadNumber
thName = threading.currentThread().name
condLock.acquire() # locked
print("start and wait run thread : %s" % thName)
condLock.wait() # Pause thread running 、 Waiting to wake up
currentRunThreadNumber += 1
print("carry on run thread : %s" % thName)
condLock.release() # Unlock
if __name__ == "__main__":
condLock = threading.Condition() # Create a conditional lock
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
while currentRunThreadNumber < maxSubThreadNumber:
notifyNumber = int(
input("Please enter the number of threads that need to be notified to run:")) # Enter the release quantity manually
# Circulation release
condLock.acquire()
condLock.notify(notifyNumber) # release
condLock.release()
print("main thread run end")
# Start... First 10 Child threads , Then all of these sub threads will be in the waiting state
# start and wait run thread : Thread-1
# start and wait run thread : Thread-2
# start and wait run thread : Thread-3
# start and wait run thread : Thread-4
# start and wait run thread : Thread-5
# start and wait run thread : Thread-6
# start and wait run thread : Thread-7
# start and wait run thread : Thread-8
# start and wait run thread : Thread-9
# start and wait run thread : Thread-10
# Bulk notification , Release a specific number of child threads to continue running
# Please enter the number of threads that need to be notified to run:5 # release 5 individual
# carry on run thread : Thread-4
# carry on run thread : Thread-3
# carry on run thread : Thread-1
# carry on run thread : Thread-2
# carry on run thread : Thread-5
# Please enter the number of threads that need to be notified to run:5 # release 5 individual
# carry on run thread : Thread-8
# carry on run thread : Thread-10
# carry on run thread : Thread-6
# carry on run thread : Thread-9
# carry on run thread : Thread-7
# Please enter the number of threads that need to be notified to run:1
# main thread run end
with sentence
because threading.Condition() Object enter__() And __exit() Method , So we can use with Statement to lock and unlock in the form of context management :
import threading
currentRunThreadNumber = 0
maxSubThreadNumber = 10
def task():
global currentRunThreadNumber
thName = threading.currentThread().name # Returns the current thread name
with condLock: # with usage
print("start and wait run thread : %s" % thName)
condLock.wait() # Pause thread running 、 Waiting to wake up
currentRunThreadNumber += 1
print("carry on run thread : %s" % thName)
if __name__ == "__main__":
condLock = threading.Condition()
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
while currentRunThreadNumber < maxSubThreadNumber:
notifyNumber = int(
input("Please enter the number of threads that need to be notified to run:"))
with condLock: # with usage
condLock.notify(notifyNumber) # release
print("main thread run end")
Basic introduction
Event lock It is based on conditional locking , It differs from conditional locking in that One time can only Let go of all , You cannot release any number of child threads to continue running .
We can think of event locks as traffic lights , When the light is red, all child threads are suspended , And enter “ wait for ” state , When the light is green, all child threads are restored “ function ”.
Here is threading Module and event lock provide related methods :
Usage mode
Event locks cannot be used with Statement to use , Only in the normal way .
As shown below , Let's simulate the operation of threads and traffic lights , Stop at the red light , pass at a green light :
import threading
maxSubThreadNumber = 3
def task():
thName = threading.currentThread().name
print("start and wait run thread : %s" % thName)
eventLock.wait() # Suspend operation , Waiting for the green light
print("green light, %s carry on run" % thName)
print("red light, %s stop run" % thName)
eventLock.wait() # Suspend operation , Waiting for the green light
print("green light, %s carry on run" % thName)
print("sub thread %s run end" % thName)
if __name__ == "__main__":
eventLock = threading.Event() # Event lock
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task) # Create thread
subThreadIns.start()
# Alternating green 、 A red light
eventLock.set() # Set it to green
eventLock.clear() # Set to red light
eventLock.set() # Set it to green
# start and wait run thread : Thread-1
# start and wait run thread : Thread-2
# start and wait run thread : Thread-3
# green light, Thread-1 carry on run
# red light, Thread-1 stop run
# green light, Thread-1 carry on run
# sub thread Thread-1 run end
# green light, Thread-3 carry on run
# red light, Thread-3 stop run
# green light, Thread-3 carry on run
# sub thread Thread-3 run end
# green light, Thread-2 carry on run
# red light, Thread-2 stop run
# green light, Thread-2 carry on run
# sub thread Thread-2 run end
Basic introduction
Semaphore lock It is also done according to the condition lock , It differs from conditional lock and event lock as follows :
Here is threading Module and semaphore lock provide related methods :
Usage mode
Here is an example of how to use , You can think of it as a limited section of road , Only the same number of threads can be released at a time :
import threading
import time
maxSubThreadNumber = 6
def task():
thName = threading.currentThread().name
semaLock.acquire()
print("run sub thread %s" % thName)
time.sleep(3)
semaLock.release()
if __name__ == "__main__":
# We can only release each time 2 individual
semaLock = threading.Semaphore(2) # Semaphore lock You can limit the number of threads released each time
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
# run sub thread Thread-1
# run sub thread Thread-2
# run sub thread Thread-3
# run sub thread Thread-4
# run sub thread Thread-6
# run sub thread Thread-5
with sentence
because threading.Semaphore() Object enter__() And __exit() Method , So we can use with Statement to lock and unlock in the form of context management :
import threading
import time
maxSubThreadNumber = 6
def task():
thName = threading.currentThread().name
with semaLock: # with Semaphore lock semaphore
print("run sub thread %s" % thName)
time.sleep(3)
if __name__ == "__main__":
semaLock = threading.Semaphore(2)
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
above 5 All kinds of locks are Based on synchronous lock To do the , You can find the answers to all of these from the source code .
First of all to see RLock Recursive lock , The implementation of recursive lock is very simple , Its interior will maintain A counter , When the counter is not 0 The thread cannot be I/O Switching between operation and time polling mechanism . But when the counter is 0 It won't be like this when you're young :
def __init__(self):
self._block = _allocate_lock()
self._owner = None
self._count = 0 # Counter
and Condition Conditional lock In fact, there are Two locks Of , A bottom lock ( Synchronization lock ) An advanced lock ( Recursive lock ).
There are two ways to unlock the low level lock , Use wait() Method will temporarily unlock the underlying lock and add an advanced lock , Only when receiving from another thread **notfiy()** After that, the senior lock will be unlocked and the lower lock will be locked again , In other words, the bottom layer of conditional lock is realized according to the continuous switching between synchronous lock and recursive lock :
def __init__(self, lock=None):
if lock is None:
lock = RLock() # You can see that the interior of conditional lock is based on recursive lock , Recursive lock is based on synchronous lock
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()
Event Event lock The internal is based on conditional locking :
class Event:
def __init__(self):
self._cond = Condition(Lock()) # A conditional lock is instantiated .
self._flag = False
def _reset_internal_locks(self): # private!
called by Thread._reset_internal_locks by _after_fork()
self._cond.__init__(Lock())
def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._flag
isSet = is_set
Semaphore Semaphore lock is also based on conditional lock :
class Semaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock()) # You can see , Here is an example of a conditional lock
self._value = value
demand : An empty list , Two threads add value in turn ( A plus even number , One plus odd number ), Finally, let the value in the list be 1 - 100 , And it's in order .
import threading
lst = []
def even():
""" Add even numbers """
with condLock: # with Automatic locking 、 Unlock
for i in range(2, 101, 2):
# Judge whether the length of the current list is in 2 If you can get along with
# If it can be used up, it means that odd numbers need to be added
# Otherwise, even numbers are added
if len(lst) % 2 != 0:
# Add even numbers
lst.append(i) # Add values first
condLock.notify() # Tell another thread , You can add odd numbers , But there will be no immediate surrender of executive power
condLock.wait() # Hand over the power of execution , And wait for another thread to add an even number
else:
# Add odd numbers
condLock.wait() # Hand over the power of execution , Wait for another thread to give an even number
lst.append(i)
condLock.notify()
condLock.notify()
def odd():
""" Add odd numbers """
with condLock: # Automatic locking 、 Unlock
for i in range(1, 101, 2):
if len(lst) % 2 == 0:
lst.append(i)
condLock.notify() #
condLock.wait()
condLock.notify()
if __name__ == "__main__":
condLock = threading.Condition() # Conditional lock
addEvenTask = threading.Thread(target=even)
addOddTask = threading.Thread(target=odd)
addEvenTask.start()
addOddTask.start()
addEvenTask.join() # join Method is used to synchronize threads
addOddTask.join()
print(lst)
Yes 2 A task thread to play Li Bai and Du Fu , How to make them answer one by one ? The text is as follows :
Du Fu : Lao Li , Come and have a drink !
Li Bai : Old du , No, I can't !
Du Fu : Lao Li , Another pot ?
Du Fu :… Lao Li ?
Li Bai : Whoosh, whoosh, whoosh … fell asleep
The code is as follows :
import threading
def libai():
event.wait()
print(" Li Bai : Old du , No, I can't !")
event.set()
event.clear()
event.wait()
print(" Li Bai : Whoosh, whoosh, whoosh ... fell asleep ..")
def dufu():
print(" Du Fu : Lao Li , Come and have a drink !")
event.set()
event.clear()
event.wait()
print(" Du Fu : Lao Li , Another pot ?")
print(" Du Fu :... Lao Li ?")
event.set()
if __name__ == '__main__':
event = threading.Event() # Event lock
t1 = threading.Thread(target=libai)
t2 = threading.Thread(target=dufu)
t1.start()
t2.start()
t1.join()
t2.join()
Now there is a group of studen
Data sources https://www.lixin