1. Conventional Sync Syntax request example
2. Asynchronous requests
3. The generator based process
3.1 generator
3.2 Using generator to realize collaborative process
stay io In more scenes , Async
Programs written in syntax take less time , Less resources to accomplish the same task , This article introduces Python
Of Async
How to realize the synergy of grammar .
Still the same , In understanding Async
Before the implementation of Syntax , Start with one Sync
Start with a grammar example , Now suppose there's one HTTP request , This program will get the corresponding response content through this request , And print it out , The code is as follows :
import socketdef request(host: str) -> None: """ Simulate the request and print the response body """ url: str = f"http://{host}" sock: socket.SocketType = socket.socket() sock.connect((host, 80)) sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii")) response_bytes: bytes = b"" chunk: bytes = sock.recv(4096) while chunk: response_bytes += chunk chunk = sock.recv(4096) print("\n".join([i for i in response_bytes.decode().split("\r\n")]))if __name__ == "__main__": request("so1n.me")
Run the program , The program can output normally , The corresponding... Is printed in the upper part HTTP Respond to Header, The next part is printed HTTP Response body , , You can see that the server asks us to https Re request in the form of , The output is as follows :
HTTP/1.1 301 Moved PermanentlyServer: GitHub.comContent-Type: text/htmlLocation: https://so1n.me/X-GitHub-Request-Id: A744:3871:4136AF:48BD9F:6188DB50Content-Length: 162Accept-Ranges: bytesDate: Mon, 08 Nov 2021 08:11:37 GMTVia: 1.1 varnishAge: 104Connection: closeX-Served-By: cache-qpg1272-QPGX-Cache: HITX-Cache-Hits: 2X-Timer: S1636359097.026094,VS0,VE0Vary: Accept-EncodingX-Fastly-Request-ID: 22fa337f777553d33503cee5282598c6a293fb5e<html><head><title>301 Moved Permanently</title></head><body><center><h1>301 Moved Permanently</h1></center><hr><center>nginx</center></body></html>
But this is not to say HTTP How is the request implemented , I don't know the details , In this code , socket The default call to is blocked , When a thread calls connect
perhaps recv
when (send
There is no need to wait , But you need to wait first when you send high and low drain
After that send
, Small demo There is no need to use drain
Method ), The program will pause until the operation is completed . When downloading many web pages at once , This will be the same as the above article , Most of the waiting time is spent on io above , cpu But always free , Although using thread pool can solve this problem , But the cost is huge , At the same time, the operating system often limits a process , The number of threads that a user or machine can use , But the collaborative process does not have these restrictions , Take up less resources , There are no system bottlenecks .
Asynchrony allows a single thread to handle concurrent operations , But as I said above , socket Is blocked by default , So we need to socket Set to non blocking , socket Provides setblocking
This method allows developers to choose whether to block , After non blocking is set , connect
and recv
The method also needs to be changed .
Because there is no blockage , The program is calling connect
I'll be right back , It's just Python
The bottom is C
, This code is in C
Middle note calling non blocking socket.connect An exception will be thrown after , We need to capture it , Just like this. :
import socketsock: socket.SocketType = socket.socket()sock.setblocking(Flase)try: sock.connect(("so1n.me", 80))except BlockingIOError: pass
After a meal of operation , Start applying for connection , But we still don't know when the connection will be established , Called when the connection is not established send
Will report a mistake , So you can always poll to call send
Until no error is reported, it is considered as success ( Real code needs to be timed out ):
while True: try: sock.send(request) break except OSError as e: pass
But it makes CPU Idling is a waste of performance , And I can't do anything else during this period , It's like we've been calling after ordering takeout to ask if the food is ready , It's a waste of telephone charges , If the meal is finished, call us and tell us , Then there is only one cost , Very economical ( This is also the case under normal circumstances ).
At this time, the event cycle is needed , In the class UNIX in , One called select
The function of , It can wait for the event to occur before calling the listening function , However, the initial implementation performance is not very good , stay Linux
Upper quilt epoll
replace , But the interface is similar , Where in the Python
These different event loops are encapsulated in selectors
In the library , At the same time through DefaultSelector
Pick the best class from the system select
function .
Let's not talk about the principle of event cycle for the time being , The most important part of the event cycle is the two parts of his name , One is the event , One is the cycle , stay Python
in , You can register events in the event loop by :
def demo(): passselector.register(fd, EVENT_WRITE, demo)
This event loop will listen to the corresponding file descriptor fd, When this file descriptor triggers a write event (EVENT_WRITE) when , The event loop will tell us that we can call the registered function demo
. However, if you change the above code to run in this way, you will find that , The program seems to end without running , But the program actually runs , It's just that they have completed the registration , Then wait for the developer to receive the event of the event loop and carry out the next operation , So we just need to write the following code at the end of the code :
while True: for key, mask in selector.select(): key.data()
So the program will run all the time , When an event is captured , Would pass for The cycle tells us , among key.data
Is the callback function we registered , When the event occurs , Will inform us , We can get the callback function and run , After understanding , We can write our first concurrent program , He realized a simple I/O Reusable little logic , The code and comments are as follows :
import socketfrom selectors import DefaultSelector, EVENT_READ, EVENT_WRITE# Select event loop selector: DefaultSelector = DefaultSelector()# Used to determine whether there are events running running_cnt: int = 0def request(host: str) -> None: """ Simulate the request and print the response body """ # Tell the main function , Their own events are still running global running_cnt running_cnt += 1 # initialization socket url: str = f"http://{host}" sock: socket.SocketType = socket.socket() sock.setblocking(False) try: sock.connect((host, 80)) except BlockingIOError: pass response_bytes: bytes = b"" def read_response() -> None: """ Receive response parameters , And determine whether the request ends """ nonlocal response_bytes chunk: bytes = sock.recv(4096) print(f"recv {host} body success") if chunk: response_bytes += chunk else: # No data means the request is over , Cancellation of listening selector.unregister(sock.fileno()) global running_cnt running_cnt -= 1 def connected() -> None: """socket Callback when establishing connection """ # Cancel monitoring selector.unregister(sock.fileno()) print(f"{host} connect success") # Send a request , And listen for read Events , And register the corresponding receiving response function sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii")) selector.register(sock.fileno(), EVENT_READ, read_response) selector.register(sock.fileno(), EVENT_WRITE, connected)if __name__ == "__main__": # Multiple requests at the same time request("so1n.me") request("github.com") request("google.com") request("baidu.com") # Listen for events running while running_cnt > 0: # Wait for the event loop to notify whether the event has completed for key, mask in selector.select(): key.data()
This code is close to registering 4 A request and register to establish a connection callback , Then enter the event loop logic , That is to hand over control to the event cycle , Until the event loop tells the program that it has received socket Notice of establishment , The program cancels the registered callback and sends a request , And register a read event callback , Then give control to the event loop , Only after receiving the response result will it enter the response result processing function, and only after receiving all the response results will it exit the program .
Here is one of my execution results
so1n.me connect success
github.com connect success
google.com connect success
recv google.com body success
recv google.com body success
baidu.com connect success
recv github.com body success
recv github.com body success
recv baidu.com body success
recv baidu.com body success
recv so1n.me body success
recv so1n.me body success
You can see that their execution order is random , Not strictly in accordance with so1n.me
, github.com
, google.com
, baidu.com
Sequential execution , At the same time, they execute quickly , This program takes about the same time as the function with the longest response time .
But you can see that , There are two callbacks in this program , Callbacks can make the code very strange , Reduce readability , It's also easy to cause hell , And when the callback reports an error , It's hard for us to know what caused the mistake , Because its context is lost , It's very confusing to check the problem like this . As a programmer , Generally, they are not satisfied with fast code , What I really want is to be fast , It's like Sync
As simple as your code , High readability , Code that can also easily troubleshoot problems , The design pattern of this combined form of code is called coprocessing .
The synergy appeared very early , It's not like a thread , Scheduled by the system , It's an autonomous pause , And wait for the event cycle notification to resume . Because the collaborative process is implemented at the software level , So there are many ways to implement it , What we want to talk about here is the co process based on Generator , Because generators are the same as coroutines , There are ways to suspend concessions and resume ( You can also use throw
To throw the wrong ), At the same time, it's with Async
The grammatical process is very similar , By understanding generator based coroutines , You can learn Async
How is the collaboration process realized .
Before understanding generator based coroutines , You need to understand the generator first , Python
The generator function of is different from the ordinary function , Only ordinary functions with keywords yield
, So it's the generator function , What's different can be seen from their bytecode :
In [1]: import dis# Ordinary function In [2]: def aaa(): passIn [3]: dis.dis(aaa) 1 0 LOAD_CONST 0 (None) 2 RETURN_VALUE# Ordinary function call function In [4]: def bbb(): ...: aaa() ...:In [5]: dis.dis(bbb) 2 0 LOAD_GLOBAL 0 (aaa) 2 CALL_FUNCTION 0 4 POP_TOP 6 LOAD_CONST 0 (None) 8 RETURN_VALUE# General generator function In [6]: def ccc(): yieldIn [7]: dis.dis(ccc) 1 0 LOAD_CONST 0 (None) 2 YIELD_VALUE 4 POP_TOP 6 LOAD_CONST 0 (None) 8 RETURN_VALUE
The above are ordinary functions , Bytecode of ordinary function calling function and ordinary generator function , It can be seen from the bytecode , The simplest function only needs LOAD_CONST
To load variables None Press into your own stack , And then through RETURN_VALUE
Return value , Ordinary functions with function calls load variables first , Function of global variable aaa
Load into your own stack , And then through CALL_FUNCTION
To call a function , Finally through POP_TOP
Throw the return value of the function out of the stack , Then pass LOAD_CONST
hold None Press into your own stack , Finally, the return value .
Generator functions are different , It will go through first LOAD_CONST
To load variables None Press into your own stack , And then through YIELD_VALUE
Return value , Then passed POP_TOP
Pop up the stack and reset the variable None Press into your own stack , Finally through RETURN_VALUE
Return value . From the analysis of bytecode, we can clearly see , The generator can be used in yield
Distinguish between two stack frames , A function call can be divided into multiple returns , It is very consistent with the characteristics of many times waiting for the process .
Next, let's look at a use of the generator , This generator will have two yield
call , And return the string at the end 'None'
, The code is as follows :
In [8]: def demo(): ...: a = 1 ...: b = 2 ...: print('aaa', locals()) ...: yield 1 ...: print('bbb', locals()) ...: yield 2 ...: return 'None' ...:In [9]: demo_gen = demo()In [10]: demo_gen.send(None)aaa {'a': 1, 'b': 2}Out[10]: 1In [11]: demo_gen.send(None)bbb {'a': 1, 'b': 2}Out[11]: 2In [12]: demo_gen.send(None)---------------------------------------------------------------------------StopIteration Traceback (most recent call last)<ipython-input-12-8f8cb075d6af> in <module>----> 1 demo_gen.send(None)StopIteration: None
This code first generates a through a function call demo_gen
Generator object for , And then for the first time send
Return value when calling 1, The second time send
Return value when calling 2, third time send
Call throws StopIteration
abnormal , The exception prompt is None
, At the same time, you can see the first printing aaa
And the second print bbb
when , They can all print to the local variables of the current function , It can be found that even in different stack frames , They read that the local variables in the current local function are consistent , This means that if you use a generator to simulate a coroutine , It will still read to the current context , It's perfect .
Besides , Python
It also supports the adoption of yield from
Syntax to return a generator , The code is as follows :
In [1]: def demo_gen_1(): ...: for i in range(3): ...: yield i ...:In [2]: def demo_gen_2(): ...: yield from demo_gen_1() ...:In [3]: demo_gen_obj = demo_gen_2()In [4]: demo_gen_obj.send(None)Out[4]: 0In [5]: demo_gen_obj.send(None)Out[5]: 1In [6]: demo_gen_obj.send(None)Out[6]: 2In [7]: demo_gen_obj.send(None)---------------------------------------------------------------------------StopIteration Traceback (most recent call last)<ipython-input-7-f9922a2f64c9> in <module>----> 1 demo_gen_obj.send(None)StopIteration:
adopt yield from
You can easily support generator calls , If you treat each generator function as a coroutine , That passed yield from
It is very convenient to realize the call between coroutines , In addition, the reminder after the generator throws an exception is very user-friendly , Also support throw
To throw an exception , In this way, we can set exceptions when the collaboration runs , such as Cancel
, The demo code is as follows :
In [1]: def demo_exc(): ...: yield 1 ...: raise RuntimeError() ...:In [2]: def demo_exc_1(): ...: for i in range(3): ...: yield i ...:In [3]: demo_exc_gen = demo_exc()In [4]: demo_exc_gen.send(None)Out[4]: 1In [5]: demo_exc_gen.send(None)---------------------------------------------------------------------------RuntimeError Traceback (most recent call last)<ipython-input-5-09fbb75fdf7d> in <module>----> 1 demo_exc_gen.send(None)<ipython-input-1-69afbc1f9c19> in demo_exc() 1 def demo_exc(): 2 yield 1----> 3 raise RuntimeError() 4 RuntimeError: In [6]: demo_exc_gen_1 = demo_exc_1()In [7]: demo_exc_gen_1.send(None) Out[7]: 0In [8]: demo_exc_gen_1.send(None) Out[8]: 1In [9]: demo_exc_gen_1.throw(RuntimeError) ---------------------------------------------------------------------------RuntimeError Traceback (most recent call last)<ipython-input-9-1a1cc55d71f4> in <module>----> 1 demo_exc_gen_1.throw(RuntimeError)<ipython-input-2-2617b2366dce> in demo_exc_1() 1 def demo_exc_1(): 2 for i in range(3):----> 3 yield i 4 RuntimeError:
You can see that when an exception is thrown during operation , There will be a very clear mistake , It is obvious that the error stack , meanwhile throw
After specifying the exception , Will be next yield
Throw an exception ( So the coroutine calls Cancel
It won't be cancelled immediately , But it will be cancelled the next time ).
We have simply learned that the generator is a programming model that fits the collaborative process very well , Also know which generators API That's what we need API, Next, you can imitate Asyncio
Interface to implement a simple collaborative process .
The first is Asyncio
There is a package called Feature
, It is used to indicate that the cooperation process is waiting for the results in the future , Here is my basis asyncio.Feature
A simple package Feature
, its API No, asyncio.Feature
whole , The code and comments are as follows :
class Status: """ Used to judge Future state """ pending: int = 1 finished: int = 2 cancelled: int = 3class Future(object): def __init__(self) -> None: """ On initialization , Feature Handle pending state , wait for set result""" self.status: int = Status.pending self._result: Any = None self._exception: Optional[Exception] = None self._callbacks: List[Callable[['Future'], None]] = [] def add_done_callback(self, fn: [['Future'], None]Callable) -> None: """ Add callback on completion """ self._callbacks.append(fn) def cancel(self): """ Cancel current Feature""" if self.status != Status.pending: return False self.status = Status.cancelled for fn in self._callbacks: fn(self) return True def set_exception(self, exc: Exception) -> None: """ Set the abnormal """ if self.status != Status.pending: raise RuntimeError("Can not set exc") self._exception = exc self.status = Status.finished def set_result(self, result: Any) -> None: """ Set result """ if self.status != Status.pending: raise RuntimeError("Can not set result") self.status = Status.finished self._result = result for fn in self._callbacks: fn(self) def result(self): """ To get the results """ if self.status == Status.cancelled: raise asyncio.CancelledError elif self.status != Status.finished: raise RuntimeError("Result is not read") elif self._exception is not None: raise self._exception return self._result def __iter__(self): """ Simulate the co process through the generator , When receiving the result notification , Will return results """ if self.status == Status.pending: yield self return self.result()
Understanding Future
when , You can think of it as a state machine , When starting initialization, it is peding
state , When running, we can switch its state , And through __iter__
Method to support the caller to use yield from Future()
To wait for Future
In itself , Until the event notification is received , We can get the result .
But you can find this Future
Is not self driven , Called __iter__
Your program doesn't know when it was called set_result
, stay Asyncio
Through a call Task
Class to drive Future
, It arranges the execution process of a collaborative process , And be responsible for executing the coordination process in the event cycle . It has two main ways :
1. On initialization , Will pass first send
Method activation generator
2. The next waiting time will be arranged immediately after being dispatched , Unless you throw StopIteration
abnormal
There is also a method that supports canceling the running of managed processes ( In the original code , Task
It is inherited from Future
, therefore Future
Some of it has ), The simplified code is as follows :
class Task: def __init__(self, coro: Generator) -> None: # Initialization status self.cancelled: bool = False self.coro: Generator = coro # Pre excitation an ordinary future f: Future = Future() f.set_result(None) self.step(f) def cancel(self) -> None: """ Used to unmanage coro""" self.coro.throw(asyncio.CancelledError) def step(self, f: Future) -> None: """ Used to invoke coro Next step , From the first activation , Add callback at completion every time , Until cancellation or StopIteration abnormal """ try: _future = self.coro.send(f.result()) except asyncio.CancelledError: self.cancelled = True return except StopIteration: return _future.add_done_callback(self.step)
such Future
and Task
It's encapsulated , You can simply try the effect :
In [2]:def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]: ...: result = yield from f ...: print(flag_int, result) ...: ...:future: Future = Future() ...:for i in range(3): ...: coro = wait_future(future, i) ...: # trusteeship wait_future This process , Inside Future Also through yield from Be hosted ...: Task(coro) ...: ...:print('ready') ...:future.set_result('ok') ...: ...:future = Future() ...:Task(wait_future(future, 3)).cancel() ...: ready0 ok1 ok2 ok---------------------------------------------------------------------------CancelledError Traceback (most recent call last)<ipython-input-2-2d1b04db2604> in <module> 12 13 future = Future()---> 14 Task(wait_future(future, 3)).cancel()<ipython-input-1-ec3831082a88> in cancel(self) 81 82 def cancel(self) -> None:---> 83 self.coro.throw(asyncio.CancelledError) 84 85 def step(self, f: Future) -> None:<ipython-input-2-2d1b04db2604> in wait_future(f, flag_int) 1 def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]:----> 2 result = yield from f 3 print(flag_int, result) 4 5 future: Future = Future()<ipython-input-1-ec3831082a88> in __iter__(self) 68 """ Simulate the co process through the generator , When receiving the result notification , Will return results """ 69 if self.status == Status.pending:---> 70 yield self 71 return self.result() 72 CancelledError:
This program will initialize Future
, And put Future
Pass to wait_future
And generate a generator , Then hand it over to Task
trusteeship , Pre excitation , because Future
Is in the generator function wait_future
Pass through yield from
Function bound , What is really pre stimulated is Future
Of __iter__
Methods yield self
, At this point, the code logic will pause in yield self
And back to .
After all pre excitation , By calling Future
Of set_result
Method , send Future
Change to the end state , because set_result
The registered callback will be executed , At this point, it will execute the Task
Of step
Methods send
Method , The code logic goes back to Future
Of __iter__
Methods yield self
, And keep going , And then meet return
Return results , And keep going , From the output, it can be found that the program is encapsulated and printed ready
after , The corresponding return results will be printed in turn , And in the last test cancel
Method ,Future
Exception thrown , At the same time, these anomalies are easy to understand , Be able to follow where you call .
Now? Future
and Task
Up and running , It can be integrated with the program we executed at the beginning , The code is as follows :
class HttpRequest(object): def __init__(self, host: str): """ Initialize variables and sock""" self._host: str = host global running_cnt running_cnt += 1 self.url: str = f"http://{host}" self.sock: socket.SocketType = socket.socket() self.sock.setblocking(False) try: self.sock.connect((host, 80)) except BlockingIOError: pass def read(self) -> Generator[Future, None, bytes]: """ from socket Get response data , and set To Future in , And pass Future.__iter__ Method or get the data and pass it through the variable chunk_future return """ f: Future = Future() selector.register(self.sock.fileno(), EVENT_READ, lambda: f.set_result(self.sock.recv(4096))) chunk_future = yield from f selector.unregister(self.sock.fileno()) return chunk_future # type: ignore def read_response(self) -> Generator[Future, None, bytes]: """ Receive response parameters , And determine whether the request ends """ response_bytes: bytes = b"" chunk = yield from self.read() while chunk: response_bytes += chunk chunk = yield from self.read() return response_bytes def connected(self) -> Generator[Future, None, None]: """socket Callback when establishing connection """ # Cancel monitoring f: Future = Future() selector.register(self.sock.fileno(), EVENT_WRITE, lambda: f.set_result(None)) yield f selector.unregister(self.sock.fileno()) print(f"{self._host} connect success") def request(self) -> Generator[Future, None, None]: # Send a request , And listen for read Events , And register the corresponding receiving response function yield from self.connected() self.sock.send(f"GET {self.url} HTTP/1.0\r\nHost: {self._host}\r\n\r\n".encode("ascii")) response = yield from self.read_response() print(f"request {self._host} success, length:{len(response)}") global running_cnt running_cnt -= 1if __name__ == "__main__": # Multiple requests at the same time Task(HttpRequest("so1n.me").request()) Task(HttpRequest("github.com").request()) Task(HttpRequest("google.com").request()) Task(HttpRequest("baidu.com").request()) # Listen for events running while running_cnt > 0: # Wait for the event loop to notify whether the event has completed for key, mask in selector.select(): key.data()
This code passes Future
And generator methods try to decouple callback functions , If you ignore HttpRequest
Medium connected
and read
Method, you can find that the whole code is basically the same as the synchronized code , Only by yield
and yield from
Relinquishing control and restoring control through a cycle of events . At the same time, through the above exception examples, it can be found that exception troubleshooting is very convenient , In this way, there are no bad things about callback , Developers only need to develop according to the idea of synchronization , But our event loop is a very simple example of an event loop , At the same time socket The relevant are not encapsulated , Some commonly used are also missing API, And these will be Python
Officially sealed to Asyncio
In this library , Through the library , We can write almost perfectly Async
Syntax code .
NOTE: Because the generator cannot pass
yield from
Syntax usage generator , thereforePython
stay 3.5 Then I usedAwait
The original synergy of .
This is about Python in Async This is the end of the article on the implementation of syntax synergy , More about Python Please search the previous articles of SDN or continue to browse the relevant articles below. I hope you can support SDN in the future !