Python—異步任務隊列Celery簡單使用

 

一.Celery簡介

  Celery是一個簡單,靈活,可靠的分佈式系統,用於處理大量消息,同時爲操做提供維護此類系統所需的工具。它是一個任務隊列,專一於實時處理,同時還支持任務調度。python

 中間人boker:mysql

   broker是一個消息傳輸的中間件。每當應用程序調用celery的異步任務的時候,會向broker傳遞消息,然後celery的worker將會取到消息,進行對於的程序執行。其中Broker的中文意思是 經紀人 ,其實就是一開始說的 消息隊列 ,用來發送和接受消息。這個Broker有幾個方案可供選擇:RabbitMQ (消息隊列),Redis(緩存數據庫),數據庫(不推薦),等等。redis

 backend:sql

   一般程序發送的消息,發完就完了,可能都不知道對方時候接受了。爲此,celery實現了一個backend,用於存儲這些消息以及celery執行的一些消息和結果。Backend是在Celery的配置中的一個配置項 CELERY_RESULT_BACKEND ,做用是保存結果和狀態,若是你須要跟蹤任務的狀態,那麼須要設置這一項。可使用數據庫做爲backend。 mongodb

二.特性

  ♦高可用:  假若鏈接丟失或失敗,職程和客戶端會自動重試,而且一些中間人經過 主/主 或 主/從 方式複製來提升可用性。數據庫

  ♦快速:     單個 Celery 進程每分鐘可處理數以百萬計的任務,而保持往返延遲在亞毫秒級(使用 RabbitMQ、py-librabbitmq 和優化過的設置)。windows

  ♦靈活:     Celery 幾乎全部部分均可以擴展或單獨使用。能夠自制鏈接池、 序列化、壓縮模式、日誌、調度器、消費者、生產者、自動擴展、 中間人傳輸或更多。緩存

    ♦簡單:      Celery 易於使用和維護,而且它  不須要配置文件  。  

三.組成

  Celery的架構由三部分組成,消息中間件(message broker)任務執行單元(worker)任務執行結果存儲(task result store)組成。(百度上的圖片)架構

                                       

              

 

   消息中間件併發

     Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, RedisMongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

   任務執行單元

     Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。

   任務結果存儲

     Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。

 

 四.安裝

  安裝Celery,(這裏使用redis做爲中間件,windows注意安裝對應支持的版本)

pip3 install celery['redis']

五.簡單使用

  使用celery包含三個方面:1. 定義任務函數。2. 運行celery服務。3. 客戶應用程序的調用。

  1.目錄結構:

    CeleryTest

      ¦--tasks.py

      ¦--user.py

#tasks.py
import time
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')


@app.task
def send(msg):
    print(f'send {msg}')
    time.sleep(3)
    return 
#user.py
from tasks import send
import time


def register():
    start = time.time()
    send.delay('666')
    print('耗時:', time.time() - start)


if __name__ == '__main__':
    register()

  2.這裏使用redis做borker,啓動redis,進入redis目錄啓動  

$ redis-server

  3.啓動worker,在CeleryTest的同級目錄終端輸入,—A爲Celery實例所在位置

$ celery -A tasks worker  -l info

  啓動成功會看到以下畫面:

                 

  4.運行user.py文件,輸入以下:

耗時: 0.15261435508728027

  調用 delay 函數便可啓動 add 這個任務。這個函數的效果是發送一條消息到broker中去,這個消息包括要執行的函數、函數的參數以及其餘信息,具體的能夠看 Celery官方文檔。這個時候 worker 會等待 broker 中的消息,一旦收到消息就會馬上執行消息。能夠看到調用send.delay()後,耗時並無受到time.sleep()的影響,成功的完成異步調用。咱們能夠在項目中執行耗時的任務時來使用Celery。這裏簡單演示並無使用backend儲存任務的結果。 

六.使用配置文件

  將Celery封裝成一個項目進行使用,這裏簡單的配置一下,方便演示,更多配置參數能夠參考官方文檔。   

    celery_demo

      ¦--celery_app

         ¦--__init__.py

         ¦--celeryconfig.py

         ¦--task1.py

         ¦--task2.py

       ¦--client.py

  __init__.py

from celery import Celery
app = Celery('demo')                                # 生成實例
app.config_from_object('celery_app.celeryconfig')   # 加載配置

  celeryconfig.py

BROKER_URL = 'redis://127.0.0.1:6379'               # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'  # 指定 Backend

CELERY_TIMEZONE = 'Asia/Shanghai'                   # 指定時區,默認是 UTC
# CELERY_TIMEZONE='UTC'                             

CELERY_IMPORTS = (                                  # 指定導入的任務模塊
    'celery_app.task1',
    'celery_app.task2'
)

  task1.py

import time
from celery_app import app


@app.task
def add(x, y):
    time.sleep(2)
    return x + y

  task2.py

import time
from celery_app import app


@app.task
def multiply(x, y):
    time.sleep(2)
    return x * y

  client.py

from celery_app import task1
from celery_app import task2

res1 = task1.add.delay(2, 8)       # 或者 task1.add.apply_async(args=[2, 8])
res2 = task2.multiply.delay(3, 7)  # 或者 task2.multiply.apply_async(args=[3, 7])
print('hello world')

 啓動worker,在celery_demo目錄執行下列命令

celery_demo $ celery -A celery_app worker -l info

 接着,運行$ python client.py,它會發送兩個異步任務到 Broker,在 Worker 的窗口咱們能夠看到以下輸出:

 

 

 在前面的例子中,咱們使用 delay()或 apply_async()方法來調用任務。事實上,delay 方法封裝了 apply_async,以下:

def delay(self, *partial_args, **partial_kwargs):
    """Shortcut to :meth:`apply_async` using star arguments."""
    return self.apply_async(partial_args, partial_kwargs)

七.定時任務

  Celery 除了能夠執行異步任務,也支持執行週期性任務(Periodic Tasks),或者說定時任務。Celery Beat 進程經過讀取配置文件的內容,週期性地將定時任務發往任務隊列。在上面的例子中,修改配置文件便可實現。

  celeryconfig.py

from celery.schedules import crontab
from datetime import timedelta

BROKER_URL = 'redis://127.0.0.1:6379'               # 指定 Broker
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'  # 指定 Backend

CELERY_TIMEZONE = 'Asia/Shanghai'                   # 指定時區,默認是 UTC
# CELERY_TIMEZONE='UTC'                             

CELERY_IMPORTS = (                                  # 指定導入的任務模塊
    'celery_app.task1',
    'celery_app.task2'
)
# schedules
CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'celery_app.task1.add',
        'schedule': timedelta(seconds=30),          # 每 30 秒執行一次
        'args': (5, 8)                              # 任務函數參數
    },
    'multiply-at-some-time': {
        'task': 'celery_app.task2.multiply',
        'schedule': crontab(hour=10, minute=50),    # 天天早上 10 點 50 分執行一次
        'args': (3, 7)                              # 任務函數參數
    }
}

啓動worker,而後定時將任務發送到 Broker,在celery_demo目錄下執行下面兩條命令:

celery -A celery_app worker -l info
celery beat -A celery_app

上面兩條命令也能夠合併爲一條:

celery -B -A celery_app worker -l info
相關文章
相關標籤/搜索