Python開發【模塊】:Celery 分佈式異步消息任務隊列

Celery

前言:python

Celery 是一個 基於python開發的分佈式異步消息任務隊列,經過它能夠輕鬆的實現任務的異步處理, 若是你的業務場景中須要用到異步任務,就能夠考慮使用celery, 舉幾個實例場景中可用的例子:linux

  • 你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只須要拿着這個任務id就能夠拿到任務執行結果, 在任務執行ing進行時,你能夠繼續作其它的事情。 
  • 你想作一個定時任務,好比天天檢測一下大家全部客戶的資料,若是發現今天 是客戶的生日,就給他發個短信祝福

Celery有如下優勢:redis

  • 簡單:一單熟悉了celery的工做流程後,配置和使用仍是比較簡單的
  • 高可用:當任務執行失敗或執行過程當中發生鏈接中斷,celery 會自動嘗試從新執行任務
  • 快速:一個單進程的celery每分鐘可處理上百萬個任務
  • 靈活: 幾乎celery的各個組件均可以被擴展及自定製

Celery基本工做流程圖:app

 

一、 Celery安裝使用異步

Celery須要在linux的環境下運行:分佈式

# 安裝
[root@localhost celerys]# pip3 install celery

# 進入python import無異常表示安裝成功
[root@localhost celerys]# python3
>>> import celery

Celery的默認broker是RabbitMQ, 僅需配置一行就能夠函數

broker_url = 'amqp://guest:guest@localhost:5672//'

使用Redis作broker也能夠url

broker_url = 'redis://localhost:6379/0'
#redis://:password@hostname:port/db_number

 

二、簡單使用spa

建立一個任務文件就叫tasks.py:orm

from celery import Celery
import time 

app = Celery('cly',                                        # 任意
             broker='redis://192.168.1.166:6379/0',        # 中間件
             backend='redis://localhost')                  # 數據存儲
 
@app.task
def add(x,y):
    time.sleep(10)
    print("running...",x,y)
    return x+y

啓動Celery Worker來開始監聽並執行任務:

# 加入環境變量
[root@localhost ~]# PATH=$PATH:/usr/local/python3.5/bin/

# 啓動一個worker
[root@localhost celerys]# celery -A tasks worker --loglevel=info

調用任務:

[root@localhost celerys]# python3 
Python 3.5.2 (default, Jul  7 2017, 23:36:01) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-11)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add                   # import add
>>> add.delay(4,6)                          # 執行函數
<AsyncResult: 4b5a8ab6-693c-4ce5-b779-305cfcdf70cd>   # 返回taskid
>>> result = add.delay(4,6)                 # 執行函數
>>> result.get()                            # 同步獲取結果,一直等待
10

>>> result.get(timeout=1)                   # 設置超時時間,過時錯誤異常
Traceback (most recent call last):
    --strip--
celery.exceptions.TimeoutError: The operation timed out.

>>> result = add.delay(4,'a')               # 執行錯誤命令
>>> result.get()                            # get後獲取到錯誤信息,觸發異常
Traceback (most recent call last):
     --strip--
celery.backends.base.TypeError: unsupported operand type(s) for +: 'int' and 'str'
>>> result = add.delay(4,'a')
>>> result.get(propagate=False)             # propagate=False 不觸發異常,獲取錯誤信息
TypeError("unsupported operand type(s) for +: 'int' and 'str'",)
>>> result.traceback                        # 獲取具體錯誤信息 log打印用
'Traceback (most recent call last):\n  File "/usr/local/python3.5/lib/python3.5/site-packages/celery/app/trace.py", line 367, in trace_task\n    R = retval = fun(*args, **kwargs)\n  File "/usr/local/python3.5/lib/python3.5/site-packages/celery/app/trace.py", line 622, in __protected_call__\n    return self.run(*args, **kwargs)\n  File "/data/celerys/tasks.py", line 12, in add\n    return x+y\nTypeError: unsupported operand type(s) for +: \'int\' and \'str\'\n'

此時worker端收到的信息:

[2017-07-08 03:12:22,565: WARNING/PoolWorker-1] running...     # 獲取到任務
[2017-07-08 03:12:22,565: WARNING/PoolWorker-1] 4
[2017-07-08 03:12:22,565: WARNING/PoolWorker-1] 6              # 任務執行完畢數據存儲到backend端
[2017-07-08 03:12:22,567: INFO/PoolWorker-1] Task tasks.add[683e395e-48b9-4d32-b3bb-1492c62af393] succeeded in 10.01260852499945s: 10

查看broker(即192.168.1.166)端數據:

[root@localhost redis-3.0.6]# src/redis-cli 
127.0.0.1:6379> keys *
1) "_kombu.binding.celeryev"
2) "unacked_mutex"
3) "_kombu.binding.celery.pidbox"
4) "_kombu.binding.celery"

執行完後,backend端的數據:

[root@localhost redis-3.0.6]# src/redis-cli   # 程序get後,數據未被刪除
127.0.0.1:6379> keys *
1) "celery-task-meta-683e395e-48b9-4d32-b3bb-1492c62af393"
相關文章
相關標籤/搜索