Preface
1. Basic introduction
2.EventLoop Scheduling implementation of
3. The Internet IO The handling of events
Preface In the article 《Python Asyncio in Coroutines,Tasks,Future Relationship and function of waiting objects 》 This paper introduces the in Python
The waiting object of , especially Task
Objects can be self - driven at startup , But one Task
Object can only drive one execution chain , If you want multiple chains to execute ( Concurrent ), Still need EventLoop
To schedule the drive , The next step will be through Python.Asyncio
Library source code to understand EventLoop
How it works .
Python.Asyncio
It is a large and comprehensive library , It includes many functions , The logic related to the core scheduling includes three kinds of waiting objects , There are other functions , They are located in runners.py
,base_event.py
,event.py
In three files .
runners.py
File has a main class --Runner
, Its main responsibility is to do a good job in initializing the event cycle that enters the collaborative process mode , And cleaning up the in memory processes when exiting the process mode , Generators and other objects .
The co process model is just for easy understanding , For computers , There is no such distinction
event.py
The files are stored in addition to EventLoop
Object interface and get and set EventLoop
Outside the function of , There are two EventLoop
Schedulable objects , Respectively Handler
and TimerHandler
, They can be thought of as EvnetLoop
Calling containers for other objects , Used to connect the relationship between the object to be scheduled and the event loop , But their implementation is very simple , about Handler
, Its source code is as follows :
# Some unwanted code has been removed class Handle: def __init__(self, callback, args, loop, context=None): # Initialization context , Make sure you can find Handle In the context of if context is None: context = contextvars.copy_context() self._context = context self._loop = loop self._callback = callback self._args = args self._cancelled = False def cancel(self): # Set up current Handle To cancel the status if not self._cancelled: self._cancelled = True self._callback = None self._args = None def cancelled(self): return self._cancelled def _run(self): # Used to execute real functions , And through context.run Method to ensure that... Is executed in its own context . try: # Keep executing the corresponding callback in your own context self._context.run(self._callback, *self._args) except (SystemExit, KeyboardInterrupt): raise except BaseException as exc: cb = format_helpers._format_callback_source( self._callback, self._args) msg = f'Exception in callback {cb}' context = { 'message': msg, 'exception': exc, 'handle': self, } self._loop.call_exception_handler(context)
Through the source code can be found ,Handle
The function is very simple , Provides functions that can be canceled and executed in their own context , and TimerHandle
Inherited from Handle
Than Handle
There are some more parameters related to time and sorting , Source code is as follows :
class TimerHandle(Handle): def __init__(self, when, callback, args, loop, context=None): super().__init__(callback, args, loop, context) self._when = when self._scheduled = False def __hash__(self): return hash(self._when) def __lt__(self, other): if isinstance(other, TimerHandle): return self._when < other._when return NotImplemented def __le__(self, other): if isinstance(other, TimerHandle): return self._when < other._when or self.__eq__(other) return NotImplemented def __gt__(self, other): if isinstance(other, TimerHandle): return self._when > other._when return NotImplemented def __ge__(self, other): if isinstance(other, TimerHandle): return self._when > other._when or self.__eq__(other) return NotImplemented def __eq__(self, other): if isinstance(other, TimerHandle): return (self._when == other._when and self._callback == other._callback and self._args == other._args and self._cancelled == other._cancelled) return NotImplemented def cancel(self): if not self._cancelled: # Used to notify the event loop of the current Handle It's exited self._loop._timer_handle_cancelled(self) super().cancel() def when(self): return self._when
It can be found through the code , These two objects are very simple , And we're using Python.Asyncio
These two objects are not directly used by , But through loop.call_xxx
A series of methods to encapsulate the call into Handle
object , And then wait EventLoop
perform . therefore loop.call_xxx
A series of methods can be considered as EventLoop
Registration operation of , Basically all non IO All asynchronous operations of the need to pass loop.call_xxx
Method to register your calls to EventLoop
in , such as Task
Object is initialized by calling loop.call_soon
Method to register to EventLoop
in ,loop.call_sonn
Is very simple to implement ,
Its source code is as follows :
class BaseEventLoop: ... def call_soon(self, callback, *args, context=None): # Check whether the event loop is closed , If so, throw an exception directly self._check_closed() handle = self._call_soon(callback, args, context) return handle def _call_soon(self, callback, args, context): # Encapsulate the call into a handle, This makes it easy to be called by the event loop handle = events.Handle(callback, args, self, context) # Add one handle To _ready, Waiting to be called self._ready.append(handle) return handle
You can see call_soon
The only really relevant code is 10 A few lines , It is responsible for encapsulating a call into a Handle
, To add to self._reday
in , So as to register the call into the event loop .
loop.call_xxx
A series of functions except loop.call_soon
Outside the series of functions , There are two other ways --loop.call_at
and loop.call_later
, They are similar to loop.call_soon
, But there is one more time parameter , To tell EventLoop
When can I call , At the same time through loop.call_at
and loop.call_later
The registered call will pass Python
Heap sorting module of headpq
Sign up to self._scheduled
variable ,
The specific code is as follows :
class BaseEventLoop: ... def call_later(self, delay, callback, *args, context=None): if delay is None: raise TypeError('delay must not be None') timer = self.call_at(self.time() + delay, callback, *args, context=context) return timer def call_at(self, when, callback, *args, context=None): if when is None: raise TypeError("when cannot be None") self._check_closed() # Create a timer handle, Then add to the... Of the event loop _scheduled in , Waiting to be called timer = events.TimerHandle(when, callback, args, self, context) heapq.heappush(self._scheduled, timer) timer._scheduled = True return timer
2.EventLoop Scheduling implementation of In the article 《Python Asyncio in Coroutines,Tasks,Future Relationship and function of waiting objects 》 It has been analyzed in runner
Will pass loop.run_until_complete
To call main
Task To turn on EventLoop
The scheduling , So I'm analyzing EventLoop
When scheduling , We should start with loop.run_until_complete
Starting with ,
The corresponding source code is as follows :
class BaseEventLoop: def run_until_complete(self, future): ... new_task = not futures.isfuture(future) # hold coroutine convert to task, So the event loop can be scheduled , The minimum scheduling unit of the event cycle is task # It should be noted that the event loop is not registered in the global variable at this time , Therefore, the information that needs to be displayed is passed in , # meanwhile Task Object registration , Have gone through loop.call_soon Register yourself in the event loop , Waiting for dispatch future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False # When it's time to task When finished , It means that the current event loop has lost the scheduling object , Unable to continue scheduling , So you need to close the current event loop , The program will return from CO process mode to thread mode future.add_done_callback(_run_until_complete_cb) try: # The event loop starts running self.run_forever() except: if new_task and future.done() and not future.cancelled(): # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() raise finally: future.remove_done_callback(_run_until_complete_cb) if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result() def run_forever(self): # Do some initialization work self._check_closed() self._check_running() self._set_coroutine_origin_tracking(self._debug) self._thread_id = threading.get_ident() old_agen_hooks = sys.get_asyncgen_hooks() # adopt asyncgen Hook to automatically close asyncgen function , This will remind the user that the generator has not been closed sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, finalizer=self._asyncgen_finalizer_hook) try: # Set the currently running event to loop to the global variable , So you can get the current event loop at any stage events._set_running_loop(self) while True: # It's the logic of the task self._run_once() if self._stopping: break finally: # Turn off the cycle , And clean up some resources self._stopping = False self._thread_id = None events._set_running_loop(None) self._set_coroutine_origin_tracking(False) sys.set_asyncgen_hooks(*old_agen_hooks)
The source code is not complicated , Its main logic is to put Corotinue
Turn to one Task
object , And then through Task
Called when the object is initialized loop.call_sonn
Method to register yourself to EventLoop
in , Last pass loop.run_forever
The loop code in keeps running , until _stopping
Marked as True
:
while True: # It's the logic of the task self._run_once() if self._stopping: break
It can be seen that , This code is to ensure that the event loop can always be executed , The automatic cycle ends , The core of real scheduling is _run_once
function ,
Its source code is as follows :
class BaseEventLoop: ... def _run_once(self): # self._scheduled It's a list , It only stores TimerHandle sched_count = len(self._scheduled) ############################### # The first stage , Arrangement self._scheduled # ############################### if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION): # When the number of tasks to be scheduled exceeds 100 And the tasks to be cancelled account for 50% when , To enter this logic # Remove the tasks that need to be cancelled new_scheduled = [] for handle in self._scheduled: if handle._cancelled: # Set up handle Of _cancelled by True, And the handle from _scheduled Remove handle._scheduled = False else: new_scheduled.append(handle) # Rearrange heap heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Need to cancel handle Not much , Will only follow this logic , This place will top the pile handle eject , And marked as non schedulable , But the entire heap is not accessed while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False ################################# # The second stage , Calculate timeout values and wait events IO # ################################# timeout = None # When there are ready to dispatch handle Or when it is shutting down , Don't wait for , It is convenient to dispatch as soon as possible if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. # If there is data in the heap , Through the top of the pile handle Calculate the shortest timeout , But not more than MAXIMUM_SELECT_TIMEOUT, To avoid exceeding system limits when = self._scheduled[0]._when timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) # The event loop waits for events , Until there is an event or timeout event_list = self._selector.select(timeout) ################################################## # The third stage , Put what meets the conditions TimeHandle Put in self._ready in # ################################################## # Get the callback of the obtained event , Then fill it to _ready self._process_events(event_list) # Put some in self._scheduled And meet the dispatching conditions handle Put it in _ready in , such as TimerHandle. # end_time For the current time + A unit of time , Guess is to be able to deal with more events during this period end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) ################################################################################ # The fourth stage , Traverse all ready to schedule handle, And through handle Of context To execute handle Corresponding callback # ################################################################################ ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() # If handle Has been cancelled , Then... Is not called if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: # A callback that takes too long , recorded , These need to be optimized by the developers themselves logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) finally: self._current_handle = None else: handle._run() handle = None # Needed to break cycles when an exception occurs.
Through source code analysis , It is clear that the first step in the scheduling logic is to regularize first self._scheduled
, In the process of regularization, heap sorting is used , Because heap sorting is very efficient in the scheduling scenario , But there are two kinds of regular code , My guess is that direct traversal is more efficient when there are too many cancellations . In order self._scheduled
after , Go to the second step , This step starts waiting for the system event cycle to return the corresponding event , If self._ready
There's data in , Just don't wait , Need to go to the next step immediately , So that we can arrange the dispatch as soon as possible . After getting the events from the system event cycle , Then we went to the third step , This step will pass self._process_events
Method to handle the corresponding event , The callback corresponding to the event is stored in the self._ready
in , Finally, I'll go through self._ready
All in Handle
And do it one by one ( It can be considered as EventLoop
Return control to the corresponding calling logic ), At this point, a complete scheduling logic is over , And enter the next scheduling logic .
notes : Due to the limitation of system event cycle , So the document IO Generally, multithreading is used to execute , Specific view :github.com/python/asyn…
Based on the analysis of EventLoop
The scheduling implementation ignores self._process_events
Concrete implementation logic , because _process_events
The way is asyncio.base_event.py
In the document BaseEventLoop
Class has no concrete implementation , Because of the Internet IO Related events need to be handled by the system's event loop , So the logic related to the system event loop is asyncio.selector_events.py
Medium BaseSelectorEventLoop
Class .BaseSelectorEventLoop
Class encapsulation selector
The module interacts with the system event loop , So that the caller doesn't have to think about sock And sock The generated file descriptors are monitored and logged off , Let's say BaseSelectorEventLoop
The built-in pipe As an example , analysis BaseSelectorEventLoop
How to network IO Event handling .
Before analysis , Let's look at an example , The code is as follows :
import asyncioimport threadingdef task(): print("task")def run_loop_inside_thread(loop): loop.run_forever()loop = asyncio.get_event_loop()threading.Thread(target=run_loop_inside_thread, args=(loop,)).start()loop.call_soon(task)
If you run this example directly , It doesn't output task
( But in the IDE Use DEBUG Thread startup will be slower in mode , So it will be output ), Because in calling loop.run_forever
after EventLoop
Will always be stuck in this logic :
event_list = self._selector.select(timeout)
So call loop.call_soon
It doesn't make EventLoop
Arrange the dispatch immediately , And if call_soon
Switch to call_soon_threadsafe
Can output normally , This is because call_soon_threadsafe
One more self._write_to_self
Call to , Its source code is as follows :
class BaseEventLoop: ... def call_soon_threadsafe(self, callback, *args, context=None): """Like call_soon(), but thread-safe.""" self._check_closed() handle = self._call_soon(callback, args, context) self._write_to_self() return handle
Because this call involves IO dependent , So we need to get to BaseSelectorEventLoop
Class view , Next, let's say pipe Related networks IO Operation to analyze EventLoop
How to deal with IO The event ( Just demonstrate reader object ,writer Object manipulation and reader similar ),
The corresponding source code is as follows :
class BaseSelectorEventLoop(base_events.BaseEventLoop): ####### # establish # ####### def __init__(self, selector=None): super().__init__() if selector is None: # Get the best selector selector = selectors.DefaultSelector() self._selector = selector # establish pipe self._make_self_pipe() self._transports = weakref.WeakValueDictionary() def _make_self_pipe(self): # establish Pipe Corresponding sock self._ssock, self._csock = socket.socketpair() # Set up sock Non blocking self._ssock.setblocking(False) self._csock.setblocking(False) self._internal_fds += 1 # Blocking server sock Read the callback corresponding to the event self._add_reader(self._ssock.fileno(), self._read_from_self) def _add_reader(self, fd, callback, *args): # Check that the event loop is closed self._check_closed() # The encapsulation callback is handle object handle = events.Handle(callback, args, self, None) try: key = self._selector.get_key(fd) except KeyError: # If there is no event loop registered to the system , Then register self._selector.register(fd, selectors.EVENT_READ, (handle, None)) else: # If you have already registered , Update mask, (reader, writer) = key.events, key.data self._selector.modify(fd, mask | selectors.EVENT_READ, (handle, writer)) if reader is not None: reader.cancel() return handle def _read_from_self(self): # Responsible for consumption sock data while True: try: data = self._ssock.recv(4096) if not data: break self._process_self_data(data) except InterruptedError: continue except BlockingIOError: break ####### # Delete # ####### def _close_self_pipe(self): # Cancellation Pipe Corresponding descriptor self._remove_reader(self._ssock.fileno()) # close sock self._ssock.close() self._ssock = None self._csock.close() self._csock = None self._internal_fds -= 1 def _remove_reader(self, fd): # If the event loop has been closed , You don't have to operate if self.is_closed(): return False try: # Query whether the file descriptor is in selector in key = self._selector.get_key(fd) except KeyError: # Returns if it does not exist return False else: # If it exists, it will enter the work of removal mask, (reader, writer) = key.events, key.data # Determine whether there are other events through the event mask mask &= ~selectors.EVENT_READ if not mask: # Remove registered to selector File descriptor for self._selector.unregister(fd) else: # Remove registered to selector File descriptor for , And register new events self._selector.modify(fd, mask, (None, writer)) # If reader Not empty , Then cancel reader if reader is not None: reader.cancel() return True else: return False
You can see from the creation section of the source code ,EventLoop
At startup, a pair of established communication will be created sock, And set to non blocking , Then encapsulate the corresponding callback into a Handle
Object and register it in the system event loop ( To delete, perform the corresponding reverse operation ), After that, the system event loop will always listen for the corresponding event , That is to say EventLoop
The execution logic of will be blocked in the following calls , Wait for the event response :
event_list = self._selector.select(timeout)
At this point, if you execute loop.call_soon_threadsafe
, Then it will pass write_to_self
Write a little information :
def _write_to_self(self): csock = self._csock if csock is None: return try: csock.send(b'\0') except OSError: if self._debug: logger.debug("Fail to write a null byte into the self-pipe socket", exc_info=True)
because csock
Data is written , So this corresponds to ssock
You will receive a read event , The system event loop will return the data after receiving the event notification , then EventLoop
The corresponding data will be obtained , And to process_events
Methods to deal with ,
Its relevant codes are as follows :
class BaseSelectorEventLoop: def _process_events(self, event_list): for key, mask in event_list: # Get the corresponding data from the callback event ,key.data At the time of registration, he was a Yuanzu , So here we need to unpack Yuanzu fileobj, (reader, writer) = key.fileobj, key.data if mask & selectors.EVENT_READ and reader is not None: # obtain reader handle, If it is marked as cancelled , Remove the corresponding file descriptor if reader._cancelled: self._remove_reader(fileobj) else: # If it is not marked as cancelled , Then arrange to self._ready in self._add_callback(reader) if mask & selectors.EVENT_WRITE and writer is not None: # For write objects , The same is true . if writer._cancelled: self._remove_writer(fileobj) else: self._add_callback(writer) def _add_callback(self, handle): # Put the callback handle Add to _ready in assert isinstance(handle, events.Handle), 'A Handle is required here' if handle._cancelled: return assert not isinstance(handle, events.TimerHandle) self._ready.append(handle) def _remove_reader(self, fd): # If the event loop has been closed , You don't have to operate if self.is_closed(): return False try: # Query whether the file descriptor is in selector in key = self._selector.get_key(fd) except KeyError: # Returns if it does not exist return False else: # If it exists, it will enter the work of removal mask, (reader, writer) = key.events, key.data mask &= ~selectors.EVENT_READ if not mask: # Remove registered to selector File descriptor for self._selector.unregister(fd) else: self._selector.modify(fd, mask, (None, writer)) if reader is not None: reader.cancel() return True else: return False
As you can see from the code _process_events
The file descriptor corresponding to the event will be processed , And get the corresponding... From the event callback Handle
Object added to self._ready
in , from EventLoop
In the next iteration self._ready
And implement .
You can see the network IO The handling of events is not complicated , Because the system event loop has done a lot of work for us , But the user owns the network IO Related operations need a similar operation , This is very tedious , fortunately asyncio
The library has been encapsulated for us , We just need to call , A lot of convenience .
This is about Python Asyncio That's all for the detailed article on dispatching principle , More about Python Asyncio Please search the previous articles of software development network or continue to browse the relevant articles below. I hope you will support software development network more in the future !