Celery是一個簡單,靈活,可靠的分佈式系統,用於處理大量消息,同時爲操做提供維護此類系統所需的工具。它是一個任務隊列,專一於實時處理,同時還支持任務調度。python
中間人boker:mysql
broker是一個消息傳輸的中間件。每當應用程序調用celery的異步任務的時候,會向broker傳遞消息,然後celery的worker將會取到消息,進行對於的程序執行。其中Broker的中文意思是 經紀人 ,其實就是一開始說的 消息隊列 ,用來發送和接受消息。這個Broker有幾個方案可供選擇:RabbitMQ (消息隊列),Redis(緩存數據庫),數據庫(不推薦),等等。redis
backend:sql
一般程序發送的消息,發完就完了,可能都不知道對方時候接受了。爲此,celery實現了一個backend,用於存儲這些消息以及celery執行的一些消息和結果。Backend是在Celery的配置中的一個配置項 CELERY_RESULT_BACKEND ,做用是保存結果和狀態,若是你須要跟蹤任務的狀態,那麼須要設置這一項。可使用數據庫做爲backend。 mongodb
♦高可用: 假若鏈接丟失或失敗,職程和客戶端會自動重試,而且一些中間人經過 主/主 或 主/從 方式複製來提升可用性。數據庫
♦快速: 單個 Celery 進程每分鐘可處理數以百萬計的任務,而保持往返延遲在亞毫秒級(使用 RabbitMQ、py-librabbitmq 和優化過的設置)。windows
♦靈活: Celery 幾乎全部部分均可以擴展或單獨使用。能夠自制鏈接池、 序列化、壓縮模式、日誌、調度器、消費者、生產者、自動擴展、 中間人傳輸或更多。緩存
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。(百度上的圖片)架構
消息中間件併發
Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ
任務執行單元
Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。
安裝Celery,(這裏使用redis做爲中間件,windows注意安裝對應支持的版本)
pip3 install celery['redis']
使用celery包含三個方面:1. 定義任務函數。2. 運行celery服務。3. 客戶應用程序的調用。
1.目錄結構:
CeleryTest
¦--tasks.py
¦--user.py
#tasks.py import time from celery import Celery app = Celery('tasks', broker='redis://localhost:6379/0') @app.task def send(msg): print(f'send {msg}') time.sleep(3) return
#user.py from tasks import send import time def register(): start = time.time() send.delay('666') print('耗時:', time.time() - start) if __name__ == '__main__': register()
2.這裏使用redis做borker,啓動redis,進入redis目錄啓動
$ redis-server
3.啓動worker,在CeleryTest的同級目錄終端輸入,—A爲Celery實例所在位置
$ celery -A tasks worker -l info
啓動成功會看到以下畫面:
4.運行user.py文件,輸入以下:
耗時: 0.15261435508728027
調用 delay 函數便可啓動 add 這個任務。這個函數的效果是發送一條消息到broker中去,這個消息包括要執行的函數、函數的參數以及其餘信息,具體的能夠看 Celery官方文檔。這個時候 worker 會等待 broker 中的消息,一旦收到消息就會馬上執行消息。能夠看到調用send.delay()後,耗時並無受到time.sleep()的影響,成功的完成異步調用。咱們能夠在項目中執行耗時的任務時來使用Celery。這裏簡單演示並無使用backend儲存任務的結果。
將Celery封裝成一個項目進行使用,這裏簡單的配置一下,方便演示,更多配置參數能夠參考官方文檔。
celery_demo
¦--celery_app
¦--__init__.py
¦--celeryconfig.py
¦--task1.py
¦--task2.py
¦--client.py
__init__.py
from celery import Celery app = Celery('demo') # 生成實例 app.config_from_object('celery_app.celeryconfig') # 加載配置
celeryconfig.py
BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend CELERY_TIMEZONE = 'Asia/Shanghai' # 指定時區,默認是 UTC # CELERY_TIMEZONE='UTC' CELERY_IMPORTS = ( # 指定導入的任務模塊 'celery_app.task1', 'celery_app.task2' )
task1.py
import time from celery_app import app @app.task def add(x, y): time.sleep(2) return x + y
task2.py
import time from celery_app import app @app.task def multiply(x, y): time.sleep(2) return x * y
client.py
from celery_app import task1 from celery_app import task2 res1 = task1.add.delay(2, 8) # 或者 task1.add.apply_async(args=[2, 8]) res2 = task2.multiply.delay(3, 7) # 或者 task2.multiply.apply_async(args=[3, 7]) print('hello world')
啓動worker,在celery_demo目錄執行下列命令
celery_demo $ celery -A celery_app worker -l info
接着,運行$ python client.py
,它會發送兩個異步任務到 Broker,在 Worker 的窗口咱們能夠看到以下輸出:
在前面的例子中,咱們使用 delay()
或 apply_async()
方法來調用任務。事實上,delay 方法封裝了 apply_async
,以下:
def delay(self, *partial_args, **partial_kwargs): """Shortcut to :meth:`apply_async` using star arguments.""" return self.apply_async(partial_args, partial_kwargs)
Celery 除了能夠執行異步任務
,也支持執行週期性任務(Periodic Tasks)
,或者說定時任務。Celery Beat 進程經過讀取配置文件的內容,週期性地將定時任務發往任務隊列。在上面的例子中,修改配置文件便可實現。
celeryconfig.py
from celery.schedules import crontab from datetime import timedelta BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend CELERY_TIMEZONE = 'Asia/Shanghai' # 指定時區,默認是 UTC # CELERY_TIMEZONE='UTC' CELERY_IMPORTS = ( # 指定導入的任務模塊 'celery_app.task1', 'celery_app.task2' ) # schedules CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'celery_app.task1.add', 'schedule': timedelta(seconds=30), # 每 30 秒執行一次 'args': (5, 8) # 任務函數參數 }, 'multiply-at-some-time': { 'task': 'celery_app.task2.multiply', 'schedule': crontab(hour=10, minute=50), # 天天早上 10 點 50 分執行一次 'args': (3, 7) # 任務函數參數 } }
啓動worker,而後定時將任務發送到 Broker,在celery_demo目錄下執行下面兩條命令:
celery -A celery_app worker -l info
celery beat -A celery_app
上面兩條命令也能夠合併爲一條:
celery -B -A celery_app worker -l info