Python 並行分佈式框架 Celery

Celery 簡介

 

        除了redis,還可使用另一個神器---Celery。Celery是一個異步任務的調度工具。css

        Celery 是 Distributed Task Queue,分佈式任務隊列,分佈式決定了能夠有多個 worker 的存在,隊列表示其是異步操做,即存在一個產生任務提出需求的工頭,和一羣等着被分配工做的碼農。html

        在 Python 中定義 Celery 的時候,咱們要引入 Broker,中文翻譯過來就是「中間人」的意思,在這裏 Broker 起到一箇中間人的角色。在工頭提出任務的時候,把全部的任務放到 Broker 裏面,在 Broker 的另一頭,一羣碼農等着取出一個個任務準備着手作。python

        這種模式註定了整個系統會是個開環系統,工頭對於碼農們把任務作的怎樣是不知情的。因此咱們要引入 Backend 來保存每次任務的結果。這個 Backend 有點像咱們的 Broker,也是存儲任務的信息用的,只不過這裏存的是那些任務的返回結果。咱們能夠選擇只讓錯誤執行的任務返回結果到 Backend,這樣咱們取回結果,即可以知道有多少任務執行失敗了。mysql

        Celery(芹菜)是一個異步任務隊列/基於分佈式消息傳遞的做業隊列。它側重於實時操做,但對調度支持也很好。Celery用於生產系統天天處理數以百萬計的任務。Celery是用Python編寫的,但該協議能夠在任何語言實現。它也能夠與其餘語言經過webhooks實現。Celery建議的消息隊列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和數據庫(使用SQLAlchemy的或Django的 ORM) 。Celery是易於集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包便可。git

 

 

 

Celery 介紹

 

在Celery中幾個基本的概念,須要先了解下,否則不知道爲何要安裝下面的東西。概念:Broker、Backend。github

什麼是broker?web

broker是一個消息傳輸的中間件,能夠理解爲一個郵箱。每當應用程序調用celery的異步任務的時候,會向broker傳遞消息,然後celery的worker將會取到消息,進行對於的程序執行。好吧,這個郵箱能夠當作是一個消息隊列。其中Broker的中文意思是 經紀人 ,其實就是一開始說的 消息隊列 ,用來發送和接受消息。這個Broker有幾個方案可供選擇:RabbitMQ (消息隊列),Redis(緩存數據庫),數據庫(不推薦),等等redis

什麼是backend?sql

一般程序發送的消息,發完就完了,可能都不知道對方時候接受了。爲此,celery實現了一個backend,用於存儲這些消息以及celery執行的一些消息和結果。Backend是在Celery的配置中的一個配置項 CELERY_RESULT_BACKEND ,做用是保存結果和狀態,若是你須要跟蹤任務的狀態,那麼須要設置這一項,能夠是Database backend,也能夠是Cache backend,具體能夠參考這裏: CELERY_RESULT_BACKEND 。mongodb

對於 brokers,官方推薦是 rabbitmq 和 redis,至於 backend,就是數據庫。爲了簡單能夠都使用 redis。

我本身演示使用RabbitMQ做爲Broker,用MySQL做爲backend。

來一張圖,這是在網上最多的一張Celery的圖了,確實描述的很是好

這裏寫圖片描述

Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。

消息中間件

Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, RedisMongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

任務執行單元

Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。

任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。

這裏我先不去看它是如何存儲的,就先選用redis來存儲任務執行結果。

由於涉及到消息中間件(在Celery幫助文檔中稱呼爲中間人<broker>),爲了更好的去理解文檔中的例子,能夠安裝兩個中間件,一個是RabbitMQ,一個redis。

根據 Celery的幫助文檔 安裝和設置RabbitMQ, 要使用 Celery,須要建立一個 RabbitMQ 用戶、一個虛擬主機,而且容許這個用戶訪問這個虛擬主機。

[plain]  view plain  copy
 
  1. $ sudo rabbitmqctl add_user forward password     #建立了一個RabbitMQ用戶,用戶名爲forward,密碼是password  
  2. $ sudo rabbitmqctl add_vhost ubuntu              #建立了一個虛擬主機,主機名爲ubuntu  
  3.   
  4. # 設置權限。容許用戶forward訪問虛擬主機ubuntu,由於RabbitMQ經過主機名來與節點通訊  
  5. $ sudo rabbitmqctl set_permissions -p ubuntu forward ".*" ".*" ".*"       
  6. $ sudo rabbitmq-server    # 啓用RabbitMQ服務器  

結果以下,成功運行:

安裝Redis,它的安裝比較簡單

[plain]  view plain  copy
 
  1. $ sudo pip install redis  

而後進行簡單的配置,只須要設置 Redis 數據庫的位置:
BROKER_URL = 'redis://localhost:6379/0'

URL的格式爲:
redis://:password@hostname:port/db_number
URL Scheme 後的全部字段都是可選的,而且默認爲 localhost 的 6479 端口,使用數據庫 0。個人配置是:

redis://:password@ubuntu:6379/5

安裝Celery,我是用標準的Python工具pip安裝的,以下:

[plain]  view plain  copy
 
  1. $ sudo pip install celery  

 

 

Celery 是一個強大的 分佈式任務隊列 的 異步處理框架,它可讓任務的執行徹底脫離主程序,甚至能夠被分配到其餘主機上運行。咱們一般使用它來實現異步任務(async task)和定時任務(crontab)。咱們須要一個消息隊列來下發咱們的任務。首先要有一個消息中間件,此處選擇rabbitmq (也可選擇 redis 或 Amazon Simple Queue Service(SQS)消息隊列服務)。推薦 選擇 rabbitmq 。使用RabbitMQ是官方特別推薦的方式,所以我也使用它做爲咱們的broker。它的架構組成以下圖:

Celery_framework

 

能夠看到,Celery 主要包含如下幾個模塊:

  • 任務模塊 Task

    包含異步任務和定時任務。其中,異步任務一般在業務邏輯中被觸發併發往任務隊列,而定時任務由 Celery Beat 進程週期性地將任務發往任務隊列。

  • 消息中間件 Broker

    Broker,即爲任務調度隊列,接收任務生產者發來的消息(即任務),將任務存入隊列。Celery 自己不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。

  • 任務執行單元 Worker

    Worker 是執行任務的處理單元,它實時監控消息隊列,獲取隊列中調度的任務,並執行它。

  • 任務結果存儲 Backend

    Backend 用於存儲任務的執行結果,以供查詢。同消息中間件同樣,存儲也可以使用 RabbitMQ, redis 和 MongoDB 等。

 

 

 

安裝

 

有了上面的概念,須要安裝這麼幾個東西:RabbitMQ、SQLAlchemy、Celery

安裝rabbitmq

官網安裝方法:http://www.rabbitmq.com/install-windows.html

啓動管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 
啓動rabbitmq:sbin/rabbitmq-server -detached

rabbitmq已經啓動,能夠打開頁面來看看 
地址:http://localhost:15672/#/ 

用戶名密碼都是guest 。如今能夠進來了,能夠看到具體頁面。 關於rabbitmq的配置,網上不少 本身去搜如下就ok了。

消息中間件有了,如今該來代碼了,使用  celeby官網代碼。

剩下兩個都是Python的東西了,直接pip安裝就行了,對於歷來沒有安裝過mysql驅動的同窗可能須要安裝MySQL-python。安裝完成以後,啓動服務: $ rabbitmq-server[回車]。啓動後不要關閉窗口, 下面操做新建窗口(Tab)。

 

安裝celery
Celery能夠經過pip自動安裝,若是你喜歡使用虛擬環境安裝能夠先使用virtualenv建立一個本身的虛擬環境。反正我喜歡使用virtualenv創建本身的環境。

[plain]  view plain  copy
 
  1. pip install celery  

 

http://www.open-open.com/lib/view/open1441161168878.html

 

開始使用 Celery

 

使用celery包含三個方面:1. 定義任務函數。2. 運行celery服務。3. 客戶應用程序的調用。

建立一個文件 tasks.py輸入下列代碼:

[python]  view plain  copy
 
  1. from celery import Celery  
  2.   
  3. broker = 'redis://127.0.0.1:6379/5'  
  4. backend = 'redis://127.0.0.1:6379/6'  
  5.   
  6.   
  7. app = Celery('tasks', broker=broker, backend=backend)  
  8.  
  9. @app.task  
  10. def add(x, y):  
  11.     return x + y  

上述代碼導入了celery,而後建立了celery 實例 app,實例化的過程當中指定了任務名tasks(和文件名一致),傳入了broker和backend。而後建立了一個任務函數add。下面啓動celery服務。在當前命令行終端運行(分別在 env1 和 env2 下執行):

[plain]  view plain  copy
 
  1. celery -A tasks worker  --loglevel=info  

目錄結構 (celery -A tasks worker --loglevel=info 這條命令當前工做目錄必須和 tasks.py 所在的目錄相同。即 進入tasks.py所在目錄執行這條命令。

使用 python 虛擬環境 模擬兩個不一樣的 主機。

此時會看見一對輸出。包括註冊的任務啦。

 

交互式客戶端程序調用方法

打開一個命令行,進入Python環境。

[plain]  view plain  copy
 
  1. In [0]:from tasks import add  
  2. In [1]: r = add.delay(2, 2)  
  3. In [2]: add.delay(2, 2)  
  4. Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d>  
  5.   
  6. In [3]: r = add.delay(3, 3)  
  7.   
  8. In [4]: r.re  
  9. r.ready   r.result  r.revoke  
  10.   
  11. In [4]: r.ready()  
  12. Out[4]: True  
  13.   
  14. In [6]: r.result  
  15. Out[6]: 6  
  16.   
  17. In [7]: r.get()  
  18. Out[7]: 6  

調用 delay 函數便可啓動 add 這個任務。這個函數的效果是發送一條消息到broker中去,這個消息包括要執行的函數、函數的參數以及其餘信息,具體的能夠看 Celery官方文檔。這個時候 worker 會等待 broker 中的消息,一旦收到消息就會馬上執行消息。

啓動了一個任務以後,能夠看到以前啓動的worker已經開始執行任務了。

如今是在python環境中調用的add函數,實際上一般在應用程序中調用這個方法。

注意:若是把返回值賦值給一個變量,那麼原來的應用程序也會被阻塞,須要等待異步任務返回的結果。所以,實際使用中,不須要把結果賦值。

 

應用程序中調用方法

新建一個 main.py 文件 代碼以下:

[python]  view plain  copy
 
  1. from tasks import add    
  2.   
  3. r = add.delay(2, 2)    
  4. r = add.delay(3, 3)    
  5. print r.ready()    
  6. print r.result      
  7. print r.get()    

在celery命令行能夠看見celery執行的日誌。打開 backend的redis,也能夠看見celery執行的信息。

使用  Redis Desktop Manager 查看 Redis 數據庫內容如圖:

 

使用配置文件

Celery 的配置比較多,能夠在 官方配置文檔:http://docs.celeryproject.org/en/latest/userguide/configuration.html  查詢每一個配置項的含義。

上述的使用是簡單的配置,下面介紹一個更健壯的方式來使用celery。首先建立一個python包,celery服務,姑且命名爲proj。目錄文件以下:

proj tree . ├── __init__.py ├── celery.py # 建立 celery 實例 ├── config.py # 配置文件 └── tasks.py # 任務函數

首先是 celery.py

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. # -*- coding:utf-8 -*-  
  3.   
  4. from __future__ import absolute_import  
  5. from celery import Celery  
  6.   
  7. app = Celery('proj', include=['proj.tasks'])  
  8.   
  9. app.config_from_object('proj.config')  
  10.   
  11. if __name__ == '__main__':  
  12.     app.start()  

這一次建立 app,並無直接指定 broker 和 backend。而是在配置文件中。

config.py

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. # -*- coding:utf-8 -*-  
  3.   
  4. from __future__ import absolute_import  
  5.   
  6. CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'  
  7. BROKER_URL = 'redis://127.0.0.1:6379/6'  

剩下的就是tasks.py

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. # -*- coding:utf-8 -*-  
  3.   
  4. from __future__ import absolute_import  
  5. from proj.celery import app  
  6.  
  7. @app.task  
  8. def add(x, y):  
  9.     return x + y  

使用方法也很簡單,在 proj 的同一級目錄執行 celery

[plain]  view plain  copy
 
  1. celery -A proj worker -l info  

如今使用任務也很簡單,直接在客戶端代碼調用 proj.tasks 裏的函數便可。

 

指定 路由 到的 隊列

Celery的官方文檔 。先看代碼(tasks.py):

[python]  view plain  copy
 
  1. from celery import Celery  
  2.   
  3. app = Celery()  
  4. app.config_from_object("celeryconfig")  
  5.  
  6. @app.task  
  7. def taskA(x,y):  
  8.     return x + y  
  9.  
  10. @app.task  
  11. def taskB(x,y,z):  
  12.      return x + y + z  
  13.  
  14. @app.task  
  15. def add(x,y):  
  16.     return x + y  

上面的tasks.py中,首先定義了一個Celery對象,而後用celeryconfig.py對celery對象進行設置,以後再分別定義了三個task,分別是taskA,taskB和add。接下來看一下celeryconfig.py 文件

[python]  view plain  copy
 
  1. from kombu import Exchange,Queue  
  2.   
  3. BROKER_URL = "redis://10.32.105.227:6379/0" CELERY_RESULT_BACKEND = "redis://10.32.105.227:6379/0"  
  4.   
  5. CELERY_QUEUES = (  
  6.    Queue("default",Exchange("default"),routing_key="default"),   
  7.    Queue("for_task_A",Exchange("for_task_A"),routing_key="task_a"),  
  8.    Queue("for_task_B",Exchange("for_task_B"),routing_key="task_a")   
  9.  )  
  10.       
  11. CELERY_ROUTES = {  
  12.     'tasks.taskA':{"queue":"for_task_A","routing_key":"task_a"},  
  13.     'tasks.taskB":{"queue":"for_task_B","routing_key:"task_b"}  
  14.  }   

在 celeryconfig.py 文件中,首先設置了brokel以及result_backend,接下來定義了三個Message Queue,而且指明瞭Queue對應的Exchange(當使用Redis做爲broker時,Exchange的名字必須和Queue的名字同樣)以及routing_key的值。

如今在一臺主機上面啓動一個worker,這個worker只執行for_task_A隊列中的消息,這是經過在啓動worker是使用-Q Queue_Name參數指定的。

[plain]  view plain  copy
 
  1. celery -A tasks worker -l info -n worker.%h -Q for_task_A  

而後到另外一臺主機上面執行taskA任務。首先 切換當前目錄到代碼所在的工程下,啓動python,執行下面代碼啓動taskA:

[python]  view plain  copy
 
  1. from tasks import *  
  2.   
  3. task_A_re = taskA.delay(100,200)  

執行完上面的代碼以後,task_A消息會被當即發送到for_task_A隊列中去。此時已經啓動的worker.atsgxxx 會當即執行taskA任務。

重複上面的過程,在另一臺機器上啓動一個worker專門執行for_task_B中的任務。修改上一步驟的代碼,把 taskA 改爲 taskB 並執行。

[python]  view plain  copy
 
  1. from tasks import *  
  2.   
  3. task_B_re = taskB.delay(100,200)  

在上面的 tasks.py 文件中還定義了add任務,可是在celeryconfig.py文件中沒有指定這個任務route到那個Queue中去執行,此時執行add任務的時候,add會route到Celery默認的名字叫作celery的隊列中去。

由於這個消息沒有在celeryconfig.py文件中指定應該route到哪個Queue中,因此會被髮送到默認的名字爲celery的Queue中,可是咱們尚未啓動worker執行celery中的任務。接下來咱們在啓動一個worker執行celery隊列中的任務。

[plain]  view plain  copy
 
  1. celery -A tasks worker -l info -n worker.%h -Q celery   

而後再查看add的狀態,會發現狀態由PENDING變成了SUCCESS。

 

Scheduler ( 定時任務,週期性任務 )

http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

一種常見的需求是每隔一段時間執行一個任務。

在celery中執行定時任務很是簡單,只須要設置celery對象的CELERYBEAT_SCHEDULE屬性便可。

配置以下

config.py

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. # -*- coding:utf-8 -*-  
  3.   
  4. from __future__ import absolute_import  
  5.   
  6. CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'  
  7. BROKER_URL = 'redis://127.0.0.1:6379/6'  
  8.   
  9. CELERY_TIMEZONE = 'Asia/Shanghai'  
  10.   
  11. from datetime import timedelta  
  12.   
  13. CELERYBEAT_SCHEDULE = {  
  14.     'add-every-30-seconds': {  
  15.          'task': 'proj.tasks.add',  
  16.          'schedule': timedelta(seconds=30),  
  17.          'args': (16, 16)  
  18.     },  
  19. }  

注意配置文件須要指定時區。這段代碼表示每隔30秒執行 add 函數。一旦使用了 scheduler, 啓動 celery須要加上-B 參數。

[plain]  view plain  copy
 
  1. celery -A proj worker -B -l info  

設置多個定時任務

[plain]  view plain  copy
 
  1. CELERY_TIMEZONE = 'UTC'  
  2. CELERYBEAT_SCHEDULE = {  
  3.     'taskA_schedule' : {  
  4.         'task':'tasks.taskA',  
  5.         'schedule':20,  
  6.         'args':(5,6)  
  7.     },  
  8.     'taskB_scheduler' : {  
  9.         'task':"tasks.taskB",  
  10.         "schedule":200,  
  11.         "args":(10,20,30)  
  12.     },  
  13.     'add_schedule': {  
  14.         "task":"tasks.add",  
  15.         "schedule":10,  
  16.         "args":(1,2)  
  17.     }  
  18. }  

定義3個定時任務,即每隔20s執行taskA任務,參數爲(5,6),每隔200s執行taskB任務,參數爲(10,20,30),每隔10s執行add任務,參數爲(1,2).經過下列命令啓動一個定時任務: celery -A tasks beat。使用 beat 參數便可啓動定時任務。

 

crontab

計劃任務固然也能夠用crontab實現,celery也有crontab模式。修改 config.py

[python]  view plain  copy
 
  1. #!/usr/bin/env python  
  2. # -*- coding:utf-8 -*-  
  3.   
  4. from __future__ import absolute_import  
  5.   
  6. CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'  
  7. BROKER_URL = 'redis://127.0.0.1:6379/6'  
  8.   
  9. CELERY_TIMEZONE = 'Asia/Shanghai'  
  10.   
  11. from celery.schedules import crontab  
  12.   
  13. CELERYBEAT_SCHEDULE = {  
  14.     # Executes every Monday morning at 7:30 A.M  
  15.     'add-every-monday-morning': {  
  16.         'task': 'tasks.add',  
  17.         'schedule': crontab(hour=7, minute=30, day_of_week=1),  
  18.         'args': (16, 16),  
  19.     },  
  20. }  

scheduler的切分度很細,能夠精確到秒。crontab模式就不用說了。

固然celery還有更高級的用法,好比 多個機器 使用,啓用多個 worker併發處理 等。

 

發送任務到隊列中

apply_async(args[, kwargs[, …]])、delay(*args, **kwargs) :http://docs.celeryproject.org/en/master/userguide/calling.html

send_task  :http://docs.celeryproject.org/en/master/reference/celery.html#celery.Celery.send_task

[plain]  view plain  copy
 
  1. from celery import Celery  
  2. celery = Celery()  
  3. celery.config_from_object('celeryconfig')  
  4. send_task('tasks.test1', args=[hotplay_id, start_dt, end_dt], queue='hotplay_jy_queue')    

 

 

Celery 監控 和 管理  以及 命令幫助

輸入 celery -h 能夠看到 celery 的命令和幫助

更詳細的幫助能夠看官方文檔:http://docs.celeryproject.org/en/master/userguide/monitoring.html

 

 

Celery 官網 示例

 

官網示例:http://docs.celeryproject.org/en/master/getting-started/first-steps-with-celery.html#first-steps

Python 並行分佈式框架 Celery 超詳細介紹

 

 

一個簡單例子

 

第一步

編寫簡單的純python函數

[python]  view plain  copy
 
  1. def say(x,y):  
  2.     return x+y  
  3.   
  4. if __name__ == '__main__':  
  5.     say('Hello','World')  

 

第二步

若是這個函數不是簡單的輸出兩個字符串相加,而是須要查詢數據庫或者進行復雜的處理。這種處理須要耗費大量的時間,仍是這種方式執行會是多麼糟糕的事情。爲了演示這種現象,可使用sleep函數來模擬高耗時任務。

[python]  view plain  copy
 
  1. import time  
  2.   
  3. def say(x,y):  
  4.     time.sleep(5)  
  5.     return x+y  
  6.   
  7. if __name__ == '__main__':  
  8.     say('Hello','World')  

 

第三步

這時候咱們可能會思考怎麼使用多進程或者多線程去實現這種任務。對於多進程與多線程的不足這裏不作討論。如今咱們能夠想一想celery到底能不能解決這種問題。

[python]  view plain  copy
 
  1. import time  
  2. from celery import Celery  
  3.   
  4. app = Celery('sample',broker='amqp://guest@localhost//')  
  5.  
  6. @app.task  
  7. def say(x,y):  
  8.     time.sleep(5)  
  9.     return x+y  
  10.   
  11. if __name__ == '__main__':  
  12.     say('Hello','World')  

如今來解釋一下新加入的幾行代碼,首先說明一下加入的新代碼徹底不須要改變原來的代碼。導入celery模塊就不用解釋了,聲明一個celery實例app的參數須要解釋一下。

  1. 第一個參數是這個python文件的名字,注意到已經把.py去掉了。
  2. 第二個參數是用到的rabbitmq隊列。能夠看到其使用的方式很是簡單,由於它是默認的消息隊列端口號都不須要指明。

 

第四步

如今咱們已經使用了celery框架了,咱們須要讓它找幾個工人幫咱們幹活。好如今就讓他們幹活。

[python]  view plain  copy
 
  1. celery -A sample worker --loglevel=info  

這條命令有些長,我來解釋一下吧。

  1. -A 表明的是Application的首字母,咱們的應用就是在 sample 裏面 定義的。
  2. worker 就是咱們的工人了,他們會努力完成咱們的工做的。
  3. -loglevel=info 指明瞭咱們的工做後臺執行狀況,雖然工人們已經向你保證過必定努力完成任務。可是謹慎的你仍是但願看看工做進展狀況。
    回車後你能夠看到相似下面這樣一個輸出,若是是沒有紅色的輸出那麼你應該是沒有遇到什麼錯誤的。

 

第五步

如今咱們的任務已經被加載到了內存中,咱們不能再像以前那樣執行python sample.py來運行程序了。咱們能夠經過終端進入python而後經過下面的方式加載任務。輸入python語句。

[python]  view plain  copy
 
  1. from sample import say  
  2. say.delay('hello','world')  

咱們的函數會當即返回,不須要等待。就那麼簡單celery解決了咱們的問題。能夠發現咱們的say函數不是直接調用了,它被celery 的 task 裝飾器 修飾過了。因此多了一些屬性。目前咱們只須要知道使用delay就好了。

 

簡單案例

確保你以前的RabbitMQ已經啓動。仍是官網的那個例子,在任意目錄新建一個tasks.py的文件,內容以下:

[python]  view plain  copy
 
  1. from celery import Celery  
  2.   
  3. app = Celery('tasks', broker='amqp://guest@localhost//')  
  4.  
  5. @app.task  
  6. def add(x, y):  
  7.     return x + y  

使用redis做爲消息隊列

[python]  view plain  copy
 
  1. app = Celery('task', broker='redis://localhost:6379/4')    
  2. app.conf.update(    
  3.     CELERY_TASK_SERIALIZER='json',    
  4.     CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content    
  5.     CELERY_RESULT_SERIALIZER='json',    
  6.     CELERYD_CONCURRENCY = 8    
  7. )    
  8.   
  9. @app.task    
  10. def add(x, y):    
  11.     return x + y    

在同級目錄執行:

[python]  view plain  copy
 
  1. $ celery -A tasks.app worker --loglevel=info  

該命令的意思是啓動一個worker ( tasks文件中的app實例,默認實例名爲app,-A 參數後也可直接加文件名,不須要 .app),把tasks中的任務(add(x,y))把任務放到隊列中。保持窗口打開,新開一個窗口進入交互模式,python或者ipython:

[python]  view plain  copy
 
  1. >>> from tasks import add  
  2. >>> add.delay(4, 4)  

到此爲止,你已經可使用celery執行任務了,上面的python交互模式下簡單的調用了add任務,並傳遞 4,4 參數。

但此時有一個問題,你忽然想知道這個任務的執行結果和狀態,到底完了沒有。所以就須要設置backend了

修改以前的tasks.py中的代碼爲:

[python]  view plain  copy
 
  1. # coding:utf-8  
  2. import subprocess  
  3. from time import sleep  
  4.   
  5. from celery import Celery  
  6.   
  7. backend = 'db+mysql://root:@192.168.0.102/celery'  
  8. broker = 'amqp://guest@192.168.0.102:5672'  
  9.   
  10. app = Celery('tasks', backend=backend, broker=broker)  
  11.  
  12.  
  13. @app.task  
  14. def add(x, y):  
  15.     sleep(10)  
  16.     return x + y  
  17.  
  18.  
  19. @app.task  
  20. def hostname():  
  21.     return subprocess.check_output(['hostname'])  

除了添加backend以外,上面還添加了一個who的方法用來測試多服務器操做。修改完成以後,還按以前的方式啓動。

一樣進入python的交互模型:

[python]  view plain  copy
 
  1. >>> from tasks import add, hostname  
  2. >>> r = add.delay(4, 4)  
  3. >>> r.ready() # 10s內執行,會輸出False,由於add中sleep了10s  
  4. >>>  
  5. >>> r = hostname.delay()  
  6. >>> r.result  # 輸出你的hostname  

 

測試多服務器

作完上面的測試以後,產生了一個疑惑,Celery叫作分佈式任務管理,那它的分佈式體如今哪?它的任務都是怎麼執行的?在哪一個機器上執行的?在當前服務器上的celery服務不關閉的狀況下,按照一樣的方式在另一臺服務器上安裝Celery,並啓動:

[python]  view plain  copy
 
  1. $ celery -A tasks worker --loglevel=info  

發現前一個服務器的Celery服務中輸出你剛啓動的服務器的hostname,前提是那臺服務器連上了你的rabbitmq。而後再進入python交互模式:

[python]  view plain  copy
 
  1. >>> from tasks import hostname  
  2. >>>  
  3. >>> for i in range(10):  
  4. ...     r = hostname.delay()  
  5. ...     print r.result  # 輸出你的hostname  
  6. >>>  

看你輸入的內容已經觀察兩臺服務器上你啓動celery服務的輸出。

 

Celery的使用技巧(Celery配置文件和發送任務)

在實際的項目中咱們須要明確先後臺的分界線,所以咱們的celery編寫的時候就應該是分紅先後臺兩個部分編寫。在celery簡單入門中的總結部分咱們也提出了另一個問題,就是須要分離celery的配置文件。

第一步

編寫後臺任務tasks.py腳本文件。在這個文件中咱們不須要再聲明celery的實例,咱們只須要導入其task裝飾器來註冊咱們的任務便可。後臺處理業務邏輯徹底獨立於前臺,這裏只是簡單的hello world程序須要多少個參數只須要告訴前臺就能夠了,在實際項目中可能你須要的是後臺執行發送一封郵件的任務或者進行復雜的數據庫查詢任務等。

[python]  view plain  copy
 
  1. import time  
  2. from celery.task import task  
  3.  
  4.  
  5. @task  
  6. def say(x,y):  
  7.         time.sleep(5)  
  8.         return x+y  

第二步

有了那麼完美的後臺,咱們的前臺編寫確定也輕鬆很多。到底簡單到什麼地步呢,來看看前臺的代碼吧!爲了形象的代表其職能,咱們將其命名爲client.py腳本文件。

[python]  view plain  copy
 
  1. from celery import Celery  
  2.   
  3. app = Celery()  
  4.   
  5. app.config_from_object('celeryconfig')  
  6. app.send_task("tasks.say",['hello','world'])  

能夠看到只須要簡單的幾步:1. 聲明一個celery實例。2. 加載配置文件。3. 發送任務。

第三步

繼續完成celery的配置。官方的介紹使用celeryconfig.py做爲配置文件名,這樣能夠防止與你如今的應用的配置同名。

[plain]  view plain  copy
 
  1. CELERY_IMPORTS = ('tasks')  
  2. CELERY_IGNORE_RESULT = False  
  3. BROKER_HOST = '127.0.0.1'  
  4. BROKER_PORT = 5672  
  5. BROKER_URL = 'amqp://'  
  6. CELERY_RESULT_BACKEND = 'amqp'  

能夠看到咱們指定了CELERY_RESULT_BACKEND爲amqp默認的隊列!這樣咱們就能夠查看處理後的運行狀態了,後面將會介紹處理結果的查看。

第四步

啓動celery後臺服務,這裏是測試與學習celery的教程。在實際生產環境中,若是是經過這種方式啓動的後臺進程是不行的。所謂後臺進程一般是須要做爲守護進程運行在後臺的,在python的世界裏老是有一些工具可以知足你的須要。這裏可使用supervisor做爲進程管理工具。在後面的文章中將會介紹如何使用supervisor工具。

[plain]  view plain  copy
 
  1. celery worker -l info --beat  

注意如今運行worker的方式也與前面介紹的不同了,下面簡單介紹各個參數。
    -l info     與--loglevel=info的做用是同樣的。
    --beat    週期性的運行。即設置 心跳。

第五步

前臺的運行就比較簡單了,與平時運行的python腳本同樣。python client.py。

如今前臺的任務是運行了,但是任務是被寫死了。咱們的任務大多數時候是動態的,爲演示動態工做的狀況咱們可使用終端發送任務。

[python]  view plain  copy
 
  1. >>> from celery import Celery  
  2. >>> app = Celery()  
  3. >>> app.config_from_object('celeryconfig')  

在python終端導入celery模塊聲明實例而後加載配置文件,完成了這些步驟後就能夠動態的發送任務而且查看任務狀態了。注意在配置文件celeryconfig.py中咱們已經開啓了處理的結果迴應模式了CELERY_IGNORE_RESULT = False而且在迴應方式配置中咱們設置了CELERY_RESULT_BACKEND = 'amqp'這樣咱們就能夠查看處處理的狀態了。

[python]  view plain  copy
 
  1. >>> x = app.send_task('task.say',['hello', 'lady'])  
  2. >>> x.ready()  
  3. False  
  4. >>> x.status  
  5. 'PENDING'  
  6. >>> x.ready()  
  7. TRUE  
  8. >>> x.status  
  9. u'SUCCESS'  

能夠看到任務發送給celery後立刻查看任務狀態會處於PENDING狀態。稍等片刻就能夠查看到SUCCESS狀態了。這種效果然棒不是嗎?在圖像處理中或者其餘的一些搞耗時的任務中,咱們只須要把任務發送給後臺就不用去管它了。當咱們須要結果的時候只須要查看一些是否成功完成了,若是返回成功咱們就能夠去後臺數據庫去找處理後生成的數據了。

 

celery使用mangodb保存數據

第一步

安裝好mongodb了!就可使用它了,首先讓咱們修改celeryconfig.py文件,使celery知道咱們有一個新成員要加入咱們的項目,它就是mongodb配置的方式以下。

[plain]  view plain  copy
 
  1. ELERY_IMPORTS = ('tasks')  
  2. CELERY_IGNORE_RESULT = False  
  3. BROKER_HOST = '127.0.0.1'  
  4. BROKER_PORT = 5672  
  5. BROKER_URL = 'amqp://'  
  6. #CELERY_RESULT_BACKEND = 'amqp'  
  7. CELERY_RESULT_BACKEND = 'mongodb'  
  8. CELERY_RESULT_BACKEND_SETTINGS = {  
  9.         "host":"127.0.0.1",  
  10.         "port":27017,  
  11.         "database":"jobs",  
  12.         "taskmeta_collection":"stock_taskmeta_collection",  
  13. }  

把#CELERY_RESULT_BACKEND = 'amp'註釋掉了,可是沒有刪除目的是對比先後的改變。爲了使用mongodb咱們有簡單了配置一下主機端口以及數據庫名字等。顯然你能夠按照你喜歡的名字來配置它。

第二步

啓動 mongodb 數據庫:mongod。修改客戶端client.py讓他可以動態的傳人咱們的數據,很是簡單代碼以下。

[python]  view plain  copy
 
  1. import sys  
  2. from celery import Celery  
  3.   
  4. app = Celery()  
  5.   
  6. app.config_from_object('celeryconfig')  
  7. app.send_task("tasks.say",[sys.argv[1],sys.argv[2]])  

任務tasks.py不須要修改!

[python]  view plain  copy
 
  1. import time  
  2. from celery.task import task  
  3.  
  4.  
  5. @task  
  6. def say(x,y):  
  7.         time.sleep(5)  
  8.         return x+y  

第三步

測試代碼,先啓動celery任務。

[python]  view plain  copy
 
  1. celery worker -l info --beat  

再來啓動咱們的客戶端,注意此次啓動的時候須要給兩個參數啦!
mongo

[python]  view plain  copy
 
  1. python client.py welcome landpack  

等上5秒鐘,咱們的後臺處理完成後咱們就能夠去查看數據庫了。

第四步

查看mongodb,須要啓動一個mongodb客戶端,啓動很是簡單直接輸入 mongo 。而後是輸入一些簡單的mongo查詢語句。

最後查到的數據結果多是你不想看到的,由於mongo已經進行了處理。想了解更多能夠查看官方的文檔。

 

原文連接:http://www.javashuo.com/article/p-fqrpploi-de.html

相關文章
相關標籤/搜索