程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

python異步任務處理框架——celery

編輯:Python

關於Celery

Celery 是一款非常簡單、靈活、可靠的分布式系統,可用於處理大量消息,並且提供了一整套操作此系統的一系列工具,同時Celery 是一款消息隊列工具,可用於處理實時數據以及任務調度。

Celery關鍵詞概念

  • Task

    異步任務和定時任務

  • Broker

    表示中間人,作用是負責接收生產者發布的任務並將任務存入隊列,然後等待任務的消費者也就是下面的Worker來處理。但是Celery本身不提供隊列服務,但是提供了配置項來來實現,一般通過Redis或RabbitMQ實現隊列服務。

  • Worker

    字面意思是工人,實際上是執行任務的消費者,它實時監控消息隊列,如果有任務就獲取任務並執行它。

  • Beat

    定時任務調度器,根據配置定時相關參數將指定的任務按照指定的時間發送給Broker(中間人)。

  • Backend

    用於存儲任務的執行結果。可以配置redis或者database作為backend

celery的使用方式

比方說現在站點注冊需要在用戶注冊完成後發送激活郵件給用戶,而後台發送郵件時間需要一定時間,而又不能同步等待郵件發送完成再響應頁面,這樣用戶體驗非常不好,這個時候我們就需要一個異步框架(celery)來幫我們完成這些任務。

基於命令行下使用(非django環境下)

  • 本地測試環境:Windows 10+Python 3.6+Celery 4.3.0+redis 2.4.5
    pip install redis 安裝的是客戶端方便連接的,本地也還要起一個redis服務端

  • 通過pycharm初始化名為celerydemo的項目

  • 新建tasks.py文件,內容如下:

    from celery import Celery
    #第一個參數"my_task"是celery實例應用名
    app=Celery("my_task",broker="redis://localhost:6379/0",backend="redis://localhost:6379/1")
    @app.task
    def send_mail():
    print("發送郵件中****************************")
    return "郵件發送成功"
    
  • 新建app.py文件,內容如下:

    from tasks import send_mail
    if __name__ == '__main__':
    result=send_mail.delay()
    print(result)
    
  • 通過pycharm打開terminal,如下圖所示:

  • 執行命令celery worker -A tasks -l info -P eventlet

    -A表示當前的任務的模塊名,這裡就是task.py的文件名;
    -l表示celery的日志等級如info、debug
    -P 表示Pool implementation,線程池實現類?

  • 再新開一個terminal窗口執行命令python app.py,結果如下圖:
    上圖紅色箭頭所指的需要關注,可以看到該任務已經執行完畢,並且返回值也輸出來了

  • 再通過pycharm打開python console界面,依次執行如下命令

    >>>from tasks import send_mail
    >>>send_mail.name
    'tasks.send_mail'
    >>>send_mail.app
    <Celery tasks at 0x22353e56978>
    >>>result=send_mail.delay()
    result
    <AsyncResult: dccd1c2b-8737-4bf7-afd8-594887b52fe8>
    >>>result.ready()
    True
    >>>result.get()
    '郵件發送成功'
    

基於配置方式使用

  • 在根目錄下新建名為celery_app的python package

  • 在celery_app的目錄下新建celeryconfig.py文件,內容如下:

    BROKER_URL = 'redis://127.0.0.1:6379/0'
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
    CELERY_TIMEZONE = 'Asia/Shanghai'
    # 導入指定的任務模塊
    CELERY_IMPORTS=['celery_app.task']
    

    關於更多的配置參數請參考Celery配置參數以及celery的configuration.

  • 在celery_app的__init__.py初始化celery應用實例,內容如下:

    from celery import Celery
    app=Celery("demo")
    # 可以從配置對象中進行加載配置。
    app.config_from_object("celery_app.celeryconfig")
    
  • 在celery_app目錄下新建task.py文件,內容如下:

    from celery_app import app
    @app.task
    def send_mail():
    print("發送郵件中****************************")
    return "郵件發送成功"
    
  • 打開terminal窗口在根目錄下執行如下命令:
    celery worker -A celery_app -l info -P eventlet,結果如下圖:
    可以看到配置文件內容生效以及celery也識別了創建的任務

  • 再新開一個terminal窗口執行命令python app.py,結果如下圖:

  • 再打開python console界面操作如下:

    >>> from celery_app.task import send_mail
    >>>send_mail.name
    'celery_app.task.send_mail'
    >>> result=send_mail.delay()
    >>> result.ready()
    True
    >>> result.get()
    '郵件發送成功'
    

定時任務使用

  • 將celeryconfig.py裡面的內容修改成如下:
    from celery.schedules import crontab,timedelta
    BROKER_URL = 'redis://127.0.0.1:6379/0'
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
    CELERY_TIMEZONE = 'Asia/Shanghai'
    # 導入指定的任務模塊
    CELERY_IMPORTS=['celery_app.task']
    # 定時任務
    CELERYBEAT_SCHEDULE={
    
    'task1':{
    
    'task':'celery_app.task.send_mail',#具體到函數的路徑
    'schedule':timedelta(seconds=10),#每10秒鐘執行郵件發送
    }
    }
    
  • 打開terminal執行命令celery beat -A celery_app -l info,結果如下:
    *再開一個terminal界面,執行以下命令:
    celery worker -A celery_app -l info -P eventlet
    因為設置的秒級別的,所以一旦執行beat命令則會立刻發送定時任務之後再等待0秒重復執行

參考資料

  • celery-3.1.7中文文檔(該文檔詳細說明了命令行參數的用法)
  • celery官方文檔
  • celery-4.3.0中文文檔

遇到的問題以及解決辦法

  • not enough values to unpack (expected 3, got 0)
    win10上更對(https://www.oneisall.top)運行celery4.x就會出現這個問題,需要通過pip install eventlet,同時在啟動worker添加一個參數,如:celery -A <mymodule> worker -l info -P eventlet即可

  • 在[tasks]可以看到任務,但是通過調用result.get()提示任務未注冊
    這種情況出現的原因可能是你兩次執行的導入方式不一樣,從而導致自動生成的任務名稱不一樣,簡單的方法就是給任務顯式添加任務名可參考該篇文章Celery-4.1 用戶指南: Task描述

  • RROR/MainProcess] consumer: Cannot connect to redis
    出現連接不上redis,可以嘗試將localhost改成127.0.0.1

  • 無法接收任務,執行不執行,但是celery處於就緒狀態
    在視圖調用任務時候是否添加了delay()方法

  • 報Object of type ‘byte’ is not JSON serializable
    可以在django的settings.py 裡面關於celery的配置參數添加如下內容:

    CELERY_TASK_SERIALIZER = 'pickle'
    CELERY_RESULT_SERIALIZER = 'pickle'
    CELERY_ACCEPT_CONTENT = ['pickle', 'json']
    

    添加pickle,這樣的話任務函數如果含有對象參數,執行任務時就不會報對象序列化的問題,關於pickle的具體內容可參考pickle模塊詳解。
    歡迎大家訪問我的的博客[未雨晴空博客]


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved