[ Failed to transfer the external chain picture , The origin station may have anti-theft chain mechanism , It is suggested to save the pictures and upload them directly (img-vcijuIAg-1644766841076)(./img/ Serial, parallel and concurrent .png)]
Be careful
: Synergy is just a way of doing things
process 、 The relationship between threads and coroutines
A thread is the smallest unit of program execution , The process is the smallest unit of resources allocated by the operating system ;
A process consists of one or more threads , Threads are different execution paths of code in a process ;
Processes are independent of each other , But each thread in the same process shares the memory space of the program ( Including code snippets 、 Data sets 、 Pile etc. ) And some process level resources ( Such as opening files and signals ), Threads in one process are not visible in other processes ;
Scheduling and switching : Thread context switching is much faster than process context switching .
Synchronous and asynchronous emphasize the message communication mechanism , So asynchronous programming only appears in network communication
Python The standard library provides two modules : _thread
and threading
, _thread
It's a low-level module , threading
It's an advanced module , Yes _thread
It was packaged . In most cases , We just need to use threading
This advanced module .
Threads can be created in two ways :
from threading import Thread
from time import sleep
def func1(name):
print(f' Threads {
name} start')
for i in range(3):
print(f' Threads : {
name}. {
i}')
sleep(1)
print(f' Threads {
name} end')
if __name__ == '__main__':
print(" The main thread : strat")
# Create thread
t1 = Thread(target=func1,args=("1",))
t2 = Thread(target=func1,args=("2",))
# Start thread
t1.start()
t2.start()
print(' The main thread : end')
from threading import Thread
from time import sleep
class MyThread(Thread):
def __init__(self,name):
Thread.__init__(self)
self.name = name
# Method rewriting , This run Function name cannot be changed
def run(self):
print(f' Threads {
self.name} start')
for i in range(3):
print(f' Threads : {
self.name}. {
i}')
sleep(1)
print(f' Threads {
self.name} end')
if __name__ == '__main__':
print(" The main thread : strat")
# Create thread
t1 = MyThread('1')
t2 = MyThread('2')
# Start thread
t1.start()
t2.start()
print(' The main thread : end')
The execution of threads is unified through start()
Method
In the previous code , We will find that : The main thread does not wait for the end of the child thread ; We can go through join Method , Let the main thread wait for the end of the child thread ;
from threading import Thread
from time import sleep
def func1(name):
print(f' Threads {
name} start')
for i in range(3):
print(f' Threads : {
name}. {
i}')
sleep(1)
print(f' Threads {
name} end')
if __name__ == '__main__':
print(" The main thread : strat")
# Create thread
t1 = Thread(target=func1,args=("1",))
t2 = Thread(target=func1,args=("2",))
# Start thread
t1.start()
t2.start()
# The main thread waits for the end of the child thread
t1.join()
t2.join()
print(' The main thread : end')
The guardian thread , The main feature is its life cycle . The main thread died , It dies with it . stay python in , Threads pass through setDaemon(True|False)
To set whether it is a daemon thread .
The role of daemons : The role of daemon thread is to provide convenient services for other threads , The most typical application of daemons is GC ( Garbage collector ).
Look at the following code :
from threading import Thread
from time import sleep
class MyThread(Thread):
def __init__(self,name):
Thread.__init__(self)
self.name = name
# Method rewriting , This run Function name cannot be changed
def run(self):
print(f' Threads {
self.name} start')
for i in range(3):
print(f' Threads : {
self.name}. {
i}')
sleep(1)
print(f' Threads {
self.name} end')
if __name__ == '__main__':
print(" The main thread : strat")
# Create thread
t1 = MyThread('1')
t2 = MyThread('2')
# Setting up the guardian thread
t1.daemon = True # The main thread dies ,t1 Threads also die
# Start thread
t1.start()
t2.start()
print(' The main thread : end')
result :
There will be t1
Set the thread as a daemon , According to the truth , It should be the main thread end after ,t1
The thread should no longer execute , But actually : Because there are two threads under the main thread , Although the main thread finished executing, it didn't really die , The main thread is waiting t2
The thread executes and terminates t1
( The guardian thread ) Only after the operation of
So if we set both threads as daemon threads , The result will be what we want ?
from threading import Thread
from time import sleep
class MyThread(Thread):
def __init__(self,name):
Thread.__init__(self)
self.name = name
# Method rewriting , This run Function name cannot be changed
def run(self):
print(f' Threads {
self.name} start')
for i in range(3):
print(f' Threads : {
self.name}. {
i}')
sleep(1)
print(f' Threads {
self.name} end')
if __name__ == '__main__':
print(" The main thread : strat")
# Create thread
t1 = MyThread('1')
t2 = MyThread('2')
# Setting up the guardian thread
t1.daemon = True # The main thread dies ,t1 Threads also die
t2.daemon = True # The main thread dies ,t2 Threads also die
# Start thread
t1.start()
t2.start()
print(' The main thread : end')
The result is as we wish
Python The code is executed by Python virtual machine ( It's also called interpreter main loop ,CPython edition ) To control ,Python At the beginning of the design, it is considered to be in the main loop of the interpreter , At the same time, only one thread is executing , At any moment , Only one thread runs in the interpreter . Yes Python Access to the virtual machine is locked by the global interpreter (GIL) To control , It is this lock that ensures that only one thread is running at the same time .
When dealing with multithreading , Multiple threads access the same object , And some threads also want to modify this object . Now , We need to use “ Thread synchronization ”. Thread synchronization is actually a waiting mechanism , Multiple threads that need to access the object at the same time enter the wait pool of the object to form a queue , Wait until the previous thread is used , Use the next thread again .
Simulation scenario : Lao Wang and his wife arrived at the same time ATM Withdraw money from the front of the machine ( Different locations ), There are only 100 element , And both want to take 80 element , If the thread is out of sync , What happens then ? Write a simulation program to see
from threading import Thread
from time import sleep
class Account:
def __init__(self, money, name):
self.money = money
self.name = name
# Simulate withdrawal action
class Drawing(Thread):
def __init__(self,drawingNum,account):
Thread.__init__(self)
self.drawingNum = drawingNum
self.account = account
self.expenseTotal = 0
def run(self):
# If you want to fart
if self.account.money < self.drawingNum:
return
sleep(1) # You can withdraw money , The block , Just to test the conflict problem
self.account.money -= self.drawingNum
self.expenseTotal += self.drawingNum
print(f' Account :{
self.account.name}, The balance is :{
self.account.money}')
print(f' Account :{
self.account.name}, A total of :{
self.expenseTotal}')
if __name__ == '__main__':
a1 = Account(100,' Lao Wang ')
draw1 = Drawing(80,a1) # Define a thread to withdraw money
draw2 = Drawing(80,a1) # Then define a thread to withdraw money
draw1.start()
draw2.start()
You can see that the account balance has become negative , This is the result of the operation when thread synchronization is not used …
We can go through “ Locking mechanism ” To achieve thread synchronization , The locking mechanism has the following key points :
The mutex
Be careful
: A mutex is one in which multiple threads rob each other , The thread that grabs the lock executes first , Threads that do not grab locks need to wait , After the mutex is used and released , Other waiting threads grab the lock again .threading
Defined in the module Lock
Variable , This variable is essentially a function , By calling this function, you can get a mutex .
from threading import Thread, Lock
from time import sleep
class Account:
def __init__(self, money, name):
self.money = money
self.name = name
# Simulate withdrawal action
class Drawing(Thread):
def __init__(self,drawingNum,account):
Thread.__init__(self)
self.drawingNum = drawingNum
self.account = account
self.expenseTotal = 0
def run(self):
lock1.acquire() # Take the lock
# If you want to fart
if self.account.money < self.drawingNum:
print(' Insufficient account balance ')
return
sleep(1) # You can withdraw money , The block , Just to test the conflict problem
self.account.money -= self.drawingNum
self.expenseTotal += self.drawingNum
lock1.release()
print(f' Account :{
self.account.name}, The balance is :{
self.account.money}')
print(f' Account :{
self.account.name}, A total of :{
self.expenseTotal}')
if __name__ == '__main__':
a1 = Account(100,' Lao Wang ')
lock1 = Lock()
draw1 = Drawing(80,a1) # Define a thread to withdraw money
draw2 = Drawing(80,a1) # Then define a thread to withdraw money
draw1.start()
draw2.start()
In multithreaded programs , A large part of the deadlock problem is caused by a thread acquiring multiple locks at the same time .
give an example : Two people have to cook , Need to be “ pan ” and “ kitchen knife ” To stir fry .
from threading import Thread, Lock
from time import sleep
def fun1():
lock1.acquire()
print('fun1 Get the kitchen knife ')
sleep(2)
lock2.acquire()
print('fun1 Get the pot ')
lock2.release()
print('fun1 Release pot ')
lock1.release()
print('fun1 Release the kitchen knife ')
def fun2():
lock2.acquire()
print('fun2 Get the pot ')
lock1.acquire()
print('fun2 Get the kitchen knife ')
lock1.release()
print('fun2 Release the kitchen knife ')
lock2.release()
print('fun2 Release pot ')
if __name__ == '__main__':
lock1 = Lock()
lock2 = Lock()
t1 = Thread(target=fun1)
t2 = Thread(target=fun2)
t1.start()
t2.start()
The deadlock is due to “ The synchronization block needs to hold multiple locks at the same time ” Of , To solve this problem , The idea is simple , Namely : The same code block , Don't hold two object locks at the same time .
After the mutex is used , Only one thread can access a resource at the same time . If a resource , We also want N individual ( Specify a value ) Thread access ? Now , You can use semaphores . Semaphores control the number of resources accessed simultaneously . Semaphores are similar to locks , Lock only one object at a time ( process ) adopt , Semaphores allow multiple objects at the same time ( process ) adopt .
# A room , Only two people are allowed to enter
from threading import Semaphore,Thread
from time import sleep
def home(name,se):
se.acquire()
print(f'{
name} get into the room ')
sleep(2)
print(f'***{
name} Out of the room ')
se.release()
if __name__ == '__main__':
se = Semaphore(2) # Semaphore object
for i in range(7):
t = Thread(target = home,args=(f'tom {
i}', se))
t.start()
principle : Event Object contains a semaphore that can be set by the thread , It allows threads to wait for certain events to happen . In the initial case ,event The signal flag in the object is set to false . If there is a thread waiting for one event object , And this event The object's flag is false , Then the thread will be blocked until the flag is true . If a thread will have a event Object's signal flag is set to true , It will wake up all the waiting event Object's thread . If a thread waits for one that has been set to true event object , So it's going to ignore this event , Carry on
Event()
You can create an event management flag , This sign (event) The default is False, event There are four main methods to call an object :
event.wait(timeout=None)
The thread calling the method will be blocked , If set timeout Parameters , After a timeout , The thread stops blocking and continues execution ;event.set()
take event The flag for is set to True, call wait All threads of the method will be awakened event.clear()
take event The flag for is set to False, call wait All threads of the method will be blocked event.is_set()
Judge event Whether the sign of is TrueNow let's simulate the following picture with a program :
import threading
import time
def chihuoguo(name):
print(f'{
name} Has been launched ')
print(f' buddy {
name} Has entered the dining state ')
time.sleep(1)
event.wait()
print(f'{
name} Got the call ')
print(f' buddy {
name} Start eating !')
if __name__ == '__main__':
event = threading.Event()
thread1 = threading.Thread(target=chihuoguo,agrs=('tom',))
thread2 = threading.Thread(target=chihuoguo,agrs=('cherry',))
# Open thread
thread1.start()
thread2.start()
# wait for event Object unlock
for i in range(10):
time.sleep(1)
print(">"*(i+1) + '-' * (9-i))
print('--->>> The main thread informs the little partner to start eating ')
event.set()
Multi-threaded environment , We often need concurrency and collaboration of multiple threads . This is the time , We need to understand an important multithreading concurrent cooperation model “ producer / Consumer model ”.
Buffer is the core of concurrency , The buffer settings are 3 Benefits
Perhaps the safest way to send data from one thread to another is to use queue
Queue in Library . Create a shared by multiple threads Queue object , These threads use put() and get() Operation to add or remove elements from the queue .Queue The object already contains the necessary locks , So you can share data safely among multiple threads through it .
from queue import Queue
from time import sleep
import random
from threading import Thread
def producer():
num = 1
while True:
if queue.qsize() < 5:
print(f' production {
num} Number , Big steamed bread ')
queue.put(f" Big steamed bread :{
num} Number ")
num += 1
sleep(random.randint(1,4))
else:
print(' The steamed bun basket is slow , Waiting for someone to pick up ')
sleep(1)
def consumer():
while True:
if queue.qsize() > 0:
print(f' Get steamed bread :{
queue.get()}')
sleep(random.randint(1,5))
else:
print(' Hurry up, I'm starving ...')
sleep(1)
if __name__ == '__main__':
queue = Queue()
t1 = Thread(target=producer)
t2 = Thread(target=consumer)
t1.start()
t2.start()
The advantages of the process :
After creating the process , Use start() Start the process
from multiprocessing import Process
import os
from time import sleep
def fun(name):
print(f' The current process ID:{
os.getpid()}')
print(f' The parent process ID:{
os.getppid()}')
print(f'Process: {
name}, start')
sleep(3)
print(f'Process:{
name} end')
# Class method creation
class MyProcess(Process):
def __init__(self,name):
Process.__init__(self)
self.name = name
def run(self):
print(f' The current process ID:{
os.getpid()}')
print(f' The parent process ID:{
os.getppid()}')
print(f'Process: {
self.name}, start')
sleep(3)
print(f'Process:{
self.name} end')
# windows Implemented by multiple processes on bug, If not main The limitation of , Will create the process infinitely recursively ,
if __name__ == '__main__':
print(" The current process ID:",os.getpid())
p1 = Process(target=fun,args=('p1',))
p2 = Process(target=fun,args=('p2',))
p1.start()
p2.start()
# p1 = MyProcess('p1')
# p2 = MyProcess('p2')
# p1.start()
# p2.start()
It is worth noting that : Interprocess communication needs to transfer data to each process , Although on the face of it, this data is a global variable , But each process runs independently of each other , Not sharing data …
from multiprocessing import Process,Queue
from time import sleep
class MyProcess(Process):
def __init__(self,name,mq):
Process.__init__(self)
self.name = name
self.mq = mq
def run(self):
print(f'Process: {
self.name}, start')
temp = self.mq.get()
print(f'get Date:{
temp}')
sleep(2)
print(f'put Data:{
temp}' + '1')
self.mq.put(temp+'1')
print(f'Process:{
self.name} end')
if __name__ == '__main__':
mq = Queue()
mq.put('1')
mq.put('2')
mq.put('3')
# Process list
p_list = []
for i in range(3):
p1 = MyProcess(f'p{
i}',mq)
p_list.append(p1)
p1.start()
p1.join() # Let the main process wait
for i in range(3):
print(mq.get())
Pipe Method returns (conn1, conn2) Represents two ends of a pipe .
import multiprocessing
from time import sleep
def func1(conn1):
sub_info = "Hello!"
print(f' process 1--{
multiprocessing.current_process().pid} send data : {
sub_info}')
sleep(1)
conn1.send(sub_info)
print(f' From process 2:{
conn1.recv()}')
sleep(1)
def func2(conn2):
sub_info = " Hello !"
print(f' process 2--{
multiprocessing.current_process().pid} send data : {
sub_info}')
sleep(1)
conn2.send(sub_info)
print(f' From process 1:{
conn2.recv()}')
sleep(1)
if __name__ == '__main__':
conn1,conn2 = multiprocessing.Pipe()
process1 = multiprocessing.Process(target=func1, args=(conn1,))
process2 = multiprocessing.Process(target=func2, args=(conn2,))
# Start subprocess
process1.start()
process2.start()
from multiprocessing import Process,Manager
def func(name,m_list,m_dict):
m_dict['age'] = 19
m_list.append(' I'm a handsome man !!')
if __name__ == '__main__':
# Manager And multiprocessing.Queue similar , It also communicates in the same way as global variables
# Although only one process is written here , It's the same to write two , Can communicate ...
with Manager() as mgr:
m_list = mgr.list()
m_dict = mgr.dict()
m_list.append(' I am a PD!!!')
p1 = Process(target=func, args=('p1',m_list,m_dict))
p1.start()
p1.join()
print(m_dict)
print(m_dict)
The process pool can provide a specified number of processes to users , That is, when a new request is submitted to the process pool , If the pool is not full , A new process will be created to execute the request ; conversely , If the number of processes in the pool has reached the specified maximum , Then the request will wait , As long as there are processes idle in the pool , The request can be executed .
Advantages of using process pools
Pool(processes)
Create process pool object processes Indicates how many processes are in the process pool pool.apply_async(func,args,kwds)
Asynchronous execution ; Put events into the process pool queue func Event function args Give... As a tuple func The ginseng kwds Give... In the form of a dictionary func The ginseng Return value : Returns an object representing a process pool event , By returning the value of get Method can get the return value of the event function pool.apply(func,args,kwds)
Synchronous execution ; Put events into the process pool queue func Event function args Give... As a tuple func The ginseng kwds Give... In the form of a dictionary func The ginseng pool.close()
Close process pool pool.join()
Recycle process pool pool.map(func,iter)
Be similar to python Of map function , Put the event to be done into the process pool func Function to execute iter Iteration objects from multiprocessing import Pool
import os
from time import sleep
def func(name):
print(f' The current process ID:{
os.getpid()},{
name}')
sleep(2)
return name
def func2(args):
print(f'callback:{
args}')
if __name__ == '__main__':
pool = Pool(5)
pool.apply_async(func=func,args=('pd',),callback=func2)
pool.apply_async(func=func,args=('pdd',),callback=func2)
pool.apply_async(func=func,args=('cpdd',),callback=func2)
pool.apply_async(func=func,args=(' Hello pd',))
pool.apply_async(func=func,args=(' Hello pdd',))
pool.apply_async(func=func,args=(' Hello cpdd',))
pool.apply_async(func=func,args=(' bye pd',))
pool.apply_async(func=func,args=(' bye pdd',))
pool.apply_async(func=func,args=(' bye cpdd',))
pool.close() # If you use with You don't need to close
pool.join()
Functional programming :
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
print(f' Of the current process ID: {
os.getpid()},{
name}')
sleep(2)
return name
if __name__ == '__main__':
with Pool(5) as pool:
args = pool.map(func1,('pd','pdd','cpdd',' Hello pd',' Hello pdd',
' Hello cpdd',' bye pd',' bye pdd',' bye cpdd'))
for a in args:
print(a)
coroutines , The full name is “ Collaborative process ”, Used to achieve task collaboration . Is a kind of in thread , More lightweight than threads , Programmers write their own programs to manage .
When there is a IO Blocking time ,CPU Have been waiting for IO return , In idle state . At this time, use the cooperative process , You can perform other tasks . When IO After returning the result , Come back and process the data . Make the most of it IO Waiting time , Improved efficiency .
notes :
asyncio Collaborative process is a better way to write crawlers . Better than multithreading and multiprocessing . Opening up new threads and processes is very time-consuming .
Disadvantages of the process
async
async
Used to declare a function as an asynchronous function , The characteristic of asynchronous function is that it can hang during function execution , To execute other asynchronous functions , Wait until the pending condition ( Suppose the hang condition is sleep(5) ) After disappearing , That is to say 5 When the second comes, come back and execute await
Used to declare that the program is suspended , For example, asynchronous programs need to wait a long time when they reach a certain step , Just hang this , To execute other asynchronous programs .import time
import asyncio
async def func1():
for i in range(1,4):
print(f'pd: The first {
i} Call command !!')
await asyncio.sleep(1)
return 'pd Call over ... Please answer the questions '
async def func2():
for k in range(1,4):
print(f' headquarters : The first {
k} Call PD!')
await asyncio.sleep(1)
return ' Headquarters call over ... Please answer the questions '
async def main():
res = await asyncio.gather(func1(), func2())
# await Asynchronous execution func1 Method
# gather Will alternate func1() and func2()
# The return value is the list of return values of the function
print(res)
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
end = time.time()
print(f' The elapsed time :{
end-start}')
Come back and execute
await
Used to declare that the program is suspended , For example, asynchronous programs need to wait a long time when they reach a certain step , Just hang this , To execute other asynchronous programs .import time
import asyncio
async def func1():
for i in range(1,4):
print(f'pd: The first {
i} Call command !!')
await asyncio.sleep(1)
return 'pd Call over ... Please answer the questions '
async def func2():
for k in range(1,4):
print(f' headquarters : The first {
k} Call PD!')
await asyncio.sleep(1)
return ' Headquarters call over ... Please answer the questions '
async def main():
res = await asyncio.gather(func1(), func2())
# await Asynchronous execution func1 Method
# gather Will alternate func1() and func2()
# The return value is the list of return values of the function
print(res)
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
end = time.time()
print(f' The elapsed time :{
end-start}')