分佈式隊列神器 Celery,你瞭解多少?

咱們在web開發中會常常遇到異步任務,對於一些消耗資源和時間的操做,若是不從應用中單獨抽出來的話,體驗是很是很差的,例如:一個手機驗證碼登陸的過程,當用戶輸入手機號點擊發送後,若是若是直接扔給後端應用去執行的話,就會引發網絡IO的阻塞,那整個應用就很是不友好了,那如何優雅的解決這個問題呢?php

咱們可使用異步任務,當接收到請求後,咱們能夠在業務邏輯的處理時觸發一個異步任務,前端當即返回讀秒讓用戶接收驗證碼,同時因爲是異步執行的任務,後端也能夠處理其餘的請求,這就很是的完美了。html

實現異步任務的工具備不少,其原理也都是去實現一個消息隊列,這裏咱們主要來了解一下Celery。前端

Celery 是什麼?

Celery簡介

Celery是一個由Python編寫的簡單,靈活且可靠的分佈式系統,它能夠處理大量消息,同時也提供了操做、維護該分佈式系統所需的工具。node

說白點就是,Celery 是一個異步任務的調度工具,它專一於實時任務處理,支持任務調度。有了Celery,咱們能夠快速創建一個分佈式任務隊列並可以簡單的管理。Celery雖然是由python編寫, 但協議能夠用任何語言實現。迄今,已有 Ruby 實現的 RCelery 、node.js 實現的 node-celery 以及一個 PHP 客戶端 。python

Celery架構

此處借鑑一張網圖,這張圖很是明瞭把Celery的組成以及工做方式描述出來了。git

這裏寫圖片描述

Celery的架構由下面三個部分組成:github

Brokers

意爲中間件/中間人,在這裏指的是任務隊列, 咱們要注意Celery自己不是任務隊列,它是管理分佈式任務隊列的工具,換一句話說,用Celery能夠快速進行任務隊列的使用與管理, Celery能夠方便的和第三方提供的任務隊列集成,例如RabbitMQ, Redis等。web

Worker

任務執行單元,咱們能夠理解爲工人,Worker是Celery提供的任務執行的單元,簡單來講,它就是Celery的工人,相似於消費者,它shi'shi監控着任務隊列,當有新的任務入隊時,它會從任務隊列中取出任務並執行。redis

backend/Task result store

任務結構存儲,顧名思義,它就是用來存儲Worker執行的任務的結果的地方,Celery支持以不一樣方式存儲任務的結果,有redis,Memcached等。後端

簡單來講,當用戶、或者咱們的應用中的觸發器將任務入Brokers隊列以後,Celery的Worker就會取出任務並執行,而後將結構保存到Task result store中

使用Celery

簡單實現

Celery及消息隊列(redis/RabbitMQ)的安裝過程在這裏就再也不贅述了,出於方便,咱們這裏使用redis,點擊這裏查看官網給出的更多的Brokers和backend支持。

首先,咱們新建一個tasks.py文件。

import time
from celery import Celery

brokers = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'


app = Celery('tasks', broker=brokers, backend=backend)

@app.task
def add(x, y):
    time.sleep(2)
    return x + y

上述代碼,咱們導入了celery庫,新建了一個celery實例,傳入了broker和backend,而後建立了任務函數add,咱們用time.sleep(2)來模擬耗時操做。

接下來咱們要啓動Celery服務,在當前命令行終端運行:

celery -A tasks worker  --loglevel=info

注意:若是在Windows中要運行以下命令:

celery -A celery_app worker --loglevel=info -P eventlet

否則會報錯。。。。。。

咱們會看到下面的輸出結果:

D:\use_Celery>celery -A tasks worker --loglevel=info -P eventlet

 -------------- celery@DESKTOP-8E96VUV v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Windows-10-10.0.18362-SP0 2020-03-18 15:49:22
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x3ed95f0
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     redis://127.0.0.1:6379/1
- *** --- * --- .> concurrency: 8 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . tasks.add

[2020-03-18 15:49:22,264: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2020-03-18 15:49:22,294: INFO/MainProcess] mingle: searching for neighbors
[2020-03-18 15:49:23,338: INFO/MainProcess] mingle: all alone
[2020-03-18 15:49:23,364: INFO/MainProcess] celery@DESKTOP-8E96VUV ready.
[2020-03-18 15:49:23,371: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/0.

這些輸出包括指定的啓動Celer應用的一些信息,還有註冊的任務等等。 

此時worker已經處於待命狀態,而 broker中尚未任務 ,咱們須要觸發任務進入broker中,worker才能去取出任務執行。

咱們新建一個add_task.py文件:

from tasks import add

result = add.delay(5, 6)  # 使用celery提供的接口delay進行調用任務函數

while not result.ready():
    pass
print("完成:", result.get())

咱們能夠看到命令窗口的輸出的celery執行的日誌

[2020-03-18 15:53:15,967: INFO/MainProcess] Received task: tasks.add[8da270cb-7f07-4202-ad6a-51cc7f559107]
[2020-03-18 15:53:17,981: INFO/MainProcess] Task tasks.add[8da270cb-7f07-4202-ad6a-51cc7f559107] succeeded in 2.015999999974156s: 11

固然咱們在backend的redis中也能夠看到執行任務的相關信息。

至此,一個簡單的 celery 應用就完成啦。

週期/定時任務

Celery 也能夠實現定時或者週期性任務,實現也很簡單,只須要配置好週期任務,而後再啓動要啓動一個 beat 服務便可。

新建Celery配置文件celery_conf.py:

from datetime import timedelta
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=3),
        'args': (16, 16)
    }
}

而後在 tasks.py 中經過app.config_from_object('celery_config') 讀取Celery配置:

# tasks.py
app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.config_from_object('celery_config')

而後從新運行 worker,接着再運行 beat:

celery -A tasks beat

咱們能夠看到如下信息:

D:\use_Celery>celery -A tasks beat
celery beat v4.4.2 (cliffs) is starting.
__    -    ... __   -        _
LocalTime -> 2020-03-18 17:07:54
Configuration ->
    . broker -> redis://127.0.0.1:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%WARNING
    . maxinterval -> 5.00 minutes (300s)

而後咱們就能夠看到啓動worker的命令行在週期性的執行任務:

[2020-03-18 17:07:57,998: INFO/MainProcess] Received task: tasks.add[f5dab8ac-0809-415f-84e7-cba488ea2495]
[2020-03-18 17:07:59,995: INFO/MainProcess] Task tasks.add[f5dab8ac-0809-415f-84e7-cba488ea2495] succeeded in 2.0s: 32
[2020-03-18 17:08:00,933: INFO/MainProcess] Received task: tasks.add[b49a4c92-e007-46ef-9b5d-f93f451a6c1b]
[2020-03-18 17:08:02,946: INFO/MainProcess] Task tasks.add[b49a4c92-e007-46ef-9b5d-f93f451a6c1b] succeeded in 2.0160000000032596s: 32
[2020-03-18 17:08:03,934: INFO/MainProcess] Received task: tasks.add[1bdfe4d8-76c1-44cc-b1fa-dbbe242692ae]
[2020-03-18 17:08:05,940: INFO/MainProcess] Task tasks.add[1bdfe4d8-76c1-44cc-b1fa-dbbe242692ae] succeeded in 2.0s: 32

能夠看出每3秒就有一個任務被加入隊列中去執行。

那定時任務又怎樣去實現呢?

也很簡單,咱們只須要更改一下配置文件便可:

CELERYBEAT_SCHEDULE = {
    'add-crontab-func': {
        'task': 'tasks.add',
        'schedule': crontab(hour=8, minute=50, day_of_week=4),
        'args': (30, 20),
    },
}
CELERY_TIMEZONE = 'Asia/Shanghai'  # 配置時區信息

其中crontab(hour=8, minute=50, day_of_week=4)表明的是每週四的8點50執行一次,只要咱們的Celery服務一直開着,定時任務就會按時執行;在這裏我也在配置里加入了時區信息。

我在這裏是8點45啓動的Celery服務、運行的beat,從下面的輸出能夠看出,50的時候咱們的定時任務就執行了。

[2020-03-19 08:45:19,934: INFO/MainProcess] celery@DESKTOP-8E96VUV ready.
[2020-03-19 08:50:00,086: INFO/MainProcess] Received task: tasks.add[45aa794d-a4ef-40e0-9480-80c7004318d5]
[2020-03-19 08:50:02,091: INFO/MainProcess] Task tasks.add[45aa794d-a4ef-40e0-9480-80c7004318d5] succeeded in 2.0s: 50

由此咱們能夠看出,利用 Celery 進行分佈式隊列管理將會大大的提升咱們的開發效率,我這裏也僅僅是關於Celery的簡單介紹和使用,若是你們感興趣,能夠去官方文檔 學習更高級更系統的用法。

最後,感謝女友在生活中,工做上的包容、理解與支持 !

相關文章
相關標籤/搜索