與rabbitmq消息隊列的區別與聯繫:
- rabbitmq 調度的是消息,而Celery調度的是任務.
- Celery調度任務時,須要傳遞參數信息,傳輸載體能夠選擇rabbitmq.
- 利用rabbitmq的持久化和ack特性,Celery能夠保證任務的可靠性.
優勢:
- 輕鬆構建分佈式的Service Provider。
- 高可擴展性,增長worker也就是增長了隊列的consumer。
- 可靠性,利用消息隊列的durable和ack,能夠儘量下降消息丟失的機率,當worker崩潰後,未處理的消息會從新進入消費隊列。
- 用戶友好,利用flower提供的管理工具能夠輕鬆的管理worker。
flower
- 使用tornado-celery,結合tornado異步非阻塞結構,能夠提升吞吐量,輕鬆建立分佈式服務框架。
- 學習成本低,可快速入門
快速入門
定義一個celery實例main.py:html
1 2 3 4
|
from celery import Celery app = Celery('route_check', include=['check_worker_path'], broker='amqp://user:password@rabbitmq_host:port//') app.config_from_object('celeryconfig')
|
include指的是須要celery掃描是否有任務定義的模塊路徑。例如add_task
就是掃描add_task.py中的任務python
celery的配置文件能夠從文件、模塊中讀取,這裏是從模塊中讀取,celeryconfig.py爲:git
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
from multiprocessing import cpu_count
from celery import platforms from kombu import Exchange, Queue
CELERYD_POOL_RESTARTS = False CELERY_RESULT_BACKEND = 'redis://:password@redis_host:port/db' CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('common_check', Exchange('route_check'), routing_key='common_check'), Queue('route_check', Exchange('route_check'), routing_key='route_check', delivery_mode=2), Queue('route_check_ignore_result', Exchange('route_check'), routing_key='route_check_ignore_result', delivery_mode=2) ) CELERY_ROUTES = { 'route_check_task.check_worker.common_check': {'queue': 'common_check'}, 'route_check_task.check_worker.check': {'queue': 'route_check'}, 'route_check_task.check_worker.check_ignore_result': {'queue': 'route_check_ignore_result'} } CELERY_DEFAULT_QUEUE = 'default' CELERY_DEFAULT_EXCHANGE = 'default' CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' CELERY_DEFAULT_ROUTING_KEY = 'default' # CELERY_MESSAGE_COMPRESSION = 'gzip' CELERY_ACKS_LATE = True CELERYD_PREFETCH_MULTIPLIER = 1 CELERY_DISABLE_RATE_LIMITS = True CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True CELERYD_CONCURRENCY = cpu_count() / 2 CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_PUBLISH_RETRY = True CELERY_TASK_PUBLISH_RETRY_POLICY = { 'max_retries': 3, 'interval_start': 10, 'interval_step': 5, 'interval_max': 20 } platforms.C_FORCE_ROOT = True
|
這裏面是一些celery的配置參數。github
在上面include的add_task.py定義以下:web
1 2 3 4 5 6 7
|
#encoding:utf8
from main import app
@app.task def add(x,y): return x+y
|
啓動celery
celery -A main worker -l info -Ofair
redis
- -A 後面是包含celery定義的模塊,咱們在main.py中定義了
app = Celery...
測試celery:
- -l 日誌打印的級別,這裏是info
- -Ofair 這個參數可讓Celery更好的調度任務
1 2 3 4 5 6 7 8 9 10
|
# encoding:utf8 __author__ = 'brianyang'
import add_task
result = add_task.add.apply_async((1,2)) print type(result) print result.ready() print result.get() print result.ready()
|
輸出是json
1 2 3 4
|
<class 'celery.result.AsyncResult'> False 3 True
|
當調用result.get()時,若是尚未返回結果,將會阻塞直到結果返回。這裏須要注意的是,若是須要返回worker執行的結果,必須在以前的config中配置CELERY_RESULT_BACKEND
這個參數,通常推薦使用Redis來保存執行結果,若是不關心worker執行結果,設置CELERY_IGNORE_RESULT=True
就能夠了,關閉緩存結果能夠提升程序的執行速度。
在上面的測試程序中,若是修改成:瀏覽器
1 2 3 4 5 6 7 8
|
# encoding:utf8 __author__ = 'brianyang'
import add_task
result = add_task.add.(1,2) print type(result) print result
|
輸出結果爲:緩存
至關於直接本地調用了add方法,並無走Celery的調度。
經過flower的dashbord能夠方便的監控任務的執行狀況:
task list
task detail
還能夠對worker進行重啓,關閉之類的操做
taks_op
使用Celery將一個集中式的系統拆分爲分佈式的系統大概步驟就是:服務器
- 根據功能將耗時的模塊拆分出來,經過註解的形式讓Celery管理
- 爲拆分的模塊設置獨立的消息隊列
- 調用者導入須要的模塊或方法,使用apply_async進行異步的調用並根據需求關注結果。
- 根據性能須要能夠添加機器或增長worker數量,方便彈性管理。
須要注意的是:
- 儘可能爲不一樣的task分配不一樣的queue,避免多個功能的請求堆積在同一個queue中。
celery -A main worker -l info -Ofair -Q add_queue
啓動Celery時,能夠經過參數Q加queue_name來指定該worker只接受指定queue中的tasks.這樣能夠使不一樣的worker各司其職。
CELERY_ACKS_LATE
可讓你的Celery更加可靠,只有當worker執行完任務後,纔會告訴MQ,消息被消費。
CELERY_DISABLE_RATE_LIMITS
Celery能夠對任務消費的速率進行限制,若是你沒有這個需求,就關閉掉它吧,有益於會加速你的程序。
tornado-celery
tornado應該是python中最有名的異步非阻塞模型的web框架,它使用的是單進程輪詢的方式處理用戶請求,經過epoll來關注文件狀態的改變,只掃描文件狀態符發生變化的FD(文件描述符)。
因爲tornado是單進程輪詢模型,那麼就不適合在接口請求後進行長時間的耗時操做,而是應該接收到請求後,將請求交給背後的worker去幹,幹完活兒後在經過修改FD告訴tornado我幹完了,結果拿走吧。很明顯,Celery與tornado很般配,而tornado-celery是celery官方推薦的結合二者的一個模塊。
整合二者很容易,首先須要安裝:
- tornado-celery
- tornado-redis
tornado代碼以下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
# encoding:utf8 __author__ = 'brianyang'
import tcelery import tornado.gen import tornado.web
from main import app import add_task
tcelery.setup_nonblocking_producer(celery_app=app)
class CheckHandler(tornado.web.RequestHandler): @tornado.web.asynchronous @tornado.gen.coroutine def get(self): x = int(self.get_argument('x', '0')) y = int(self.get_argument('y', '0')) response = yield tornado.gen.Task(add_task.add.apply_async, args=[x, y]) self.write({'results': response.result}) self.finish
application = tornado.web.Application([ (r"/add", CheckHandler), ])
if __name__ == "__main__": application.listen(8889) tornado.ioloop.IOLoop.instance().start()
|
在瀏覽器輸入:http://127.0.0.1:8889/add?x=1&y=2
結果爲:
經過tornado+Celery能夠顯著的提升系統的吞吐量。
Benchmark
使用Jmeter進行壓測,60個進程不間斷地的訪問服務器:
接口單獨訪問響應時間通常在200~400ms
- uwsgi + Flask方案:
uwsgi關鍵配置:
1 2
|
processes = 10 threads = 3
|
Flask負責接受並處理請求,壓測結果:
qps是46,吞吐量大概是2700/min
uwsgi+Flask
- tornado+Celery方案:
Celery配置:
CELERYD_CONCURRENCY = 10
也就是10個worker(進程),壓測結果:
qps是139,吞吐量大概是8300/min
tornado+Celery
從吞吐量和接口相應時間各方面來看,使用tornado+Celery都能帶來更好的性能。
Supervisor
- 什麼是supervisor
supervisor俗稱Linux後臺進程管理器
- 適合場景
– 須要長期運行程序,除了nohup,咱們有更好的supervisor
– 程序意外掛掉,須要重啓,讓supervisor來幫忙
– 遠程管理程序,不想登錄服務器,來來來,supervisor提供了高大上(屁~)的操做界面.
以前啓動Celery命令是celery -A main worker -l info -Ofair -Q common_check
,當你有10臺機器的時候,每次更新代碼後,都須要登錄服務器,而後更新代碼,最後再殺掉Celery進程重啓,惡不噁心,簡直噁心死了。
讓supervisor來,首先須要安裝:
pip install supervisor
配置文件示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
[unix_http_server] file=/tmp/supervisor.sock ; path to your socket file chmod=0777 username=admin password=admin
[inet_http_server] port=0.0.0.0:2345 username=admin password=admin
[supervisord] logfile=/var/log/supervisord.log ; supervisord log file logfile_maxbytes=50MB ; maximum size of logfile before rotation logfile_backups=10 ; number of backed up logfiles loglevel=info ; info, debug, warn, trace pidfile=/var/run/supervisord.pid ; pidfile location nodaemon=false ; run supervisord as a daemon minfds=1024 ; number of startup file descriptors minprocs=200 ; number of process descriptors user=root ; default user childlogdir=/var/log/ ; where child log files will live
[rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl] serverurl=unix:///tmp/supervisor.sock ; use unix:// schem for a unix sockets. username=admin password=admin [program:celery] command=celery -A main worker -l info -Ofair
directory=/home/q/celeryTest user=root numprocs=1 stdout_logfile=/var/log/worker.log stderr_logfile=/var/log/worker.log autostart=true autorestart=true startsecs=10
; Need to wait for currently executing tasks to finish at shutdown. ; Increase this if you have very long running tasks. stopwaitsecs = 10
; When resorting to send SIGKILL to the program to terminate it ; send SIGKILL to its whole process group instead, ; taking care of its children as well. killasgroup=true
; Set Celery priority higher than default (999) ; so, if rabbitmq is supervised, it will start first. priority=1000
|
示例文件很長,不要怕,只須要複製下來,改改就能夠
比較關鍵的幾個地方是:
1 2 3 4
|
[inet_http_server] port=0.0.0.0:2345 username=admin password=admin
|
這個可讓你經過訪問http://yourhost:2345
,驗證輸入admin/admin的方式遠程管理supervisor,效果以下:
remote supervisor
[program:flower]
這裏就是你要託管給supervisor的程序的一些配置,其中autorestart=true
能夠在程序崩潰時自動重啓進程,不信你用kill試試看。
剩下的部分就是一些日誌位置的設置,當前工做目錄設置等,so esay~
supervisor優勢:
- 管理進程簡單,不再用nohup & kill了。
- 不再用擔憂程序掛掉了
- web管理很方便
缺點:
- web管理雖然方便,可是每一個頁面只能管理本機的supervisor,若是我有一百臺機器,那就須要打開100個管理頁面,很麻煩.
怎麼辦~
經過rpc調用獲取配置中的每個supervisor程序的狀態並進行管理,能夠分組,分機器進行批量/單個的管理。方便的不要不要的。來兩張截圖:
- 分組管理:
group
- 分機器管理:
server經過簡單的配置,能夠方便的進行管理。