Celery學習筆記

轉載請註明出處:點我

 

個人第一篇博客!嘿嘿!html

在公司實習,接觸到的第一個項目就用到了Celery,以前是徹底沒有接觸過Celery這玩意,而後花了點時間仔細的研究了下怎麼用。在學習過程當中也遇到了些問題,因此把本身的學習過程記錄下來,供他人蔘考下。python

先說一下個人實驗環境:兩臺ubuntu的機子,一臺win7的機子,都安裝好了必須的軟件。用戶名爲atsgxxx的機子跑的是ubuntu的系統,Redis就運行在這個上面,另一臺ubuntu的機子的用戶名是sclu084。redis

Celery數據庫

那麼什麼是Celery呢?ubuntu

Celery是一個用Python開發的異步的分佈式任務調度模塊。服務器

 

Celery自己不包含消息服務,使用第三方消息服務,也就是Broker,來傳遞任務,目前支持的有Rebbimq,Redis,數據庫以及其餘的一些好比Amazon SQS,Monogdb和IronMQ 網絡

 

由於項目裏面用的是Redis,因此這裏以Redis做爲Broker。app

安裝Celery異步

sudo apt-get install celery分佈式

使用Redis做爲Broker的話,能夠二者一塊安裝

sudo pip install -U celery[redis]

固然若是正式生產環境中,有可能redis服務器和Celery在不一樣的機器上面的話,就要二者單獨安裝

sudo apt-get install redis-server 這個命令能夠安裝redis,包括了redi-cli工具

第一個簡單的例子

這個例子來自於Celery的官方文檔。先看代碼:

 

1 from celery import Celery
2 app = Celery('tasks',broker="redis://127.0.0.1:6379/0")
3 
4 @app.task
5 def add(x,y):
6         return x + y

 

把代碼保存爲tasks.py文件(這個例子運行在atsgxxx這臺機器上,上面運行了Redis,因此broker是127.0.0.1)。而後再terminal下啓動worker。

celery -A tasks worker -l info

這個命令會啓動一個worker來執行task。執行完這條命令後,不出意外的出現下面這個界面的話表示worker已經啓動成功,正在等待執行任務。

而後啓動另一個終端,進入python工做環境,執行任務,以下圖所示:

調用delay函數便可啓動add這個任務,add函數的參數爲4,4,這個函數的效果是發送一條消息到broker中去,這個消息包括要執行的函數已經執行函數的參數,還有一些其餘信息,具體的能夠看Celery的文檔。

由於以前已經啓動了一個worker,這個worker會等待broker中的消息,一旦收到消息就會馬上執行消息
啓動了一個任務以後,能夠看到以前啓動的worker已經開始執行任務了。效果以下圖所示:

從上圖中能夠看到,任務已經被執行成功。

Celery與分佈式

既然Celery是一個分佈式的任務調度模塊,那麼Celery是如何跟分佈式掛上鉤的呢?首先得明白什麼是分佈式。個人理解是所謂的分佈式就是由多臺分佈在不一樣地方的計算機經過網絡共同完成任務。在Celery裏面,就能夠是多臺不一樣的計算機執行不一樣的任務或者是相同的任務。
若是要說Celery的分佈式應用的話,我以爲就要提到Celery的消息路由機制,就要提一下AMQP協議。具體的能夠查看AMQP的文檔。簡單地說就是能夠有多個消息隊列(Message Queue),不一樣的消息能夠指定發送給不一樣的Message Queue,而這是經過Exchange來實現的。發送消息到Message Queue中時,能夠指定routiing_key,Exchange經過routing_key來把消息路由(routes)到不一樣的Message Queue中去。具體的能夠參考下這個網頁,上面講的很詳細的了。

如今來看下代碼:(代碼實現的功能是在兩臺ubuntu上面啓動worker,每一個worker執行指定的Queue中的Task,而後在win7上面執行消息。同時演示了默認消息隊列的使用。)

 1 from celery import Celery
 2 
 3 app = Celery()
 4 app.config_from_object("celeryconfig")
 5 
 6 @app.task
 7 def taskA(x,y):
 8     return x + y
 9 
10 @app.task
11 def taskB(x,y,z):
12      return x + y + z
13 
14 @app.task
15 def add(x,y):
16     return x + y

上面的tasks.py中,首先定義了一個Celery對象,而後用celeryconfig.py對celery對象進行設置,以後再分別定義了三個task,分別是taskA,taskB和add。接下來看一下celeryconfig.py文件

 1 from kombu import Exchange,Queue
 2 
 3 BROKER_URL = "redis://10.32.105.227:6379/0" CELERY_RESULT_BACKEND = "redis://10.32.105.227:6379/0"
 4 
 5 CELERY_QUEUES = (
     Queue("default",Exchange("default"),routing_key="default"),
     Queue("for_task_A",Exchange("for_task_A"),routing_key="task_a"),
     Queue("for_task_B",Exchange("for_task_B"),routing_key="task_a")
   )
6 7 CELERY_ROUTES = { 8 'tasks.taskA':{"queue":"for_task_A","routing_key":"task_a"}, 9 'tasks.taskB":{"queue":"for_task_B","routing_key:"task_b"} 10 }

在celeryconfig.py文件中,首先設置了brokel以及result_backend,接下來定義了三個Message Queue,而且指明瞭Queue對應的Exchange(當使用Redis做爲broker時,Exchange的名字必須和Queue的名字同樣)以及routing_key的值。

如今如今其中一臺ubuntu上面啓動一個worker,這個worker只執行for_task_A隊列中的消息,這是經過在啓動worker是使用-Q Queue_Name參數指定的。

celery -A tasks worker -l info -n worker.%h -Q for_task_A

其中-n參數表示這個worker的name,-Q參數指定了這個worker執行for_task_A隊列中的消息。執行結果以下圖所示:

上面的執行結果代表名字爲worker.atsgxxx的任務已經啓動,等待執行for_task_A中的任務。

而後再win7上面執行taskA任務。在win7上,進入CMD,切換當前目錄到代碼坐在的工程下,啓動python,執行下面代碼啓動taskA:

from tasks import *

task_A_re = taskA.delay(100,200)

執行完上面的代碼以後,task_A消息會被當即發送到for_task_A隊列中去。此時已經啓動的worker.atsgxxx 會當即執行taskA任務。效果以下圖所示:

能夠看到taskA已經被worker.atsgxxx執行成功.

而後再win7上面查看taskA的執行狀態:

也顯示taskA已經成功被執行了。

重複上面的過程,在另一臺機器上啓動一個worker專門執行for_task_B中的任務,在win7上執行taskB任務。整個過程及結果以下面的圖所示:

在上面的tasks.py文件中還定義了add任務,可是在celeryconfig.py文件中沒有指定這個任務route到那個Queue中去執行,此時執行add任務的時候,add會route到Celery默認的名字叫作celery的隊列中去。

下面如今wind7上面執行add任務,而後再另一個終端上面啓動一個worker執行名字爲celery的隊列中的消息(這個名字叫作celery的Queue不是咱們定義的,是Celery默認的)。結果以下圖所示:

此時能夠看到add任務的狀態是PENDING,表示尚未被執行,由於這個消息沒有在celeryconfig.py文件中指定應該route到哪個Queue中,因此會被髮送到默認的名字爲celery的Queue中,可是咱們尚未啓動worker執行celery中的任務。接下來咱們在啓動一個worker執行celery隊列中的任務。

 1 celery -A tasks worker -l info -n worker.%h -Q celery 

而後再查看add的狀態,會發現狀態由PENDING變成了SUCCESS。效果以下圖所示:

Celery與定時任務

在celery中執行定時任務很是簡單,只須要設置celery對象的CELERYBEAT_SCHEDULE屬性便可。

下面咱們接着上面的代碼,在celeryconfig.py中添加CELERYBEAT_SCHEDULE變量:

 

 1 CELERY_TIMEZONE = 'UTC'
 2 CELERYBEAT_SCHEDULE = {
 3     'taskA_schedule' : {
 4         'task':'tasks.taskA',
 5         'schedule':20,
 6         'args':(5,6)
 7     },
 8     'taskB_scheduler' : {
 9         'task':"tasks.taskB",
10         "schedule":200,
11         "args":(10,20,30)
12     },
13     'add_schedule': {
14         "task":"tasks.add",
15         "schedule":10,
16         "args":(1,2)
17     }
18 }

 

其中定義了3個定時任務,即每隔20s執行taskA任務,參數爲(5,6),每隔200s執行taskB任務,參數爲(10,20,30),每隔10s執行add任務,參數爲(1,2).經過下列命令啓動一個定時任務:

 1 celery -A tasks beat 

使用beat參數便可啓動定時任務。

下面分別在三臺機器上面啓動三個worker分別執行for_task_A,for_task_B和celery這三個Queue中的任務。啓動以後,再在其中一臺機器上面啓動定時任務。結果以下圖所示(第一張爲啓動定時任務一段時間後的截圖):

 

能夠看到一旦scheduler啓動起來,就會按照CELERYBEAT_SCHEDULE指定的時間執行指定的任務。而後已經啓動的worker已接受到一消息就會執行任務,以下圖所示:

相關文章
相關標籤/搜索