Preface
1.Asyncio Entrance
2. Two kinds of Coroutine The difference between calling methods
3.Task And Future
3.1.Future
3.2.Task
4. summary
PrefaceLast article 《Python in Async The implementation of syntax coroutine 》 It introduces Python How to implement the collaborative process with a generator and Python Asyncio adopt Future and Task To realize the scheduling of collaborative process , And in the Python Asyncio In Coroutines, Tasks and Future All belong to waiting objects , In use Asyncio In the process of , It often involves the transformation and scheduling of the three , Developers tend to be confused about concepts and functions , This paper mainly expounds the relationship between the three and their functions .
1.Asyncio Entrance A coroutine is a special case of a thread , The entry and switching of the coordination process are scheduled by the event loop , In the new edition of Python
The entrance to the middle process is Asyncio.run
, When the program runs to Asyncio.run
after , It can be simply understood that the program switches from thread mode to co process mode ( It's just easy to understand , For computers , There is no such distinction ),
The following is an example code of the smallest co process :
import asyncioasync def main(): await asyncio.sleep(0)asyncio.run(main())
In this code ,main
Functions and asyncio.sleep
All belong to Coroutine,main
It's through asyncio.run
Calling , Next, the program also enters a co process mode ,asyncio.run
The core call is Runner.run
, The code is as follows :
class Runner: ... def run(self, coro, *, context=None): """Run a coroutine inside the embedded event loop.""" # Omit code ... # hold coroutine To task task = self._loop.create_task(coro, context=context) # Omit code ... try: # If the incoming is Future perhaps coroutine, It will also be designed for task return self._loop.run_until_complete(task) except exceptions.CancelledError: # Omit code ...
Some other functions and initialization codes have been deleted from this code , You can see that the main function of this function is through loop.create_task How to put a Coroutine Object into a Task object , And then through loop.run_until_complete Wait for this Task End of run .
You can see ,Asycnio
It will not be dispatched directly Coroutine, But turn it into Task Then schedule , This is because in the Asyncio
The smallest scheduling object in the event loop is Task. But in the Asyncio
Not all of them Coroutine All calls will be converted to Task The object waits , For example, the asyncio.sleep
, Because it's in main
Direct in function await Of , So it won't be converted , But just wait , The figure shown by calling the tool analysis is as follows :
In this diagram , from main
Function to asyncio.sleep
There is no obvious in the function loop.create_task
Wait Coroutine To Task call , The reason why conversion is not needed here is not that some special optimizations have been made , But this is why , This await asyncio.sleep
The function will actually be main
This Coroutine Converted Task
Continue scheduling until .
In understanding Task
Before the scheduling principle of , Let's go back to the original call example , Look, just use Task Call and use directly Coroutine What is the difference between calls .
The following code , We show the execution of a Coroutine To Task Wait for the operation of , Then the code will be as follows :
import asyncioasync def main(): await asyncio.create_task(asyncio.sleep(0))asyncio.run(main())
Such code looks like the original call example , Do not have what difference , But if you make some changes , For example, add some sleep time and Coroutine Call to , Can see Task The role of the object , Now prepare two documents ,
Their code is as follows :
# demo_coro.pyimport asyncioimport timeasync def main(): await asyncio.sleep(1) await asyncio.sleep(2)s_t = time.time()asyncio.run(main())print(time.time() - s_t)# // Output: 3.0028765201568604# demo_task.pyimport asyncioimport timeasync def main(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2)) await task_1 await task_2s_t = time.time()asyncio.run(main())print(time.time() - s_t)# // Output: 2.0027475357055664
among demo_coro.py
Twice await
call , The total running time of the program is 3 second , and demo_task.py
Is to put two Coroutine Object to Task object , Then do it twice await
call , The total running time of the program is 2 second . You can find ,demo_task.py
The run time of is similar to the longest running Task Object duration , and demo_coro.py
The run time of is approximately two hours Coroutine Total runtime of the object .
The reason why this is the result , Because of the direct await
Coroutine Object time , This program will wait , until Coroutine After the object is executed, continue to go down , and Task The difference between objects is that at the moment of creation , You have registered yourself in the event loop and wait to be scheduled to run , And then return a task Object for developers to wait , because asyncio.sleep
It's pure. IO Type of call , So in this program , Two asyncio.sleep
Coroutine Be converted to Task Thus, concurrent call is realized .
The reason why the above code passed Task Can realize concurrent call , Because Task There are some functions that interact with the event loop , It is these functions that set up Coroutine The possibility of concurrent calls , however Task yes Future A sub object of , So in understanding Task Before , You need to know Future.
3.1.Future And Coroutine The only difference between concession and acceptance is Future In addition to the functions of concession and receiving results , It is also a container with state that can only passively make event calls , It is initialized Pending
state , It can then be cancelled , Set result and set exception . After the corresponding operation is set ,Future Will be transformed into an irreversible corresponding state , And pass loop.call_sonn
To call all callback functions registered on itself , At the same time, it has __iter__
and __await__
Method so that it can be await
and yield from
call , Its main code is as follows :
class Future: ... def set_result(self, result): """ Set result , And schedule the next call """ if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self!r}') self._result = result self._state = _FINISHED self.__schedule_callbacks() def set_exception(self, exception): """ Set the abnormal , And schedule the next call """ if self._state != _PENDING: raise exceptions.InvalidStateError(f'{self._state}: {self!r}') if isinstance(exception, type): exception = exception() if type(exception) is StopIteration: raise TypeError("StopIteration interacts badly with generators " "and cannot be raised into a Future") self._exception = exception self._state = _FINISHED self.__schedule_callbacks() self.__log_traceback = True def __await__(self): """ Set to blocking, And accept await perhaps yield from call """ if not self.done(): self._asyncio_future_blocking = True yield self # This tells Task to wait for completion. if not self.done(): raise RuntimeError("await wasn't used with future") return self.result() # May raise too. __iter__ = __await__ # make compatible with 'yield from'.
Looking at this code alone, it is difficult to understand why the following future Called set_result
Then you can continue to go down :
async def demo(future: asyncio.Future): await future print("aha")
This is because Future Follow Coroutine equally , No ability of active scheduling , Only through Task Be scheduled in conjunction with the event loop .
3.2.TaskTask yes Future Subclasses of , Except for inheritance Future All the ways , It also adds two important methods __step
and __wakeup
, Through these two methods Task Dispatch capability , This is a Coroutine and Future There is no the ,Task The main codes related to scheduling are as follows ( See note... For instructions ):
class Task(futures._PyFuture): # Inherit Python Task implementation # from a Python Future implementation. _log_destroy_pending = True def __init__(self, coro, *, loop=None, name=None, context=None): super().__init__(loop=loop) # Omit some initialization code ... # managed coroutine self._coro = coro if context is None: self._context = contextvars.copy_context() else: self._context = context # adopt loop.call_sonn, stay Task Immediately after initialization, notify the event loop to execute its own... When it is free next time __step function self._loop.call_soon(self.__step, context=self._context) def __step(self, exc=None): coro = self._coro # convenient asyncio introspection _enter_task(self._loop, self) # Call either coro.throw(exc) or coro.send(None). try: if exc is None: # adopt send Pre excitation managed coroutine # At this time, you will only get coroutine yield Return the data or receive a StopIteration It's abnormal # about Future perhaps Task The return is Self result = coro.send(None) else: # Send exception to coroutine result = coro.throw(exc) except StopIteration as exc: # StopIteration representative Coroutine Operation completed if self._must_cancel: # coroutine Cancelled before stopping , You need to perform the cancel operation self._must_cancel = False super().cancel(msg=self._cancel_message) else: # Send the running value to the result value super().set_result(exc.value) # Omit other exception encapsulation ... else: # If no exception is thrown blocking = getattr(result, '_asyncio_future_blocking', None) if blocking is not None: # adopt Future The code can judge , If a _asyncio_future_blocking attribute , It represents the present result yes Future Or is it Task # It means this Task It's wrapped in another one Future perhaps Task # Omit Future Judge ... if blocking: # Represents this Future perhaps Task In a stuck state , # At this time Task Gave up control of the event cycle , Wait for this stuck Future perhaps Task Wake yourself up when the execution is complete result._asyncio_future_blocking = False result.add_done_callback(self.__wakeup, context=self._context) self._fut_waiter = result if self._must_cancel: if self._fut_waiter.cancel(msg=self._cancel_message): self._must_cancel = False else: # Can not be await two new_exc = RuntimeError( f'yield was used instead of yield from ' f'in task {self!r} with {result!r}') self._loop.call_soon( self.__step, new_exc, context=self._context) elif result is None: # Relinquished control of the event loop , Managed on their behalf coroutine There might be a coroutine Running , The next step is to give control to him and the cycle of events # Current coroutine Even if there is no Future perhaps Task, But son Future There may be self._loop.call_soon(self.__step, context=self._context) finally: _leave_task(self._loop, self) self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): # Other Task and Future This function will be called after completion , Next, do some processing try: # Recycling Future The state of , If Future Something is wrong , Then pass the exception back to yourself future.result() except BaseException as exc: # This may also be a cancellation. self.__step(exc) else: # Task You don't need your own trusteeship Future The result is worth , And the following notes , This makes scheduling faster # Don't pass the value of `future.result()` explicitly, # as `Future.__iter__` and `Future.__await__` don't need it. # If we call `_step(value, None)` instead of `_step()`, # Python eval loop would use `.send(value)` method call, # instead of `__next__()`, which is slower for futures # that return non-generator iterators from their `__iter__`. self.__step() self = None # Needed to break cycles when an exception occurs.
This source code is Task Object __setp
The method is longer , the After streamlining, it can be found that he mainly does three jobs :
1. adopt send
perhaps throw
To drive Coroutine Go to the next step
2. By giving it to people who are in their own custody Future perhaps Task Add a callback to get notification of completion and regain control
3. adopt loop.call_soon
To make concessions , Give control to the event loop
Source code analysis alone can be difficult to understand , Here's how Two kinds of Coroutine
As an example , A simple statement Task The process of cyclic scheduling with Events , First of all demo_coro
, There is only one in this example Task:
# demo_coro.pyimport asyncioimport timeasync def main(): await asyncio.sleep(1) await asyncio.sleep(2)s_t = time.time()asyncio.run(main())print(time.time() - s_t)# // Output: 3.0028765201568604
The first step in this example is to main
Turn to one Task, And then called the corresponding. __step
Method , Now __step
Method will call main()
This Coroutine Of send(None)
Method .
Then the logic of the whole program will go directly to main
Function await asyncio.sleep(1)
This Coroutine in ,await asyncio.sleep(1)
Mr. Hui becomes a Future object , And pass loop.call_at
Tell the event loop in 1 Activate this... In seconds Future object , Then return the object to . Then the logic will return to Task Of __step
In the method ,__step
Find out send
Call to get a Future object , So right here Future Add a callback , Give Way Future Activate yourself when you're done , Then give up control of the event cycle . Then the event loop activates this one second later Future object , Then the program logic will execute to Future The callback , That is to say Task Of __wakeup
Method , therefore Task Of __step
Called again to , And this time I met the latter await asyncio.sleep(2)
, So I went through the above process again . When two asyncio.sleep
After execution is complete ,Task Of __step
The method is right Coroutine Send a send(None)
Then I caught StopIteration
abnormal , Now Task Would pass set_result
Set result , And end your scheduling process .
You can see demo_core.py
Only one of them Task In charge of scheduling with the event loop , The beginning of the event cycle must be a Task, And pass Task To tune up a Coroutine, adopt __step
Method put the follow-up Future,Task,Coroutine All operate as a chain , and demo_task.py
It's different , It has two Task, The code is as follows :
# demo_task.pyimport asyncioimport timeasync def main(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2)) await task_1 await task_2s_t = time.time()asyncio.run(main())print(time.time() - s_t)# // Output: 2.0027475357055664
The first step in this example is to follow demo_coro
equally , But jump to main
After the function, there is a difference , First, in this function task1 and task2 Two Task, They will all pass separately __step
Methods send
Activate the corresponding asyncio.sleep
Coroutine, Then wait for the corresponding Future To inform yourself that you have completed . And for creating these two Task Of main Task Come on , adopt main
Functional awati task_1
and await task_2
To get their “ control power “. The first is through await task_1
sentence ,main Task Medium __step
Method is calling send
After that, I got task_1 Corresponding Future, At this time, you can work for this Future Add a callback , Let him inform himself when he finishes , Take the next step by yourself , about task_2 So it is with . Until the last two task All completed ,main Task And caught StopIteration
abnormal , adopt set_result
Set result , And end your scheduling process .
You can see demo_task.py
And demo_coro.py
The obvious difference is main Task Two... Are created in the life cycle of the run Task, And pass await
Hosted two Task, Two at the same time Task It can also realize the concurrency of two collaborative processes , So it can be found that during the operation of the event loop , The concurrency of the current process is always less than that registered in the event loop Task Number . Besides , If in main Task If there is no explicit await
, Then Task Will escape , Not subject to main Task management , as follows :
# demo_task.pyimport asyncioimport timedef mutli_task(): task_1 = asyncio.create_task(asyncio.sleep(1)) task_2 = asyncio.create_task(asyncio.sleep(2))async def main(): mutli_task() await asyncio.sleep(1.5) s_t = time.time()asyncio.run(main())print(time.time() - s_t)# // Output: 1.5027475357055664
In this code ,main Task In carrying out the mutli_task
when , Will create two task, But in __step
Medium coro.send(None)
The result of the call is await asyncio.sleep(1.5)
Back to Future, therefore main Task You can only call this Future Of add_don_callback
To load your own __wakeup
Method , Eventually lead to main Task Can only be hosted to await asyncio.sleep(1.5)
Of Future, and mutli_task
Created task Then he escaped , Become the vertex of another chain Task.
However, the event loop of this program is only managed to main Task
So the event loop will always run , until main Task
Exit only at the end of the run , Then the program will exit together , So the running time of the program is only 1.5 About seconds .
In addition, due to other Task It is also registered to the event loop , So the event loop will help put task_1 completion of enforcement , and task_2 The sleep time defined is 2 second , Before the program exits, the event loop will find a Task Not yet completed , So I'll be on this Task Print an alarm and clean it up .
In depth Task,Future After understanding the source code of , I understand Task and Future stay Asyncio
The role of , It was also found that Task and Future All follow loop There is a certain coupling , and loop You can also create Task and Future, So if you want to really understand Asyncio
The scheduling principle of , One more step is needed , adopt Asyncio
To understand the whole Asyncio
The design of the .
This is about Python Asyncio in Coroutines,Tasks,Future This is the end of the article on the relationship and role of waiting objects , More about Python Asyncio Please search the previous articles of software development network or continue to browse the relevant articles below. I hope you will support software development network more in the future !