分佈式異步任務隊列神器-Celery

最近研究了下異步任務神器-Celery,發現很是好用,能夠說是高可用,假如你發出一個任務執行命令給 Celery,只要 Celery 的執行單元 (worker)
在運行,那麼它必定會執行;若是執行單元 (worker) 出現故障,如斷電,斷網狀況下,只要執行單元 (worker) 恢復運行,那麼它會繼續執行你已經發出的命令。這一點有很強的實用價值:假若有交易系統接到了大量交易請求,主機卻掛了,但前端用戶仍能夠繼續發交易請求,發送交易請求後,用戶無需等待。待主機恢復後,已發出的交易請求能夠繼續執行,只不過用戶收到交易確認的時間延長而已,但並不影響用戶體驗。前端

Celery 簡介

它是一個異步任務調度工具,用戶使用 Celery 產生任務,借用中間人來傳遞任務,任務執行單元從中間人那裏消費任務。任務執行單元能夠單機部署,也能夠分佈式部署,所以 Celery 是一個高可用的生產者消費者模型的異步任務隊列。你能夠將你的任務交給 Celery 處理,也可讓 Celery 自動按 crontab 那樣去自動調度任務,而後去作其餘事情,你能夠隨時查看任務執行的狀態,也可讓 Celery 執行完成後自動把執行結果告訴你。node

應用場景:

  1. 高併發的請求任務。互聯網已經普及,人們的衣食住行中產生的交易均可以線上進行,這就避免不了某些時間極高的併發任務請求,如公司中常見的購買理財、學生繳費,在理財產品投放市場後、開學前的一段時間,交易量猛增,確認交易時間較長,此時能夠把交易請求任務交給 Celery 去異步執行,執行完再將結果返回給用戶。用戶提交後不須要等待,任務完成後會通知到用戶(購買成功或繳費成功),提升了網站的總體吞吐量和響應時間,幾乎不須要增長硬件成本便可知足高併發。python

  2. 定時任務。在雲計算,大數據,集羣等技術愈來愈普及,生產環境的機器也愈來愈多,定時任務是避免不了的,若是每臺機器上運行着本身的 crontab 任務,管理起來至關麻煩,例如當進行災備切換時,某些 crontab 任務可能須要單獨手工調起,給運維人員形成極大的麻煩,有了 Celery ,你能夠集中管理全部機器的定時任務,並且災備不管什麼時候切換,crontab 任務總能正確的執行。web

  3. 異步任務。 一些耗時較長的操做,好比 I/O 操做,網絡請求,能夠交給 Celery 去異步執行,用戶提交後能夠作其餘事情,當任務完成後將結果返回用戶便可,可提升用戶體驗。正則表達式

Celery 的優勢

  1. 純 Python 編寫,開源。這已是站在巨人的肩膀上了,雖然 Celery 是由純 Python 編寫的,但協議能夠用任何語言實現。迄今,已有 Ruby 實現的 RCelery 、node.js 實現的 node-celery 以及一個 PHP 客戶端 ,語言互通也能夠經過 using webhooks 實現。redis

  2. 靈活的配置。默認的配置已經知足絕大多數需求,所以你不須要編寫配置文件基本就可使用,固然若是有個性化地定製,你能夠選擇使用配置文件,也能夠將配置寫在源代碼文件裏。sql

  3. 方便監控。任務的全部狀態,均在你的掌握之下。mongodb

  4. 完善的錯誤處理。數據庫

  5. 靈活的任務隊列和任務路由。你能夠很是方便地將一個任務運行在你指定的隊列上,這叫任務路由。json

Celery 的架構

學習一個工具,最好先從它的架構理解,輔以快速入門的代碼來實踐,最深刻的就是閱讀他的源碼了,下圖是 Celery 的架構圖。

 
celery架構.png

任務生產者 :調用Celery提供的API,函數,裝飾器而產生任務並交給任務隊列的都是任務生產者。

任務調度 Beat:Celery Beat進程會讀取配置文件的內容,週期性的將配置中到期須要執行的任務發送給任務隊列

中間人(Broker):Celery 用消息通訊,一般使用中間人(Broker)在客戶端和 worker 以前傳遞,這個過程從客戶端向隊列添加消息開始,以後中間人把消息派送給 worker。官方給出的實現Broker的工具備:

名稱 狀態 監視 遠程控制
RabbitMQ 穩定
Redis 穩定
Mongo DB 實驗性
Beanstalk 實驗性
Amazon SQS 實驗性
Couch DB 實驗性
Zookeeper 實驗性
Django DB 實驗性
SQLAlchemy 實驗性
Iron MQ 第三方

在實際使用中咱們選擇 RabbitMQ 或 Redis 做爲中間人便可。

執行單元 worker:worker 是任務執行單元,是屬於任務隊列的消費者,它持續地監控任務隊列,當隊列中有新地任務時,它便取出來執行。worker 能夠運行在不一樣的機器上,只要它指向同一個中間人便可,worker還能夠監控一個或多個任務隊列, Celery 是分佈式任務隊列的重要緣由就在於 worker 能夠分佈在多臺主機中運行。修改配置文件後不須要重啓 worker,它會自動生效。

任務結果存儲backend:用來持久存儲 Worker 執行任務的結果,Celery支持不一樣的方式存儲任務的結果,包括AMQP,Redis,memcached,MongoDb,SQLAlchemy等。

Celery 的使用示例:

以 Python3.6.5 版本爲例。

1. 安裝 python 庫:celery,redis。

pip install celery #安裝celery pip install celery[librabbitmq,redis,auth,msgpack] #安裝celery對應的依賴 

celery其餘的依賴包以下:
序列化:
celery[auth]:使用auth序列化。
celery[msgpack]:使用msgpack序列化。
celery[yaml]:使用yaml序列化。
併發:
celery[eventlet]:使用eventlet池。
celery[gevent]:使用gevent池。
celery[threads]:使用線程池。
傳輸和後端:
celery[librabbitmq]:使用librabbitmq的C庫.
celery[redis]:使用Redis做爲消息傳輸方式或結果後端。
celery[mongodb]:使用MongoDB做爲消息傳輸方式(實驗性),或是結果後端(已支持)。
celery[sqs]:使用AmazonSQS做爲消息傳輸方式(實驗性)。
celery[memcache]:使用memcache做爲結果後端。
celery[cassandra]:使用ApacheCassandra做爲結果後端。
celery[couchdb]:使用CouchDB做爲消息傳輸方式(實驗性)。
celery[couchbase]:使用CouchBase做爲結果後端。
celery[beanstalk]:使用Beanstalk做爲消息傳輸方式(實驗性)。
celery[zookeeper]:使用Zookeeper做爲消息傳輸方式。
celery[zeromq]:使用ZeroMQ做爲消息傳輸方式(實驗性)。
celery[sqlalchemy]:使用SQLAlchemy做爲消息傳輸方式(實驗性),或做爲結果後端(已支持)。
celery[pyro]:使用Pyro4消息傳輸方式(實驗性)。
celery[slmq]:使用SoftLayerMessageQueue傳輸(實驗性)。

2. 安裝 Redis,以 ubuntu 操做系統爲例(若是使用 RabbitMQ,本身裝一下就能夠)。

經過源碼安裝:

$ wget http://download.redis.io/releases/redis-4.0.11.tar.gz $ tar xzf redis-4.0.11.tar.gz $ cd redis-4.0.11 $ make 

修改 redis 配置文件 redis.conf,修改bind = 127.0.0.0.1爲bind = 0.0.0.0,意思是容許遠程訪問redis數據庫。

啓動 redis-server

$ cd src
$ ./redis-server ../redis.conf

3. 第一個 celery 應用程序。

功能:模擬一個耗時操做,並打印 worker 所在機器的 IP 地址,中間人和結果存儲都使用 redis 數據庫。

#encoding=utf-8 #filename my_first_celery.py from celery import Celery import time import socket app = Celery(''tasks'', broker='redis://127.0.0.1:6379/0',backend ='redis://127.0.0.1:6379/0' ) def get_host_ip(): """ 查詢本機ip地址 :return: ip """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip @app.task def add(x, y): time.sleep(3) # 模擬耗時操做 s = x + y print("主機IP {}: x + y = {}".format(get_host_ip(),s)) return s 

啓動這個 worker:

celery -A my_first_celery worker -l info

這裏,-A 表示咱們的程序的模塊名稱,worker 表示啓動一個執行單元,-l 是批 -level,表示打印的日誌級別。可使用 celery –help 命令來查看celery命令的幫助文檔。執行命令後,worker界面展現信息以下:

aaron@ubuntu:~/project$ celery -A my_first_celery worker -l info 
 
 -------------- celery@ubuntu v4.2.1 (windowlicker) ---- **** ----- --- * *** * -- Linux-4.10.0-37-generic-x86_64-with-Ubuntu-16.04-xenial 2018-08-27 22:46:00 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f1ce0747080 - ** ---------- .> transport: redis://127.0.0.1:6379/0 - ** ---------- .> results: redis://127.0.0.1:6379/0 - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . my_first_celery.add [2018-08-27 22:46:00,726: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0 [2018-08-27 22:46:00,780: INFO/MainProcess] mingle: searching for neighbors [2018-08-27 22:46:02,075: INFO/MainProcess] mingle: all alone [2018-08-27 22:46:02,125: INFO/MainProcess] celery@ubuntu ready. 

已經至關清晰了。
若是你不想使用 celery 命令來啓動 worker,可直接使用文件來驅動,修改 my_first_celery.py (增長入口函數main)

if __name__ == '__main__': app.start() 

再執行

python my_first_celery.py worker

便可。

4. 調用任務

在 my_first_celery.py 的同級目錄下編寫以下腳本 start_task.py以下。

from my_first_celery import add #導入咱們的任務函數add import time result = add.delay(12,12) #異步調用,這一步不會阻塞,程序會當即往下運行 while not result.ready():# 循環檢查任務是否執行完畢 print(time.strftime("%H:%M:%S")) time.sleep(1) print(result.get()) #獲取任務的返回結果 print(result.successful()) #判斷任務是否成功執行 

執行

python start_task.py

結果以下所示:

22:50:59 22:51:00 22:51:01 24 True 

發現等待了大約3秒鐘後,任務返回告終果24,而且是成功完成,此時worker界面增長的信息以下:

[2018-08-27 22:50:58,840: INFO/MainProcess] Received task: my_first_celery.add[a0c4bb6b-17af-474c-9eab-407d593a7807] [2018-08-27 22:51:01,898: WARNING/ForkPoolWorker-1] 主機IP 192.168.195.128: x + y = 24 [2018-08-27 22:51:01,915: INFO/ForkPoolWorker-1] Task my_first_celery.add[a0c4bb6b-17af-474c-9eab-407d593a7807] succeeded in 3.067237992000173s: 24 

這裏的信息很是詳細,其中a0c4bb6b-17af-474c-9eab-407d593a7807是taskid,只要指定了 backend,根據這個 taskid 能夠隨時去 backend 去查找運行結果,使用方法以下:

>>> from my_first_celery import add >>> taskid= 'a0c4bb6b-17af-474c-9eab-407d593a7807' >>> add.AsyncResult(taskid).get() 24 >>>#或者 >>> from celery.result import AsyncResult >>> AsyncResult(taskid).get() 24 

重要說明:若是想遠程執行 worker 機器上的做業,請將 my_first_celery.py 和 start_tasks.py 複製到遠程主機上(須要安裝
celery),修改 my_first_celery.py 指向同一個中間人和結果存儲,再執行 start_tasks.py 便可遠程執行 worker 機器上的做業。my_first_celery.add函數的代碼不是必須的,你也要以這樣調用任務:

from my_first_celery import app app.send_task("my_first_celery.add",args=(1,3)) 

5. 第一個 celery 項目

在生產環境中每每有大量的任務須要調度,單獨一個文件是不方便的,celery 固然支持模塊化的結構,我這裏寫了一個用於學習的 Celery 小型工程項目,含有隊列操做,任務調度等實用操做,目錄樹以下所示:

 
celeryproj.png

其中 init.py是空文件,目的是告訴 Python myCeleryProj 是一個可導入的包.
app.py

from celery import Celery app = Celery("myCeleryProj", include=["myCeleryProj.tasks"]) app.config_from_object("myCeleryProj.settings") if __name__ == "__main__": app.start() 

settings.py

from kombu import Queue import re from datetime import timedelta from celery.schedules import crontab CELERY_QUEUES = ( # 定義任務隊列 Queue("default", routing_key="task.#"), # 路由鍵以「task.」開頭的消息都進default隊列 Queue("tasks_A", routing_key="A.#"), # 路由鍵以「A.」開頭的消息都進tasks_A隊列 Queue("tasks_B", routing_key="B.#"), # 路由鍵以「B.」開頭的消息都進tasks_B隊列 ) CELERY_TASK_DEFAULT_QUEUE = "default" # 設置默認隊列名爲 default CELERY_TASK_DEFAULT_EXCHANGE = "tasks" CELERY_TASK_DEFAULT_EXCHANGE_TYPE = "topic" CELERY_TASK_DEFAULT_ROUTING_KEY = "task.default" CELERY_ROUTES = ( [ ( re.compile(r"myCeleryProj\.tasks\.(taskA|taskB)"), {"queue": "tasks_A", "routing_key": "A.import"}, ), # 將tasks模塊中的taskA,taskB分配至隊列 tasks_A ,支持正則表達式 ( "myCeleryProj.tasks.add", {"queue": "default", "routing_key": "task.default"}, ), # 將tasks模塊中的add任務分配至隊列 default ], ) # CELERY_ROUTES = ( # [ # ("myCeleryProj.tasks.*", {"queue": "default"}), # 將tasks模塊中的全部任務分配至隊列 default # ], # ) # CELERY_ROUTES = ( # [ # ("myCeleryProj.tasks.add", {"queue": "default"}), # 將add任務分配至隊列 default # ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),# 將taskA任務分配至隊列 tasks_A # ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),# 將taskB任務分配至隊列 tasks_B # ], # ) BROKER_URL = "redis://127.0.0.1:6379/0" # 使用redis 做爲消息代理 CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0" # 任務結果存在Redis CELERY_RESULT_SERIALIZER = "json" # 讀取任務結果通常性能要求不高,因此使用了可讀性更好的JSON CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間,不建議直接寫86400,應該讓這樣的magic數字表述更明顯 CELERYBEAT_SCHEDULE = { "add": { "task": "myCeleryProj.tasks.add", "schedule": timedelta(seconds=10), "args": (10, 16), }, "taskA": { "task": "myCeleryProj.tasks.taskA", "schedule": crontab(hour=21, minute=10), }, "taskB": { "task": "myCeleryProj.tasks.taskB", "schedule": crontab(hour=21, minute=12), }, } 

tasks.py

import os from myCeleryProj.app import app import time import socket def get_host_ip(): """ 查詢本機ip地址 :return: ip """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] finally: s.close() return ip @app.task def add(x, y): s = x + y time.sleep(3) # 模擬耗時操做 print("主機IP {}: x + y = {}".format(get_host_ip(), s)) return s @app.task def taskA(): print("taskA begin...") print(f"主機IP {get_host_ip()}") time.sleep(3) print("taskA done.") @app.task def taskB(): print("taskB begin...") print(f"主機IP {get_host_ip()}") time.sleep(3) print("taskB done.") 

readme.txt

#啓動 worker #分別在三個終端窗口啓動三個隊列的worker,執行命令以下所示: celery -A myCeleryProj.app worker -Q default -l info celery -A myCeleryProj.app worker -Q tasks_A -l info celery -A myCeleryProj.app worker -Q tasks_B -l info #固然也能夠一次啓動多個隊列,以下則表示一次啓動兩個隊列tasks_A,tasks_B。 celery -A myCeleryProj.app worker -Q tasks_A,tasks_B -l info #則表示一次啓動兩個隊列tasks_A,tasks_B。 #最後咱們再開啓一個窗口來調用task: 注意觀察worker界面的輸出 >>> from myCeleryProj.tasks import * >>> add.delay(4,5);taskA.delay();taskB.delay() #同時發起三個任務 <AsyncResult: 21408d7b-750d-4c88-9929-fee36b2f4474> <AsyncResult: 737b9502-77b7-47a6-8182-8e91defb46e6> <AsyncResult: 69b07d94-be8b-453d-9200-12b37a1ca5ab> #也可使用下面的方法調用task >>> from myCeleryProj.app import app >>> app.send_task(myCeleryProj.tasks.add,args=(4,5) >>> app.send_task(myCeleryProj.tasks.taskA) >>> app.send_task(myCeleryProj.tasks.taskB)
相關文章
相關標籤/搜索