簡介
Celery 是使用 python 編寫的分布式任務調度框架。
它有幾個主要的概念:
celery 應用
用戶編寫的代碼腳本,用來定義要執行的任務,然後通過 broker 將任務發送到消息隊列中
broker
代理,通過消息隊列在客戶端和 worker 之間進行協調。
celery 本身並不包含消息隊列,它支持一下消息隊列RabbitMQRdisAmazon SQSZookeeper
更多關於 Broker 見官方文檔
backend
數據庫,用來存儲任務返回的結果。
worker
工人,用來執行 broker 分派的任務。
任務
任務,定義的需要執行的任務
版本要求
Celery5.1 要求:
python(3.6,3.7,3.8)
Celery 是一個資金最少的項目,所以我們不支持 Microsoft Windows。
更多更詳細的版本要求見官方文檔
安裝
使用 pip 安裝:
pip install -U Celery
捆綁包
Celery 還定義了一組包,用於安裝 Celery 和給定的依賴項。
可以在 pip 命令中實現中括號來指定這些依賴項。
pip install "celery[librabbitmq]"
pip install "celery[librabbitmq,redis,auth,msgpack]"
具體支持的依賴包見官方文檔。
簡單使用
1. 選擇一個 broker
使用 celery 首先需要選擇一個消息隊列。安裝任意你熟悉的前面提到的 celery 支持的消息隊列。
2. 編寫一個 celery 應用
首先我們需要編寫一個 celery 應用,它用來創建任務和管理 wokers,它要能夠被其他的模塊導入。
創建一個tasks.py 文件:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
第一個參數tasks是當前模塊的名稱,它可以省略,建議以當前模塊名為名稱。
第二個關鍵字參數 broker='redis://localhost:6379/0’指定我們使用 Redis 作為消息隊列,並指定連接地址。
3.運行 celery 的 worker 服務
cd 到 tasks.py 所在目錄,然後運行下面的命令來啟動 worker 服務
celery -A tasks worker --loglevel=INFO
4. 調用任務
from tasks import add
add.delay(4,4)
通過調用任務的 delay 來執行對應的任務。celery 會把執行命令發送到 broker,broker 再將消息發送給 worker 服務來執行,如果一切正常你將會在 worker 服務的日志中看到接收任務和執行任務的日志。
5. 保存結果
如果你想要跟蹤任務的狀態以及保存任務的返回結果,celery 需要把它發送到某個地方。celery 提供多種結果後端。
我們這裡以 reids 為例,修改 tasks.py中的代碼,添加一個 Redis 後端。
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
更多結果後端見官方文檔。
重新啟動 worker 服務,重新打開 python 解釋器
from tasks import add
result = add.delay(4,4)
ready()方法返回任務是否執行完成:
result.ready()
False
還可以等待結果完成,但很少使用這種方法,因為它將異步調用轉換為同步調用
result.get(timeout=1)
8
在應用中使用 celery
創建項目
項目結構:
proj/__init__.py
/celery.py
/tasks.py
proj/celery.py
from celery import Celery
app = Celery('proj',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['proj.tasks']
)# 配置
app.conf.update(
result_expires=3600, # 結果過期時間
)
在這個模塊中我們創建了一個 Celery 模塊。要在你的項目中使用 celery 只需要導入此實例。
proj/tasks.py
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.tas
kdef xsum(numbers)
return sum(numbers)
啟動 worker
celery -A proj worker -l INFO
調用任務
from proj.tasks import add
add.delay(2, 2)
在 django 中使用celery
要在你的 django 項目中使用 celery,首先需要定義一個 Celery 的實例。
如果你又 django 項目如下:
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py
那麼推薦的方法是創建一個新的proj/proj/celery.py模塊來定義芹菜實例:file:proj/proj/celery.py
import os
from celery import Celery
# 為`celery`設置默認的django設置模塊
os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')
app = Celery('proj')
# 設置配置來源
app.config_from_object('django.conf:settings',namespace='CELERY')
# 加載所有的已注冊django應用中的任務
app.autodiscover_tasks()
@app.task(bind=True) def debug_task(self): print(f'Request: {
self.request!r}')
然後你需要在你的 proj/proj/init.py模塊中導入這個應用程序。這樣就可以保證 Django 啟動時加載應用程序,以便於 @shared_task 裝飾器的使用。
proj/proj/init.py:
from .celery import app as celery_app
__all__ = ('celery_app',)
請注意,此示例項目布局適用於較大的項目,對於簡單的項目,可以使用包含定義應用程序和任務的單個模塊。
接下來我們來解釋一下 celery.py 中的代碼,首先,我們設置celery命令行程序的環境變量DJANGO_SETTINGS_MODULE的默認值:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
這一行的作用是加載當前 django 項目的環境設置,特別是當需要在異步任務中用到 ORM。它必須在創建應用程序實例之前。
app = Celery('proj')
我們還添加了 Django 設置模塊作為 Celery 的配置源。這意味著我們不必使用多個配置文件,而是直接在 Django 的配置文件中配置 Celery。
app.config_from_object('django.conf:settings', namespace='CELERY')
大寫命名空間意味著所有Celery配置項必須以大寫指定,並以 CELERY_ 開頭,因此例如broker_url 設置變為 CELERY_BROKER_URL。
例如,Django 項目的配置文件可能包括:
settings.py
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30*60
接下來,可重用應用程序的常見做法是在單獨的tasks.py模塊中定義所有任務Celery有一種方法可以自動發現這些模塊:
app.autodiscover_tasks()
使用上面的行,Celery 將按照tasks.py 約定自動從所有已安裝的應用程序中發現任務:
- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py
這樣就不必手動將各個模塊添加到CELERY_IMPORTS 設置中。
使用 @shared_task 裝飾器
我們編寫的任務可能會存在於可重用的應用程序中,而可重用的應用程序不能依賴與項目本身,因此無法直接導入 celery 應用實例。
@shared_task裝飾器可以讓我們無需任何具體的 celery 實例創建任務:demoapp/tasks.py
# Create your tasks here
from demoapp.models import Widget
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
@shared_task
def count_widgets():
return Widget.objects.count()
@shared_task
def rename_widget(widget_id, name):
w = Widget.objects.get(id=widget_id)
w.name = name
w.save()
最後感謝每一個認真閱讀我文章的人,下面這個網盤鏈接也是我費了幾天時間整理的非常全面的,希望也能幫助到有需要的你!
這些資料,對於想轉行做【軟件測試】的朋友來說應該是最全面最完整的備戰倉庫,這個倉庫也陪伴我走過了最艱難的路程,希望也能幫助到你!凡事要趁早,特別是技術行業,一定要提升技術功底。希望對大家有所幫助……
如果你不想一個人野蠻生長,找不到系統的資料,問題得不到幫助,堅持幾天便放棄的感受的話,可以點擊下方小卡片加入我們群,大家可以一起討論交流,裡面會有各種軟件測試資料和技術交流。
點擊文末小卡片領取敲字不易,如果此文章對你有幫助的話,點個贊收個藏來個關注,給作者一個鼓勵。也方便你下次能夠快速查找。
零基礎轉行軟件測試:25天從零基礎轉行到入職軟件測試崗,今天學完,明天就業。【包括功能/接口/自動化/python自動化測試/性能/測試開發】
自動化測試進階:2022B站首推超詳細python自動化軟件測試實戰教程,備戰金三銀四跳槽季,進階學完暴漲20K