Celery 是一個 基於python開發的分佈式異步消息任務隊列,經過它能夠輕鬆的實現任務的異步處理, 若是你的業務場景中須要用到異步任務,就能夠考慮使用celery, 舉幾個實例場景中可用的例子: html
1. 你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只須要拿着這個任務id就能夠拿到任務執行結果, 在任務執行ing進行時,你能夠繼續作其它的事情。 python
2. 你想作一個定時任務,好比天天檢測一下大家全部客戶的資料,若是發現今天 是客戶的生日,就給他發個短信祝福 linux
【簡單講:異步 + 定時】 redis
Celery有如下優勢sql
1. 簡單:一單熟悉了celery的工做流程後,配置和使用仍是比較簡單的 shell
2. 高可用:當任務執行失敗或執行過程當中發生鏈接中斷,celery 會自動嘗試從新執行任務 服務器
3. 快速:一個單進程的celery每分鐘可處理上百萬個任務 app
4. 靈活: 幾乎celery的各個組件均可以被擴展及自定製異步
Celery 在執行任務時須要經過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 通常使用rabbitMQ or Redis async
Celery是一個上層任務,當有用戶請求到來的時候,發送一個任務給celery,這個任務會被中間件Redis/RabbitMQ接收發送給Celery的節點去執行任務,有一個好處就是能夠橫向的擴展機器去執行任務。
Celery安裝使用
Celery的默認broker[可理解爲中間件Redis/RabbitMQ]是RabbitMQ, 僅需配置一行就能夠
broker_url = 'amqp://guest:guest@localhost:5672//'
rabbitMQ 沒裝的話請裝一下,安裝看這裏 http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3
使用Redis作broker也能夠
安裝redis組件
pip install -U "celery[redis]"
配置
Configuration is easy, just configure the location of your Redis database: app.conf.broker_url = 'redis://localhost:6379/0' Where the URL is in the format of:【若是中間件有認證操做】 redis://:password@hostname:port/db_number all fields after the scheme are optional, and will default to localhost on port 6379, using database 0. 若是想獲取每一個任務的執行結果,還須要配置一下把任務結果存在哪 If you also want to store the state and return values of tasks in Redis, you should configure these settings: app.conf.result_backend = 'redis://localhost:6379/0'
Win7下安裝celery模塊
pip3 install celery
[測試發現celery安裝完成後Win7的cmd能夠用celery,不須要添加] 若是須要手動添加,則找到celery的安裝路徑,寫入Win7的path裏便可。 E:\PyCharm 2017.2.4\Python3.2.5\Lib\site-packages\celery\bin\
Ubuntu下安裝
首先安裝pip3
sudo apt-get install python3-pip
安裝Celery
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple celery
Ubuntu下安裝成功
Ubuntu下運行報錯,缺乏Redis
運行Celery任務後報錯
ImportError: Missing redis library (pip install redis)
安裝Redis鏈接模塊:
pip3 install redis
啓動Celery Worker來開始監聽並執行任務
celery -A Celery的Py文件名 worker --loglevel=info
操做前期條件: 安裝並啓動Redis
myCelery.py
from celery import Celery # 定義了一個Celery的App app = Celery('tasks', broker='redis://192.168.2.105', # Celery和用戶請求的中間代理 backend='redis://192.168.2.105') # 接收Celery返回結果的 @app.task # 函數變成一個Celery的任務,調用celery實現異步任務 def add(x, y): print("running...", x, y) return x + y
運行:
celery -A myCelery worker --loglevel=info 【寫文件名稱便可】
運行結果[Win]
運行結果[Linux]
注: celery文件運行起來後只能接收和執行任務[等待任務狀態...],還須要用戶發送任務
Linux下發布任務
進入文件所在的路徑下
omc@omc-virtual-machine:~/Celery$ cd /home/omc/Celery
進入Python環境
omc@omc-virtual-machine:~/Celery$ python
在Python環境下導入文件的函數
Python 3.5.2 (default, Nov 23 2017, 16:37:01) [GCC 5.4.0 20160609] on linux Type "help", "copyright", "credits" or "license" for more information. >>> from myCelery import add >>> add.delay(100,100) <AsyncResult: 22631adf-e54f-4718-99a1-1e76b5c1575f> >>> add(100,300) running... 100 300 400
注:中間件Redis只負責存儲了中間的消息
打開Redis查看存儲的消息內容
omc@omc-virtual-machine:~$ redis-cli
127.0.0.1:6379> get celery-task-meta-ebad12b8-d4ff-461b-b6e5-0dab4eaba855
>>> r = add.delay(20,20000) >>> r.
>>> r.get() 20020
返回對象的其餘方法的使用:
>>> r = cmd_Celery.my_cmd.delay("df -h") The ready() method returns whether the task has finished processing or not: >>> r.ready() False You can wait for the r to complete, but this is rarely used since it turns the asynchronous call into a synchronous one: >>> r.get(timeout=1) # 設置超時時間 8 In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument: >>> r.get(propagate=False) # propagate 擴展,添加propagate後會對報錯進行格式化輸出 If the task raised an exception you can also gain access to the original traceback: >>> r.traceback # 報錯調試用 …
前臺發佈:
後臺有2個Celery的worker
work1:
Work2:
操做前期條件: 安裝並啓動Redis
cmd_Celery.py
from celery import Celery # 定義了一個Celery的App app = Celery('tasks', broker='redis://192.168.2.105', # redis://:password@hostname:port/db_number 有密碼認證的鏈接 # broker='redis://:密碼@192.168.2.105:6379/0', backend='redis://192.168.2.105') # 接收Celery返回結果的 # 函數變成一個Celery的任務,調用celery實現異步任務 import subprocess import time @app.task def my_cmd(cmd): print('Celery of CMD:', cmd) time.sleep(5) # 判斷是不是異步的標誌,會卡5秒後執行cmd的命令 cmd_obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return cmd_obj.stdout.read().decode("utf-8") # 返回結果必須是可JSON的,須要編碼
後臺啓動Celery任務:
omc@omc-virtual-machine:~/Celery$ celery -A cmd_Celery worker --loglevel=debug
前臺客戶端發送命令:
>>> python3 >>> import cmd_Celery >>> r = cmd_Celery.my_cmd.delay("df -h") >>> r.get()
後臺Celery服務器端執行任務:
若是返回結果不是可JSON會報錯:
默認返回的是byte類型的,須要進行解碼
2次Ctrl+C
前臺啓動命令: celery -A 項目名worker -loglevel=info 後臺啓動命令: celery multi start w1 -A 項目名 -l info 後臺重啓命令: celery multi start w1 -A 項目名 -l info 後臺中止命令: celery multi stop w1 -A 項目名 -l info 先後臺的區別: 後臺是mult啓動