import uuid
class Future:
def __init__(self, loop):
self._result = None
self._done = False
self._callbacks = []
self._loop = loop
# 給_result 屬性賦值,_result 的值結束耗時操作返回的數據
def set_result(self, data):
if self._done:
raise RuntimeError("Future 對象不能重復設置值")
self._done = True
self._result = data
if isinstance(data, Future):
self._result = data._result
for callback in self._callbacks:
self._loop.add_ready_task(callback)
# 獲取 future對象中的result 值
def result(self):
if self._done:
return self._result
raise RuntimeError("Future對象 值結果還沒就緒")
# await 等待
def __await__(self):
# yield 在異步協程中的作用就是:當執行到調用系統發起io操作後,暫停函數的執行,
# 將當前 future 對象返回,並讓出執行權
yield self
return self._result
# 添加回調事件,當 set_result方法被調用的時候,將協程的回調對象放到事件循環中進行執行
def add_done_callback(self, callback, *args):
self._callbacks.append(callback)
class Task(Future):
def __init__(self, core, loop):
super(Task, self).__init__(loop)
# core 就是一個協程任務
self.core = core
self._task_id = uuid.uuid4()
self._future = None
# run方法相當於啟動器,啟動協程任務函數,io耗時操作都必須與future對象進行關聯,當執行到 await future對象的時候
# await 觸發future 對象中的 __await__ 方法,yield 暫停函數執行,並返回當前future對象,
# t = self.core.send(Node)執行結束, 此時 future 是執行io操作的Future 對象
def run(self):
try:
print(f"{
self._task_id} 任務 開始執行")
future = self.core.send(None)
except StopIteration:
self.set_result(self._future)
print(f"{
self._task_id} 協程任務 執行結束")
print("-" * 50)
# 當 self.core 第一次send的時候不會出現報錯,並將執行io操作中的future對象返回回來,
# future 對象中執行io操作的地方與系統進行交換,當io操作執行完成後會調用future 對象中的 set_result 方法,
# set_result 方法 將io結果掛到future 屬性中,並將回調函數重新放到事件循環中進行執行
else:
print(f"{
self._task_id} 任務 執行到io耗時操作,將執行權讓出去,設置io回調通知")
print("-" * 50)
future.add_done_callback(self)
self._future = future
import collections
import heapq
import time
from random import random, randint
from threading import Thread
from async_future_task import Future, Task
class EventLoop:
loop = None
# 單例,事件循環只能有一個
def __new__(cls, *args, **kwargs):
if not cls.loop:
cls.loop = super().__new__(cls)
return cls.loop
def __init__(self):
# 已經准備好可以運行的任務隊列
self._ready_que = collections.deque()
# 延時任務列表
self._scheduled = []
self.stop = False
# 創建協程任務對象,並添加到可執行隊列中
def create_task(self, core, *args):
task = Task(core, self)
self._ready_que.append(task)
return task
# 添加任務到延時任務隊列中
def add_delay_task(self, delay, callback, *args):
t = time.time() + delay
heapq.heappush(self._scheduled, (t, callback, args))
# 添加可執行的任務到任務隊列中, 這個函數主要是給future對象進行添加回調任務
def add_ready_task(self, task, *args):
self._ready_que.append(task)
def run_forever(self):
while True:
self.exec_task()
if self.stop and len(self._scheduled) == 0 and len(self._ready_que) == 0:
break
def stop_exit(self):
self.stop = True
# 執行任務
def exec_task(self):
t = time.time()
len_scheduled = len(self._scheduled)
for i in range(len_scheduled):
task = heapq.heappop(self._scheduled)
if task[0] <= t:
self._ready_que.append((task[1], task[2]))
else:
heapq.heappush(self._scheduled, task)
break
len_ready = len(self._ready_que)
for i in range(len_ready):
task = self._ready_que.popleft()
# 如果是task 是 Task 對象的話就執行 run方法
if isinstance(task, Task):
task.run()
# 如果不是Task對象的話 就把task當做函數來執行
else:
task[0](*task[1])
# 這是用戶層, 用戶只需要 await 框架的異步方法就可以了,
# 不需要關系框架底部是如何實現的
async def get_baidu():
# 在調用fake_io 後等待future 對象,此時會觸發 future 對象中的 __await__ 方法,又因為 __await__
# 方法中有 yield , 它會暫停函數的執行,返回future本身對象
data = await aiohttp_request_url()
print("異步任務結束, io操作獲取到的值是: ", data)
return data
# aiohttp_request_url 模擬的是異步 http請求,
# 該方法模擬的是框架封裝好的、執行調用系統io的步驟
async def aiohttp_request_url():
# 創建future 等待對象
future = Future(loop)
# 執行io耗時操作,此時並不等待,只調用,不等待,將耗時操作托管給系統,
# 系統執行完io耗時操作,自動回調future set_result 方法, fake_io 模擬調用系統發起io操作,系統自動回調結果
fake_io(future)
data = await future
# 可以在await 獲取到data 或進行一些數據的處理
return data
def fake_io(future):
def sleep():
global task_run_time
# 隨機休眠 0-1秒
task_time = random()
task_run_time += task_time
time.sleep(task_time)
# io耗時操作執行完成,模擬系統回調 set_result 方法,給future對象設置隨機值
data = randint(1, 10)
future.set_result(data)
Thread(target=sleep).start()
loop = EventLoop()
start_time = time.time()
task_run_time = 0
for _ in range(1000):
loop.create_task(get_baidu())
loop.add_delay_task(2, loop.stop_exit)
loop.run_forever()
print(f"所有任務執行時間:{
task_run_time}, 實際執行時間{
time.time() - start_time}")
參考b站大佬 DavyCloud asyncio系列教程