要回答這個問題咱們首先看看在流水線上的案列,若是人的速度很慢,機器的速度比人的速度快不少,就會形成,機器生產的東西沒有及時處理,越積越多,形成阻塞,影響生產。python
打個比方若是出現人的速度跟不上機器速度怎麼辦,這個時候咱們就須要第三方,監管人員(任務隊列)把機器生產的東西,放在一個地方,(隊列),而後分配給每一個用戶,有條不理的執行。linux
python 裏面的celery 模塊是一個簡單,靈活且可靠的,處理大量消息的分佈式系統,而且提供維護這樣一個系統的必需工具。它是一個專一於實時處理的任務隊列,同時也支持任務調度。git
pip install Celery
消息隊列
消息隊列的輸入是工做的一個單元,稱爲任務,獨立的職程(Worker)進程持續監視隊列中是否有須要處理的新任務。
Celery 用消息通訊,一般使用中間人(Broker)在客戶端和職程間斡旋。這個過程從客戶端向隊列添加消息開始,以後中間人把消息派送給職程,職程對消息進行處理。以下圖所示:
Celery 系統可包含多個職程和中間人,以此得到高可用性和橫向擴展能力。
Celery的架構
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。
消息中間件
Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成,包括,RabbitMQ,Redis,MongoDB等,這裏我先去了解RabbitMQ,Redis。
任務執行單元
Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中
任務結果存儲
Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等,這裏我先不去看它是如何存儲的,就先選用Redis來存儲任務執行結果。github
環境web
任務腳本redis
import os import sys import datetime BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASE_DIR) from celery import Celery from celery import chain, group, chord, Task import celeryconfig app = Celery() app.config_from_object(celeryconfig) __all__ = ['add', 'reduce','sum_all', 'other'] #################################### # tas # #################################### @app.task def add(x, y): return x + y @app.task def reduce(x, y): return x - y @app.task def sum(values): return sum([int(value) for value in values]) @app.task def other(x, y): return x * y
!/usr/bin/python #coding:utf-8 from kombu import Queue CELERY_TIMEZONE = 'Asia/Shanghai' #################################### # 通常配置 # #################################### CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True # List of modules to import when celery starts. CELERY_IMPORTS = ('tasks', ) CELERYD_MAX_TASKS_PER_CHILD = 40 # 每一個worker執行了多少任務就會死掉 BROKER_POOL_LIMIT = 10 #默認celery與broker鏈接池鏈接數 CELERY_DEFAULT_QUEUE='default' CELERY_DEFAULT_ROUTING_KEY='task.default' CELERY_RESULT_BACKEND='redis://192.168.29.234:6379/0' BROKER_URL='redis://192.168.29.234:6379/0' #默認隊列 CELERY_DEFAULT_QUEUE = 'celery' CELERY_DEFAULT_ROUTING_KEY = 'celery' CELERYD_LOG_FILE="./logs/celery.log" CELERY_QUEUEs = ( Queue("queue_add", routing_key='queue_add'), Queue('queue_reduce', routing_key='queue_sum'), Queue('celery', routing_key='celery'), ) CELERY_ROUTES = { 'task.add':{'queue':'queue_add', 'routing_key':'queue_add'}, 'task.reduce':{'queue':'queue_reduce', 'routing_key':'queue_sum'}, }
flower githubmongodb
在234 上flower.py 的腳本json
#!/usr/bin/env python #coding:utf-8 broker_api = 'redis://127.0.0.1:6379/0' logging = 'DEBUG' address = '0.0.0.0' port = 5555 #外部訪問密碼 #basic_auth=['root:ybl8651073'] persistent=True #持久化celery tasks(若是爲false的話,重啓flower以後,監控的task就消失了) db="/root/flower_db"
celery worker -A tasks --loglevel=info --queues=celery,queue_add --hostname=celery_worker198
1. redis服務 2. celery worker -A tasks --loglevel=info --queues=celery,queue_reduce --hostname=celery_worker234 3. celery flower worker -A tasks --config==/root/flower.py
分析
api
curl -X POST -d '{"args":[1,2]}' http://192.168.29.234:5555/api/task/async-apply/tasks.add