Why talk about ?
How to explain ?
A coroutine is not provided by a computer , Programmers artificially create .
coroutines (Coroutine), It can also be called a microthread , It's a context switch technology in user mode . In short , In fact, code blocks are switched and executed through a thread . for example :
def func1():
print(1)
...
print(2)
def func2():
print(3)
...
print(4)
func1()
func2()
There are several ways to implement a collaborative process :
pip3 install greenlet
from greenlet import greenlet
def func1():
print(1) # The first 1 Step : Output 1
gr2.switch() # The first 3 Step : Switch to func2 function
print(2) # The first 6 Step : Output 2
gr2.switch() # The first 7 Step : Switch to func2 function , Continue to execute backward from the last execution position
def func2():
print(3) # The first 4 Step : Output 3
gr1.switch() # The first 5 Step : Switch to func1 function , Continue to execute backward from the last execution position
print(4) # The first 8 Step : Output 4
gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # The first 1 Step : To carry out func1 function
def func1():
yield 1
yield from func2()
yield 2
def func2():
yield 3
yield 4
f1 = func1()
for item in f1:
print(item)
stay python3.4 And later .
import asyncio
@asyncio.coroutine
def func1():
print(1)
# The Internet IO request : Download a picture
yield from asyncio.sleep(2) # encounter IO The long-running , Automation switches to tasks Other tasks in
print(2)
@asyncio.coroutine
def func2():
print(3)
# The Internet IO request : Download a picture
yield from asyncio.sleep(2) # encounter IO The long-running , Automation switches to tasks Other tasks in
print(4)
tasks = [
asyncio.ensure_future( func1() ),
asyncio.ensure_future( func2() )
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
Be careful : encounter IO Blocking automatic switching
stay python3.5 And later .
import asyncio
async def func1():
print(1)
# The Internet IO request : Download a picture
await asyncio.sleep(2) # encounter IO The long-running , Automation switches to tasks Other tasks in
print(2)
async def func2():
print(3)
# The Internet IO request : Download a picture
await asyncio.sleep(2) # encounter IO The long-running , Automation switches to tasks Other tasks in
print(4)
tasks = [
asyncio.ensure_future( func1() ),
asyncio.ensure_future( func2() )
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
In a thread, if you encounter IO Waiting time , Threads don't wait , Use your spare time to do something else .
Case study : To download three pictures ( The Internet IO).
The ordinary way ( Sync )
""" pip3 install requests """
import requests
def download_image(url):
print(" Start the download :",url)
# Send network request , Download the pictures
response = requests.get(url)
print(" Download complete ")
# Save the image to a local file
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
for item in url_list:
download_image(item)
Coroutines way ( asynchronous )
""" Download pictures using third-party modules aiohttp, Please install in advance :pip3 install aiohttp """
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import aiohttp
import asyncio
async def fetch(session, url):
print(" Send a request :", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)
print(' Download complete ',url)
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
tasks = [ asyncio.create_task(fetch(session, url)) for url in url_list ]
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run( main() )
Understanding becomes an endless cycle , To detect and execute some code .
# Pseudo code
Task list = [ Mission 1, Mission 2, Mission 3,... ]
while True:
List of executable tasks , List of completed tasks = Go to the task list and check all the tasks , take ' Executable ' and ' Completed ' My task is to return to
for Ready for mission in List of executable tasks :
Perform the tasks that are ready
for Completed tasks in List of completed tasks :
Remove... From the task list Completed tasks
If Task list All the tasks in have been completed , Then stop the cycle
import asyncio
# To generate or get an event loop
loop = asyncio.get_event_loop()
# Put the task on ` Task list `
loop.run_until_complete( Mission )
Coroutines function , When defining functions async def Function name
.
Coroutine object , perform Coroutines function () Get the coroutine object .
async def func():
pass
result = func()
Be careful : Execute the coroutine function to create a coroutine object , Function internal code will not execute .
If you want to run the internal code of the coroutine function , It is necessary to hand over the collaboration object to the event loop for processing .
import asyncio
async def func():
print(" Come and fuck me !")
result = func()
# loop = asyncio.get_event_loop()
# loop.run_until_complete( result )
asyncio.run( result ) # python3.7
await + Waiting objects ( Coroutine object 、Future、Task object -> IO wait for )
Example 1:
import asyncio
async def func():
print(" Come and play ")
response = await asyncio.sleep(2)
print(" end ",response)
asyncio.run( func() )
Example 2:
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print('end')
return ' Return value '
async def func():
print(" Execute the internal code of the coroutine function ")
# encounter IO Operation suspends the current orchestration ( Mission ), etc. IO After the operation is completed, continue to execute . When the current schedule is suspended , The event loop can be used to execute other coroutines ( Mission ).
response = await others()
print("IO End of request , The result is :", response)
asyncio.run( func() )
Example 3:
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print('end')
return ' Return value '
async def func():
print(" Execute the internal code of the coroutine function ")
# encounter IO Operation suspends the current orchestration ( Mission ), etc. IO After the operation is completed, continue to execute . When the current schedule is suspended , The event loop can be used to execute other coroutines ( Mission ).
response1 = await others()
print("IO End of request , The result is :", response1)
response2 = await others()
print("IO End of request , The result is :", response2)
asyncio.run( func() )
await Is to wait for the value of the object to reach the result before continuing down .
Tasks are used to schedule coroutines concurrently.
When a coroutine is wrapped into a Task with functions like
asyncio.create_task()
the coroutine is automatically scheduled to run soon.
vernacular : Add multiple tasks to the event loop .
Tasks For concurrent scheduling coroutines , adopt asyncio.create_task( Coroutine object )
The way to create Task object , This allows the coroutine to join the event loop and wait for the scheduled execution . Besides using asyncio.create_task()
Function , You can also use lower level loop.create_task()
or ensure_future()
function . Manual instantiation is not recommended Task object .
Be careful :asyncio.create_task()
Function in Python 3.7 Is added to . stay Python 3.7 Before , You can switch to a lower level asyncio.ensure_future()
function .
Example 1:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return " Return value "
async def main():
print("main Start ")
# establish Task object , The current execution func Add a function task to the event loop .
task1 = asyncio.create_task( func() )
# establish Task object , The current execution func Add a function task to the event loop .
task2 = asyncio.create_task( func() )
print("main end ")
# When executing a coroutine, you encounter IO In operation , Will automatically switch to perform other tasks .
# Here await It is to wait for all the corresponding coroutines to be executed and get the results
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
asyncio.run( main() )
Example 2:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return " Return value "
async def main():
print("main Start ")
task_list = [
asyncio.create_task(func(), name='n1'),
asyncio.create_task(func(), name='n2')
]
print("main end ")
done, pending = await asyncio.wait(task_list, timeout=None)
print(done)
asyncio.run(main())
Example 3:
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return " Return value "
task_list = [
func(),
func(),
]
done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)
A
Future
is a special low-level awaitable object that represents an eventual result of an asynchronous operation.
Task Inherit Future,Task Inside object await The processing of the results is based on Future It's from the object .
Example 1:
async def main():
# Get the current event loop
loop = asyncio.get_running_loop()
# Create a task (Future object ), It's a task of doing nothing .
fut = loop.create_future()
# Waiting for the final result of the mission (Future object ), If there is no result, we will wait forever .
await fut
asyncio.run( main() )
Example 2:
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result("666")
async def main():
# Get the current event loop
loop = asyncio.get_running_loop()
# Create a task (Future object ), No behavior , Then the task will never know when to end .
fut = loop.create_future()
# Create a task (Task object ), The binding set_after function , The function is inside 2s after , Will give fut assignment .
# I.e. manual setting future The end result of the mission , that fut And that's it .
await loop.create_task( set_after(fut) )
# wait for Future Object acquisition final result , Otherwise, just wait
data = await fut
print(data)
asyncio.run( main() )
Use thread pool 、 Objects used by the process pool to implement asynchronous operations .
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
print(value)
return 123
# Creating a thread pool
pool = ThreadPoolExecutor(max_workers=5)
# Create a process pool
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
print(fut)
There may be cross time when writing code in the future . for example :crm project 80% They are all based on asynchronous programming + MySQL( I won't support it )【 Threads 、 Process to do asynchronous programming 】.
import time
import asyncio
import concurrent.futures
def func1():
# Some time-consuming operation
time.sleep(2)
return "SB"
async def main():
loop = asyncio.get_running_loop()
# 1. Run in the default loop's executor ( Default ThreadPoolExecutor )
# First step : Internal will call first ThreadPoolExecutor Of submit Method to apply for a thread to execute in the thread pool func1 function , And return a concurrent.futures.Future object
# The second step : call asyncio.wrap_future take concurrent.futures.Future The object is packaged as asycio.Future object .
# because concurrent.futures.Future Object does not support await grammar , So it needs to be packaged as asycio.Future object Can be used .
fut = loop.run_in_executor(None, func1)
result = await fut
print('default thread pool', result)
# 2. Run in a custom thread pool:
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(
# pool, func1)
# print('custom thread pool', result)
# 3. Run in a custom process pool:
# with concurrent.futures.ProcessPoolExecutor() as pool:
# result = await loop.run_in_executor(
# pool, func1)
# print('custom process pool', result)
asyncio.run( main() )
Case study :asyncio + Asynchronous modules are not supported
import asyncio
import requests
async def download_image(url):
# Send network request , Download the pictures ( Meet the network to download pictures IO request , Automation switches to other tasks )
print(" Start the download :", url)
loop = asyncio.get_event_loop()
# requests Module does not support asynchronous operation by default , So we use thread pool to implement .
future = loop.run_in_executor(None, requests.get, url)
response = await future
print(' Download complete ')
# Save the image to a local file
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
if __name__ == '__main__':
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
tasks = [ download_image(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete( asyncio.wait(tasks) )
What is an asynchronous iterator
Realized __aiter__()
and __anext__()
Object of method .__anext__
Must return a awaitable object .async for
Can handle asynchronous iterators __anext__()
Method returns the waiting object , Until it triggers a StopAsyncIteration
abnormal . from PEP 492 introduce .
What are asynchronous iteratable objects ?
Can be found in async for
The object used in the statement . Must pass through it __aiter__()
Method returns a asynchronous iterator. from PEP 492 introduce .
import asyncio
class Reader(object):
""" Custom asynchronous iterators ( It's also an asynchronous iterative object ) """
def __init__(self):
self.count = 0
async def readline(self):
# await asyncio.sleep(1)
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val == None:
raise StopAsyncIteration
return val
async def func():
obj = Reader()
async for item in obj:
print(item)
asyncio.run( func() )
Such objects are defined by __aenter__()
and __aexit__()
Method async with
Control the environment in the statement . from PEP 492 introduce .
import asyncio
class AsyncContextManager:
def __init__(self):
self.conn = conn
async def do_something(self):
# Asynchronous operation database
return 666
async def __aenter__(self):
# Asynchronously linked database
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
# Closing database links asynchronously
await asyncio.sleep(1)
async def func():
async with AsyncContextManager() as f:
result = await f.do_something()
print(result)
asyncio.run( func() )
yes asyncio An alternative to the event loop . The event loop > Default asyncio The event loop of .
pip3 install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# To write asyncio Code for , Consistent with the previous code .
# The automation of the internal event loop becomes uvloop
asyncio.run(...)
Be careful : One asgi -> uvicorn
What's used internally is uvloop
In the use of python Code operation redis when , link / operation / Disconnection is all network IO.
pip3 install aioredis
Example 1:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import aioredis
async def execute(address, password):
print(" Start execution ", address)
# The Internet IO operation : establish redis Connect
redis = await aioredis.create_redis(address, password=password)
# The Internet IO operation : stay redis Set hash value in car, Three key value pairs are set internally , namely : redis = { car:{key1:1,key2:2,key3:3}}
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# The Internet IO operation : Go to redis Get the value
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# The Internet IO operation : close redis Connect
await redis.wait_closed()
print(" end ", address)
asyncio.run( execute('redis://47.93.4.198:6379', "root!2345") )
Example 2:
import asyncio
import aioredis
async def execute(address, password):
print(" Start execution ", address)
# The Internet IO operation : First, connect 47.93.4.197:6379, encounter IO The task is automatically switched , De link 47.93.4.198:6379
redis = await aioredis.create_redis_pool(address, password=password)
# The Internet IO operation : encounter IO Will automatically switch tasks
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# The Internet IO operation : encounter IO Will automatically switch tasks
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# The Internet IO operation : encounter IO Will automatically switch tasks
await redis.wait_closed()
print(" end ", address)
task_list = [
execute('redis://47.93.4.197:6379', "root!2345"),
execute('redis://47.93.4.198:6379', "root!2345")
]
asyncio.run(asyncio.wait(task_list))
pip3 install aiomysql
Example 1:
import asyncio
import aiomysql
async def execute():
# The Internet IO operation : Connect MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql', )
# The Internet IO operation : establish CURSOR
cur = await conn.cursor()
# The Internet IO operation : perform SQL
await cur.execute("SELECT Host,User FROM user")
# The Internet IO operation : obtain SQL result
result = await cur.fetchall()
print(result)
# The Internet IO operation : Close links
await cur.close()
conn.close()
asyncio.run(execute())
Example 2:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import aiomysql
async def execute(host, password):
print(" Start ", host)
# The Internet IO operation : First, connect 47.93.40.197, encounter IO The task is automatically switched , De link 47.93.40.198:6379
conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
# The Internet IO operation : encounter IO Will automatically switch tasks
cur = await conn.cursor()
# The Internet IO operation : encounter IO Will automatically switch tasks
await cur.execute("SELECT Host,User FROM user")
# The Internet IO operation : encounter IO Will automatically switch tasks
result = await cur.fetchall()
print(result)
# The Internet IO operation : encounter IO Will automatically switch tasks
await cur.close()
conn.close()
print(" end ", host)
task_list = [
execute('47.93.41.197', "root!2345"),
execute('47.93.40.197', "root!2345")
]
asyncio.run(asyncio.wait(task_list))
install
pip3 install fastapi
pip3 install uvicorn (asgi Internally based on uvloop)
Example : luffy.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app = FastAPI()
# Create a redis Connection pool
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123", minsize=1, maxsize=10)
@app.get("/")
def index():
""" Common operation interface """
return {
"message": "Hello World"}
@app.get("/red")
async def red():
""" Asynchronous operation interface """
print(" The request came. ")
await asyncio.sleep(3)
# Get a connection from the connection pool
conn = await REDIS_POOL.acquire()
redis = Redis(conn)
# Set the value
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# Read the values
result = await redis.hgetall('car', encoding='utf-8')
print(result)
# Connection pool
REDIS_POOL.release(conn)
return result
if __name__ == '__main__':
uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")
pip3 install aiohttp
import aiohttp
import asyncio
async def fetch(session, url):
print(" Send a request :", url)
async with session.get(url, verify_ssl=False) as response:
text = await response.text()
print(" Get the results :", url, len(text))
return text
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://python.org',
'https://www.baidu.com',
'https://www.pythonav.com'
]
tasks = [ asyncio.create_task(fetch(session, url)) for url in url_list]
done,pending = await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run( main() )
The greatest significance : Utilize its... Through a thread IO Wait for time to do something else .