高性能異步框架Celery入坑指南

在一個應用服務中,對於時效性要求沒那麼高的業務場景,咱們不必等到全部任務執行完才返回結果,例如用戶註冊場景中,保存了用戶帳號密碼以後,就能夠當即返回,後續的帳號激活郵件,能夠用一種異步的形式去處理,這種異步操做能夠用隊列服務來實現。不然,若是等到郵件發送成功可能幾秒過去了。html

Celery 是什麼?

Celery 是Python語言實現的分佈式隊列服務,除了支持即時任務,還支持定時任務,Celery 有5個核心角色。python

Task

任務(Task)就是你要作的事情,例如一個註冊流程裏面有不少任務,給用戶發驗證郵件就是一個任務,這種耗時的任務就能夠交給Celery去處理,還有一種任務是定時任務,好比天天定時統計網站的註冊人數,這個也能夠交給Celery週期性的處理。redis

Broker

Broker 的中文意思是經紀人,指爲市場上買賣雙方提供中介服務的人。在Celery中這個角色至關於數據結構中的隊列,介於生產者和消費者之間經紀人。例如一個Web系統中,生產者是主程序,它生產任務,將任務發送給 Broker,消費者是 Worker,是專門用於執行任務的後臺服務。Celery自己不提供隊列服務,通常用Redis或者RabbitMQ來實現隊列服務。數據庫

Worker

Worker 就是那個一直在後臺執行任務的人,也成爲任務的消費者,它會實時地監控隊列中有沒有任務,若是有就當即取出來執行。bash

Beat

Beat 是一個定時任務調度器,它會根據配置定時將任務發送給 Broker,等待 Worker 來消費。數據結構

Backend

Backend 用於保存任務的執行結果,每一個任務都有返回值,好比發送郵件的服務會告訴咱們有沒有發送成功,這個結果就是存在Backend中,固然咱們並不老是要關心任務的執行結果。app

記住這5個角色後面理解Celery就輕鬆了。異步

快速入門

接觸任何新東西,沒有什麼比實際動手學得更快了。假設咱們選擇Redis做爲broker,你須要安裝redis而且已經啓動了redis服務(這個步驟請自行借用搜索引擎解決)分佈式

pip install -U "celery[redis]"
複製代碼

一、建立Celery實例

# tasks.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')
複製代碼

二、建立任務

假設這個發送郵件的任務須要5秒鐘才能執行完ide

# tasks.py
@app.task
def send_mail(email):
    print("send mail to ", email)
    import time
    time.sleep(5)
    return "success"
複製代碼

在沒有Celery的狀況下,程序順序執行,每一個步驟都須要等上一步執行完成。

1. 插入記錄到數據庫
2. 發郵件
3. 註冊成功
複製代碼

咱們能夠把2放在一個任務中交給celery去執行,這樣咱們就不須要等待發郵件完成,你只須要安排celery去處理幫我去完成就行了。代碼就變成了

1. 插入記錄到數據庫
2. celery 幫我去發郵件
3. 註冊成功
複製代碼

第二步是很是快的,它只須要把任務放進隊列裏面去,並不會等任務真正執行完。這跟生活是徹底貼切的,例如咱們不少事情都不是本身親歷其爲去作,而是將一個不過重要或即時性沒那麼高的事情轉交給別人處理。

三、啓動Worker

啓動Worker,監聽 Broker 中是否有任務,命令:celery worker,你可能須要指定參數

celery -A tasks worker --loglevel=info
複製代碼

-A: 指定 celery 實例所在哪一個模塊中,例子中,celery實例在tasks.py文件中,啓動成功後,能看到信息

函數用app.task 裝飾器修飾以後,就會成爲Celery中的一個Task。

四、調用任務

在主程序中調用任務,掉任務發送給 Broker, 而不是真正執行該任務

# user.py
from tasks import send_mail

def register():
    import time
    start = time.time()
    print("1. 插入記錄到數據庫")
    print("2. celery 幫我發郵件")
    send_mail.delay("xx@gmail.com")
    print("3. 告訴用戶註冊成功")
    print("耗時:%s 秒 " % (time.time() - start))

if __name__ == '__main__':
    register()

複製代碼

在主程序中,調用函數的.delay方法

目錄結構:

── celery_test
   ├── tasks.py
   └── user.py
複製代碼

運行 python user.py, 啓動應用程序

1. 插入記錄到數據庫
2. celery 幫我發郵件
3. 告訴用戶註冊成功
耗時:0.22688984870910645 秒 
複製代碼

程序花了不到0.23秒就執行完成,若是按照正常的同步邏輯去執行,至少須要5秒鐘,由於發郵件的任務就花了5秒。

在worker服務窗口看日誌信息

注意:

一、celery worker 啓動時,若是是root用戶,須要設置環境變量:

$ export C_FORCE_ROOT='true'
複製代碼

二、 Celery4.x 開始再也不支持Windows平臺,若是須要在Windows開發,請使用3.x的版本。

三、使用 RabbitMQ 或 Redis 做爲 Broker,生產環境永遠不要使用關係數據庫

四、不要使用複雜對象做爲任務函數的參數

# Good
@app.task
def my_task(user_id):
    user = User.objects.get(id=user_id)
    print(user.name)
    # ...
複製代碼
# Bad
@app.task
def my_task(user):
    print(user.name)
    # ...
複製代碼

小結

學習Celery,首先須要知道它的應用場景,而後是Celery中的常見角色,最後按照步驟感覺一下Celery是如何跑起來的。

參考連接:

同步發表博客:foofish.net

公衆號:Python之禪
相關文章
相關標籤/搜索