[譯] 在 Flask 中使用 Redis Queue 實現異步任務

若是你的應用中存在長執行任務,你應當把它們從普通流程中剝離並置於後臺執行。html

可能你的 web 應用會要求用戶在註冊時上傳頭像(圖片可能須要被裁剪)和進行郵箱驗證。若是你直接在請求處理函數中去加工圖片和發送驗證郵件,那麼終端用戶不得不等待這些執行的完成。相反,你更但願把這些任務放到任務隊列中,並由一個 worker 線程來處理,這種狀況下應用就能馬上響應客戶端的請求了。由此一來,終端用戶能夠在客戶端繼續其餘的操做,你的應用也能被釋放去響應其餘用戶的請求。前端

這篇文章講了如何在 Flask 應用中配置 Redis Queue(RQ)來處理長執行任務。python

固然 Celery 也是一個不錯的解決方案。不過相比於 Redis Queue,它會稍顯複雜並引入更多的依賴項。android

目錄

本文目標

閱讀完本文後,你應當學會:ios

  1. 在 Flask 應用中集成 Redis Queue 並建立相應任務。
  2. 使用 Docker 鏡像化包含 Flask 和 Redis 的應用。
  3. 使用獨立的 worker 線程在後臺處理長執行任務。
  4. 配置 RQ Dashboard 用於監控任務隊列、做業和 worker 線程。
  5. 使用 Docker 擴展 worker 線程的數量。

工做流程

在本文中,咱們的目標是藉助 Redis Queue 的能力開發一個能處理長執行任務的 Flask 應用,其中長執行任務的執行獨立於普通請求、響應的執行。git

  1. 終端用戶經過 POST 請求服務端建立一個新任務
  2. 如圖所示,任務隊列會增長一個新任務,以後服務端再把任務 id 返回給客戶端
  3. 建立好的任務會在服務端後臺執行,客戶端只需使用 AJAX 不斷輪詢任務狀態便可

Flask 集成 Redis Queue 的調用時序圖

最終咱們將實現一個以下所示的應用:github

開發完成

項目配置

想要繼續看下去嗎?clone 下面的倉庫來看看裏面的代碼和結構吧:web

$ git clone https://github.com/mjhea0/flask-redis-queue --branch base --single-branch
$ cd flask-redis-queue
複製代碼

由於咱們一共須要管理三個進程(Flask、Redis 和 worker),爲了簡化這一系列工做流,這裏咱們選擇了使用 Docker 來部署,最終咱們僅需在一個終端裏就能夠運行整個應用了。ajax

像這樣就能將應用跑起來:redis

$ docker-compose up -d --build
複製代碼

使用你的瀏覽器訪問 http://localhost:5004,你應該能看到以下頁面:

flask、redis queue 和 docker

任務觸發

project/client/static/main.js 裏的監聽器監聽到按鍵的點擊後,它會獲取按鍵對應的任務類型 — 123,並把獲得的任務類型看成參數經過 AJAX POST 請求發到服務端。

$('.btn').on('click', function() {
  $.ajax({
    url: '/tasks',
    data: { type: $(this).data('type') },
    method: 'POST'
  })
  .done((res) => {
    getStatus(res.data.task_id)
  })
  .fail((err) => {
    console.log(err)
  });
});
複製代碼

在服務端,project/server/main/views.py 會負責處理客戶端發來的請求:

@main_blueprint.route('/tasks', methods=['POST'])
def run_task():
    task_type = request.form['type']
    return jsonify(task_type), 202
複製代碼

下面咱們來裝配 Redis Queue。

Redis Queue

首先咱們須要在 docker-compose.yml 中添加配置以啓動兩個新的進程 — Redis 和 worker:

version: '3.7'

services:

  web:
    build: .
    image: web
    container_name: web
    ports:
      - '5004:5000'
    command: python manage.py run -h 0.0.0.0
    volumes:
      - .:/usr/src/app
    environment:
      - FLASK_DEBUG=1
      - APP_SETTINGS=project.server.config.DevelopmentConfig
    depends_on:
      - redis

  worker:
    image: web
    command: python manage.py run_worker
    volumes:
      - .:/usr/src/app
    environment:
      - APP_SETTINGS=project.server.config.DevelopmentConfig
    depends_on:
      - redis

  redis:
    image: redis:4.0.11-alpine
複製代碼

在 "project/server/main" 目錄中添加一個新的任務 tasks.py

# project/server/main/tasks.py

import time

def create_task(task_type):
    time.sleep(int(task_type) * 10)
    return True
複製代碼

更新咱們的視圖代碼,讓它能鏈接 Redis 並把任務放入隊列,最後再把任務的 id 返回給客戶端:

@main_blueprint.route('/tasks', methods=['POST'])
def run_task():
    task_type = request.form['type']
    with Connection(redis.from_url(current_app.config['REDIS_URL'])):
        q = Queue()
        task = q.enqueue(create_task, task_type)
    response_object = {
        'status': 'success',
        'data': {
            'task_id': task.get_id()
        }
    }
    return jsonify(response_object), 202
複製代碼

別忘了正確地引入上面用到的庫:

import redis
from rq import Queue, Connection
from flask import render_template, Blueprint, jsonify, \
    request, current_app

from project.server.main.tasks import create_task
複製代碼

更新 BaseConfig 文件:

class BaseConfig(object):
    """基礎配置"""
    WTF_CSRF_ENABLED = True
    REDIS_URL = 'redis://redis:6379/0'
    QUEUES = ['default']
複製代碼

細心的讀者可能發現了,咱們在引用 redis 服務(在 docker-compose.yml 中引入的)的地址時,使用了 REDIS_URL 而非 localhost 或是某個特定 IP。在 Docker 中如何經過 hostname 鏈接其餘服務,能夠在 Docker Compose 官方文檔 中找到答案。

最終,咱們即可以使用 Redis Queue 的 worker 來處理放在隊首的任務了。

@cli.command('run_worker')
def run_worker():
    redis_url = app.config['REDIS_URL']
    redis_connection = redis.from_url(redis_url)
    with Connection(redis_connection):
        worker = Worker(app.config['QUEUES'])
        worker.work()
複製代碼

在這裏,咱們經過自定義的 CLI 命令來啓動 worker。

須要注意的是,經過裝飾器 @cli.command() 啓動的代碼能夠訪問到應用的上下文,以及訪問到在 project/server/config.py 中定義的配置變量。

一樣須要引入正確的庫:

import redis
from rq import Connection, Worker
複製代碼

在 requirements 文件中添加應用的依賴信息:

redis==2.10.6
rq==0.12.0
複製代碼

構建並啓動新的 Docker 容器:

$ docker-compose up -d --build
複製代碼

讓咱們試試觸發一個任務:

$ curl -F type=0 http://localhost:5004/tasks
複製代碼

你應該會獲得相似的返回:

{
  "data": {
    "task_id": "bdad64d0-3865-430e-9cc3-ec1410ddb0fd"
  },
  "status": "success"
}

複製代碼

任務狀態

讓咱們回頭看看客戶端的按鍵監聽器:

$('.btn').on('click', function() {
  $.ajax({
    url: '/tasks',
    data: { type: $(this).data('type') },
    method: 'POST'
  })
  .done((res) => {
    getStatus(res.data.task_id)
  })
  .fail((err) => {
    console.log(err)
  });
});
複製代碼

每當建立任務的 AJAX 請求返回後,咱們便會取出其中的任務 id 繼續調用 getStatus()。若 getStatus() 也成功返回,那麼咱們便在表格 DOM 中新增一行記錄。

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.data.task_id}</td>
        <td>${res.data.task_status}</td>
        <td>${res.data.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);
    const taskStatus = res.data.task_status;
    if (taskStatus === 'finished' || taskStatus === 'failed') return false;
    setTimeout(function() {
      getStatus(res.data.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err);
  });
}
複製代碼

更新視圖層代碼:

@main_blueprint.route('/tasks/<task_id>', methods=['GET'])
def get_status(task_id):
    with Connection(redis.from_url(current_app.config['REDIS_URL'])):
        q = Queue()
        task = q.fetch_job(task_id)
    if task:
        response_object = {
            'status': 'success',
            'data': {
                'task_id': task.get_id(),
                'task_status': task.get_status(),
                'task_result': task.result,
            }
        }
    else:
        response_object = {'status': 'error'}
    return jsonify(response_object)
複製代碼

調用下面命令在隊列中新增一個任務:

$ curl -F type=1 http://localhost:5004/tasks
複製代碼

而後再用上面返回體中的 task_id 來請求新增的任務詳情接口:

$ curl http://localhost:5004/tasks/5819789f-ebd7-4e67-afc3-5621c28acf02

{
  "data": {
    "task_id": "5819789f-ebd7-4e67-afc3-5621c28acf02",
    "task_result": true,
    "task_status": "finished"
  },
  "status": "success"
}
複製代碼

一樣讓咱們在瀏覽器中試試效果:

flask, redis queue, docker

任務控制檯

RQ Dashboard 是一個 Redis Queue 的輕量級 web 端監控系統。

爲了集成 RQ Dashboard,首先你須要在 "project" 下新建一個 "dashboard" 文件夾,而後再在其中新建一個 Dockerfile

FROM python:3.7.0-alpine

RUN pip install rq-dashboard

EXPOSE 9181

CMD ["rq-dashboard"]
複製代碼

接着把上面的模塊做爲 service 添加到 docker-compose.yml 中:

version: '3.7'

services:

  web:
    build: .
    image: web
    container_name: web
    ports:
      - '5004:5000'
    command: python manage.py run -h 0.0.0.0
    volumes:
      - .:/usr/src/app
    environment:
      - FLASK_DEBUG=1
      - APP_SETTINGS=project.server.config.DevelopmentConfig
    depends_on:
      - redis

  worker:
    image: web
    command: python manage.py run_worker
    volumes:
      - .:/usr/src/app
    environment:
      - APP_SETTINGS=project.server.config.DevelopmentConfig
    depends_on:
      - redis

  redis:
    image: redis:4.0.11-alpine

  dashboard:
    build: ./project/dashboard
    image: dashboard
    container_name: dashboard
    ports:
      - '9181:9181'
    command: rq-dashboard -H redis
複製代碼

構建並啓動新的容器:

$ docker-compose up -d --build
複製代碼

打開 http://localhost:9181 來看看整個控制檯:

rq dashboard

能夠嘗試啓動一些任務來試試控制檯功能:

rq dashboard

你也能夠經過增長 worker 的數量來觀察應用的變化:

$ docker-compose up -d --build --scale worker=3
複製代碼

結語

這是一篇在 Flask 中配置 Redis Queue 用於處理長執行任務的基礎指南。你能夠利用該隊列來執行任何可能阻塞或拖慢用戶體驗的進程。

還想繼續挑戰本身?

  1. 註冊 Digital Ocean 並利用 Docker Swarm 把這個應用部署到多個節點。
  2. 爲接口增長單元測試。(能夠使用 fakeredis 來模擬 Redis 實例)
  3. 利用 Flask-SocketIO 把客戶端的輪詢改成 websocket 鏈接。

能夠在 此倉庫 找到本文代碼。

若是發現譯文存在錯誤或其餘須要改進的地方,歡迎到 掘金翻譯計劃 對譯文進行修改並 PR,也可得到相應獎勵積分。文章開頭的 本文永久連接 即爲本文在 GitHub 上的 MarkDown 連接。


掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 AndroidiOS前端後端區塊鏈產品設計人工智能等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃官方微博知乎專欄

相關文章
相關標籤/搜索