最近研究了下異步任務神器-Celery,發現很是好用,能夠說是高可用,假如你發出一個任務執行命令給 Celery,只要 Celery 的執行單元 (worker)
在運行,那麼它必定會執行;若是執行單元 (worker) 出現故障,如斷電,斷網狀況下,只要執行單元 (worker) 恢復運行,那麼它會繼續執行你已經發出的命令。這一點有很強的實用價值:假若有交易系統接到了大量交易請求,主機卻掛了,但前端用戶仍能夠繼續發交易請求,發送交易請求後,用戶無需等待。待主機恢復後,已發出的交易請求能夠繼續執行,只不過用戶收到交易確認的時間延長而已,但並不影響用戶體驗。前端
它是一個異步任務調度工具,用戶使用 Celery 產生任務,借用中間人來傳遞任務,任務執行單元從中間人那裏消費任務。任務執行單元能夠單機部署,也能夠分佈式部署,所以 Celery 是一個高可用的生產者消費者模型的異步任務隊列。你能夠將你的任務交給 Celery 處理,也可讓 Celery 自動按 crontab 那樣去自動調度任務,而後去作其餘事情,你能夠隨時查看任務執行的狀態,也可讓 Celery 執行完成後自動把執行結果告訴你。node
高併發的請求任務。互聯網已經普及,人們的衣食住行中產生的交易均可以線上進行,這就避免不了某些時間極高的併發任務請求,如公司中常見的購買理財、學生繳費,在理財產品投放市場後、開學前的一段時間,交易量猛增,確認交易時間較長,此時能夠把交易請求任務交給 Celery 去異步執行,執行完再將結果返回給用戶。用戶提交後不須要等待,任務完成後會通知到用戶(購買成功或繳費成功),提升了網站的總體吞吐量和響應時間,幾乎不須要增長硬件成本便可知足高併發。python
定時任務。在雲計算,大數據,集羣等技術愈來愈普及,生產環境的機器也愈來愈多,定時任務是避免不了的,若是每臺機器上運行着本身的 crontab 任務,管理起來至關麻煩,例如當進行災備切換時,某些 crontab 任務可能須要單獨手工調起,給運維人員形成極大的麻煩,有了 Celery ,你能夠集中管理全部機器的定時任務,並且災備不管什麼時候切換,crontab 任務總能正確的執行。web
異步任務。 一些耗時較長的操做,好比 I/O 操做,網絡請求,能夠交給 Celery 去異步執行,用戶提交後能夠作其餘事情,當任務完成後將結果返回用戶便可,可提升用戶體驗。正則表達式
純 Python 編寫,開源。這已是站在巨人的肩膀上了,雖然 Celery 是由純 Python 編寫的,但協議能夠用任何語言實現。迄今,已有 Ruby 實現的 RCelery 、node.js 實現的 node-celery 以及一個 PHP 客戶端 ,語言互通也能夠經過 using webhooks 實現。redis
靈活的配置。默認的配置已經知足絕大多數需求,所以你不須要編寫配置文件基本就可使用,固然若是有個性化地定製,你能夠選擇使用配置文件,也能夠將配置寫在源代碼文件裏。sql
方便監控。任務的全部狀態,均在你的掌握之下。mongodb
完善的錯誤處理。數據庫
靈活的任務隊列和任務路由。你能夠很是方便地將一個任務運行在你指定的隊列上,這叫任務路由。json
學習一個工具,最好先從它的架構理解,輔以快速入門的代碼來實踐,最深刻的就是閱讀他的源碼了,下圖是 Celery 的架構圖。
任務生產者 :調用Celery提供的API,函數,裝飾器而產生任務並交給任務隊列的都是任務生產者。
任務調度 Beat:Celery Beat進程會讀取配置文件的內容,週期性的將配置中到期須要執行的任務發送給任務隊列
中間人(Broker):Celery 用消息通訊,一般使用中間人(Broker)在客戶端和 worker 以前傳遞,這個過程從客戶端向隊列添加消息開始,以後中間人把消息派送給 worker。官方給出的實現Broker的工具備:
名稱 | 狀態 | 監視 | 遠程控制 |
---|---|---|---|
RabbitMQ | 穩定 | 是 | 是 |
Redis | 穩定 | 是 | 是 |
Mongo DB | 實驗性 | 是 | 是 |
Beanstalk | 實驗性 | 否 | 否 |
Amazon SQS | 實驗性 | 否 | 否 |
Couch DB | 實驗性 | 否 | 否 |
Zookeeper | 實驗性 | 否 | 否 |
Django DB | 實驗性 | 否 | 否 |
SQLAlchemy | 實驗性 | 否 | 否 |
Iron MQ | 第三方 | 否 | 否 |
在實際使用中咱們選擇 RabbitMQ 或 Redis 做爲中間人便可。
執行單元 worker:worker 是任務執行單元,是屬於任務隊列的消費者,它持續地監控任務隊列,當隊列中有新地任務時,它便取出來執行。worker 能夠運行在不一樣的機器上,只要它指向同一個中間人便可,worker還能夠監控一個或多個任務隊列, Celery 是分佈式任務隊列的重要緣由就在於 worker 能夠分佈在多臺主機中運行。修改配置文件後不須要重啓 worker,它會自動生效。
任務結果存儲backend:用來持久存儲 Worker 執行任務的結果,Celery支持不一樣的方式存儲任務的結果,包括AMQP,Redis,memcached,MongoDb,SQLAlchemy等。
以 Python3.6.5 版本爲例。
pip install celery #安裝celery pip install celery[librabbitmq,redis,auth,msgpack] #安裝celery對應的依賴
celery其餘的依賴包以下:
序列化:
celery[auth]:使用auth序列化。
celery[msgpack]:使用msgpack序列化。
celery[yaml]:使用yaml序列化。
併發:
celery[eventlet]:使用eventlet池。
celery[gevent]:使用gevent池。
celery[threads]:使用線程池。
傳輸和後端:
celery[librabbitmq]:使用librabbitmq的C庫.
celery[redis]:使用Redis做爲消息傳輸方式或結果後端。
celery[mongodb]:使用MongoDB做爲消息傳輸方式(實驗性),或是結果後端(已支持)。
celery[sqs]:使用AmazonSQS做爲消息傳輸方式(實驗性)。
celery[memcache]:使用memcache做爲結果後端。
celery[cassandra]:使用ApacheCassandra做爲結果後端。
celery[couchdb]:使用CouchDB做爲消息傳輸方式(實驗性)。
celery[couchbase]:使用CouchBase做爲結果後端。
celery[beanstalk]:使用Beanstalk做爲消息傳輸方式(實驗性)。
celery[zookeeper]:使用Zookeeper做爲消息傳輸方式。
celery[zeromq]:使用ZeroMQ做爲消息傳輸方式(實驗性)。
celery[sqlalchemy]:使用SQLAlchemy做爲消息傳輸方式(實驗性),或做爲結果後端(已支持)。
celery[pyro]:使用Pyro4消息傳輸方式(實驗性)。
celery[slmq]:使用SoftLayerMessageQueue傳輸(實驗性)。
經過源碼安裝:
$ wget http://download.redis.io/releases/redis-4.0.11.tar.gz $ tar xzf redis-4.0.11.tar.gz $ cd redis-4.0.11 $ make
修改 redis 配置文件 redis.conf,修改bind = 127.0.0.0.1爲bind = 0.0.0.0,意思是容許遠程訪問redis數據庫。
啓動 redis-server
$ cd src $ ./redis-server ../redis.conf
功能:模擬一個耗時操做,並打印 worker 所在機器的 IP 地址,中間人和結果存儲都使用 redis 數據庫。
#encoding=utf-8 #filename my_first_celery.py from celery import Celery import time import socket app = Celery(''tasks'', broker='redis://127.0.0.1:6379/0',backend ='redis://127.0.0.1:6379/0' ) def get_host_ip(): """ 查詢本機ip地址 :return: ip """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip @app.task def add(x, y): time.sleep(3) # 模擬耗時操做 s = x + y print("主機IP {}: x + y = {}".format(get_host_ip(),s)) return s
啓動這個 worker:
celery -A my_first_celery worker -l info
這裏,-A 表示咱們的程序的模塊名稱,worker 表示啓動一個執行單元,-l 是批 -level,表示打印的日誌級別。可使用 celery –help 命令來查看celery命令的幫助文檔。執行命令後,worker界面展現信息以下:
aaron@ubuntu:~/project$ celery -A my_first_celery worker -l info
-------------- celery@ubuntu v4.2.1 (windowlicker) ---- **** ----- --- * *** * -- Linux-4.10.0-37-generic-x86_64-with-Ubuntu-16.04-xenial 2018-08-27 22:46:00 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f1ce0747080 - ** ---------- .> transport: redis://127.0.0.1:6379/0 - ** ---------- .> results: redis://127.0.0.1:6379/0 - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . my_first_celery.add [2018-08-27 22:46:00,726: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0 [2018-08-27 22:46:00,780: INFO/MainProcess] mingle: searching for neighbors [2018-08-27 22:46:02,075: INFO/MainProcess] mingle: all alone [2018-08-27 22:46:02,125: INFO/MainProcess] celery@ubuntu ready.
已經至關清晰了。
若是你不想使用 celery 命令來啓動 worker,可直接使用文件來驅動,修改 my_first_celery.py (增長入口函數main)
if __name__ == '__main__': app.start()
再執行
python my_first_celery.py worker
便可。
在 my_first_celery.py 的同級目錄下編寫以下腳本 start_task.py以下。
from my_first_celery import add #導入咱們的任務函數add import time result = add.delay(12,12) #異步調用,這一步不會阻塞,程序會當即往下運行 while not result.ready():# 循環檢查任務是否執行完畢 print(time.strftime("%H:%M:%S")) time.sleep(1) print(result.get()) #獲取任務的返回結果 print(result.successful()) #判斷任務是否成功執行
執行
python start_task.py
結果以下所示:
22:50:59 22:51:00 22:51:01 24 True
發現等待了大約3秒鐘後,任務返回告終果24,而且是成功完成,此時worker界面增長的信息以下:
[2018-08-27 22:50:58,840: INFO/MainProcess] Received task: my_first_celery.add[a0c4bb6b-17af-474c-9eab-407d593a7807] [2018-08-27 22:51:01,898: WARNING/ForkPoolWorker-1] 主機IP 192.168.195.128: x + y = 24 [2018-08-27 22:51:01,915: INFO/ForkPoolWorker-1] Task my_first_celery.add[a0c4bb6b-17af-474c-9eab-407d593a7807] succeeded in 3.067237992000173s: 24
這裏的信息很是詳細,其中a0c4bb6b-17af-474c-9eab-407d593a7807是taskid,只要指定了 backend,根據這個 taskid 能夠隨時去 backend 去查找運行結果,使用方法以下:
>>> from my_first_celery import add >>> taskid= 'a0c4bb6b-17af-474c-9eab-407d593a7807' >>> add.AsyncResult(taskid).get() 24 >>>#或者 >>> from celery.result import AsyncResult >>> AsyncResult(taskid).get() 24
重要說明:若是想遠程執行 worker 機器上的做業,請將 my_first_celery.py 和 start_tasks.py 複製到遠程主機上(須要安裝
celery),修改 my_first_celery.py 指向同一個中間人和結果存儲,再執行 start_tasks.py 便可遠程執行 worker 機器上的做業。my_first_celery.add函數的代碼不是必須的,你也要以這樣調用任務:
from my_first_celery import app app.send_task("my_first_celery.add",args=(1,3))
在生產環境中每每有大量的任務須要調度,單獨一個文件是不方便的,celery 固然支持模塊化的結構,我這裏寫了一個用於學習的 Celery 小型工程項目,含有隊列操做,任務調度等實用操做,目錄樹以下所示:
其中 init.py是空文件,目的是告訴 Python myCeleryProj 是一個可導入的包.
app.py
from celery import Celery app = Celery("myCeleryProj", include=["myCeleryProj.tasks"]) app.config_from_object("myCeleryProj.settings") if __name__ == "__main__": app.start()
settings.py
from kombu import Queue import re from datetime import timedelta from celery.schedules import crontab CELERY_QUEUES = ( # 定義任務隊列 Queue("default", routing_key="task.#"), # 路由鍵以「task.」開頭的消息都進default隊列 Queue("tasks_A", routing_key="A.#"), # 路由鍵以「A.」開頭的消息都進tasks_A隊列 Queue("tasks_B", routing_key="B.#"), # 路由鍵以「B.」開頭的消息都進tasks_B隊列 ) CELERY_TASK_DEFAULT_QUEUE = "default" # 設置默認隊列名爲 default CELERY_TASK_DEFAULT_EXCHANGE = "tasks" CELERY_TASK_DEFAULT_EXCHANGE_TYPE = "topic" CELERY_TASK_DEFAULT_ROUTING_KEY = "task.default" CELERY_ROUTES = ( [ ( re.compile(r"myCeleryProj\.tasks\.(taskA|taskB)"), {"queue": "tasks_A", "routing_key": "A.import"}, ), # 將tasks模塊中的taskA,taskB分配至隊列 tasks_A ,支持正則表達式 ( "myCeleryProj.tasks.add", {"queue": "default", "routing_key": "task.default"}, ), # 將tasks模塊中的add任務分配至隊列 default ], ) # CELERY_ROUTES = ( # [ # ("myCeleryProj.tasks.*", {"queue": "default"}), # 將tasks模塊中的全部任務分配至隊列 default # ], # ) # CELERY_ROUTES = ( # [ # ("myCeleryProj.tasks.add", {"queue": "default"}), # 將add任務分配至隊列 default # ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),# 將taskA任務分配至隊列 tasks_A # ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),# 將taskB任務分配至隊列 tasks_B # ], # ) BROKER_URL = "redis://127.0.0.1:6379/0" # 使用redis 做爲消息代理 CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/0" # 任務結果存在Redis CELERY_RESULT_SERIALIZER = "json" # 讀取任務結果通常性能要求不高,因此使用了可讀性更好的JSON CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間,不建議直接寫86400,應該讓這樣的magic數字表述更明顯 CELERYBEAT_SCHEDULE = { "add": { "task": "myCeleryProj.tasks.add", "schedule": timedelta(seconds=10), "args": (10, 16), }, "taskA": { "task": "myCeleryProj.tasks.taskA", "schedule": crontab(hour=21, minute=10), }, "taskB": { "task": "myCeleryProj.tasks.taskB", "schedule": crontab(hour=21, minute=12), }, }
tasks.py
import os from myCeleryProj.app import app import time import socket def get_host_ip(): """ 查詢本機ip地址 :return: ip """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] finally: s.close() return ip @app.task def add(x, y): s = x + y time.sleep(3) # 模擬耗時操做 print("主機IP {}: x + y = {}".format(get_host_ip(), s)) return s @app.task def taskA(): print("taskA begin...") print(f"主機IP {get_host_ip()}") time.sleep(3) print("taskA done.") @app.task def taskB(): print("taskB begin...") print(f"主機IP {get_host_ip()}") time.sleep(3) print("taskB done.")
readme.txt
#啓動 worker #分別在三個終端窗口啓動三個隊列的worker,執行命令以下所示: celery -A myCeleryProj.app worker -Q default -l info celery -A myCeleryProj.app worker -Q tasks_A -l info celery -A myCeleryProj.app worker -Q tasks_B -l info #固然也能夠一次啓動多個隊列,以下則表示一次啓動兩個隊列tasks_A,tasks_B。 celery -A myCeleryProj.app worker -Q tasks_A,tasks_B -l info #則表示一次啓動兩個隊列tasks_A,tasks_B。 #最後咱們再開啓一個窗口來調用task: 注意觀察worker界面的輸出 >>> from myCeleryProj.tasks import * >>> add.delay(4,5);taskA.delay();taskB.delay() #同時發起三個任務 <AsyncResult: 21408d7b-750d-4c88-9929-fee36b2f4474> <AsyncResult: 737b9502-77b7-47a6-8182-8e91defb46e6> <AsyncResult: 69b07d94-be8b-453d-9200-12b37a1ca5ab> #也可使用下面的方法調用task >>> from myCeleryProj.app import app >>> app.send_task(myCeleryProj.tasks.add,args=(4,5) >>> app.send_task(myCeleryProj.tasks.taskA) >>> app.send_task(myCeleryProj.tasks.taskB)