在學習Celery以前,我先簡單的去了解了一下什麼是生產者消費者模式。
生產者消費者模式
在實際的軟件開發過程當中,常常會碰到以下場景:某個模塊負責產生數據,這些數據由另外一個模塊來負責處理(此處的模塊是廣義的,能夠是類、函數、線程、進程等)。產生數據的模塊,就形象地稱爲生產者;而處理數據的模塊,就稱爲消費者。
單單抽象出生產者和消費者,還夠不上是生產者消費者模式。該模式還須要有一個緩衝區處於生產者和消費者之間,做爲一箇中介。生產者把數據放入緩衝區,而消費者從緩衝區取出數據,以下圖所示:
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過消息隊列(緩衝區)來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給消息隊列,消費者不找生產者要數據,而是直接從消息隊列裏取,消息隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。這個消息隊列就是用來給生產者和消費者解耦的。------------->這裏又有一個問題,什麼叫作解耦?
解耦:假設生產者和消費者分別是兩個類。若是讓生產者直接調用消費者的某個方法,那麼生產者對於消費者就會產生依賴(也就是耦合)。未來若是消費者的代碼發生變化,可能會影響到生產者。而若是二者都依賴於某個緩衝區,二者之間不直接依賴,耦合也就相應下降了。生產者直接調用消費者的某個方法,還有另外一個弊端。因爲函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回以前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。緩衝區還有另外一個好處。若是製造數據的速度時快時慢,緩衝區的好處就體現出來了。當數據製造快的時候,消費者來不及處理,未處理的數據能夠暫時存在緩衝區中。等生產者的製造速度慢下來,消費者再慢慢處理掉。
由於太抽象,看過網上的說明以後,經過個人理解,我舉了個例子:吃包子。
假如你很是喜歡吃包子(吃起來根本停不下來),今天,你媽媽(生產者)在蒸包子,廚房有張桌子(緩衝區),你媽媽將蒸熟的包子盛在盤子(消息)裏,而後放到桌子上,你正在看巴西奧運會,看到蒸熟的包子放在廚房桌子上的盤子裏,你就把盤子取走,一邊吃包子一邊看奧運。在這個過程當中,你和你媽媽使用同一個桌子放置盤子和取走盤子,這裏桌子就是一個共享對象。生產者添加食物,消費者取走食物。桌子的好處是,你媽媽不用直接把盤子給你,只是負責把包子裝在盤子裏放到桌子上,若是桌子滿了,就再也不放了,等待。並且生產者還有其餘事情要作,消費者吃包子比較慢,生產者不能一直等消費者吃完包子把盤子放回去再去生產,由於吃包子的人有不少,若是這期間你好朋友來了,和你一塊兒吃包子,生產者不用關注是哪一個消費者去桌子上拿盤子,而消費者只去關注桌子上有沒有放盤子,若是有,就端過來吃盤子中的包子,沒有的話就等待。對應關係以下圖:
考察了一下,原來當初設計這個模式,主要就是用來處理併發問題的,而Celery就是一個用python寫的並行分佈式框架。
而後我接着去學習Celery
Celery的定義
Celery(芹菜)是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,而且提供維護這樣一個系統的必需工具。
我比較喜歡的一點是:Celery支持使用任務隊列的方式在分佈的機器、進程、線程上執行任務調度。而後我接着去理解什麼是任務隊列。
任務隊列
任務隊列是一種在線程或機器間分發任務的機制。
消息隊列
消息隊列的輸入是工做的一個單元,稱爲任務,獨立的職程(Worker)進程持續監視隊列中是否有須要處理的新任務。
Celery 用消息通訊,一般使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,以後中間人把消息派送給職程,職程對消息進行處理。以下圖所示:
Celery 系統可包含多個職程和中間人,以此得到高可用性和橫向擴展能力。
Celery的架構
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件
Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成,包括,RabbitMQ,Redis,MongoDB等,這裏我先去了解RabbitMQ,Redis。
任務執行單元
Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這裏我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。
而後我接着去安裝Celery,在安裝Celery以前,我已經在本身虛擬機上安裝好了Python,版本是2.7,是爲了更好的支持Celery的3.0以上的版本。
由於涉及到消息中間件,因此我先去選擇一個在我工做中要用到的消息中間件(在Celery幫助文檔中稱呼爲中間人<broker>),爲了更好的去理解文檔中的例子,我安裝了兩個中間件,一個是RabbitMQ,一個redis。
在這裏我就先根據Celery3.1的幫助文檔安裝和設置RabbitMQ, 要使用 Celery,咱們須要建立一個 RabbitMQ 用戶、一個虛擬主機,而且容許這個用戶訪問這個虛擬主機。下面是我在我的虛擬機Ubuntu14.04上的設置:
$ sudo rabbitmqctl add_user forward password
#建立了一個RabbitMQ用戶,用戶名爲forward,密碼是password
$ sudo rabbitmqctl add_vhost ubuntu
#建立了一個虛擬主機,主機名爲ubuntu
$ sudo rabbitmqctl set_permissions -p ubuntu forward ".*" ".*" ".*"
#容許用戶forward訪問虛擬主機ubuntu,由於RabbitMQ經過主機名來與節點通訊
$ sudo rabbitmq-server
以後我啓用RabbitMQ服務器,結果以下,成功運行:
以後我安裝Redis,它的安裝比較簡單,以下:
$ sudo pip install redis
而後進行簡單的配置,只須要設置 Redis 數據庫的位置:
BROKER_URL = 'redis://localhost:6379/0'
URL的格式爲:
redis://:password@hostname:port/db_number
URL Scheme 後的全部字段都是可選的,而且默認爲 localhost 的 6379 端口,使用數據庫 0。個人配置是:
redis://:password@ubuntu:6379/5
以後安裝Celery,我是用標準的Python工具pip安裝的,以下:
$ sudo pip install celery
爲了測試Celery可否工做,我運行了一個最簡單的任務,編寫tasks.py,以下圖所示:
編輯保存退出後,我在當前目錄下運行以下命令:
$ celery -A tasks worker --loglevel=info
#查詢文檔,瞭解到該命令中-A參數表示的是Celery APP的名稱,這個實例中指的就是tasks.py,後面的tasks就是APP的名稱,worker是一個執行任務角色,後面的loglevel=info記錄日誌類型默認是info,這個命令啓動了一個worker,用來執行程序中add這個加法任務(task)。
而後看到界面顯示結果以下:
咱們能夠看到Celery正常工做在名稱ubuntu的虛擬主機上,版本爲3.1.23,在下面的[config]中咱們能夠看到當前APP的名稱tasks,運輸工具transport就是咱們在程序中設置的中間人redis://127.0.0.1:6379/5,result咱們沒有設置,暫時顯示爲disabled,而後咱們也能夠看到worker缺省使用perfork來執行併發,當前併發數顯示爲1,而後能夠看到下面的[queues]就是咱們說的隊列,當前默認的隊列是celery,而後咱們看到下面的[tasks]中有一個任務tasks.add.
瞭解了這些以後,根據文檔我從新打開一個terminal,而後執行Python,進入Python交互界面,用delay()方法調用任務,執行以下操做:
這個任務已經由以前啓動的Worker異步執行了,而後我打開以前啓動的worker的控制檯,對輸出進行查看驗證,結果以下:
綠色部分第一行說明worker收到了一個任務:tasks.add,這裏咱們和以前發送任務返回的AsyncResult對比咱們發現,每一個task都有一個惟一的ID,第二行說明了這個任務執行succeed,執行結果爲12。
查看資料說調用任務後會返回一個AsyncResult實例,可用於檢查任務的狀態,等待任務完成或獲取返回值(若是任務失敗,則爲異常和回溯)。但這個功能默認是不開啓的,須要設置一個 Celery 的結果後端(backend),這塊我在下一個例子中進行了學習。
經過這個例子後我對Celery有了初步的瞭解,而後我在這個例子的基礎上去進一步的學習。
由於Celery是用Python編寫的,因此爲了讓代碼結構化一些,就像一個應用,我使用python包,建立了一個celery服務,命名爲pj。文件目錄以下:
celery.py
from __future __ import absolute_import
#定義將來文件的絕對進口,並且絕對進口必須在每一個模塊的頂部啓用。
from celery import Celery
#從celery導入Celery的應用程序接口
App.config_from_object(‘pj.config’)
#從config.py中導入配置文件
if __name__ == ‘__main__’:
app.start()
#執行當前文件,運行celery
app = Celery(‘pj’,
broker=‘redis://localhost’,
backend=‘redis://localhost’,
include=[‘pj.tasks’]
)
#首先建立了一個celery實例app,實例化的過程當中,制定了任務名pj(與當前文件的名字相同),Celery的第一個參數是當前模塊的名稱,在這個例子中就是pj,後面的參數能夠在這裏直接指定,也能夠寫在配置文件中,咱們能夠調用config_from_object()來讓Celery實例加載配置模塊,個人例子中的配置文件起名爲config.py,配置文件以下:
在配置文件中咱們能夠對任務的執行等進行管理,好比說咱們可能有不少的任務,可是我但願有些優先級比較高的任務先被執行,而不但願先進先出的等待。那麼須要引入一個隊列的問題. 也就是說在個人broker的消息存儲裏面有一些隊列,他們並行運行,可是worker只從對應 的隊列裏面取任務。在這裏咱們但願tasks.py中的add先被執行。task中我設置了兩個任務:
因此我經過from celery import group引入group,用來建立並行執行的一組任務。而後這塊現須要理解的就是這個@app.task,@符號在python中用做函數修飾符,到這塊我又回頭去看python的裝飾器(在代碼運行期間動態增長功能的方式)究竟是如何實現的,在這裏的做用就是經過task()裝飾器在可調用的對象(app)上建立一個任務。
瞭解完裝飾器後,我回過頭去整理配置的問題,前面提到任務的優先級問題,在這個例子中若是咱們想讓add這個加法任務優先於subtract減法任務被執行,咱們能夠將兩個任務放到不一樣的隊列中,由咱們決定先執行哪一個任務,咱們能夠在配置文件中這樣配置:
先了解了幾個經常使用的參數的含義:
Exchange:交換機,決定了消息路由規則;
Queue:消息隊列;
Channel:進行消息讀寫的通道;
Bind:綁定了Queue和Exchange,意即爲符合什麼樣路由規則的消息,將會放置入哪個消息隊列;
我將add這個函數任務放在了一個叫作for_add的隊列裏面,將subtract這個函數任務放在了一個叫作for_subtract的隊列裏面,而後我在當前應用目錄下執行命令:
這個worker就只負責處理for_add這個隊列的任務,執行這個任務:
任務已經被執行,我在worker控制檯查看結果:
能夠看到worker收到任務,而且執行了任務。
在這裏咱們仍是在交互模式下手動去執行,咱們想要crontab的定時生成和執行,咱們能夠用celery的beat去週期的生成任務和執行任務,在這個例子中我但願每10秒鐘產生一個任務,而後去執行這個任務,我能夠這樣配置:
使用了scheduler,要制定時區:CELERY_TIMEZONE = 'Asia/Shanghai',啓動celery加上-B的參數:
而且要在config.py中加入from datetime import timedelta。
更近一步,若是我但願在每週四的19點30分生成任務,分發任務,讓worker取走執行,能夠這樣配置:
看完這些基礎的東西,我回過頭對celery在回顧了一下,用圖把它的框架大體畫出來,以下圖: