20 python--celery

Celery概述

關於celery的定義,首先來看官方網站:Celery(芹菜) 是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,而且提供維護這樣一個系統的必需工具。html

簡單來看,是一個基於python開發的分佈式異步消息任務隊列,持使用任務隊列的方式在分佈的機器、進程、線程上執行任務調度。經過它能夠輕鬆的實現任務的異步處理, 若是你的業務場景中須要用到異步任務,就能夠考慮使用celery, 舉幾個實例場景中可用的例子:python

  1. 你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只須要拿着這個任務id就能夠拿到任務執行結果, 在任務執行ing進行時,你能夠繼續作其它的事情。   
  2. 你想作一個定時任務,好比天天檢測一下大家全部客戶的資料,若是發現今天 是客戶的生日,就給他發個短信祝福 。  

Celery 在執行任務時須要經過一箇中間人(消息中間件)來接收和發送任務消息,以及存儲任務結果,完整的中間人列表請查閱官方網站redis

PS:任務隊列是一種在線程或機器間分發任務的機制。架構

PS:消息隊列的輸入是工做的一個單元,稱爲任務,獨立的工做(Worker)進程持續監視隊列中是否有須要處理的新任務。 併發

Celery簡介

Celery 系統可包含多個職程和中間人,以此得到高可用性和橫向擴展能力,其基本架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。app

  1. 消息中間件,Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成,通常使用rabbitMQ or Redis,固然其餘的還有MySQL以及Mongodb。
  2. 任務執行單元,Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。
  3. 任務結果存儲,Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等。

Celery的主要特色:異步

  1. 簡單:一單熟悉了celery的工做流程後,配置和使用仍是比較簡單的
  2. 高可用:當任務執行失敗或執行過程當中發生鏈接中斷,celery 會自動嘗試從新執行任務
  3. 快速:一個單進程的celery每分鐘可處理上百萬個任務
  4. 靈活: 幾乎celery的各個組件均可以被擴展及自定製

Celery基本工做流程圖

一、用戶應用程序講任務經過celery放入Broker中。分佈式

  二、多個worker經過Broker獲取任務並執行。工具

  三、worker執行完成後,會把任務的結果、狀態等信息返回到Broker中存儲,供用戶程序讀取。網站

PS:Celery 用消息通訊,一般使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,以後中間人把消息派送給職程。

Celery模塊的基本使用

# 利用pip3命令安裝celery模塊

pip3 install celery
 
用來定義任務列表,這裏任務文件的名稱叫作task.py(注意後面會用到文件名)。
from celery import Celery
 
app = Celery('task',  # 是當前模塊的名稱,這個參數是必須的,這樣的話名稱能夠自動生成
    broker="redis://192.168.100.83:6379/0",     # 中間人的地址
    backend="redis://192.168.100.83:6379/1"   # 結果數據存放地址
)
 
@app.task     # 使用celery標識一個任務,多個任務都須要使用該裝飾器 
def add(x,y):    
    return x+y

其餘中間件

1 # rabbitmq
2 broker = 'amqp://user:password@ip:5672//'
3 
4 # redis
5 broker = 'redis://passwordf@ip:6379/db'

運行一個worker

elery -A task worker --loglevel=debug
 
# -A參數表示的是Celery APP的名稱,task就是APP的名稱(應用文件名)
# worker表示是一個執行任務角色
# loglevel=info記錄日誌類型默認是info

生產者

form ling import add
re = add.delay(10, 20)
print(re.result)    #獲取結果
print(re.ready)        #是否處理
print(re.get(timeout=1))        #獲取結果
print(re.status)        #是否處理
相關文章
相關標籤/搜索