celery 實例進階

認識

這裏有幾個概念,task、worker、broker。
顧名思義,task 就是老闆交給你的各類任務,worker 就是你手下幹活的人員。前端

那什麼是 Broker 呢?python

老闆給你下發任務時,你須要 把它記下來, 這個它 能夠是你隨身攜帶的本子,也能夠是 電腦裏地記事本或者excel,或者是你的 任什麼時候間管理工具。redis

Broker  則是 Celery 記錄task的地方。
做爲一個任務管理者的你,將老闆(前端程序)發給你的 安排的工做(Task) 記錄到你的本子(Broker)裏。接下來,你就安排你手下的IT程序猿們(Worker),都到你的本子(Broker)裏來取走工做(Task)app

1. broker爲rabbitmq

#tasks.py異步

from celery import Celery

app = Celery('tasks', broker='amqp://admin:admin@localhost:5672')

@app.task
def add(x, y):
    return x + y

啓動async

celery -A tasks worker --loglevel=info

運行工具

>>> from tasks import add
>>> add(1, 3)
4
>>> add.delay(1,3)
<AsyncResult: 07614cef-f314-4c7b-a33f-92c080cadb83>
>>> 

:delay是使用異步的方式,會壓入到消息隊列。不然,不會使用消息隊列。測試

文件名爲tasks.py,則其中代碼app = Celery('tasks', broker=),Celery第一個參數爲工程名,啓動時也是celery -A tasks worker --loglevel=infospa

對比3d

:投入到指定的隊列用:add.delay(1, 3, queue='queue_add1') 

test_2.py

from celery import Celery

app = Celery('proj', broker='amqp://admin:admin@localhost:5672', include='test_2')

@app.task
def add(x, y):
    return x + y

2. 以python+文件名的方式啓動

例1:

#test.py

from celery import Celery
import time
app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672')

@app.task
def add(x, y):
    print "------>"
    time.sleep(5)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

啓動

python test.py worker 

celery默認啓動的worker數爲內核個數,若是指定啓動個數,用參數-c,例

python test.py worker -c 2

例2:

#test.py

from celery import Celery
import time
app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672')

@app.task
def add(x, y):
    print "------>"
    time.sleep(2)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

#eg.py

from test import *
import time

rev = []
for i in range(3):
    rev.append(add.delay(1,3))

print "len rev:", len(rev)
while 1:
    tag = 1
    for key in rev:
        if not key.ready():
            tag = 0
            time.sleep(1)
            print "sleep 1"
    if tag:
        break
print "_____________________>"

3. broker爲redis

#test_redis.py

from celery import Celery
import time
#app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')
app = Celery('test_redis', backend='redis', broker='redis://100.69.201.116:7000')

@app.task
def add(x, y):
    print "------>"
    time.sleep(5)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

啓動

python test_redis.py worker -c 2

測試

from celery import group
from test_redis import *
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
for ret in g.get():
    print ret
print "end-----------------------------------"

結果

5
end-----------------------------------

4. 兩個隊列(redis)

#test_redis.py

from celery import Celery
import time
#app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')
app = Celery('test_redis', backend='redis', broker='redis://100.69.201.116:7000')

@app.task
def add(x, y):
    print "------>"
    time.sleep(5)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

#test_redis_2.py

from celery import Celery
import time
#app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')
app = Celery('test_redis_2', backend='redis', broker='redis://100.69.201.116:7001')

@app.task
def add_2(x, y):
    print "=======>"
    time.sleep(5)
    print "<================="
    return x + y

if __name__ == "__main__":
    app.start()

測試

from celery import group
from test_redis import *
from test_redis_2 import *
ll = [(1,2), (3,4), (5,6)]
g = group(add.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
    print ret
print "end redis_1 -----------------------------------"

ll = [(1,2), (3,4), (5,6)]
g = group(add_2.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
    print ":", ret
print "end redis_2 -----------------------------------"

結果

3
7
11
end redis_1 -----------------------------------
: 3
: 7
: 11
end redis_2 -----------------------------------

5. 兩個隊列(同一個rabbitmq)

註釋:須要提早設置下隊列

##例1

#test.py

from celery import Celery
import time
app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672//')

@app.task
def add(x, y):
    print "------>"
    time.sleep(5)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

 

#test_2.py

from celery import Celery
import time
app = Celery('test_2', backend='amqp', broker='amqp://admin:admin@localhost:5672//hwzh')

@app.task
def add_2(x, y):
    print "=====>"
    time.sleep(5)
    print "<=========="
    return x + y

if __name__ == "__main__":
    app.start()

測試

from celery import group
from test import *
from test_2 import *

ll = [(1,2), (3,4), (7,8)]
g = group(add.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
    print ret

ll = [(1,2), (3,4), (7,8)]
g = group(add_2.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
    print ret

結果

3
7
15
3
7
15

 

##例2

#test.py

from celery import Celery
import time
app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672//mq4')

@app.task
def add(x, y):
    print "------>"
    time.sleep(2)
    print "<--------------"
    return x + y

@app.task
def sum(x, y):
    print "------>"
    time.sleep(2)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

#eg2.py

from test import *
import time

rev = []
for i in range(3):
    rev.append(add.delay(1,3))

for i in range(3):
    rev.append(sum.delay(1,3))

print "len rev:", len(rev)
while 1:
    tag = 1
    for key in rev:
        if not key.ready():
            tag = 0
            time.sleep(1)
            print "sleep 1"
    if tag:
        break
print "_____________________>"

6. 保存結果

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://admin:admin@localhost')

@app.task
def add(x, y): 
    return x + y

啓動

celery -A tasks_1 worker --loglevel=info

與前例不一樣:

- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f8057931810
- ** ---------- .> transport: amqp://admin:**@localhost:5672//
- ** ---------- .> results: amqp

運行

>>> from tasks_1 import add
>>> result = add.delay(1, 3)
>>> result.ready()
True
>>> result.get()
4

7. 多個隊列

from celery import Celery
from kombu import Exchange, Queue
BROKER_URL = 'amqp://admin:admin@localhost//'
app = Celery('tasks', backend='amqp',broker=BROKER_URL)
app.conf.update(
     CELERY_ROUTES={
          "add1":{"queue":"queue_add1"},
          "add2":{"queue":"queue_add2"},
          "add3":{"queue":"queue_add3"},
          "add4":{"queue":"queue_add4"},
        },
)
@app.task
def add1(x, y):
     return x + y

@app.task
def add2(x, y):
     return x + y

@app.task
def add3(x, y):
     return x + y

@app.task
def add4(x, y):
     return x + y

8. 消息路由

文件:tasks.py

from celery import Celery, platforms
import time
import os

app = Celery('proj', broker='amqp://admin:admin@ip:5672',
             include=['tasks']
             )
app.conf.update(
    CELERY_ROUTES={
        'tasks.fun_1': {
            'queue': "q_1" 
        },
        'tasks.fun_2': {
            'queue': "q_2"
        }
    }
)
platforms.C_FORCE_ROOT = True 

@app.task
def fun_1(n):
    print "(((((((((((((((func_1", n
    return 1

@app.task
def fun_2(n):
    print n, ")))))))))))))))"
    return 2

if __name__ == "__main__":
    app.start()

啓動

python tasks.py worker -c 2 -Q q_1
python tasks.py worker -c 2 -Q q_2

兩個消息隊列:q_1, q_2,調用示例

>>> from tasks import *
>>> fun_1(1)
(((((((((((((((func_1 1
1
>>> fun_1.delay(1)
<AsyncResult: 528a2ad1-bc16-4bdc-beff-cd166fe3e885>
>>> fun_2.delay(2)
<AsyncResult: ee5881eb-b384-4a39-ba00-08aa8ee53504>

9. woker內啓多進程

#tasks.py

from celery import Celery
import time
import multiprocessing as mp

app = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks")

def test_func(i):
    print "beg...:", i
    time.sleep(5)
    print "....end:", i
    return i * 5

@app.task
def fun_1(n):
    curr_proc = mp.current_process()
    curr_proc.daemon = False
    p = mp.Pool(mp.cpu_count())
    curr_proc.daemon = True
    for i in range(n):
        p.apply_async(test_func, args=(i,))
    p.close()
    p.join()
    return 1

if __name__ == "__main__":
    app.start()

說明

直接啓動多進程是確定不能夠的,由於是守候進程(curr_proc.daemon=True),因此啓多進程以前主動設置爲非守候進程:curr_proc.daemon=False,啓動了之後再設爲守候進程

#tasks_callback.py

from celery import Celery
import time
import multiprocessing as mp

app = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks_callback")
rev = []
def test_func(i):
    print "beg...:", i
    time.sleep(5)
    print "....end:", i
    return i * 5

def callback_log(rev_val):
    rev.append(rev_val)

@app.task
def fun_1(n):
    print "before rev:", rev
    curr_proc = mp.current_process()
    curr_proc.daemon = False
    p = mp.Pool(mp.cpu_count())
    curr_proc.daemon = True
    for i in range(n):
        p.apply_async(test_func, args=(i,), callback=callback_log)
    p.close()
    p.join()
    print "after rev:", rev
    return 1

if __name__ == "__main__":
    app.start()

10. 經常使用參數配置

1. CELERYD_PREFETCH_MULTIPLIER

同時預取得消息個數,好比若是CELERYD_PREFETCH_MULTIPLIER=2,那麼若是如今對於1個worker,有一個狀態是STARTED, 那麼能夠有2個處於RECEVED狀態(若是有的話),這樣就避免了若是消息不少所有分下取,後起來的worker領不到消息的尷尬。

參考代碼

from celery import Celery, platforms
import time
import os

app = Celery('proj', broker='amqp://admin:admin@localhost:5672',
             include=['tasks']
             )
app.conf.update(
    CELERYD_PREFETCH_MULTIPLIER=2,
    CELERY_ROUTES={
        'tasks.fun_1': {
            'queue': "q_1"
        },
        'tasks.fun_2': {
            'queue': "q_2"
        }
    }
)
platforms.C_FORCE_ROOT = True

@app.task
def fun_1(n):
    print "(((((((((((((((func_1", n
    time.sleep(20)
    return 1

@app.task
def fun_2(n):
    print n, ")))))))))))))))"
    return 2

調用

>>> from tasks import *
>>> fun_1.delay(3)
<AsyncResult: 609f2216-6785-409e-9f6f-85ae3fcce084>
>>> fun_1.delay(3)
<AsyncResult: 0230b8bd-b237-40ef-bc73-88929f8f8290>
>>> fun_1.delay(3)
<AsyncResult: 8fce172a-93c9-41f8-8c08-377a4363389c>
>>> fun_1.delay(3)

 

參考:http://windrocblog.sinaapp.com/?p=1585

相關文章
相關標籤/搜索