Celery 是一款非常簡單、靈活、可靠的分布式系統,可用於處理大量消息,並且提供了一整套操作此系統的一系列工具,同時Celery 是一款消息隊列工具,可用於處理實時數據以及任務調度。
異步任務和定時任務
表示中間人,作用是負責接收生產者發布的任務並將任務存入隊列,然後等待任務的消費者也就是下面的Worker來處理。但是Celery本身不提供隊列服務,但是提供了配置項來來實現,一般通過Redis或RabbitMQ實現隊列服務。
字面意思是工人,實際上是執行任務的消費者,它實時監控消息隊列,如果有任務就獲取任務並執行它。
定時任務調度器,根據配置定時相關參數將指定的任務按照指定的時間發送給Broker(中間人)。
用於存儲任務的執行結果。可以配置redis或者database作為backend
比方說現在站點注冊需要在用戶注冊完成後發送激活郵件給用戶,而後台發送郵件時間需要一定時間,而又不能同步等待郵件發送完成再響應頁面,這樣用戶體驗非常不好,這個時候我們就需要一個異步框架(celery)來幫我們完成這些任務。
本地測試環境:Windows 10+Python 3.6+Celery 4.3.0+redis 2.4.5pip install redis
安裝的是客戶端方便連接的,本地也還要起一個redis服務端)
通過pycharm初始化名為celerydemo的項目
新建tasks.py文件,內容如下:
from celery import Celery
#第一個參數"my_task"是celery實例應用名
app=Celery("my_task",broker="redis://localhost:6379/0",backend="redis://localhost:6379/1")
@app.task
def send_mail():
print("發送郵件中****************************")
return "郵件發送成功"
新建app.py文件,內容如下:
from tasks import send_mail
if __name__ == '__main__':
result=send_mail.delay()
print(result)
通過pycharm打開terminal,如下圖所示:
執行命令celery worker -A tasks -l info -P eventlet
-A表示當前的任務的模塊名,這裡就是task.py的文件名;
-l表示celery的日志等級如info、debug
-P 表示Pool implementation,線程池實現類?
再新開一個terminal窗口執行命令python app.py
,結果如下圖:
上圖紅色箭頭所指的需要關注,可以看到該任務已經執行完畢,並且返回值也輸出來了
再通過pycharm打開python console界面,依次執行如下命令
>>>from tasks import send_mail
>>>send_mail.name
'tasks.send_mail'
>>>send_mail.app
<Celery tasks at 0x22353e56978>
>>>result=send_mail.delay()
result
<AsyncResult: dccd1c2b-8737-4bf7-afd8-594887b52fe8>
>>>result.ready()
True
>>>result.get()
'郵件發送成功'
在根目錄下新建名為celery_app的python package
在celery_app的目錄下新建celeryconfig.py文件,內容如下:
BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_TIMEZONE = 'Asia/Shanghai'
# 導入指定的任務模塊
CELERY_IMPORTS=['celery_app.task']
關於更多的配置參數請參考Celery配置參數以及celery的configuration.
在celery_app的__init__.py初始化celery應用實例,內容如下:
from celery import Celery
app=Celery("demo")
# 可以從配置對象中進行加載配置。
app.config_from_object("celery_app.celeryconfig")
在celery_app目錄下新建task.py文件,內容如下:
from celery_app import app
@app.task
def send_mail():
print("發送郵件中****************************")
return "郵件發送成功"
打開terminal窗口在根目錄下執行如下命令:celery worker -A celery_app -l info -P eventlet
,結果如下圖:
可以看到配置文件內容生效以及celery也識別了創建的任務
再新開一個terminal窗口執行命令python app.py
,結果如下圖:
再打開python console界面操作如下:
>>> from celery_app.task import send_mail
>>>send_mail.name
'celery_app.task.send_mail'
>>> result=send_mail.delay()
>>> result.ready()
True
>>> result.get()
'郵件發送成功'
from celery.schedules import crontab,timedelta
BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_TIMEZONE = 'Asia/Shanghai'
# 導入指定的任務模塊
CELERY_IMPORTS=['celery_app.task']
# 定時任務
CELERYBEAT_SCHEDULE={
'task1':{
'task':'celery_app.task.send_mail',#具體到函數的路徑
'schedule':timedelta(seconds=10),#每10秒鐘執行郵件發送
}
}
celery beat -A celery_app -l info
,結果如下:celery worker -A celery_app -l info -P eventlet
:not enough values to unpack (expected 3, got 0)
win10上更對(https://www.oneisall.top)運行celery4.x就會出現這個問題,需要通過pip install eventlet
,同時在啟動worker添加一個參數,如:celery -A <mymodule> worker -l info -P eventlet
即可
在[tasks]可以看到任務,但是通過調用result.get()
提示任務未注冊
這種情況出現的原因可能是你兩次執行的導入方式不一樣,從而導致自動生成的任務名稱不一樣,簡單的方法就是給任務顯式添加任務名可參考該篇文章Celery-4.1 用戶指南: Task描述
RROR/MainProcess] consumer: Cannot connect to redis
出現連接不上redis,可以嘗試將localhost改成127.0.0.1
無法接收任務,執行不執行,但是celery處於就緒狀態
在視圖調用任務時候是否添加了delay()
方法
報Object of type ‘byte’ is not JSON serializable
可以在django的settings.py 裡面關於celery的配置參數添加如下內容:
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
添加pickle,這樣的話任務函數如果含有對象參數,執行任務時就不會報對象序列化的問題,關於pickle的具體內容可參考pickle模塊詳解。
歡迎大家訪問我的的博客[未雨晴空博客]