Celery是一個由Python實現的分佈式任務隊列,任務隊列一般有3個方面的功能。前端
已經有不少文章來描述Celery的用法與簡單原理,本篇文章也會簡單說起,但不會費太多筆墨。python
本篇重點在於,利用Python動手實現一個簡單的Celery,並使用本身實現的Celery實現異步任務,與上一篇「Python Web:Flask異步執行任務」同樣,經過Flask構建一個簡單的web,而後執行耗時任務,但願前端能夠經過進度條顯示任務的進度。git
需注意,這裏不會去解讀Celery的源碼,其源碼具備不少工程細節,比較複雜,這裏只是從其本質出發,簡單的實現一個玩具Celery,這個玩具Celery在穩定性、效率等方面固然不能與Celery相比,但能夠很好的理解Celery大致是怎麼實現的。github
本文講究的是「形離神合」,與Celery實現細節不一樣,但本質原理相同。web
那咱們開始吧!redis
Celery 5 個關鍵的概念,弄明白,就大體理解 Celery 了。json
1.Task(任務) 簡單而言就是你要作的事情,如用戶註冊流程中的發送郵件後端
2.Worker(工做者) 在後臺處理Task的人bash
3.Broker(經紀人) 本質是一種隊列,Task 會交給 Broker ,Worker 會從 Broker 中取 Task ,並處理併發
4.Beat 定時任務調度器,根據定的時間,向 Broker 中添加數據,而後等待 Worker 去處理
5.Backend 用於保存 Worker 執行結果的對象,每一個 Task 都要有返回值,這些返回值,就在 Backend 中
這裏咱們拋開這裏的各類概念,從更本質的角度來看Celery,發現它就一個任務序列化存儲與反序列化獲取的過程。
以Web異步任務爲例,使用方式一般爲:
流程並不複雜,Celery中不一樣的概念分別負責上面流程中的不一樣部分。
接着咱們來實現一個Celery,這裏Celery選擇Redis做爲後端。
先來整理一個大致的框架。
首先咱們須要定義一個Task類來表示要執行的任務,不一樣的任務要執行的具體邏輯由使用者自身編寫。
接着要定義一個任務隊列,即Celery中的Broker,用於存儲要執行的任務。
隨後要定義執行進程Worker,Worker要從Broker中獲取任務並去執行。
最後還須要定義一個用於存儲任務返回數據的類,即Celery中的Backend。
看上去有點複雜,不慌,其實很簡單。
首先來實現task.py,用於定義任務相關的一些邏輯
# task.py
import abc
import json
import uuid
import traceback
import pickle
from broker import Broker
from backend import Backend
class BaseTask(abc.ABC):
""" Example Usage: class AdderTask(BaseTask): task_name = "AdderTask" def run(self, a, b): result = a + b return result adder = AdderTask() adder.delay(9, 34) """
task_name = None
def __init__(self):
if not self.task_name:
raise ValueError("task_name should be set")
self.broker = Broker()
@abc.abstractmethod # abstractmethod 派生類必須重寫實現邏輯
def run(self, *args, **kwargs):
# 寫上你具體的邏輯
raise NotImplementedError("Task `run` method must be implemented.")
# 更新任務狀態
def update_state(self, task_id, state, meta={}):
_task = {"state": state, "meta": meta}
serialized_task = json.dumps(_task)
backend = Backend()
backend.enqueue(queue_name=task_id, item=serialized_task)
print(f"task info: {task_id} succesfully queued")
# 異步執行
def delay(self, *args, **kwargs):
try:
self.task_id = str(uuid.uuid4())
_task = {"task_id": self.task_id, "args": args, "kwargs": kwargs}
serialized_task = json.dumps(_task)
# 加入redis中
self.broker.enqueue(queue_name=self.task_name, item=serialized_task)
print(f"task: {self.task_id} succesfully queued")
except Exception:
# traceback.print_exc()
raise Exception("Unable to publish task to the broker.")
return self.task_id
# 獲取數據
def async_result(task_id):
backend = Backend()
_dequeued_item = backend.dequeue(queue_name=task_id)
dequeued_item = json.loads(_dequeued_item)
state = dequeued_item["state"]
meta = dequeued_item["meta"]
class Info():
def __init__(self, state, meta):
self.state = state
self.meta = meta
info = Info(state, meta)
return info
複製代碼
上述代碼中,定義了BaseTask類,它繼承自python的abc.ABC成爲一個抽象基類,其中一開始便要求必須定義task_name,這是由於後面咱們須要經過task_name去找對應的任務隊列。
BaseTask類的run()方法被abc.abstractmethod裝飾,該裝飾器會要求BaseTask的派生類必須重寫run()方法,這是爲了讓使用者能夠自定義本身的任務邏輯。
BaseTask類的update_state()方法用於更新任務的狀態,其邏輯很簡單,先將參數進行JSON序列化,而後調用Backend的enqueue()方法將數據存入,這裏的Backend實際上是Redis實例,enqueue()方法會將數據寫入Redis的list中,須要注意,這裏list的key爲task_id,即當前任務的id。
BaseTask類的delay()方法用於異步執行任務,首先經過uuid爲任務建立一個惟一id,而後將方法的參數經過JSON序列化,而後調用Broker的enqueue()將數據存入,這裏的Broker其實也是一個Redis實例,enqueue()方法一樣是將數據寫入到Redis的list中,只是list的key爲task_name,即當前任務的名稱。
此外還實現了async_result()方法,該方法用於異步獲取任務的數據,經過該方法能夠得到任務的執行結果,或任務執行中的各類數據,數據的結構是有簡單約定的,必需要有state表示固然任務的狀態,必需要有meta表示當前任務的一些信息。
在task.py中使用了Broker與Backend,那接着就來實現一下這兩個,先實現Broker。
# broker.py
import redis # pip install redis
class Broker:
""" use redis as our broker. This implements a basic FIFO queue using redis. """
def __init__(self):
host = "localhost"
port = 6379
password = None
self.redis_instance = redis.StrictRedis(
host=host, port=port, password=password, db=0, socket_timeout=8.0
)
def enqueue(self, item, queue_name):
self.redis_instance.lpush(queue_name, item)
def dequeue(self, queue_name):
dequed_item = self.redis_instance.brpop(queue_name, timeout=3)
if not dequed_item:
return None
dequed_item = dequed_item[1]
return dequed_item
複製代碼
沒什麼可講的,就是定了兩個方法用於數據的存儲與讀取,存儲使用lpush方法,它會將數據從左邊插入到Redis的list中,讀取數據使用brpop方法,它會從list的右邊取出第一個元素,返回該元素值並從list刪除,左進右出就構成了一個隊列。
爲了簡便,Backend的代碼與Broker如出一轍,只是用來存儲任務的信息而已,代碼就不貼了。
接着來實現後臺任務執行進程Worker
# worker.py
import json
class Worker:
""" Example Usage: task = AdderTask() worker = Worker(task=task) worker.start() """
def __init__(self, task) -> None:
self.task = task
def start(self,):
while True:
try:
_dequeued_item = self.task.broker.dequeue(queue_name=self.task.task_name)
dequeued_item = json.loads(_dequeued_item)
task_id = dequeued_item["task_id"]
task_args = dequeued_item["args"]
task_kwargs = dequeued_item["kwargs"]
task_kwargs['task_id'] = task_id
self.task.run(*task_args, **task_kwargs)
print("succesful run of task: {0}".format(task_id))
except Exception:
print("Unable to execute task.")
continue
複製代碼
上述代碼中,定義了Worker類,Worker類在初始化時須要指定具體的任務實例,而後從broker中獲取任務相關的數據,接着調用其中的run()方法完成任務的執行,比較簡單。
玩具Celery的關鍵結構都定義好了,接着就來使用一下它,這裏依舊會使用「Python Web:Flask異步執行任務」文章中的部分代碼,如前端代碼,這裏也再也不討論其前端代碼,沒有閱讀能夠先閱讀一下,方便理解下面的內容。
首先定義出一個耗時任務
# app.py
class LongTask(BaseTask):
task_name = "LongTask"
def run(self, task_id):
"""Background task that runs a long function with progress reports."""
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
total = random.randint(10, 50)
for i in range(total):
if not message or random.random() < 0.25:
message = '{0} {1} {2}...'.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
self.update_state(task_id=task_id, state='PROGRESS',
meta={'current': i, 'total': total,
'status': message})
time.sleep(1)
self.update_state(task_id=task_id, state='FINISH', meta={'current':100, 'total': 100,'status': 'Task completed!', 'result':32})
return
複製代碼
每一個耗時任務都要繼承在BaseTask而且重寫其run()方法,run()方法中的邏輯就是當前這個耗時任務要處理的具體邏輯。
這裏邏輯其實很簡單,就是隨機的從幾個列表中抽取詞彙而已。
在for迭代中,想要前端知道當前任務for迭代的具體狀況,就須要將相應的數據經過BaseTask的update_state()方法將其更新到backend中,使用task_id做爲Redis中list的key。
當邏輯所有執行完後,將狀態爲FINISH的信息存入backend中。
寫一個接口來觸發這個耗時任務
# app.py
@app.route('/longtask', methods=['POST'])
def longtask():
long_task = LongTask()
task_id = long_task.delay()
return jsonify({}), 202, {'Location': url_for('taskstatus',
task_id=task_id)}
複製代碼
邏輯很是簡單,實例化LongTask(),並調用其中的delay()方法,該方法會將當前任務存入認爲隊列中,當前的請求會將當前任務的task_id經過響應包頭的中的taskstatus字段傳遞給前端。
前端獲取到後,就能夠經過task_id去獲取當前任務執行狀態等信息,從而實現前端的可視化。
接着定義相應的接口來獲取當前任務的信息,調用用async_result()方法,將task_id傳入則可。
# app.py
@app.route('/status/<task_id>')
def taskstatus(task_id):
info = async_result(task_id)
print(info)
if info.state == 'PENDING':
response = {
'state': info.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif info.state != 'FAILURE':
response = {
'state': info.state,
'current': info.meta.get('current', 0),
'total': info.meta.get('total', 1),
'status': info.meta.get('status', '')
}
if 'result' in info.meta:
response['result'] = info.meta['result']
else:
# something went wrong in the background job
response = {
'state': info.state,
'current': 1,
'total': 1,
'status': str(info.meta), # this is the exception raised
}
return jsonify(response)
複製代碼
最後,須要定義一個啓動後臺任務執行進程的邏輯
# run_worker.py
from worker import Worker
from app import LongTask
if __name__ == "__main__":
long_task = LongTask()
worker = Worker(task=long_task)
worker.start()
複製代碼
至此,總體結構就構建完了,使用一下。
首先運行redis。
redis-server
複製代碼
而後運行Flask。
python app.py
複製代碼
最後啓動一下後臺任務執行進程,它至關於Celery的celery -A xxx worker --loglevel=info
命令。
python run_worker.py
複製代碼
同時執行多個任務,效果以下
對應的一些打印以下:
python run_worker.py
Unable to execute task.
Unable to execute task.
Unable to execute task.
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
複製代碼
python app.py
* Serving Flask app "app" (lazy loading)
* Environment: production
WARNING: Do not use the development server in a production environment.
Use a production WSGI server instead.
* Debug mode: on
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
* Restarting with stat
* Debugger is active!
* Debugger PIN: 145-285-706
127.0.0.1 - - [25/Sep/2019 11:14:07] "GET / HTTP/1.1" 200 -
task: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
127.0.0.1 - - [25/Sep/2019 11:14:11] "POST /longtask HTTP/1.1" 202 -
<task.async_result.<locals>.Info object at 0x107f50780>
127.0.0.1 - - [25/Sep/2019 11:14:11] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
<task.async_result.<locals>.Info object at 0x107f50a20>
127.0.0.1 - - [25/Sep/2019 11:14:13] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
複製代碼
須要注意一些,上面的代碼中,使用Worker須要實例化具體的任務,此時任務實例與app.py中經過接口建立的任務實例是不一樣的,Worker利用不一樣的實例,使用相同的參數,從而實現執行效果相同的目的。
代碼已上傳Githu:github.com/ayuLiao/toy…
若是你以爲文章有幫助,請按一下右下角的「在看」小星星,那是能夠按的,謝謝。