動手實現一個簡單的Celery

簡介

Celery是一個由Python實現的分佈式任務隊列,任務隊列一般有3個方面的功能。前端

  • 1.減緩高併發壓力,先將任務寫入隊列,有空餘資源再運行
  • 2.執行定時任務,先將任務寫入隊列,指定時間下再執行
  • 3.異步任務,web中存在耗時任務能夠先將其寫入隊列,而後後臺任務進程去執行

已經有不少文章來描述Celery的用法與簡單原理,本篇文章也會簡單說起,但不會費太多筆墨。python

本篇重點在於,利用Python動手實現一個簡單的Celery,並使用本身實現的Celery實現異步任務,與上一篇「Python Web:Flask異步執行任務」同樣,經過Flask構建一個簡單的web,而後執行耗時任務,但願前端能夠經過進度條顯示任務的進度。git

需注意,這裏不會去解讀Celery的源碼,其源碼具備不少工程細節,比較複雜,這裏只是從其本質出發,簡單的實現一個玩具Celery,這個玩具Celery在穩定性、效率等方面固然不能與Celery相比,但能夠很好的理解Celery大致是怎麼實現的。github

本文講究的是「形離神合」,與Celery實現細節不一樣,但本質原理相同。web

那咱們開始吧!redis

Celery的概念與原理

Celery 5 個關鍵的概念,弄明白,就大體理解 Celery 了。json

  • 1.Task(任務) 簡單而言就是你要作的事情,如用戶註冊流程中的發送郵件後端

  • 2.Worker(工做者) 在後臺處理Task的人bash

  • 3.Broker(經紀人) 本質是一種隊列,Task 會交給 Broker ,Worker 會從 Broker 中取 Task ,並處理併發

  • 4.Beat 定時任務調度器,根據定的時間,向 Broker 中添加數據,而後等待 Worker 去處理

  • 5.Backend 用於保存 Worker 執行結果的對象,每一個 Task 都要有返回值,這些返回值,就在 Backend 中

這裏咱們拋開這裏的各類概念,從更本質的角度來看Celery,發現它就一個任務序列化存儲與反序列化獲取的過程。

以Web異步任務爲例,使用方式一般爲:

  • 1.有一個要長時間處理I/O的函數,若是不將其異步執行就會產生的阻塞,這一般是不被容許的
  • 2.啓動一個後臺任務執行進程
  • 3.當要執行耗時函數時,不會馬上同步運行,而是提取函數的關鍵數據,將其序列化存儲到隊列中,隊列可使用Redis或其餘方式實現
  • 4.後臺任務執行進程會從隊列中獲取數據,並將其反序列化還原
  • 5.後臺任務執行進程會使用原來的函數以及還原的數據完成函數的執行,從而實現異步執行的效果。

流程並不複雜,Celery中不一樣的概念分別負責上面流程中的不一樣部分。

實現一個簡單的Celery

接着咱們來實現一個Celery,這裏Celery選擇Redis做爲後端。

先來整理一個大致的框架。

首先咱們須要定義一個Task類來表示要執行的任務,不一樣的任務要執行的具體邏輯由使用者自身編寫。

接着要定義一個任務隊列,即Celery中的Broker,用於存儲要執行的任務。

隨後要定義執行進程Worker,Worker要從Broker中獲取任務並去執行。

最後還須要定義一個用於存儲任務返回數據的類,即Celery中的Backend。

看上去有點複雜,不慌,其實很簡單。

實現任務類

首先來實現task.py,用於定義任務相關的一些邏輯

# task.py
import abc
import json
import uuid
import traceback
import pickle

from broker import Broker
from backend import Backend

class BaseTask(abc.ABC):
    """ Example Usage: class AdderTask(BaseTask): task_name = "AdderTask" def run(self, a, b): result = a + b return result adder = AdderTask() adder.delay(9, 34) """

    task_name = None

    def __init__(self):
        if not self.task_name:
            raise ValueError("task_name should be set")
        self.broker = Broker()
 
 @abc.abstractmethod # abstractmethod 派生類必須重寫實現邏輯
    def run(self, *args, **kwargs):
        # 寫上你具體的邏輯
        raise NotImplementedError("Task `run` method must be implemented.")

    # 更新任務狀態
    def update_state(self, task_id, state, meta={}):
        _task = {"state": state, "meta": meta}
        serialized_task = json.dumps(_task)
        backend = Backend()
        backend.enqueue(queue_name=task_id, item=serialized_task)
        print(f"task info: {task_id} succesfully queued")

    # 異步執行
    def delay(self, *args, **kwargs):
        try:
            self.task_id = str(uuid.uuid4())
            _task = {"task_id": self.task_id, "args": args, "kwargs": kwargs}
            serialized_task = json.dumps(_task)
            # 加入redis中
            self.broker.enqueue(queue_name=self.task_name, item=serialized_task)
            print(f"task: {self.task_id} succesfully queued")
        except Exception:
            # traceback.print_exc()
            raise Exception("Unable to publish task to the broker.")
        return self.task_id

# 獲取數據
def async_result(task_id):
    backend = Backend()
    _dequeued_item = backend.dequeue(queue_name=task_id)
    dequeued_item = json.loads(_dequeued_item)
    state = dequeued_item["state"]
    meta = dequeued_item["meta"]
    class Info():
        def __init__(self, state, meta):
            self.state = state
            self.meta = meta
    info = Info(state, meta)
    return info
複製代碼

上述代碼中,定義了BaseTask類,它繼承自python的abc.ABC成爲一個抽象基類,其中一開始便要求必須定義task_name,這是由於後面咱們須要經過task_name去找對應的任務隊列。

BaseTask類的run()方法被abc.abstractmethod裝飾,該裝飾器會要求BaseTask的派生類必須重寫run()方法,這是爲了讓使用者能夠自定義本身的任務邏輯。

BaseTask類的update_state()方法用於更新任務的狀態,其邏輯很簡單,先將參數進行JSON序列化,而後調用Backend的enqueue()方法將數據存入,這裏的Backend實際上是Redis實例,enqueue()方法會將數據寫入Redis的list中,須要注意,這裏list的key爲task_id,即當前任務的id。

BaseTask類的delay()方法用於異步執行任務,首先經過uuid爲任務建立一個惟一id,而後將方法的參數經過JSON序列化,而後調用Broker的enqueue()將數據存入,這裏的Broker其實也是一個Redis實例,enqueue()方法一樣是將數據寫入到Redis的list中,只是list的key爲task_name,即當前任務的名稱。

此外還實現了async_result()方法,該方法用於異步獲取任務的數據,經過該方法能夠得到任務的執行結果,或任務執行中的各類數據,數據的結構是有簡單約定的,必需要有state表示固然任務的狀態,必需要有meta表示當前任務的一些信息。

實現Broker與Backend

在task.py中使用了Broker與Backend,那接着就來實現一下這兩個,先實現Broker。

# broker.py
import redis # pip install redis

class Broker:
    """ use redis as our broker. This implements a basic FIFO queue using redis. """
    def __init__(self):
        host = "localhost"
        port = 6379
        password = None
        self.redis_instance = redis.StrictRedis(
            host=host, port=port, password=password, db=0, socket_timeout=8.0
        )

    def enqueue(self, item, queue_name):
        self.redis_instance.lpush(queue_name, item)

    def dequeue(self, queue_name):
        dequed_item = self.redis_instance.brpop(queue_name, timeout=3)
        if not dequed_item:
            return None
        dequed_item = dequed_item[1]
        return dequed_item
複製代碼

沒什麼可講的,就是定了兩個方法用於數據的存儲與讀取,存儲使用lpush方法,它會將數據從左邊插入到Redis的list中,讀取數據使用brpop方法,它會從list的右邊取出第一個元素,返回該元素值並從list刪除,左進右出就構成了一個隊列。

爲了簡便,Backend的代碼與Broker如出一轍,只是用來存儲任務的信息而已,代碼就不貼了。

後臺任務執行進程Worker

接着來實現後臺任務執行進程Worker

# worker.py
import json

class Worker:
    """ Example Usage: task = AdderTask() worker = Worker(task=task) worker.start() """
    def __init__(self, task) -> None:
        self.task = task

    def start(self,):
        while True:
            try:
                _dequeued_item = self.task.broker.dequeue(queue_name=self.task.task_name)
                dequeued_item = json.loads(_dequeued_item)
                task_id = dequeued_item["task_id"]
                task_args = dequeued_item["args"]
                task_kwargs = dequeued_item["kwargs"]
                task_kwargs['task_id'] = task_id
                self.task.run(*task_args, **task_kwargs)
                print("succesful run of task: {0}".format(task_id))
            except Exception:
                print("Unable to execute task.")
                continue   
複製代碼

上述代碼中,定義了Worker類,Worker類在初始化時須要指定具體的任務實例,而後從broker中獲取任務相關的數據,接着調用其中的run()方法完成任務的執行,比較簡單。

使用玩具Celery

玩具Celery的關鍵結構都定義好了,接着就來使用一下它,這裏依舊會使用「Python Web:Flask異步執行任務」文章中的部分代碼,如前端代碼,這裏也再也不討論其前端代碼,沒有閱讀能夠先閱讀一下,方便理解下面的內容。

首先定義出一個耗時任務

# app.py
class LongTask(BaseTask):

    task_name = "LongTask"

    def run(self, task_id):
        """Background task that runs a long function with progress reports."""
        verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
        adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
        noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']

        message = ''
        total = random.randint(10, 50)

        for i in range(total):
            if not message or random.random() < 0.25:
                message = '{0} {1} {2}...'.format(random.choice(verb),
                                                  random.choice(adjective),
                                                  random.choice(noun))
            self.update_state(task_id=task_id, state='PROGRESS',
                              meta={'current': i, 'total': total,
                                    'status': message})
            time.sleep(1)
        
        self.update_state(task_id=task_id, state='FINISH', meta={'current':100, 'total': 100,'status': 'Task completed!', 'result':32})
        return
複製代碼

每一個耗時任務都要繼承在BaseTask而且重寫其run()方法,run()方法中的邏輯就是當前這個耗時任務要處理的具體邏輯。

這裏邏輯其實很簡單,就是隨機的從幾個列表中抽取詞彙而已。

在for迭代中,想要前端知道當前任務for迭代的具體狀況,就須要將相應的數據經過BaseTask的update_state()方法將其更新到backend中,使用task_id做爲Redis中list的key。

當邏輯所有執行完後,將狀態爲FINISH的信息存入backend中。

寫一個接口來觸發這個耗時任務

# app.py
@app.route('/longtask', methods=['POST'])
def longtask():
    long_task = LongTask()
    task_id = long_task.delay()
    return jsonify({}), 202, {'Location': url_for('taskstatus',
                                                  task_id=task_id)}
複製代碼

邏輯很是簡單,實例化LongTask(),並調用其中的delay()方法,該方法會將當前任務存入認爲隊列中,當前的請求會將當前任務的task_id經過響應包頭的中的taskstatus字段傳遞給前端。

前端獲取到後,就能夠經過task_id去獲取當前任務執行狀態等信息,從而實現前端的可視化。

接着定義相應的接口來獲取當前任務的信息,調用用async_result()方法,將task_id傳入則可。

# app.py
@app.route('/status/<task_id>')
def taskstatus(task_id):
    info = async_result(task_id)
    print(info)
    if info.state == 'PENDING':
        response = {
            'state': info.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif info.state != 'FAILURE':
        response = {
            'state': info.state,
            'current': info.meta.get('current', 0),
            'total': info.meta.get('total', 1),
            'status': info.meta.get('status', '')
        }
        if 'result' in info.meta:
            response['result'] = info.meta['result']
    else:
        # something went wrong in the background job
        response = {
            'state': info.state,
            'current': 1,
            'total': 1,
            'status': str(info.meta),  # this is the exception raised
        }
    return jsonify(response)
複製代碼

最後,須要定義一個啓動後臺任務執行進程的邏輯

# run_worker.py
from worker import Worker
from app import LongTask

if __name__ == "__main__":
    long_task = LongTask()
    worker = Worker(task=long_task)
    worker.start()
複製代碼

至此,總體結構就構建完了,使用一下。

首先運行redis。

redis-server
複製代碼

而後運行Flask。

python app.py
複製代碼

最後啓動一下後臺任務執行進程,它至關於Celery的celery -A xxx worker --loglevel=info命令。

python run_worker.py
複製代碼

同時執行多個任務,效果以下

對應的一些打印以下:

python run_worker.py
Unable to execute task.
Unable to execute task.
Unable to execute task.
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
task info: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
複製代碼
python app.py
 * Serving Flask app "app" (lazy loading)
 * Environment: production
   WARNING: Do not use the development server in a production environment.
   Use a production WSGI server instead.
 * Debug mode: on
 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
 * Restarting with stat

 * Debugger is active!
 * Debugger PIN: 145-285-706
127.0.0.1 - - [25/Sep/2019 11:14:07] "GET / HTTP/1.1" 200 -
task: 3c7cd8ac-7482-467b-b17c-dba2649b70ee succesfully queued
127.0.0.1 - - [25/Sep/2019 11:14:11] "POST /longtask HTTP/1.1" 202 -
<task.async_result.<locals>.Info object at 0x107f50780>
127.0.0.1 - - [25/Sep/2019 11:14:11] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
<task.async_result.<locals>.Info object at 0x107f50a20>
127.0.0.1 - - [25/Sep/2019 11:14:13] "GET /status/3c7cd8ac-7482-467b-b17c-dba2649b70ee HTTP/1.1" 200 -
複製代碼

須要注意一些,上面的代碼中,使用Worker須要實例化具體的任務,此時任務實例與app.py中經過接口建立的任務實例是不一樣的,Worker利用不一樣的實例,使用相同的參數,從而實現執行效果相同的目的。

代碼已上傳Githu:github.com/ayuLiao/toy…

若是你以爲文章有幫助,請按一下右下角的「在看」小星星,那是能夠按的,謝謝。

相關文章
相關標籤/搜索