談談python裏面關於任務隊列

談談python裏面關於任務隊列

  • 爲何要作任務隊列

要回答這個問題咱們首先看看在流水線上的案列,若是人的速度很慢,機器的速度比人的速度快不少,就會形成,機器生產的東西沒有及時處理,越積越多,形成阻塞,影響生產。python

  • 任務隊列的意義:

打個比方若是出現人的速度跟不上機器速度怎麼辦,這個時候咱們就須要第三方,監管人員(任務隊列)把機器生產的東西,放在一個地方,(隊列),而後分配給每一個用戶,有條不理的執行。linux

python 裏面的celery 模塊是一個簡單,靈活且可靠的,處理大量消息的分佈式系統,而且提供維護這樣一個系統的必需工具。它是一個專一於實時處理的任務隊列,同時也支持任務調度。git

  • 關於安裝celery
pip  install Celery

關於celery 的概念介紹

消息隊列
消息隊列的輸入是工做的一個單元,稱爲任務,獨立的職程(Worker)進程持續監視隊列中是否有須要處理的新任務。
Celery 用消息通訊,一般使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,以後中間人把消息派送給職程,職程對消息進行處理。以下圖所示:
 
Celery 系統可包含多個職程和中間人,以此得到高可用性和橫向擴展能力。
Celery的架構
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件
Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成,包括,RabbitMQ,Redis,MongoDB等,這裏我先去了解RabbitMQ,Redis
任務執行單元
Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這裏我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。github

實戰

環境web

  • kaillinux 主機兩臺(192.168.29.234,192.168.29.198)
  • redis (192.168.29.234 )
  • flower (192.168.29.234)
  • 任務腳本(兩臺都必須部署)

任務腳本redis

  • tasks.py (計算加減乘除)
import os
import sys
import datetime
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
from celery import Celery
from celery import chain, group, chord, Task
import celeryconfig
app = Celery()
app.config_from_object(celeryconfig)
__all__ = ['add', 'reduce','sum_all', 'other']
####################################
# tas #
####################################
@app.task
def add(x, y):
    return x + y
@app.task
def reduce(x, y):
    return x - y
@app.task
def sum(values):
    return sum([int(value) for value in values])
@app.task
def other(x, y):
    return x * y
  • celeryconfig.py
!/usr/bin/python
#coding:utf-8
from kombu import Queue
CELERY_TIMEZONE = 'Asia/Shanghai'
####################################
# 通常配置 #
####################################
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# List of modules to import when celery starts.
CELERY_IMPORTS = ('tasks', )
CELERYD_MAX_TASKS_PER_CHILD = 40 #  每一個worker執行了多少任務就會死掉
BROKER_POOL_LIMIT = 10 #默認celery與broker鏈接池鏈接數
CELERY_DEFAULT_QUEUE='default'
CELERY_DEFAULT_ROUTING_KEY='task.default'
CELERY_RESULT_BACKEND='redis://192.168.29.234:6379/0'  
BROKER_URL='redis://192.168.29.234:6379/0'  
#默認隊列
CELERY_DEFAULT_QUEUE = 'celery'
CELERY_DEFAULT_ROUTING_KEY = 'celery'
CELERYD_LOG_FILE="./logs/celery.log"
CELERY_QUEUEs = (
    Queue("queue_add", routing_key='queue_add'),
    Queue('queue_reduce', routing_key='queue_sum'),
    Queue('celery', routing_key='celery'),
    )
CELERY_ROUTES = {
    'task.add':{'queue':'queue_add', 'routing_key':'queue_add'},
    'task.reduce':{'queue':'queue_reduce', 'routing_key':'queue_sum'},
}

關於flower 是監控任務信息的web 圖表,默認的配置沒有作驗證,並且當主機重啓時,數據會丟失,因此咱們要自定義一個flower 文件

flower githubmongodb

在234 上flower.py 的腳本json

#!/usr/bin/env python
#coding:utf-8
broker_api = 'redis://127.0.0.1:6379/0'
logging = 'DEBUG'
address = '0.0.0.0'
port = 5555
#外部訪問密碼
#basic_auth=['root:ybl8651073']
persistent=True  #持久化celery tasks(若是爲false的話,重啓flower以後,監控的task就消失了)
db="/root/flower_db"

運行

  • 在198上啓動
celery worker -A  tasks --loglevel=info --queues=celery,queue_add --hostname=celery_worker198
  • 在234 上啓動
1.  redis服務
2.  celery worker -A  tasks --loglevel=info --queues=celery,queue_reduce --hostname=celery_worker234
3.  celery  flower worker -A  tasks  --config==/root/flower.py

服務驗證

  • 在任一臺有celeryservice項目代碼的服務器上,運行add、reduce、-
  • sum、other任務(測試可簡單使用add.delay(1,2)等)
  • add只會在198上運行,
  • sum任務,可能會在198或234服務器的worker節點運行
  • reduce任務,只會在234上運行。
  • other任務可能會在198或者234上運行。

打開監控web 192.168.29.234:5555

兩臺上線workers

  • 隨機運行幾個任務

image.png

  • 分析
    運行結果api

  • 也能夠經過 curl提交任務
curl -X POST -d '{"args":[1,2]}' http://192.168.29.234:5555/api/task/async-apply/tasks.add
相關文章
相關標籤/搜索