celery 異步任務 週期任務 定時任務的實現

hello, 小夥伴們, 很久不更新了,這一次帶來的是celery在python中的應用以及設置異步任務週期任務和定時任務的步驟,但願能給入坑的你帶來些許幫助.python

首先是對celery的介紹,Celery實際上是一個專一於實時處理和調度任務的分佈式任務隊列,同時提供操做和維護分佈式系統所須要的所有數據, 所以能夠用它提供的接口快速實現並管理一個分佈式的任務隊列,它自己不是任務隊列,它是封裝了操做常見任務隊列的各類操做, 可使用它快速進行任務隊列的使用與管理.在Python中的組成部分是 1.用戶任務 app    2.管道 broker 用於存儲任務 官方推薦的是 redis rabbitMQ   / backend 用於存儲任務執行結果的   3, 員工 worker  大體流程入下:linux

 最左邊的是用戶, 用戶發起1個請求給服務器, 要服務器執行10個任務,將這10個任務分給10個調度器,即開啓10個線程進行任務處理,worker會一直監聽調度器是否有任務, 一旦發現有新的任務, 就會當即執行新任務,一旦執行完就會返回給調度器, 即backend, backend會將請求發送給服務器, 服務器將結果返回給用戶, 表現的結果就是,這10個任務同時完成,同時返回,,這就是Celery的整個工做流程, 其中的角色分別爲,任務(app_work), 調度器(broker + backend), 將任務緩存的部分, 即將全部任務暫時存在的地方,至關於生產者, 消費者(worker 能夠指定數量, 即在建立worker命令的時候能夠指定數量), 在worker拿到任務後,人就控制不了了, 除非把worker殺死, 否則確定會執行完.redis

也即 任務來了之後, 調度器(broker)去緩存任務, worker去執行任務, 完成後返回backend,接着返回,windows

還有就是關於定時任務和週期任務在linux上爲何不用自身所帶着的去作,是由於linux週期定時任務是不可控的, 很差管理, 返回值保存也是個麻煩事, 而celery只要開啓着調度器, 就能夠隨時把人物結果獲取到,即便用celery控制起來是很是方便的.緩存

接下來就是實例代碼:服務器

from celery import Celery
import time
# 建立一個Celery實例, 就是用戶的應用app 第一個參數是任務名稱, 能夠隨意起 後面的就是配置的broker和backend
diaoduqi= Celery("mytask", broker="redis://127.0.0.1:6379", backend="redis:127.0.0.1:6379")
# 接下來是爲應用建立任務 ab

@diaoduqi.task
def ab(a,b):
    time.sleep(15)
    return a+b
workers.py
from worker import ab

# 將任務交給Celery的Worker執行
res = ab.delay(2,4)

#返回任務ID
print(res.id)
brokers.py
from celery.result import AsyncResult
from worker import diaoduqi

# 異步獲取任務返回值
async_task = AsyncResult(id="31ec65e8-3995-4ee1-b3a8-1528400afd5a",app=diaoduqi)

# 判斷異步任務是否執行成功
if async_task.successful():
    #獲取異步任務的返回值
    result = async_task.get()
    print(result)
else:
    print("任務還未執行完成")
backends.py

爲了方便,如今直接將三個文件表明的部分命名在文件名稱中.首先是啓動workers.py  app

啓動方式是依據系統的不一樣來啓動的, 對於linux下   celery worker -A workers -l INFO 也能夠指定開啓的worker數量 即在後面添加的參數是  -c  5   表示指定5個worker  理論上指定的worker是無上限的,異步

在windows下須要安裝一個eventlet模塊進行運行, 否則不會運行成功 pip install eventlet 能夠開啓線程  不指定數量是默認6個worker, 理論上worker的數量能夠開啓無限個,可是celery worker -A s1 -l INFO -P eventlet -c 5   使用eventlet 開啓5個worker   執行async

該命令後 處於就緒狀態, 須要發佈任務, 即brokers.py進行任務發佈, 方法是使用delay的方式執行異步任務, 返回了一個任務id, 接着去backends.py中取這個任務id, 去查詢任務是否完成,斷定條件即任務.successful 判斷是否執行完,   上面就是celery異步執行任務的用法與解釋分佈式

接下來就是celery在項目中的應用

在實際項目中應用celery是有必定規則的, 即目錄結構應該以下.

結構說明   首先是建立一個CeleryTask的包,接着是在裏面建立一個celery.py,必須是這個文件  關於重名的問題, 找尋模塊的順序是先從當前目錄中去尋找, 根本找不到,接着是從內置模塊中去找, 根本就找不到寫的這個celery這個文件,

from celery import Celery

DDQ = Celery("DDQ",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379",
             include=["CeleryTask.TaskOne","CeleryTask.TaskTwo"])
celery.py
import time
from CeleryTask.celery import DDQ

@DDQ.task
def one1(a,b):
    # time.sleep(3)
    return a+b


@DDQ.task
def one2():
    time.sleep(2)
    return "one2"
TaskOne.py
import time
from CeleryTask.celery import DDQ


@DDQ.task
def two1():
    time.sleep(2)
    return "two1"


@DDQ.task
def two2():
    time.sleep(3)
    return "two2"
taskTwo.py
from CeleryTask.TaskOne import one1 as one

# one.delay(10,10)
# two.delay(20,20)

# 定時任務咱們不在使用delay這個方法了,delay是當即交給task 去執行
# 如今咱們使用apply_async定時執行

# 首先咱們要先給task一個執行任務的時間
import datetime, time

# 獲取當前時間 此時間爲東八區時間
ctime = time.time()
# 將當前的東八區時間改成 UTC時間 注意這裏必定是UTC時間,沒有其餘說法
utc_time = datetime.datetime.utcfromtimestamp(ctime)
# 爲當前時間增長 10 秒
add_time = datetime.timedelta(seconds=10)
action_time = utc_time + add_time

# action_time 就是當前時間將來10秒以後的時間
# 如今咱們使用apply_async定時執行
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
res = one.apply_async(args=(6, 10), eta=action_time)
print(res.id)
# 這樣本來延遲5秒執行的One函數如今就要在10秒鐘之後執行了
getR.py

接着是在命令行cd到與CeleryTask同級目錄下, 使用命令 celery worker -A CeleryTask -l INFO -P eventlet -c 50   這樣 就開啓了worker   接着去 發佈任務, 在定時任務中再也不使用delay這個方法了,

delay是當即交給ttask去執行, 在這裏使用 apply_async定時執行  指的是調度的時候去定時執行

須要設置的是UTC時間, 以及定時的時間(多長時間之後執行)   以後使用 celery worker -A CeleryTask -l INFO -P eventlet -c 50 命令開啓worker, 以後運行 getR.py文件發佈任務, 能夠看到在定義的時間之後執行該任務

週期任務

週期任務 指的是在指定時間去執行任務   須要導入的一個模塊有 crontab

文件結構以下

 

結構同定時任務差很少,只不過須要變更一下文件內容  GetR文件已經不須要了,能夠刪除.

from celery import Celery
from celery.schedules import crontab

DDQ = Celery("DDQ", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379",
             include=["CeleryTask.TaskOne", "CeleryTask.TaskTwo"])

# 我要要對beat任務生產作一個配置,這個配置的意思就是每10秒執行一次Celery_task.task_one任務參數是(10,10)
DDQ.conf.beat_schedule = {
    "each10s_task": {
        "task": "CeleryTask.TaskOne.one1",
        "schedule": 10,  # 每10秒鐘執行一次
        "args": (10, 10)
    },
    "each1m_task": {
        "task": "CeleryTask.TaskOne.one2",
        "schedule": crontab(minute=1)  # 每1分鐘執行一次 也能夠替換成 60  即  "schedule": 60
    }
}
celery.py
import time

from CeleryTask.celery import DDQ

@DDQ.task
def one1(a,b):
    # time.sleep(3)
    return a+b


@DDQ.task
def one2():
    time.sleep(2)
    return "one2"
TaskOne.py
import time

from CeleryTask.celery import DDQ

@DDQ.task
def two1():
    time.sleep(2)
    return "two1"


@DDQ.task
def two2():
    time.sleep(3)
    return "two2"
taskTwo.py

以上配置完成之後,這時候就不能直接建立worker了,由於要執行週期任務,須要首先有一個任務的生產方, 即 celery beat -A  CeleryTask, 用來產生建立者, 接着是建立worker worker的建立命令仍是原來的命令, 即  celery worker -A CeleryTask -l INFO -P eventlet -c 50 , 建立完worker以後, 每10秒就會由beat建立一個任務給 worker去執行.至此, celery建立異步任務, 週期任務,定時任務完畢, 夥伴們本身拿去測試吧.

相關文章
相關標籤/搜索