Celery實現週期任務

  這個翻譯以後竟然叫芹菜~~最近Django框架須要涉及到執行週期任務~~上網搜了下其實還挺多的(django_crontab:這個學習週期短,可是發現不只麻煩還很差用啊)、(apscheduler,簡單還行在沒徹底掌握Celery時先用它頂了一段時間,可是須要注意不一樣版本的使用方法差異還挺大的),最後仍是決定花時間來學學,Celery是Python語言實現的分佈式隊列服務,除了支持即時任務,還支持定時任務。python

Celery中的五個核心角色

  • Task

  任務(Task)就是你要作的事情,例如一個註冊流程裏面有不少任務,給用戶發驗證郵件就是一個任務,這種耗時任務能夠交給Celery去處理,還有一種任務是定時任務,好比天天定時統計網站的註冊人數,這個也能夠交給Celery週期性的處理。redis

  • Broker

  Broker 的中文意思是經紀人,指爲市場上買賣雙方提供中介服務的人。在Celery中它介於生產者和消費者之間經紀人,這個角色至關於數據結構中的隊列。例如一個Web系統中,生產者是處理核心業務的Web程序,業務中可能會產生一些耗時的任務,好比短信,生產者會將任務發送給 Broker,就是把這個任務暫時放到隊列中,等待消費者來處理。消費者是 Worker,是專門用於執行任務的後臺服務。Worker 將實時監控隊列中是否有新的任務,若是有就拿出來進行處理。Celery 自己不提供隊列服務,通常用 Redis 或者 RabbitMQ 來扮演 Broker 的角色django

  • Worker

  Worker 就是那個一直在後臺執行任務的人,也稱爲任務的消費者,它會實時地監控隊列中有沒有任務,若是有就當即取出來執行。數據結構

  • Beat

  Beat 是一個定時任務調度器,它會根據配置定時將任務發送給 Broker,等待 Worker 來消費。app

  • Backend

  Backend 用於保存任務的執行結果,每一個任務都有返回值,好比發送郵件的服務會告訴咱們有沒有發送成功,這個結果就是存在Backend中,固然咱們並不老是要關心任務的執行結果。框架

                                                 

入門操做

一、安裝(Celery4.x 開始再也不支持Windows平臺,若是須要在Windows開發,請使用3.x的版本)異步

pip install celery==3.1.15

二、建立一個實例async

# 建立一個Celery實例,broker是管道用於儲存任務,官方推薦Redis、RabbitMQ;backend用於存儲任務執行結果
app = Celery("tasks", broker="redis://127.0.0.1:6379", backend='redis://127.0.0.1:6379')

 三、直接上代碼(新建一個celery_task1目錄,裏面建下面兩個py文件)分佈式

#!/usr/bin/env python3
# -*- encoding:utf-8 -*-
"""
author:Barret
mail:barret_vip@163.com
"""
from celery import Celery
import time

# 建立Celery實例
app = Celery('tasks', broker='redis://127.0.0.1:6379',
                      backend='redis://127.0.0.1:6379',
             )
app.conf.task_protocol = 1

# 建立任務
@app.task
def add(x):
    time.sleep(1)
    print("開始任務了:%s" %x)
    return x
celerys.py
#!/usr/bin/env python3
# -*- encoding:utf-8 -*-
"""
author:Barret
mail:barret_vip@163.com
"""
from celery.result import AsyncResult
import sys

dir = r"D:\today\celery_task1"
sys.path.append(dir)  # 個人任務文件不在環境變量裏,IDLE找不到
from celerys import add
reslut = add.delay(1)

# 判斷是否有值,get調用會阻塞
print(reslut) # 能夠查看id,也可使用reslut.id獲取id。

# 異步獲取任務返回值
async_task = AsyncResult(id="45cf4a22-82d5-43f2-a0c9-8fd3af6b1136", app=add)

# 判斷異步任務是否執行成功
if async_task.successful():
    r = reslut.get(propagate=False)  # 出現異常返回異常不觸發異常
    print(r)
else:
    print("任務還沒執行完畢")
cmd

四、執行步驟(不建議直接在Django項目中建立,會出現報錯)ide

# 一、打開CMD或者Pycharm下的Terminal執行下面命令:
celery -A celerys  worker -l info -P eventlet

# 二、接着運行cmd.py便可打印結果

 結果出現這樣的即運行成功:

入門操做報錯

  • BUG1:若是出現:AttributeError: 'str' object has no attribute 'items',則是redis版本太高(我安裝的是3.x),安裝2.10.6便可~(可是在我安裝2.10以後又說我版本過低,可是當我從新裝回3.x的時候又能用了~~~我只能說玄學好吧)

  •  BUG_2:若是出現下面這個報錯解決辦法是,在實例化Celery時添加下面配置項,這個好像是新協議問題,使用指定到舊協議。

# 在app建立的時候指定
CELERY_TASK_PROTOCOL = 1  # Django中使用這個
app.conf.task_protocol = 1
BUG_2:解決辦法
  • BUG_3:若是想獲取返回結果直接用get方式會報錯,須要指定保存任務結果的位置。
reslut.get()  # 會報錯這裏是實例化的時候,沒有定義backend,就是保存任務結果的位置。

  • BUG_3解決:指定輸出到,我這裏仍是指定到redis中。

好吧我算是把各類報錯都玩了個遍~~~~

  • BUG_4:在執行時又出現下面這個報錯了
Traceback (most recent call last):
  File "d:\programmingsoftware\python35\lib\site-packages\billiard\pool.py", line 358, in workloop
    result = (True, prepare_result(fun(*args, **kwargs)))
  File "d:\programmingsoftware\python35\lib\site-packages\celery\app\trace.py", line 525, in _fast_trace_task
    tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
pip install eventlet # 安裝一下
celery -A task1 worker -l info -P eventlet # 將啓動命令改爲這個
再去執行cmd.py就沒問題了~~
BUG_4:解決方法
  • 哇~~~~想哭一波三折終於看到輸出了(這一步就表明初步使用沒問題了,接下來我終於能夠作週期任務了)

在Django中使用Celery

更新中..................

相關文章
相關標籤/搜索