ansheng’s blog!

Celery分布式任务队列

Celery是一个基于Python开发的分布式任务队列,关于Celery的更多介绍可以参考官网。

要使用Celery我们需要一个BrokerBackend,这里我们使用Redis

Celery不支持Windows

安装

这里为了方便起见,我们使用Docker来快速启动一个Redis服务。

➜  ~ docker pull redis
➜  ~ docker run -d --name redis -p 6379:6379 redis

我已经通过pyenv创建好了一个虚拟环境,切换过去即可

➜  ~ pyenv activate venv
(venv) ➜  ~ pip install celery

因为我们的BrokerBackend使用的都是Redis,所以还需要安装redis模块

(venv) ➜  ~ pip install redis

使用

应用

创建一个文件tasks.py,用来存放任务列表

from celery import Celery

app = Celery('tasks', broker='redis://localhost',backend='redis://localhost')

@app.task
def add(x, y):
    return x + y

Celery的第一个参数是这个APP的名字,brokerbackend指定了要使用消息代理的URL。

上面我们定义了一个任务叫add,它返回xy的和。

启动

(venv) ➜  ~ celery -A tasks worker --loglevel=info

执行启动命令的时候,一定要和tasks.py在同一级目录下,执行成功的输出应该如下:

(venv) ➜  ~ celery -A tasks worker --loglevel=info

celery@ShengdeMacBook-Pro.local v4.1.0 (latentcall)

Darwin-17.3.0-x86_64-i386-64bit 2018-01-22 14:33:49

[config]
.> app:         tasks:0x1069cd3c8
.> transport:   redis://localhost:6379//
.> results:     redis://localhost/
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2018-01-22 14:33:49,334: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-01-22 14:33:49,350: INFO/MainProcess] mingle: searching for neighbors
[2018-01-22 14:33:50,376: INFO/MainProcess] mingle: all alone
[2018-01-22 14:33:50,394: INFO/MainProcess] celery@ShengdeMacBook-Pro.local ready.

调用任务

新打开一个终端并切换到虚拟环境中,然后进入Python交互模式下

>>> from tasks import add
# 如果执行函数的时候不加delay则表示当做函数来执行
>>> add(1,2)
3
# 异步任务添加成功后返回一个AsyncResult实例
>>> add.delay(1,2)
<AsyncResult: e167ae38-0e53-4592-ad0d-6c685f16168c>
>>> result = add.delay(1,2)
# get获取执行结果,如果任务没有执行完成就执行get则会阻塞住
>>> result.get()
3

为了测试延迟异步效果,我们在tasks.py里面新加一个任务叫sleep

from celery import Celery
import time

app = Celery('tasks', broker='redis://localhost',backend='redis://localhost')

@app.task
def add(x, y):
    return x + y

@app.task
def sleep(t):
    time.sleep(t)
    return str(time.time())

任务添加完成之后我们需要关闭celery worker,然后在启动,当我们再次启动的时候回发现刚添加的任务sleep已经加入到tasks中了。

(venv) ➜  ~ celery -A tasks worker --loglevel=info

celery@ShengdeMacBook-Pro.local v4.1.0 (latentcall)

Darwin-17.3.0-x86_64-i386-64bit 2018-01-22 14:34:27

[config]
.> app:         tasks:0x105647400
.> transport:   redis://localhost:6379//
.> results:     redis://localhost/
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add
  . tasks.sleep

[2018-01-22 14:34:27,548: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-01-22 14:34:27,563: INFO/MainProcess] mingle: searching for neighbors
[2018-01-22 14:34:28,588: INFO/MainProcess] mingle: all alone
[2018-01-22 14:34:28,619: INFO/MainProcess] celery@ShengdeMacBook-Pro.local ready.

再次进入python交互环境调用任务

>>> from tasks import sleep
# 创建任务,sleep 10秒
>>> task = sleep.delay(10)
# 在任务还没有执行完成的之后get结果,设置超时时间为1秒,从结果可以看出,任务并没有执行完成,抛出了一个celery.exceptions.TimeoutError的异常
>>> task.get(timeout=1)
Traceback (most recent call last):
  File "/Users/shengan/.pyenv/versions/venv/lib/python3.6/site-packages/celery/backends/async.py", line 256, in _wait_for_pending
    on_interval=on_interval):
  File "/Users/shengan/.pyenv/versions/venv/lib/python3.6/site-packages/celery/backends/async.py", line 55, in drain_events_until
    raise socket.timeout()
socket.timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/shengan/.pyenv/versions/venv/lib/python3.6/site-packages/celery/result.py", line 194, in get
    on_message=on_message,
  File "/Users/shengan/.pyenv/versions/venv/lib/python3.6/site-packages/celery/backends/async.py", line 189, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/Users/shengan/.pyenv/versions/venv/lib/python3.6/site-packages/celery/backends/async.py", line 260, in _wait_for_pending
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.

如果你要想要知道一个耗时的任务有没有执行完毕,通过上述的测试可以捕获异常,然后做对应的处理,但是celery提供了一个更方便的方法。

>>> result.ready()
False
>>> result.ready()
True
>>> result.get()
'1516603282.2424462'

关于AsyncResult的结果对象可以参考:http://docs.celeryproject.org/en/latest/reference/celery.result.html

项目中使用celery

# 项目目录
(venv) ➜  ~ mkdir celery_tasks
# celery的配置文件
(venv) ➜  ~ touch celery_tasks/celery.py
# 任务1
(venv) ➜  ~ touch celery_tasks/task_1.py
# 任务2
(venv) ➜  ~ touch celery_tasks/task_2.py
from __future__ import absolute_import
from celery import Celery

app = Celery('celery_tasks',
             broker='redis://localhost',
             backend='redis://localhost',
             include=['celery_tasks.task_1', 'celery_tasks.task_2'])

app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()
from __future__ import absolute_import
from .celery import app


@app.task
def add(x, y):
    return x + y
from __future__ import absolute_import
import time
from .celery import app


@app.task
def sleep(t):
    time.sleep(t)
    return str(time.time())
(venv) ➜  ~ celery -A celery_tasks worker -l info

celery@ShengdeMacBook-Pro.local v4.1.0 (latentcall)

Darwin-17.3.0-x86_64-i386-64bit 2018-01-22 14:59:33

[config]
.> app:         celery_tasks:0x109e29ef0
.> transport:   redis://localhost:6379//
.> results:     redis://localhost/
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . celery_tasks.task_1.add
  . celery_tasks.task_2.sleep

[2018-01-22 14:59:34,042: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-01-22 14:59:34,072: INFO/MainProcess] mingle: searching for neighbors
[2018-01-22 14:59:35,109: INFO/MainProcess] mingle: all alone
[2018-01-22 14:59:35,148: INFO/MainProcess] celery@ShengdeMacBook-Pro.local ready.

新开一个窗口测试

(venv)   ~ python
>>> from celery_tasks import task_1,task_2
>>> task_1.add.delay(1,2)
<AsyncResult: 8621cc57-ddf1-4188-a21d-78678611c735>
>>> task_2.sleep.delay(5)
<AsyncResult: a842caa1-3162-460c-b49a-860797f7784b>

后台运行

(venv) ➜  ~ celery multi start celery_demo -A celery_tasks -l info
celery multi v4.1.0 (latentcall)
> Starting nodes...
	> celery_demo@ShengdeMacBook-Pro.local: OK
(venv) ➜  ~ ps -ef | grep celery_demo | grep -v grep
  501 30862     1   0  3:03下午 ??         0:00.72 /Users/shengan/.pyenv/versions/3.6.3/envs/venv/bin/python -m celery worker -A celery_tasks -l info --logfile=celery_demo%I.log --pidfile=celery_demo.pid --hostname=celery_demo@ShengdeMacBook-Pro.local
  501 30900 30862   0  3:03下午 ??         0:00.01 /Users/shengan/.pyenv/versions/3.6.3/envs/venv/bin/python -m celery worker -A celery_tasks -l info --logfile=celery_demo%I.log --pidfile=celery_demo.pid --hostname=celery_demo@ShengdeMacBook-Pro.local
  501 30901 30862   0  3:03下午 ??         0:00.01 /Users/shengan/.pyenv/versions/3.6.3/envs/venv/bin/python -m celery worker -A celery_tasks -l info --logfile=celery_demo%I.log --pidfile=celery_demo.pid --hostname=celery_demo@ShengdeMacBook-Pro.local
  501 30902 30862   0  3:03下午 ??         0:00.01 /Users/shengan/.pyenv/versions/3.6.3/envs/venv/bin/python -m celery worker -A celery_tasks -l info --logfile=celery_demo%I.log --pidfile=celery_demo.pid --hostname=celery_demo@ShengdeMacBook-Pro.local
  501 30903 30862   0  3:03下午 ??         0:00.01 /Users/shengan/.pyenv/versions/3.6.3/envs/venv/bin/python -m celery worker -A celery_tasks -l info --logfile=celery_demo%I.log --pidfile=celery_demo.pid --hostname=celery_demo@ShengdeMacBook-Pro.local

一个守护进程,worker个数等于CPU个数。

(venv) ➜  ~ celery multi restart celery_demo -A celery_tasks -l info
celery multi v4.1.0 (latentcall)
> Stopping nodes...
	> celery_demo@ShengdeMacBook-Pro.local: TERM -> 31132
> Waiting for 1 node -> 31132.....
	> celery_demo@ShengdeMacBook-Pro.local: OK
> Restarting node celery_demo@ShengdeMacBook-Pro.local: OK
> Waiting for 1 node -> None...
(venv) ➜  ~ celery multi stop celery_demo -A celery_tasks -l info
celery multi v4.1.0 (latentcall)
> Stopping nodes...
	> celery_demo@ShengdeMacBook-Pro.local: TERM -> 30862
(venv) ➜  ~ celery multi stopwait celery_demo -A celery_tasks -l info

后记

Celery不仅可以做异步任务,还可以做定时任务,它还与django做了深度结合,可以使用django-celery-result库将任务执行的结果存放到django的models中,django-celery-beat库会将定时任务的规则存入到数据库中,而不用通过配置文件来定义。