在tornado中使用celery實現異步任務處理之中的一個

1、簡單介紹

tornado-celery是用於Tornado web框架的非堵塞 celeryclient。
經過tornado-celery可以將耗時任務增長到任務隊列中處理,
在celery中建立任務。tornado中就可以像調用AsyncHttpClient同樣調用這些任務。



Celery中兩個主要的概念:Broker、Backend
Broker : 事實上就是一開始說的 消息隊列 ,用來發送和接受消息。html


         Broker有幾個方案可供選擇:RabbitMQ,Redis。數據庫等
Backend: 做用是保存結果和狀態,可以是Database backend。也可以是Cache backend
         Backend有幾個方案可供選擇:
         http://docs.celeryproject.org/en/latest/configuration.html#celery-result-backend

node

2、tornado結合Redis實現異步任務處理

2.1 安裝環境

1. 安裝tornado

見文章《CentOS6.4安裝python2.7.3環境和Tornado》

2. 安裝 tornoda-celery

tornado-celery的安裝很是easy:
$ pip install tornado-celery
Downloading/unpacking tornado-celery
  Downloading tornado-celery-0.3.4.tar.gz
  Running setup.py egg_info for package tornado-celery 
... 
Successfully installed tornado-celery celery pika pytz billiard kombu anyjson amqp
Cleaning up...

3. 安裝 celery

$ pip install celery
Requirement already satisfied (use --upgrade to upgrade): celery in /usr/local/python2.7.3/lib/python2.7/site-packages
Requirement already satisfied (use --upgrade to upgrade): pytz>dev in /usr/local/python2.7.3/lib/python2.7/site-packages (from celery)
Requirement already satisfied (use --upgrade to upgrade): billiard>=3.3.0.19,<3.4 in /usr/local/python2.7.3/lib/python2.7/site-packages (from celery)
Requirement already satisfied (use --upgrade to upgrade): kombu>=3.0.24,<3.1 in /usr/local/python2.7.3/lib/python2.7/site-packages (from celery)
Requirement already satisfied (use --upgrade to upgrade): anyjson>=0.3.3 in /usr/local/python2.7.3/lib/python2.7/site-packages (from kombu>=3.0.24,<3.1->celery)
Requirement already satisfied (use --upgrade to upgrade): amqp>=1.4.6,<2.0 in /usr/local/python2.7.3/lib/python2.7/site-packages (from kombu>=3.0.24,<3.1->celery)
Cleaning up...

4. 安裝 tornado-redis

$ pip install tornado-redis
Downloading/unpacking tornado-redis
  Downloading tornado-redis-2.4.18.tar.gz
  Running setup.py egg_info for package tornado-redis
    
Installing collected packages: tornado-redis
  Running setup.py install for tornado-redis
    
Successfully installed tornado-redis
Cleaning up...

5. 安裝redis

$ yum install redis

redis做爲NoSQL數據庫的一種應用。響應速度和命中率上仍是比較高效的。
項目中需要用集中式可橫向擴展的緩存框架。作了一點調研。
即使redis、memcached存在效率上的差別(詳細比較參考http://timyang.net/data/mcdb-tt-redis/)。
但事實上都能知足眼下項目的需求。但是redis仍是比較風騷的。支持鏈表和集合操做。支持正則表達式查找key,
眼下項目緩存的結果大可能是鏈表,假設鏈表新增或者改動數據的話,
redis就體現出了極大的優點(memcached僅僅能又一次載入鏈表。redis可以對鏈表新增或者改動)
1). 下載redis
下載地址 http://code.google.com/p/redis/downloads/list

2). 安裝redis
下載後解壓 
$ tar zxvf redis-2.6.14.tar.gz 
到隨意文件夾,好比/usr/local/redis-2.6.14
解壓後,進入redis文件夾
$ cd /usr/local/redis-2.6.14
$ make  

複製文件
#這個文件是redis啓動的配置文件
$ cp redis.conf /etc/                                 
#如下的很是實用,這樣就不用再運行時加上./了。而且可以在不論什麼地方運行
$ cp ./src/redis-benchmark ./src/redis-cli ./src/redis-server /usr/bin/ 

設置內存分配策略(可選,依據server的實際狀況進行設置)
$ echo 1 > /proc/sys/vm/overcommit_memory
可選值:0、一、2。


0。 表示內核將檢查是否有足夠的可用內存供應用進程使用。
    假設有足夠的可用內存。內存申請贊成。不然,內存申請失敗,並把錯誤返回給應用進程。
1, 表示內核贊成分配所有的物理內存,而不管當前的內存狀態怎樣。python


2, 表示內核贊成分配超過所有物理內存和交換空間總和的內存

或改動/etc/sysctl.conf 加入例如如下選項後就不會內存持續添加
vm.dirty_ratio = 1
vm.dirty_background_ratio=1
vm.dirty_writeback_centisecs=2
vm.dirty_expire_centisecs=3
vm.drop_caches=3
vm.swappiness =100
vm.vfs_cache_pressure=163
vm.overcommit_memory=1
vm.lowmem_reserve_ratio=32 32 8
kern.maxvnodes=3
$ sysctl -p

值得注意的一點是,redis在dump數據的時候,會fork出一個子進程,理論上child進程所佔用的內存和parent是同樣的,
比方parent佔用的內存爲8G,這個時候也要相同分配8G的內存給child,假設內存沒法負擔。
每每會形成redisserver的down機或者IO負載太高,效率降低。
因此這裏比較優化的內存分配策略應該設置爲 1(表示內核贊成分配所有的物理內存。而不管當前的內存狀態怎樣)

開啓redisport,改動防火牆配置文件
$ vi /etc/sysconfig/iptables  
...
# 增長port配置
# For redis
-A INPUT -m state --state NEW -m tcp -p tcp --dport 6379 -j ACCEPT 
...
COMMIT
【保存並退出】
 
又一次載入規則
    service iptables restart   

3). 啓動redis服務
    [root@Architect redis-2.6.14]# pwd  
    /usr/local/redis-2.6.14  
    [root@Architect redis-2.6.14]# redis-server /etc/redis.conf  &
redis-server /etc/redis.conf            
                _._                                                  
           _.-``__ ''-._                                             
      _.-``    `.  `_.  ''-._           Redis 2.6.14 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._                                   
 (    '      ,       .-`  | `,    )     Running in stand alone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 5897
  `-._    `-._  `-./  _.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |           http://redis.io        
  `-._    `-._`-.__.-'_.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |                                  
  `-._    `-._`-.__.-'_.-'    _.-'                                   
      `-._    `-.__.-'    _.-'                                       
          `-._        _.-'                                           
              `-.__.-'                                               


[5897] 10 Dec 17:00:42.661 # Server started, Redis version 2.6.14
[5897] 10 Dec 17:00:42.661 * The server is now ready to accept connections on port 6379

查看進程,確認redis已經啓動
[root@Architect redis-2.6.14]# ps -ef | grep redis  
    root       401 29222  0 18:06 pts/3    00:00:00 grep redis  
    root     29258     1  0 16:23 ?        00:00:00 redis-server /etc/redis.conf  

$ redis-cli ping 
PONG

假設這裏啓動redis服務失敗,普通狀況下是因爲redis.conf文件有問題,
建議檢查或找個可用的配置文件進行覆蓋,避免少走彎路。這裏建議。
改動/etc/redis.conf,設置redis進程爲後臺守護進程

    # By default Redis does not run as a daemon. Use 'yes' if you need it.  
    # Note that Redis will write a pid file in /var/run/redis.pid when daemonized.  
    daemonize yes  

4). 測試redis,啓動實例
    [root@Architect redis-2.6.14]# redis-cli  
    redis> set name songbin  
    OK  
    redis> get name   
    "songbin"  

5). 關閉redis服務
    redis-cli shutdown  

redis服務關閉後,緩存數據會本身主動dump到硬盤上,硬盤地址爲redis.conf中的配置項dbfilename 
dump.rdb所設定強制備份數據到磁盤,使用例如如下命令
$ redis-cli save 或者 
$ redis-cli -p 6380 save(指定port)

web

2.2 演示樣例程序

1. python安裝redis環境

$ pip install redis

2. 編輯 tasks.py

在當前文件夾下編輯 
# vim tasks.py
#!/usr/bin/env python
# File: task.py
#

from time import sleep
from celery import Celery
 
backend = 'redis://127.0.0.1:6379/0'
broker = 'redis://127.0.0.1:6379/1'
 
app = Celery('tasks', backend=backend, broker=broker)
 
@app.task
def add(x, y):
     sleep(10)
     return x + y

3. 執行celelry worker

$ celery -A tasks worker --loglevel=info
Running a worker with superuser privileges when the
worker accepts messages serialized with pickle is a very bad idea!

If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).

User information: uid=0 euid=0 gid=0 egid=0

出現這種信息是表示redis的服務啓動失敗。處理辦法:
[root]$ export C_FORCE_ROOT="true"
[root ]$ celery -A tasks worker --loglevel=debug
celery -A tasks worker --loglevel=info
/usr/local/python2.7.3/lib/python2.7/site-packages/celery/platforms.py:766: RuntimeWarning: 
You are running the worker with superuser privileges, which is
absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
[2014-12-10 23:02:26,993: WARNING/MainProcess] /usr/local/python2.7.3/lib/python2.7/site-packages/celery/apps/worker.py:161: 
CDeprecationWarning: 
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.
  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
 
 -------------- celery@ltv_13 v3.1.17 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-2.6.18-194.el5-x86_64-with-glibc2.3
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x15ea8250
- ** ---------- .> transport:   redis://127.0.0.1:6379/1
- ** ---------- .> results:     redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
                
[tasks]
  . tasks.add

[2014-12-10 23:02:27,516: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2014-12-10 23:02:27,524: INFO/MainProcess] mingle: searching for neighbors
[2014-12-10 23:02:29,074: INFO/MainProcess] mingle: all alone
[2014-12-10 23:02:29,080: WARNING/MainProcess] celery@ltv_13 ready.
這就表示啓動成功了

假設出如下的提示:
[2014-12-11 16:04:08,223: WARNING/MainProcess] /usr/local/python2.7.3/lib/python2.7/site-packages/celery/apps/worker.py:161:
CDeprecationWarning: 
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.
  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
則表示需要在tasks中進行配置:
# vim tasks.py
#!/usr/bin/env python
# File: task.py
#

from time import sleep
from celery import Celery
 
backend = 'redis://127.0.0.1:6379/0'
broker = 'redis://127.0.0.1:6379/1'

app = Celery('tasks', backend=backend, broker=broker)
#app.conf.CELERY_TASK_SERIALIZER='json'
#app.conf.CELERY_ACCEPT_CONTENT=['json']
app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_RESULT_SERIALIZER='json',
    )

@app.task
def add(x, y):
     sleep(10)
     return x + y

測試代碼:
[root]$ cat test.py 
from tasks import add
if __name__ == '__main__':
    for i in range(100):
        for j in range(100):
            kk=add.delay(i, j)
            kk.ready()
            kk.get()

[root]$ python ./test.py

可以在celelry worker看到消息被消費了
[2014-12-11 15:43:04,136: INFO/MainProcess] Received task: tasks.add[a0d1facd-39e8-44f6-9dd9-8980dbfca41b]
[2014-12-11 15:43:14,138: INFO/MainProcess] Task tasks.add[a0d1facd-39e8-44f6-9dd9-8980dbfca41b] succeeded in 10.0008870028s: 0
[2014-12-11 15:43:14,638: INFO/MainProcess] Received task: tasks.add[6357f049-ae5a-4690-8ac7-2ff91b9d21c9]
[2014-12-11 15:43:24,639: INFO/MainProcess] Task tasks.add[6357f049-ae5a-4690-8ac7-2ff91b9d21c9] succeeded in 10.0008919984s: 1
[2014-12-11 15:43:25,140: INFO/MainProcess] Received task: tasks.add[787039c5-bf6d-49e3-980b-912c0b743351]
[2014-12-11 15:43:35,141: INFO/MainProcess] Task tasks.add[787039c5-bf6d-49e3-980b-912c0b743351] succeeded in 10.0006869994s: 2
[2014-12-11 15:43:35,642: INFO/MainProcess] Received task: tasks.add[71826656-1b25-425d-884d-423d642ad6fe]
[2014-12-11 15:43:45,643: INFO/MainProcess] Task tasks.add[71826656-1b25-425d-884d-423d642ad6fe] succeeded in 10.000723999s: 3
[2014-12-11 15:43:46,144: INFO/MainProcess] Received task: tasks.add[eea8cbb3-c526-4c27-94b2-2cb1446b78f1]
[2014-12-11 15:43:56,145: INFO/MainProcess] Task tasks.add[eea8cbb3-c526-4c27-94b2-2cb1446b78f1] succeeded in 10.0006980002s: 4
[2014-12-11 15:43:56,646: INFO/MainProcess] Received task: tasks.add[b04058d7-9ac1-4979-a4ce-eb262c9ad2a4]
[2014-12-11 15:44:06,647: INFO/MainProcess] Task tasks.add[b04058d7-9ac1-4979-a4ce-eb262c9ad2a4] succeeded in 10.0008420013s: 5
[2014-12-11 15:44:07,148: INFO/MainProcess] Received task: tasks.add[ca5ebf48-591b-43dc-b542-a36a5bdc66b5]
[2014-12-11 15:44:17,149: INFO/MainProcess] Task tasks.add[ca5ebf48-591b-43dc-b542-a36a5bdc66b5] succeeded in 10.0005079992s: 6
[2014-12-11 15:44:17,649: INFO/MainProcess] Received task: tasks.add[0ec250b1-07b5-4df6-a06e-94ad232d5e73]
[2014-12-11 15:44:27,650: INFO/MainProcess] Task tasks.add[0ec250b1-07b5-4df6-a06e-94ad232d5e73] succeeded in 10.0003799982s: 7
...

4. 使用python測試

$ python
>>> from tasks import add
>>> r = add.delay(4, 4)
>>> r.ready() # 10s內運行,會輸出False,因爲add中sleep了10s。10s以後返回True
>>>False
>>> r.status #任務狀態
'PENDING'
>>> r.result  # 輸出你的hostname
>>>8
>>> r.status
'SUCCESS'

假設以上測試沒經過,則表示redis的工做不正常。

5. 使用Tornado測試

完整的測試代碼例如如下:
/usr/bin/env python
#
# -*- coding:utf-8 -*-
#
# Author : Hank
# E-mail :
# File   : tornado_redis.py
#
import json
import time
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen
import tornado.httpclient
import tcelery
import tasks
from tasks import add 


class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self):
        print "CALLING get()"
        xxx = 10
        yyy = 2
        tasks.add.apply_async(args=[xxx,yyy])


application = tornado.web.Application([
        (r"/", MainHandler),
    ])

if __name__ == "__main__":
    application.listen(10001)
    tornado.ioloop.IOLoop.instance().start()

調用:
$ curl -i "http://10.2.175.13:10001" &
$ curl -i "http://10.2.175.13:10001" &

celelry worker中查看執行結果:
在celery的消息端可以看到消息已發送了:
[2014-12-15 11:33:52,869: INFO/MainProcess] Received task: tasks.add[8336c4cc-84f4-4f5c-92b1-a54289526f56]
[2014-12-15 11:34:00,686: INFO/MainProcess] Received task: tasks.add[4971754a-3d62-40e6-8bfa-6666f83a2d2d]
[2014-12-15 11:34:02,871: INFO/MainProcess] Task tasks.add[8336c4cc-84f4-4f5c-92b1-a54289526f56] succeeded in 10.0005960017s: 12
[2014-12-15 11:34:10,687: INFO/MainProcess] Task tasks.add[4971754a-3d62-40e6-8bfa-6666f83a2d2d] succeeded in 10.0011309981s: 12

6. tornado和celery異步使用的侷限

我查了很是多資料,也測試了很是多方法,眼下獲得的結果是這種(歡迎有人找到方法否認個人結論):
tornado和celery結合使用時。沒有辦法設置從celery work返回給tornado的回調函數。
像這種形式:
tasks.add.apply_async(args=[xxx,yyy], callback=on_result())

def on_result(self, resp)
    self.write(resp)
    self.finish()

因此。僅僅能是將耗時的任務異步發送給celery。celery收到後就會處理任務,
以後再以某種方式去檢查任務的運行結果。
要想實現上面這種功能,可以見我寫總結《Tornado異步任務的實現》

2.3 celery 3.1 的文檔

http://docs.jinkan.org/docs/celery/userguide/calling.html#guide-calling
Calling Tasks
文件夾: 
. Basics
. Linking (callbacks/errbacks)
. ETA and countdown
. Expiration
. Message Sending Retry
. Serializers
. Compression
. Connections
. Routing options

1. Basics

This document describes Celery’s uniform 「Calling API」 used by task instances and the canvas.
The API defines a standard set of execution options, as well as three methods:
.  apply_async(args[, kwargs[, …]])
   Sends a task message.
.  delay(*args, **kwargs)
   Shortcut to send a task message, but does not support execution options.
.  calling (__call__)
   Applying an object supporting the calling API (e.g. add(2, 2)) means that the task will be executed 
   in the current process, and not by a worker (a message will not be sent).

2. Quick Cheat Sheet

.  T.delay(arg, kwarg=value)
   always a shortcut to .apply_async.
.  T.apply_async((arg, ), {'kwarg': value})
.  T.apply_async(countdown=10)
   executes 10 seconds from now.
.  T.apply_async(eta=now + timedelta(seconds=10))
   executes 10 seconds from now, specifed using eta
.  T.apply_async(countdown=60, expires=120)
   executes in one minute from now, but expires after 2 minutes.
.  T.apply_async(expires=now + timedelta(days=2))
   expires in 2 days, set using datetime.

3.Example

The delay() method is convenient as it looks like calling a regular function:
   task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
Using apply_async() instead you have to write:
  task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

So delay is clearly convenient, but if you want to set additional execution 
options you have to use apply_async.
The rest of this document will go into the task execution options in detail. 
All examples use a task called add, returning the sum of two arguments:

@app.task
def add(x, y):
    return x + y

Tip
If the task is not registered in the current process you can use send_task() to call the task by name instead.

There’s another way…
You will learn more about this later while reading about the Canvas,
but subtask‘s are objects used to pass around the signature of a task invocation, 
(for example to send it over the network), and they also support the Calling API:
    task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()

4. Linking (callbacks/errbacks)

Celery supports linking tasks together so that one task follows another. 
The callback task will be applied with the result of the parent task as a partial argument:
  add.apply_async((2, 2), link=add.s(16))

What is s?
The add.s call used here is called a subtask, I talk more about subtasks in the canvas guide, 
where you can also learn about chain, which is a simpler way to chain tasks together.
In practice the link execution option is considered an internal primitive, 
and you will probably not use it directly, but rather use chains instead.

Here the result of the first task (4) will be sent to a new task that adds 16 to the previous result, 
forming the expression (2 + 2) + 16 = 20
You can also cause a callback to be applied if task raises an exception (errback), 
but this behaves differently from a regular callback in that it will be passed the 
id of the parent task, not the result. This is because it may not always be possible 
to serialize the exception raised, and so this way the error callback requires a result 
backend to be enabled, and the task must retrieve the result of the task instead.

This is an example error callback:
@app.task
def error_handler(uuid):
    result = AsyncResult(uuid)
    exc = result.get(propagate=False)
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          uuid, exc, result.traceback))

it can be added to the task using the link_error execution option:
    add.apply_async((2, 2), link_error=error_handler.s())
In addition, both the link and link_error options can be expressed as a list:
    add.apply_async((2, 2), link=[add.s(16), other_task.s()])
The callbacks/errbacks will then be called in order, and all callbacks will 
be called with the return value of the parent task as a partial argument.

5. ETA and countdown

The ETA (estimated time of arrival) lets you set a specific date and time 
that is the earliest time at which your task will be executed. countdown is a shortcut 
to set eta by seconds into the future.


>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get()    # this takes at least 3 seconds to return
20
The task is guaranteed to be executed at some time after the specified date and time, 
but not necessarily at that exact time. Possible reasons for broken deadlines may include
 many items waiting in the queue, or heavy network latency. To make sure your 
tasks are executed in a timely manner you should monitor the queue for congestion. 
Use Munin, or similar tools, to receive alerts, so appropriate action can be 
taken to ease the workload. See Munin.


While countdown is an integer, eta must be a datetime object, 
specifying an exact date and time (including millisecond precision, 
and timezone information):
>>> from datetime import datetime, timedelta
>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)
Expiration
The expires argument defines an optional expiry time, either as seconds after task publish, 
or a specific date and time using datetime:

>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)

>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
...                 expires=datetime.now() + timedelta(days=1)
When a worker receives an expired task it will mark the task as REVOKED (TaskRevokedError).

6. Message Sending Retry

Celery will automatically retry sending messages in the event of connection failure, 
and retry behavior can be configured – like how often to retry, or a maximum number 
of retries – or disabled all together.

To disable retry you can set the retry execution option to False:
  add.apply_async((2, 2), retry=False)
Related Settings:
  CELERY_TASK_PUBLISH_RETRY
  CELERY_TASK_PUBLISH_RETRY_POLICY
Retry Policy
A retry policy is a mapping that controls how retries behave, and can contain the following keys:
. max_retries
  Maximum number of retries before giving up, in this case 
  the exception that caused the retry to fail will be raised.

  A value of 0 or None means it will retry forever.
  The default is to retry 3 times.
. interval_start
  Defines the number of seconds (float or integer) to wait between retries. 
  Default is 0, which means the first retry will be instantaneous.
. interval_step
  On each consecutive retry this number will be added to 
  the retry delay (float or integer). Default is 0.2.
. interval_max
  Maximum number of seconds (float or integer) to wait between retries. Default is 0.2.

For example, the default policy correlates to:
add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})
the maximum time spent retrying will be 0.4 seconds. 
It is set relatively short by default because a connection failure could lead
to a retry pile effect if the broker connection is down:
e.g. many web server processes waiting to retry blocking other incoming requests.

7. Serializers

Security
The pickle module allows for execution of arbitrary functions, please see the security guide.
Celery also comes with a special serializer that uses cryptography to sign your messages.


Data transferred between clients and workers needs to be serialized, so every message 
in Celery has a content_type header that describes the serialization method used to encode it.
The default serializer is pickle, but you can change this using the CELERY_TASK_SERIALIZER setting, 
or for each individual task, or even per message.
There’s built-in support for pickle, JSON, YAML and msgpack, and you can also add your own 
custom serializers by registering them into the Kombu serializer registry (see ref:kombu:guide-serialization).


Each option has its advantages and disadvantages.
json – JSON is supported in many programming languages, is now
    a standard part of Python (since 2.6), and is fairly fast to decode using the modern Python 
    libraries such as cjson or simplejson.
    The primary disadvantage to JSON is that it limits you to the following data types: 
    strings, Unicode, floats, boolean, dictionaries, and lists. Decimals and dates are notably missing.
    Also, binary data will be transferred using Base64 encoding, which will cause the 
    transferred data to be around 34% larger than an encoding which supports native binary types.
    However, if your data fits inside the above constraints and you need cross-language support, 
    the default setting of JSON is probably your best choice.
    See http://json.org for more information.
pickle – If you have no desire to support any language other than
    Python, then using the pickle encoding will gain you the support of all 
    built-in Python data types (except class instances), smaller messages 
    when sending binary files, and a slight speedup over JSON processing.
    See http://docs.python.org/library/pickle.html for more information.
yaml – YAML has many of the same characteristics as json,
    except that it natively supports more data types (including dates, recursive references, etc.)
    However, the Python libraries for YAML are a good bit slower than the libraries for JSON.
    If you need a more expressive set of data types and need to maintain cross-language compatibility, 
    then YAML may be a better fit than the above.
    See http://yaml.org/ for more information.
msgpack – msgpack is a binary serialization format that is closer to JSON
    in features. It is very young however, and support should be considered experimental at this point.
    See http://msgpack.org/ for more information.


The encoding used is available as a message header, so the worker knows how to deserialize any task. 
If you use a custom serializer, this serializer must be available for the worker.


The following order is used to decide which serializer to use when sending a task:
1) The serializer execution option.
2) The Task.serializer attribute
3) The CELERY_TASK_SERIALIZER setting.

Example setting a custom serializer for a single task invocation:
>>> add.apply_async((10, 10), serializer='json')


8. Compression
Celery can compress the messages using either gzip, or bzip2. 
You can also create your own compression schemes and register them in the kombu compression registry.
The following order is used to decide which compression scheme to use when sending a task:
1). The compression execution option.
2). The Task.compression attribute.
3). The CELERY_MESSAGE_COMPRESSION attribute.


Example specifying the compression used when calling a task:
>>> add.apply_async((2, 2), compression='zlib')

9. Connections
Automatic Pool Support
Since version 2.3 there is support for automatic connection pools,
so you don’t have to manually handle connections and publishers to reuse connections.
The connection pool is enabled by default since version 2.5.
See the BROKER_POOL_LIMIT setting for more information.

You can handle the connection manually by creating a publisher:
results = []
with add.app.pool.acquire(block=True) as connection:
    with add.get_publisher(connection) as publisher:
        try:
            for args in numbers:
                res = add.apply_async((2, 2), publisher=publisher)
                results.append(res)
print([res.get() for res in results])


Though this particular example is much better expressed as a group:


>>> from celery import group


>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.subtask(n) for i in numbers).apply_async()


>>> res.get()
[4, 8, 16, 32]


10. Routing options
Celery can route tasks to different queues.
Simple routing (name <-> name) is accomplished using the queue option:
   add.apply_async(queue='priority.high')
You can then assign workers to the priority.high queue by using the workers -Q argument:
   $ celery worker -l info -Q celery,priority.high
參見
Hard-coding queue names in code is not recommended, 
the best practice is to use configuration routers (CELERY_ROUTES).


To find out more about routing, please see Routing Tasks.


11. Advanced Options
These options are for advanced users who want to take use of AMQP’s full routing capabilities. 
Interested parties may read the routing guide.
. exchange
  Name of exchange (or a kombu.entity.Exchange) to send the message to.
. routing_key
  Routing key used to determine.
. priority
  A number between 0 and 9, where 0 is the highest priority.


Supported by: redis, beanstalk
相關文章
相關標籤/搜索