import uuid
class Future:
def __init__(self, loop):
self._result = None
self._done = False
self._callbacks = []
self._loop = loop
# to _result Attribute assignment ,_result The value of ends the data returned by the time-consuming operation
def set_result(self, data):
if self._done:
raise RuntimeError("Future Object cannot set value repeatedly ")
self._done = True
self._result = data
if isinstance(data, Future):
self._result = data._result
for callback in self._callbacks:
self._loop.add_ready_task(callback)
# obtain future Object result value
def result(self):
if self._done:
return self._result
raise RuntimeError("Future object The value result is not ready ")
# await wait for
def __await__(self):
# yield Its role in asynchronous collaboration is : When executing to calling the system to initiate io After the operation , Pause function execution ,
# Will the current future Object returns , And give up the right of execution
yield self
return self._result
# Add callback event , When set_result When the method is called , Put the callback object of the coroutine into the event loop for execution
def add_done_callback(self, callback, *args):
self._callbacks.append(callback)
class Task(Future):
def __init__(self, core, loop):
super(Task, self).__init__(loop)
# core It's a collaborative task
self.core = core
self._task_id = uuid.uuid4()
self._future = None
# run Method is equivalent to starter , Start the co process task function ,io Time consuming operations must be consistent with future Object to associate , When executed await future When the object
# await Trigger future Object __await__ Method ,yield Pause function execution , And return to the current future object ,
# t = self.core.send(Node) end of execution , here future Is to perform io Operation of the Future object
def run(self):
try:
print(f"{
self._task_id} Mission Start execution ")
future = self.core.send(None)
except StopIteration:
self.set_result(self._future)
print(f"{
self._task_id} Cooperation task end of execution ")
print("-" * 50)
# When self.core for the first time send There will be no error when , And will be implemented io In operation future Object returns ,
# future Execution in object io The place of operation is exchanged with the system , When io After the operation is completed, it will call future Object set_result Method ,
# set_result Method take io Result linked to future Properties of the , And put the callback function back into the event loop for execution
else:
print(f"{
self._task_id} Mission Execute to io The long-running , Give up the executive power , Set up io Callback notice ")
print("-" * 50)
future.add_done_callback(self)
self._future = future
import collections
import heapq
import time
from random import random, randint
from threading import Thread
from async_future_task import Future, Task
class EventLoop:
loop = None
# Single case , There can only be one event loop
def __new__(cls, *args, **kwargs):
if not cls.loop:
cls.loop = super().__new__(cls)
return cls.loop
def __init__(self):
# The task queue is ready to run
self._ready_que = collections.deque()
# Deferred task list
self._scheduled = []
self.stop = False
# Create a collaboration task object , And added to the executable queue
def create_task(self, core, *args):
task = Task(core, self)
self._ready_que.append(task)
return task
# Add tasks to the delayed task queue
def add_delay_task(self, delay, callback, *args):
t = time.time() + delay
heapq.heappush(self._scheduled, (t, callback, args))
# Add executable tasks to the task queue , This function is mainly for future Object to add callback tasks
def add_ready_task(self, task, *args):
self._ready_que.append(task)
def run_forever(self):
while True:
self.exec_task()
if self.stop and len(self._scheduled) == 0 and len(self._ready_que) == 0:
break
def stop_exit(self):
self.stop = True
# Perform tasks
def exec_task(self):
t = time.time()
len_scheduled = len(self._scheduled)
for i in range(len_scheduled):
task = heapq.heappop(self._scheduled)
if task[0] <= t:
self._ready_que.append((task[1], task[2]))
else:
heapq.heappush(self._scheduled, task)
break
len_ready = len(self._ready_que)
for i in range(len_ready):
task = self._ready_que.popleft()
# If it is task yes Task Object, then execute run Method
if isinstance(task, Task):
task.run()
# If not Task Object words Just put task Execute as a function
else:
task[0](*task[1])
# This is the user layer , Users only need await The asynchronous method of the framework is ok ,
# It doesn't need how the bottom of the relationship framework is implemented
async def get_baidu():
# Calling fake_io Wait for future object , It's going to trigger future Object __await__ Method , Again because __await__
# Methods include yield , It pauses the execution of the function , return future Self object
data = await aiohttp_request_url()
print(" Asynchronous task end , io The value obtained by the operation is : ", data)
return data
# aiohttp_request_url The simulation is asynchronous http request ,
# This method simulates the framework encapsulated 、 Execute the call system io Steps for
async def aiohttp_request_url():
# establish future Wait for the person
future = Future(loop)
# perform io The long-running , Don't wait at this time , Just call , Don't wait for , Trusteeship time-consuming operations to the system ,
# The system has finished executing io The long-running , Automatic callback future set_result Method , fake_io The simulation calls the system to initiate io operation , The system automatically calls back the result
fake_io(future)
data = await future
# Can be in await Get data Or do some data processing
return data
def fake_io(future):
def sleep():
global task_run_time
# Random sleep 0-1 second
task_time = random()
task_run_time += task_time
time.sleep(task_time)
# io Time consuming operation execution completed , Simulate system callback set_result Method , to future Object to set random values
data = randint(1, 10)
future.set_result(data)
Thread(target=sleep).start()
loop = EventLoop()
start_time = time.time()
task_run_time = 0
for _ in range(1000):
loop.create_task(get_baidu())
loop.add_delay_task(2, loop.stop_exit)
loop.run_forever()
print(f" Execution time of all tasks :{
task_run_time}, Actual execution time {
time.time() - start_time}")
Reference resources b Station boss DavyCloud asyncio Series of tutorials