Celery
1. 简介
Celery是使用python编写的分布式任务调度框架。
它有几个主要的概念:
- celery应用
- 用户编写的代码脚本,用来定义要执行的任务,然后通过broker将任务发送到消息队列中
- broker
- 代理,通过消息队列在客户端和worker之间进行协调。
- celery本身并不包含消息队列,它支持一下消息队列
- RabbitMQ
- Rdis
- Amazon SQS
- Zookeeper
- 更多关于Broker见官方文档
- backend
- 数据库,用来存储任务返回的结果。
- worker
- 工人,用来执行broker分派的任务。
- 任务
- 任务,定义的需要执行的任务
版本要求
Celery5.1 要求:
- python(3.6,3.7,3.8)
Celery 是一个资金最少的项目,所以我们不支持 Microsoft Windows。
更多更详细的版本要求见官方文档
安装
使用pip安装:
pip install -U Celery
捆绑包
Celery还定义了一组包,用于安装Celery和给定的依赖项。
可以在pip命令中实现中括号来指定这些依赖项。
pip install "celery[librabbitmq]"
pip install "celery[librabbitmq,redis,auth,msgpack]"
具体支持的依赖包见官方文档
2. 简单使用
1. 选择一个broker
使用celery首先需要选择一个消息队列。安装任意你熟悉的前面提到的celery支持的消息队列。
2. 编写一个celery应用
首先我们需要编写一个celery应用,它用来创建任务和管理wokers,它要能够被其他的模块导入。
创建一个tasks.py
文件:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
第一个参数tasks
是当前模块的名称,它可以省略,建议以当前模块名为名称。
第二个关键字参数broker='redis://localhost:6379/0'
指定我们使用redis作为消息队列,并指定连接地址。
3.运行celery的worker服务
cd到tasks.py所在目录,然后运行下面的命令来启动worker服务
celery -A tasks worker --loglevel=INFO
4. 调用任务
>>> from tasks import add
>>> add.delay(4,4)
通过调用任务的delay
来执行对应的任务。celery会把执行命令发送到broker,broker再将消息发送给worker服务来执行,如果一切正常你将会在worker服务的日志中看到接收任务和执行任务的日志。
5. 保存结果
如果你想要跟踪任务的状态以及保存任务的返回结果,celery需要把它发送到某个地方。celery提供多种结果后端。
我们这里以reids为例,修改tasks.py
中的代码,添加一个redis后端。
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
更多结果后端见官方文档。
重新启动worker服务,重新打开python解释器
>>> from tasks import add
>>> result = add.delay(4,4)
ready()
方法返回任务是否执行完成:
>>> result.ready()
False
还可以等待结果完成,但很少使用这种方法,因为它将异步调用转换为同步调用
>>> result.get(timeout=1)
8
3. 在应用中使用celery
创建项目
项目结构:
proj/__init__.py
/celery.py
/tasks.py
proj/celery.py
from celery import Celery
app = Celery('proj',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['proj.tasks'])
# 配置
app.conf.update(
result_expires=3600, # 结果过期时间
)
在这个模块中我们创建了一个Celery
模块。要在你的项目中使用celery只需要导入此实例。
proj/tasks.py
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
启动worker
celery -A proj worker -l INFO
调用任务
>>> from proj.tasks import add
>>> add.delay(2, 2)
4. 在django中使用celery
要在你的django项目中使用celery,首先需要定义一个Celery的实例。
如果你又django项目如下:
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py
那么推荐的方法是创建一个新的proj/proj/celery.py
模块来定义芹菜实例:
file:proj/proj/celery.py
import os
from celery import Celery
# 为`celery`设置默认的django设置模块
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
# 设置配置来源
app.config_from_object('django.conf:settings', namespace='CELERY')
# 加载所有的已注册django应用中的任务
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
然后你需要在你的proj/proj/__init__.py
模块中导入这个应用程序。这样就可以保证Django
启动时加载应用程序,以便于@shared_task
装饰器的使用。
proj/proj/__init__.py
:
from .celery import app as celery_app
__all__ = ('celery_app',)
请注意,此示例项目布局适用于较大的项目,对于简单的项目,可以使用包含定义应用程序和任务的单个模块。
接下来我们来解释一下celery.py
中的代码,首先,我们设置celery
命令行程序的环境变量DJANGO_SETTINGS_MODULE
的默认值:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
这一行的作用是加载当前django项目的环境设置,特别是当需要在异步任务中用到orm。它必须在创建应用程序实例之前。
app = Celery('proj')
我们还添加了Django设置模块作为Celery的配置源。这意味着我们不必使用多个配置文件,而是直接在Django的配置文件中配置Celery。
app.config_from_object('django.conf:settings', namespace='CELERY')
大写命名空间意味着所有Celery配置项
必须以大写指定,并以CELERY_
开头,因此例如broker_url
设置变为CELERY_BROKER_URL
。
例如,Django项目的配置文件可能包括:
settings.py
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30*60
接下来,可重用应用程序的常见做法是在单独的tasks.py
模块中定义所有任务,Celery
有一种方法可以自动发现这些模块:
app.autodiscover_tasks()
使用上面的行,Celery 将按照tasks.py
约定自动从所有已安装的应用程序中发现任务:
- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py
这样就不必手动将各个模块添加到CELERY_IMPORTS
设置中。
使用@shared_task
装饰器
我们编写的任务可能会存在于可重用的应用程序中,而可重用的应用程序不能依赖与项目本身,因此无法直接导入celery应用实例。
@shared_task
装饰器可以让我们无需任何具体的celery实例创建任务:
demoapp/tasks.py
# Create your tasks here
from demoapp.models import Widget
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def xsum(numbers):
return sum(numbers)
@shared_task
def count_widgets():
return Widget.objects.count()
@shared_task
def rename_widget(widget_id, name):
w = Widget.objects.get(id=widget_id)
w.name = name
w.save()
欢迎来到testingpai.com!
注册 关于