celery————分布式任务队列
 2018-02-20

Install

Usage

  1. define a function ```python from celery import Celery

app = Celery(‘tasks’, backend=’amqp’, broker=’amqp://’)

@app.task def add(x, y): return x+y

tasks是当前模块的名称,即当前文件写在`tasks.py`中,`broker`='amqp://localhost'代表所用的messageQueue是rabbitmq,也可以采用redis,为`'redis://localhost'`。backend用来保存task的运行结果。

2. Lanuch a worker
```bash
celery -A tasks worker --loglevel=info

如果是在django项目中,把tasks改为项目名称。-A为–app。

  1. use task
    >>> from tasks import add
    >>> result = add.delay(4, 4)
    >>> result.ready()
    True
    >>> result.get(timeout=1)
    8
    

    通过result.state可以查看状态,如果一直处于PENDING状态,可以查看result.backend是否正确配置。

  2. Config
    • 直接配置
      app.conf.update(
       task_serializer='json',
       accept_content=['json'],  # Ignore other content
       result_serializer='json',
       timezone='Europe/Oslo',
       enable_utc=True,
      )
      
    • 从模块中读配置文件
      app.config_from_object('celeryconfig')
      

      celeryconfig.py ```python broker_url = ‘pyamqp://’ result_backend = ‘rpc://’

task_serializer = ‘json’ result_serializer = ‘json’ accept_content = [‘json’] timezone = ‘Europe/Oslo’ enable_utc = True


* 配置任务速率(celeryconfig.py), `Error`
```python
# 加入到低优先级队列
task_routes = {
    'tasks.add': 'low-priority',
}
# tasks.add的速度限制为1分钟10次
task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}
}
  1. Others
    • app定义在主要的文件中,如celery.py中定义
      app = Celery('proj',
              broker='amqp://',
              backend='amqp://',
              include=['proj.tasks'])
      

      proj/tasks.py

      from .celery.py import app
      @app.task
      def ...
       ...
      

app = Celery()

@app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(10.0, test.s(‘hello’), name=’add every 10’)

@app.task def test(msg): print(msg)


2. 启动定时检测
```bash
celery -A periodic_task beat
  1. 启动worker
    celery -A periodic_task worker
    

    结果:

    [2017-11-07 11:17:36,061: WARNING/ForkPoolWorker-2] hello
    [2017-11-07 11:17:46,058: WARNING/ForkPoolWorker-1] hello
    [2017-11-07 11:17:56,060: WARNING/ForkPoolWorker-2] hello
    [2017-11-07 11:18:06,060: WARNING/ForkPoolWorker-1] hello
    [2017-11-07 11:18:16,060: WARNING/ForkPoolWorker-2] hello
    [2017-11-07 11:18:26,059: WARNING/ForkPoolWorker-1] hello
    [2017-11-07 11:18:36,058: WARNING/ForkPoolWorker-2] hello
    [2017-11-07 11:18:46,059: WARNING/ForkPoolWorker-1] hello
    [2017-11-07 11:18:56,062: WARNING/ForkPoolWorker-2] hello
    
  2. 高级设置
    # Executes every Monday morning at 7:30 a.m.
     sender.add_periodic_task(
         crontab(hour=7, minute=30, day_of_week=1),
         test.s('Happy Mondays!'),
     )
    
  3. 参考博客 http://www.cnblogs.com/alex3714/p/6351797.html