Install
- Broker: RabbitMQ
sudo apt-get install rabbitmq-server
- Celery: A distributed task queue
pip install celery
Usage
- define a function ```python from celery import Celery
app = Celery(‘tasks’, backend=’amqp’, broker=’amqp://’)
@app.task def add(x, y): return x+y
tasks是当前模块的名称,即当前文件写在`tasks.py`中,`broker`='amqp://localhost'代表所用的messageQueue是rabbitmq,也可以采用redis,为`'redis://localhost'`。backend用来保存task的运行结果。
2. Lanuch a worker
```bash
celery -A tasks worker --loglevel=info
如果是在django项目中,把tasks改为项目名称。-A为–app。
- use task
>>> from tasks import add >>> result = add.delay(4, 4) >>> result.ready() True >>> result.get(timeout=1) 8
通过
result.state
可以查看状态,如果一直处于PENDING状态,可以查看result.backend
是否正确配置。 - Config
- 直接配置
app.conf.update( task_serializer='json', accept_content=['json'], # Ignore other content result_serializer='json', timezone='Europe/Oslo', enable_utc=True, )
- 从模块中读配置文件
app.config_from_object('celeryconfig')
celeryconfig.py
```python broker_url = ‘pyamqp://’ result_backend = ‘rpc://’
- 直接配置
task_serializer = ‘json’ result_serializer = ‘json’ accept_content = [‘json’] timezone = ‘Europe/Oslo’ enable_utc = True
* 配置任务速率(celeryconfig.py), `Error`
```python
# 加入到低优先级队列
task_routes = {
'tasks.add': 'low-priority',
}
# tasks.add的速度限制为1分钟10次
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}
- Others
- app定义在主要的文件中,如
celery.py
中定义app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks'])
proj/tasks.py
from .celery.py import app @app.task def ... ...
- app定义在主要的文件中,如
- 延迟执行+简化参数
>>> s1 = add.signature((2, 2), countdown = 10) >>> res = s1.delay() >>> res.state 'PENDING' >>> res.state 'SUCCESS'
- Timezone
app.conf.timezone = 'Europe/London'
- rabbitmq C实现
$ pip install librabbitmq
- 定时任务
- periodic_task.py ```python from celery import Celery from celery.schedules import crontab
app = Celery()
@app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(10.0, test.s(‘hello’), name=’add every 10’)
@app.task def test(msg): print(msg)
2. 启动定时检测
```bash
celery -A periodic_task beat
- 启动worker
celery -A periodic_task worker
结果:
[2017-11-07 11:17:36,061: WARNING/ForkPoolWorker-2] hello [2017-11-07 11:17:46,058: WARNING/ForkPoolWorker-1] hello [2017-11-07 11:17:56,060: WARNING/ForkPoolWorker-2] hello [2017-11-07 11:18:06,060: WARNING/ForkPoolWorker-1] hello [2017-11-07 11:18:16,060: WARNING/ForkPoolWorker-2] hello [2017-11-07 11:18:26,059: WARNING/ForkPoolWorker-1] hello [2017-11-07 11:18:36,058: WARNING/ForkPoolWorker-2] hello [2017-11-07 11:18:46,059: WARNING/ForkPoolWorker-1] hello [2017-11-07 11:18:56,062: WARNING/ForkPoolWorker-2] hello
- 高级设置
# Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), )
- 参考博客
http://www.cnblogs.com/alex3714/p/6351797.html