celery 使用 - 3

# celery 使用

1.broker

2.基礎案例

使用redis做爲broker和brokend。python

建立tasks.pyredis

# tasks.py
di = 'redis://:****@localhost:6379/0'
app = Celery('tasks', backend=di, broker=di)

@app.task
def add(x, y):
    return x + y

運行:
celery -A tasks worker -l info -P eventlet併發

建立temp.py
# temp.py
from tasks import add
rv = add.delay(4, 4)app

2.1 運行結果:

運行tasksasync

E:\python\code test>celery -A tasks worker -l info -P eventlet

 -------------- celery@*** v4.3.0 (rhubarb)
---- **** -----
--- * ***  * -- Windows0 2019-09-21 22:08:04
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x1aebfdcf98
- ** ---------- .> transport:   redis://:**@localhost:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2019-09-21 22:08:04,802: INFO/MainProcess] Connected to redis://:**@192.168.199
.113:6379/0
[2019-09-21 22:08:04,813: INFO/MainProcess] mingle: searching for neighbors
[2019-09-21 22:08:05,849: INFO/MainProcess] mingle: all alone
[2019-09-21 22:08:05,886: INFO/MainProcess] celery@*** ready.
[2019-09-21 22:08:05,905: INFO/MainProcess] pidbox: Connected to redis://:**@...../0.

運行temp測試

[2019-09-21 22:11:27,198: INFO/MainProcess] Received task: tasks.add[06d745c6-53
18-4f48-8a1e-2ab8f8563994]
[2019-09-21 22:11:27,200: INFO/MainProcess] Task tasks.add[06d745c6-5318-4f48-8a
1e-2ab8f8563994] succeeded in 0.0s: 8
[2019-09-21 22:11:31,935: INFO/MainProcess] Received task: tasks.add[115c3b5d-eb
a7-472b-86ab-bd356f650e13]
[2019-09-21 22:11:31,936: INFO/MainProcess] Task tasks.add[115c3b5d-eba7-472b-86
ab-bd356f650e13] succeeded in 0.0s: 8

2.2 問題

在運行時出現兩個問題:this

  1. redis-py版本問題,目前爲2.*,要求升級
    pip install --upgrade redis
    升級到4.***
  2. 報錯ValueError: not enough values to unpack (expected 3, got 0)
    解決方法:
    看別人描述大概就是說win10上運行celery4.x就會出現這個問題,解決辦法以下,原理未知:pwa

    安裝`eventlet
    pip install eventlet線程

    而後啓動worker的時候加一個參數,以下:
    celery -A worker -l info -P eventlet
    而後就能夠正常的調用了。
    rest

3.複雜一點的測試環境

通常而言,celery項目的代碼分爲三部分:

  1. worker定義
  2. tasks定義
  3. tasks添加

結構:

proj/__init__.py
    /celery_worker.py  # worker定義
    /celery_tasks.py   # tasks定義
    /celery_run.py      # 調用

proj/celery_worker.py

 # celery test -- worker
from celery import Celery

di_broker = 'redis://:123@192.168.199.113:6379/0'
di_backend = 'redis://:123@192.168.199.113:6379/1'

def create_worker():
    # app = Celery('tasks', broker=di)
    app = Celery('tasks',
             backend=di_backend,
             broker=di_broker,
            include=['code_2.celery_tasks'])

    app.conf.update(result_expires=3600,)
    return app

app = create_worker()


if __name__ == '__main__':
    app.start()
    

proj/celery_tasks.py

from celery_worker import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

proj/celery_run.py

# celery test
from celery_tasks import add
rv = add.delay(4, 4)

out = rv.get(timeout=1)
print(out)
out = rv.ready()
print(out)

start the woker
celery -A celery_tasks worker -l info -P eventlet

stopping the woker
ctrl+c

實驗環境搭建完成,下面測試複雜一點的功能。

4.calling tasks

接口

add(4, 4) # 本地調用
add.delay(4, 4) # worker執行
This method is actually a star-argument shortcut to another method called apply_async():
add.apply_async((2, 2))
能夠使用更多參數
add.apply_async((2, 2), queue='lopri', countdown=10)
上句表明任務發送到lopri隊列,至少等待10秒才執行

每一個任務都會被賦與一個id
The delay and apply_async methods return an AsyncResult instance
若是指定了backend,能夠查看任務的執行狀況

res = add.delay(2, 2)
res.get(timeout=1)
4

You can find the task’s id by looking at the id attribute:
res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114

You can also inspect the exception and traceback if the task raised an exception, in fact result.get() will propagate any errors by default:
res = add.delay(2)
res.get(timeout=1)

If you don’t wish for the errors to propagate then you can disable that by passing the propagate argument:
res.get(propagate=False)
TypeError('add() takes exactly 2 arguments (1 given)',)

5.server/worker

5.1 基礎講解

(vir_venv) E:\python\code>celery -A celery_tasks worker -l info -P eventlet

 -------------- celery@** v4.3.0 (rhubarb)
---- **** -----
--- * ***  * -- Windows-8.1-6.3. 2019-09-22 10:50:49
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x38ac527d30
- ** ---------- .> transport:   redis://:**@***:6379/0
- ** ---------- .> results:     redis://:**@***:6379/1
- *** --- * --- .> concurrency: 4 (eventlet) # 併發數
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

這裏使用eventlet表明每次任務都是在單獨線程中執行。
task events參數決定是否監視worker

5.2 後臺運行

celery multi start worker1 -A celery_worker -l info 
celery  multi restart w1 -A proj -l info  
celery multi stop w1 -A proj -l info
# 等待執行完成
celery multi stopwait w1 -A proj -l info 

6.task組合結構/工做流

task支持下面的方法:


add.signature((2, 2), countdown=10)
tasks.add(2, 2)
There’s also a shortcut using star arguments:

add.s(2, 2)
tasks.add(2, 2)

def func2():
r = add.s(2,2)
pr_type(r)
rv = r.delay()
out = rv.get(timeout=5)
print(out)
out = rv.ready()
print(out)

看起來它像partial,實質也是對tasks的一個封裝,使用它的目的是爲了構造更復雜的任務結構。
支持的組合結構以下:
group chain chord map starmap chunks

以group爲例:

>>> g = group(add.s(i) for i in xrange(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
相關文章
相關標籤/搜索