爲啥要學這個?在作測試的時候,對於一些特殊場景,好比凌晨3點執行一批測試集,或者在前端發送100個請求時,而每一個請求響應至少1s以上,用戶不可能等着後端執行完成後,將結果返回給前端,這個時候須要一個異步任務隊列。而python提供一個分佈式異步消息任務隊列------- Celery。php
任務隊列通常用於線程或計算機之間分配工做的一種機制。html
任務隊列的輸入是一個稱爲任務的工做單元,有專門的職程(Worker)進行不斷的監視任務隊列,進行執行新的任務工做。前端
Celery 經過消息機制進行通訊,一般使用中間人(Broker)做爲客戶端和職程(Worker)調節。啓動一個任務,客戶端向消息隊列發送一條消息,而後中間人(Broker)將消息傳遞給一個職程(Worker),最後由職程(Worker)進行執行中間人(Broker)分配的任務。node
Celery 能夠有多個職程(Worker)和中間人(Broker),用來提升Celery的高可用性以及橫向擴展能力。python
Celery 是用 Python 編寫的,但協議能夠用任何語言實現。除了 Python 語言實現以外,還有Node.js的node-celery和php的celery-php。git
能夠經過暴露 HTTP 的方式進行,任務交互以及其它語言的集成開發。github
Celery 是一個異步任務隊列,一個Celery有三個核心組件:web
一、Celery 客戶端: 用於發佈後臺做業;當與 Flask 一塊兒工做的時候,客戶端與 Flask 應用一塊兒運行。redis
二、Celery workers: 運行後臺做業的進程。Celery 支持本地和遠程的 workers,能夠在本地服務器上啓動一個單獨的 worker,也能夠在遠程服務器上啓動worker,須要拷貝代碼;數據庫
三、消息代理: 客戶端經過消息隊列和 workers 進行通訊,Celery 支持多種方式來實現這些隊列。最經常使用的代理就是 RabbitMQ 和 Redis。
前面引言中已經說了兩種,這裏再列舉一下:
一、Celery 是一個 基於python開發的分佈式異步消息任務隊列,經過它能夠輕鬆的實現任務的異步處理,
若是你的業務場景中須要用到異步任務,就能夠考慮使用celery
二、你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,
你過一段時間只須要拿着這個任務id就能夠拿到任務執行結果, 在任務執行ing進行時,你能夠繼續作其它的事情
三、Celery 在執行任務時須要經過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 通常使用rabbitMQ or Redis
一、簡單:一單熟悉了celery的工做流程後,配置和使用仍是比較簡單的
二、高可用:當任務執行失敗或執行過程當中發生鏈接中斷,celery 會自動嘗試從新執行任務
三、快速:一個單進程的celery每分鐘可處理上百萬個任務
四、靈活: 幾乎celery的各個組件均可以被擴展及自定製
草圖:
user:用戶程序,用於告知celery去執行一個任務。就是produce消息提供者。
broker: 存聽任務(依賴RabbitMQ或Redis,進行存儲)
worker:執行任務
一、方便查看定時任務的執行狀況, 如 是否成功, 當前狀態, 執行任務花費的時間等.
二、使用功能齊備的管理後臺或命令行添加,更新,刪除任務.
三、方便把任務和配置管理相關聯.
四、可選 多進程, Eventlet 和 Gevent 三種模型併發執行.
五、提供錯誤處理機制.
六、提供多種任務原語, 方便實現任務分組,拆分,和調用鏈.
七、支持多種消息代理和存儲後端.
八、Celery 是語言無關的.它提供了python 等常見語言的接口支持.
畫一個簡單的架構圖,幫助理解:
角色:
一、Celery Beat : 任務調度器. Beat 進程會讀取配置文件的內容, 週期性的將配置中到期須要執行的任務發送給任務隊列.
2、Celery Worker : 執行任務的消費者, 一般會在多臺服務器運行多個消費者, 提升運行效率.
三、Broker : 消息代理, 隊列自己. 也稱爲消息中間件. 接受任務生產者發送過來的任務消息, 存進隊列再按序分發給任務消費方(一般是消息隊列或者數據庫).
四、Producer : 任務生產者. 調用 Celery API , 函數或者裝飾器, 而產生任務並交給任務隊列處理的都是任務生產者.
5、Result Backend : 任務處理完成以後保存狀態信息和結果, 以供查詢.
任務執行方式:
1.發佈者發佈任務(WEB 應用)
2.任務調度定期發佈任務(定時任務)
依賴庫:
billiard : 基於 Python2.7 的 multisuprocessing 而改進的庫, 主要用來提升性能和穩定性. librabbitmp : C 語言實現的 Python 客戶端 kombu : Celery 自帶的用來收發消息的庫, 提供了符合 Python 語言習慣的, 使用 AMQP 協議的高級藉口. 這三個庫, 都由 Celery 的開發者開發和維護.
使用於生產環境的消息代理有 RabbitMQ 和 Redis, 官方推薦 RabbitMQ.
這裏我使用redis。
BROKER_URL = 'redis://localhost:6379' #代理人 CELERY_RESULT_BACKEND = 'redis://localhost:6379' #結果存儲地址 CELERY_ACCEPT_CONTENT = ['application/json'] #指定任務接收的內容序列化類型 CELERY_TASK_SERIALIZER = 'json' #任務序列化方式 CELERY_RESULT_SERIALIZER = 'json' #任務結果序列化方式 CELERY_TASK_RESULT_EXPIRES = 12 * 30 #超過期間 CELERY_MESSAGE_COMPRESSION = 'zlib' #是否壓縮 CELERYD_CONCURRENCY = 4 #併發數默認已CPU數量定 CELERYD_PREFETCH_MULTIPLIER = 4 #celery worker 每次去redis取任務的數量 CELERYD_MAX_TASKS_PER_CHILD = 3 #每一個worker最多執行3個任務就摧毀,避免內存泄漏 CELERYD_FORCE_EXECV = True #能夠防止死鎖 CELERY_ENABLE_UTC = False #關閉時區 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定時任務調度器
一、delay
task.delay(args1, args2, kwargs=value_1, kwargs2=value_2)
二、apply_async
delay 其實是 apply_async 的別名, 還可使用以下方法調用, 可是 apply_async 支持更多的參數:
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})
支持參數:
一、countdown : 等待一段時間再執行
add.apply_async((2,3), countdown=5)
二、eta : 定義任務的開始時間.
add.apply_async((2,3), eta=now+tiedelta(second=10))
三、expires : 設置超時時間.
add.apply_async((2,3), expires=60)
四、retry : 定時若是任務失敗後, 是否重試.
add.apply_async((2,3), retry=False)
五、retry_policy : 重試策略.
一、max_retries : 最大重試次數, 默認爲 3 次.
二、interval_start : 重試等待的時間間隔秒數, 默認爲 0 , 表示直接重試不等待.
三、interval_step : 每次重試讓重試間隔增長的秒數, 能夠是數字或浮點數, 默認爲 0.2
四、interval_max : 重試間隔最大的秒數, 即 經過 interval_step 增大到多少秒以後, 就不在增長了, 能夠是數字或者浮點數, 默認爲 0.2 .
自定義發佈者,交換機,路由鍵, 隊列, 優先級,序列方案和壓縮方法:
task.apply_async((2,2), compression='zlib', serialize='json', queue='priority.high', routing_key='web.add', priority=0, exchange='web_exchange')
在客戶端和消費者之間傳輸數據須要 序列化和反序列化. Celery 支出的序列化方案以下所示:
CELERY_TASK_SERIALIZER=json
官方:https://docs.celeryproject.org/en/latest/index.html
中文手冊:https://www.celerycn.io/
以上就是Celery的簡介,有興趣的朋友能夠看看,或者進羣討論交流一下。