程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python coroutine & asyncio & asynchronous programming

編輯:Python

Why talk about ?

  • More and more students are asking async Asynchronous related issues , And this part of the knowledge is not easy to learn ( Asynchronous non-blocking 、asyncio)
  • There are more and more asynchronous related topics and frameworks , for example :tornado、fastapi、django 3.x asgi 、aiohttp It's all asynchronous -> Lifting performance .

How to explain ?

  • The first part : coroutines .
  • The second part :asyncio The module is programmed asynchronously .
  • The third part : Practical cases .

1. coroutines

A coroutine is not provided by a computer , Programmers artificially create .

coroutines (Coroutine), It can also be called a microthread , It's a context switch technology in user mode . In short , In fact, code blocks are switched and executed through a thread . for example :

def func1():
print(1)
...
print(2)
def func2():
print(3)
...
print(4)
func1()
func2()

There are several ways to implement a collaborative process :

  • greenlet, Early modules .
  • yield keyword .
  • asyncio Decorator (py3.4)
  • async、await keyword (py3.5)【 recommend 】

1.1 greenlet Implement coroutines

pip3 install greenlet
from greenlet import greenlet
def func1():
print(1) # The first 1 Step : Output 1
gr2.switch() # The first 3 Step : Switch to func2 function 
print(2) # The first 6 Step : Output 2
gr2.switch() # The first 7 Step : Switch to func2 function , Continue to execute backward from the last execution position 
def func2():
print(3) # The first 4 Step : Output 3
gr1.switch() # The first 5 Step : Switch to func1 function , Continue to execute backward from the last execution position 
print(4) # The first 8 Step : Output 4
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # The first 1 Step : To carry out func1 function 

1.2 yield keyword

def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)

1.3 asyncio

stay python3.4 And later .

import asyncio
@asyncio.coroutine
def func1():
print(1)
# The Internet IO request : Download a picture 
yield from asyncio.sleep(2) # encounter IO The long-running , Automation switches to tasks Other tasks in 
print(2)
@asyncio.coroutine
def func2():
print(3)
# The Internet IO request : Download a picture 
yield from asyncio.sleep(2) # encounter IO The long-running , Automation switches to tasks Other tasks in 
print(4)
tasks = [
asyncio.ensure_future( func1() ),
asyncio.ensure_future( func2() )
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

Be careful : encounter IO Blocking automatic switching

1.4 async & await keyword

stay python3.5 And later .

import asyncio
async def func1():
print(1)
# The Internet IO request : Download a picture 
await asyncio.sleep(2) # encounter IO The long-running , Automation switches to tasks Other tasks in 
print(2)
async def func2():
print(3)
# The Internet IO request : Download a picture 
await asyncio.sleep(2) # encounter IO The long-running , Automation switches to tasks Other tasks in 
print(4)
tasks = [
asyncio.ensure_future( func1() ),
asyncio.ensure_future( func2() )
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

2. The significance of synergy

In a thread, if you encounter IO Waiting time , Threads don't wait , Use your spare time to do something else .

Case study : To download three pictures ( The Internet IO).

  • The ordinary way ( Sync )

    """ pip3 install requests """
    import requests
    def download_image(url):
    print(" Start the download :",url)
    # Send network request , Download the pictures 
    response = requests.get(url)
    print(" Download complete ")
    # Save the image to a local file 
    file_name = url.rsplit('_')[-1]
    with open(file_name, mode='wb') as file_object:
    file_object.write(response.content)
    if __name__ == '__main__':
    url_list = [
    'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
    'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
    'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    ]
    for item in url_list:
    download_image(item)
    
  • Coroutines way ( asynchronous )

    """ Download pictures using third-party modules aiohttp, Please install in advance :pip3 install aiohttp """
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import aiohttp
    import asyncio
    async def fetch(session, url):
    print(" Send a request :", url)
    async with session.get(url, verify_ssl=False) as response:
    content = await response.content.read()
    file_name = url.rsplit('_')[-1]
    with open(file_name, mode='wb') as file_object:
    file_object.write(content)
    print(' Download complete ',url)
    async def main():
    async with aiohttp.ClientSession() as session:
    url_list = [
    'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
    'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
    'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    ]
    tasks = [ asyncio.create_task(fetch(session, url)) for url in url_list ]
    await asyncio.wait(tasks)
    if __name__ == '__main__':
    asyncio.run( main() )
    

3. Asynchronous programming

3.1 The event loop

Understanding becomes an endless cycle , To detect and execute some code .

# Pseudo code
Task list = [ Mission 1, Mission 2, Mission 3,... ]
while True:
List of executable tasks , List of completed tasks = Go to the task list and check all the tasks , take ' Executable ' and ' Completed ' My task is to return to
for Ready for mission in List of executable tasks :
Perform the tasks that are ready
for Completed tasks in List of completed tasks :
Remove... From the task list Completed tasks
If Task list All the tasks in have been completed , Then stop the cycle
import asyncio
# To generate or get an event loop 
loop = asyncio.get_event_loop()
# Put the task on ` Task list `
loop.run_until_complete( Mission )

3.2 Quick start

Coroutines function , When defining functions async def Function name .

Coroutine object , perform Coroutines function () Get the coroutine object .

async def func():
pass
result = func()

Be careful : Execute the coroutine function to create a coroutine object , Function internal code will not execute .

If you want to run the internal code of the coroutine function , It is necessary to hand over the collaboration object to the event loop for processing .

import asyncio
async def func():
print(" Come and fuck me !")
result = func()
# loop = asyncio.get_event_loop()
# loop.run_until_complete( result )
asyncio.run( result ) # python3.7 

3.3 await

await + Waiting objects ( Coroutine object 、Future、Task object -> IO wait for )

Example 1:

import asyncio
async def func():
print(" Come and play ")
response = await asyncio.sleep(2)
print(" end ",response)
asyncio.run( func() )

Example 2:

import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print('end')
return ' Return value '
async def func():
print(" Execute the internal code of the coroutine function ")
# encounter IO Operation suspends the current orchestration ( Mission ), etc. IO After the operation is completed, continue to execute . When the current schedule is suspended , The event loop can be used to execute other coroutines ( Mission ).
response = await others()
print("IO End of request , The result is :", response)
asyncio.run( func() )

Example 3:

import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print('end')
return ' Return value '
async def func():
print(" Execute the internal code of the coroutine function ")
# encounter IO Operation suspends the current orchestration ( Mission ), etc. IO After the operation is completed, continue to execute . When the current schedule is suspended , The event loop can be used to execute other coroutines ( Mission ).
response1 = await others()
print("IO End of request , The result is :", response1)
response2 = await others()
print("IO End of request , The result is :", response2)
asyncio.run( func() )

await Is to wait for the value of the object to reach the result before continuing down .

3.4 Task object

Tasks are used to schedule coroutines concurrently.

When a coroutine is wrapped into a Task with functions like asyncio.create_task() the coroutine is automatically scheduled to run soon.

vernacular : Add multiple tasks to the event loop .

Tasks For concurrent scheduling coroutines , adopt asyncio.create_task( Coroutine object ) The way to create Task object , This allows the coroutine to join the event loop and wait for the scheduled execution . Besides using asyncio.create_task() Function , You can also use lower level loop.create_task() or ensure_future() function . Manual instantiation is not recommended Task object .

Be careful :asyncio.create_task() Function in Python 3.7 Is added to . stay Python 3.7 Before , You can switch to a lower level asyncio.ensure_future() function .

Example 1:

import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return " Return value "
async def main():
print("main Start ")
# establish Task object , The current execution func Add a function task to the event loop .
task1 = asyncio.create_task( func() )
# establish Task object , The current execution func Add a function task to the event loop .
task2 = asyncio.create_task( func() )
print("main end ")
# When executing a coroutine, you encounter IO In operation , Will automatically switch to perform other tasks .
# Here await It is to wait for all the corresponding coroutines to be executed and get the results 
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
asyncio.run( main() )

Example 2:

import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return " Return value "
async def main():
print("main Start ")
task_list = [
asyncio.create_task(func(), name='n1'),
asyncio.create_task(func(), name='n2')
]
print("main end ")
done, pending = await asyncio.wait(task_list, timeout=None)
print(done)
asyncio.run(main())

Example 3:

import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return " Return value "
task_list = [
func(),
func(),
]
done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)

3.5 asyncio.Future object

A Futureis a special low-level awaitable object that represents an eventual result of an asynchronous operation.

Task Inherit Future,Task Inside object await The processing of the results is based on Future It's from the object .

Example 1:

async def main():
# Get the current event loop 
loop = asyncio.get_running_loop()
# Create a task (Future object ), It's a task of doing nothing .
fut = loop.create_future()
# Waiting for the final result of the mission (Future object ), If there is no result, we will wait forever .
await fut
asyncio.run( main() )

Example 2:

import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result("666")
async def main():
# Get the current event loop 
loop = asyncio.get_running_loop()
# Create a task (Future object ), No behavior , Then the task will never know when to end .
fut = loop.create_future()
# Create a task (Task object ), The binding set_after function , The function is inside 2s after , Will give fut assignment .
# I.e. manual setting future The end result of the mission , that fut And that's it .
await loop.create_task( set_after(fut) )
# wait for Future Object acquisition final result , Otherwise, just wait 
data = await fut
print(data)
asyncio.run( main() )

3.5 concurrent.futures.Future object

Use thread pool 、 Objects used by the process pool to implement asynchronous operations .

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
print(value)
return 123
# Creating a thread pool 
pool = ThreadPoolExecutor(max_workers=5)
# Create a process pool 
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)

There may be cross time when writing code in the future . for example :crm project 80% They are all based on asynchronous programming + MySQL( I won't support it )【 Threads 、 Process to do asynchronous programming 】.

import time
import asyncio
import concurrent.futures
def func1():
# Some time-consuming operation 
time.sleep(2)
return "SB"
async def main():
loop = asyncio.get_running_loop()
# 1. Run in the default loop's executor ( Default ThreadPoolExecutor )
# First step : Internal will call first ThreadPoolExecutor Of submit Method to apply for a thread to execute in the thread pool func1 function , And return a concurrent.futures.Future object 
# The second step : call asyncio.wrap_future take concurrent.futures.Future The object is packaged as asycio.Future object .
# because concurrent.futures.Future Object does not support await grammar , So it needs to be packaged as asycio.Future object Can be used .
fut = loop.run_in_executor(None, func1)
result = await fut
print('default thread pool', result)
# 2. Run in a custom thread pool:
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(
# pool, func1)
# print('custom thread pool', result)
# 3. Run in a custom process pool:
# with concurrent.futures.ProcessPoolExecutor() as pool:
# result = await loop.run_in_executor(
# pool, func1)
# print('custom process pool', result)
asyncio.run( main() )

Case study :asyncio + Asynchronous modules are not supported

import asyncio
import requests
async def download_image(url):
# Send network request , Download the pictures ( Meet the network to download pictures IO request , Automation switches to other tasks )
print(" Start the download :", url)
loop = asyncio.get_event_loop()
# requests Module does not support asynchronous operation by default , So we use thread pool to implement .
future = loop.run_in_executor(None, requests.get, url)
response = await future
print(' Download complete ')
# Save the image to a local file 
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
tasks = [ download_image(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete( asyncio.wait(tasks) )

3.7 Asynchronous iterator

What is an asynchronous iterator

Realized __aiter__() and __anext__() Object of method .__anext__ Must return a awaitable object .async for Can handle asynchronous iterators __anext__() Method returns the waiting object , Until it triggers a StopAsyncIteration abnormal . from PEP 492 introduce .

What are asynchronous iteratable objects ?

Can be found in async for The object used in the statement . Must pass through it __aiter__() Method returns a asynchronous iterator. from PEP 492 introduce .

import asyncio
class Reader(object):
""" Custom asynchronous iterators ( It's also an asynchronous iterative object ) """
def __init__(self):
self.count = 0
async def readline(self):
# await asyncio.sleep(1)
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val == None:
raise StopAsyncIteration
return val
async def func():
obj = Reader()
async for item in obj:
print(item)
asyncio.run( func() )

3.8 Asynchronous context manager

Such objects are defined by __aenter__() and __aexit__() Method async with Control the environment in the statement . from PEP 492 introduce .

import asyncio
class AsyncContextManager:
def __init__(self):
self.conn = conn
async def do_something(self):
# Asynchronous operation database 
return 666
async def __aenter__(self):
# Asynchronously linked database 
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
# Closing database links asynchronously 
await asyncio.sleep(1)
async def func():
async with AsyncContextManager() as f:
result = await f.do_something()
print(result)
asyncio.run( func() )

4.uvloop

yes asyncio An alternative to the event loop . The event loop > Default asyncio The event loop of .

pip3 install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# To write asyncio Code for , Consistent with the previous code .
# The automation of the internal event loop becomes uvloop
asyncio.run(...)

Be careful : One asgi -> uvicorn What's used internally is uvloop

5. Practical cases

5.1 asynchronous redis

In the use of python Code operation redis when , link / operation / Disconnection is all network IO.

pip3 install aioredis

Example 1:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import aioredis
async def execute(address, password):
print(" Start execution ", address)
# The Internet IO operation : establish redis Connect 
redis = await aioredis.create_redis(address, password=password)
# The Internet IO operation : stay redis Set hash value in car, Three key value pairs are set internally , namely : redis = { car:{key1:1,key2:2,key3:3}}
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# The Internet IO operation : Go to redis Get the value 
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# The Internet IO operation : close redis Connect 
await redis.wait_closed()
print(" end ", address)
asyncio.run( execute('redis://47.93.4.198:6379', "root!2345") )

Example 2:

import asyncio
import aioredis
async def execute(address, password):
print(" Start execution ", address)
# The Internet IO operation : First, connect 47.93.4.197:6379, encounter IO The task is automatically switched , De link 47.93.4.198:6379
redis = await aioredis.create_redis_pool(address, password=password)
# The Internet IO operation : encounter IO Will automatically switch tasks 
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# The Internet IO operation : encounter IO Will automatically switch tasks 
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# The Internet IO operation : encounter IO Will automatically switch tasks 
await redis.wait_closed()
print(" end ", address)
task_list = [
execute('redis://47.93.4.197:6379', "root!2345"),
execute('redis://47.93.4.198:6379', "root!2345")
]
asyncio.run(asyncio.wait(task_list))

5.2 asynchronous MySQL

pip3 install aiomysql

Example 1:

import asyncio
import aiomysql
async def execute():
# The Internet IO operation : Connect MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql', )
# The Internet IO operation : establish CURSOR
cur = await conn.cursor()
# The Internet IO operation : perform SQL
await cur.execute("SELECT Host,User FROM user")
# The Internet IO operation : obtain SQL result 
result = await cur.fetchall()
print(result)
# The Internet IO operation : Close links 
await cur.close()
conn.close()
asyncio.run(execute())

Example 2:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import aiomysql
async def execute(host, password):
print(" Start ", host)
# The Internet IO operation : First, connect 47.93.40.197, encounter IO The task is automatically switched , De link 47.93.40.198:6379
conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
# The Internet IO operation : encounter IO Will automatically switch tasks 
cur = await conn.cursor()
# The Internet IO operation : encounter IO Will automatically switch tasks 
await cur.execute("SELECT Host,User FROM user")
# The Internet IO operation : encounter IO Will automatically switch tasks 
result = await cur.fetchall()
print(result)
# The Internet IO operation : encounter IO Will automatically switch tasks 
await cur.close()
conn.close()
print(" end ", host)
task_list = [
execute('47.93.41.197', "root!2345"),
execute('47.93.40.197', "root!2345")
]
asyncio.run(asyncio.wait(task_list))

5.3 FastAPI frame

install

pip3 install fastapi
pip3 install uvicorn (asgi Internally based on uvloop)

Example : luffy.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app = FastAPI()
# Create a redis Connection pool 
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123", minsize=1, maxsize=10)
@app.get("/")
def index():
""" Common operation interface """
return {
"message": "Hello World"}
@app.get("/red")
async def red():
""" Asynchronous operation interface """
print(" The request came. ")
await asyncio.sleep(3)
# Get a connection from the connection pool 
conn = await REDIS_POOL.acquire()
redis = Redis(conn)
# Set the value 
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# Read the values 
result = await redis.hgetall('car', encoding='utf-8')
print(result)
# Connection pool 
REDIS_POOL.release(conn)
return result
if __name__ == '__main__':
uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")

5.4 Reptiles

pip3 install aiohttp
import aiohttp
import asyncio
async def fetch(session, url):
print(" Send a request :", url)
async with session.get(url, verify_ssl=False) as response:
text = await response.text()
print(" Get the results :", url, len(text))
return text
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://python.org',
'https://www.baidu.com',
'https://www.pythonav.com'
]
tasks = [ asyncio.create_task(fetch(session, url)) for url in url_list]
done,pending = await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run( main() )

summary

The greatest significance : Utilize its... Through a thread IO Wait for time to do something else .


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved