Celery It's a very simple one 、 flexible 、 Reliable distributed system , Can be used to handle a large number of messages , It also provides a set of tools for operating the system , meanwhile Celery Is a message queuing tool , It can be used to process real-time data and task scheduling .
Asynchronous tasks and scheduled tasks
Represents an intermediary , It is responsible for receiving the tasks published by the producer and storing the tasks in the queue , Then the consumers waiting for the task are the following Worker To deal with it . however Celery It does not provide queue service itself , But configuration items are provided to implement , Usually by Redis or RabbitMQ Implement queue service .
It literally means workers , It's actually the consumer who performs the task , It monitors message queues in real time , If there is a task, get the task and execute it .
Timed task scheduler , Send the specified task to at the specified time according to the configuration timing related parameters Broker( A middleman ).
Used to store the execution results of tasks . You can configure the redis perhaps database As backend
For example, the site registration needs to send an activation email to the user after the user registration is completed , It takes some time to send mail in the background , And you can't synchronously wait for the mail to be sent before responding to the page , This is a very bad user experience , At this time, we need an asynchronous framework (celery) To help us accomplish these tasks .
Local test environment :Windows 10+Python 3.6+Celery 4.3.0+redis 2.4.5pip install redis
The installation is convenient for the client to connect , There will also be a local redis Server side )
adopt pycharm The initialization name is celerydemo Project
newly build tasks.py file , The contents are as follows :
from celery import Celery
# The first parameter "my_task" yes celery Instance application name
app=Celery("my_task",broker="redis://localhost:6379/0",backend="redis://localhost:6379/1")
@app.task
def send_mail():
print(" Sending mail ****************************")
return " Mail sent successfully "
newly build app.py file , The contents are as follows :
from tasks import send_mail
if __name__ == '__main__':
result=send_mail.delay()
print(result)
adopt pycharm open terminal, As shown in the figure below :
Carry out orders celery worker -A tasks -l info -P eventlet
-A Indicates the module name of the current task , Here is the task.py The name of the file ;
-l Express celery The log level of is as follows info、debug
-P Express Pool implementation, Thread pool implementation class ?
Open a new one terminal Windows execute commands python app.py
, The results are as follows :
The red arrow in the above figure indicates the need for attention , You can see that the task has been executed , And the return value is also output
Re pass pycharm open python console Interface , Execute the following commands in turn
>>>from tasks import send_mail
>>>send_mail.name
'tasks.send_mail'
>>>send_mail.app
<Celery tasks at 0x22353e56978>
>>>result=send_mail.delay()
result
<AsyncResult: dccd1c2b-8737-4bf7-afd8-594887b52fe8>
>>>result.ready()
True
>>>result.get()
' Mail sent successfully '
Create a new file named... Under the root directory celery_app Of python package
stay celery_app Create a new celeryconfig.py file , The contents are as follows :
BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_TIMEZONE = 'Asia/Shanghai'
# Import the specified task module
CELERY_IMPORTS=['celery_app.task']
For more configuration parameters, please refer to Celery Configuration parameters as well as celery Of configuration.
stay celery_app Of __init__.py initialization celery Application example , The contents are as follows :
from celery import Celery
app=Celery("demo")
# You can load configuration from the configuration object .
app.config_from_object("celery_app.celeryconfig")
stay celery_app New under the directory task.py file , The contents are as follows :
from celery_app import app
@app.task
def send_mail():
print(" Sending mail ****************************")
return " Mail sent successfully "
open terminal The window executes the following command under the root directory :celery worker -A celery_app -l info -P eventlet
, The results are as follows :
You can see that the contents of the configuration file are effective and celery It also identifies the tasks created
Open a new one terminal Windows execute commands python app.py
, The results are as follows :
And on again python console The interface operation is as follows :
>>> from celery_app.task import send_mail
>>>send_mail.name
'celery_app.task.send_mail'
>>> result=send_mail.delay()
>>> result.ready()
True
>>> result.get()
' Mail sent successfully '
from celery.schedules import crontab,timedelta
BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_TIMEZONE = 'Asia/Shanghai'
# Import the specified task module
CELERY_IMPORTS=['celery_app.task']
# Timing task
CELERYBEAT_SCHEDULE={
'task1':{
'task':'celery_app.task.send_mail',# The path to the function
'schedule':timedelta(seconds=10),# Every time 10 Seconds to send mail
}
}
celery beat -A celery_app -l info
, give the result as follows :celery worker -A celery_app -l info -P eventlet
:not enough values to unpack (expected 3, got 0)
win10 Better (https://www.oneisall.top) function celery4.x That's the problem , Need to pass through pip install eventlet
, At the same time starting worker Add a parameter , Such as :celery -A <mymodule> worker -l info -P eventlet
that will do
stay [tasks] You can see the task , But by calling result.get()
Prompt that the task is not registered
The reason for this may be that the import methods you perform twice are different , As a result, the names of the automatically generated tasks are different , The simple way is to give the task Explicitly add task name Please refer to this article Celery-4.1 User guide : Task describe
RROR/MainProcess] consumer: Cannot connect to redis
The connection failed redis, You can try to localhost Change to 127.0.0.1
Unable to receive task , Execute not execute , however celery In a ready state
Whether the view is added when invoking the task delay()
Method
newspaper Object of type ‘byte’ is not JSON serializable
Can be in django Of settings.py On the inside celery Add the following content to the configuration parameters of :
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
add to pickle, In this case, if the task function contains object parameters , The problem of object serialization will not be reported during task execution , About pickle Please refer to pickle Module details .
Welcome to my blog [ No rain and clear sky blog ]