Celery和Rabbitmq自學

異步消息隊列,也能用於定時和週期性任務。每次修改的task代碼還要重啓worker,這個有點麻煩html

全部帶task()裝飾器的可調用對象(usertask)都是celery.app.task.Task類的子類,也就是說task()裝飾器會將usertask標識符變成Task子類的引用。node

另外,celery容許用自定義Task類,不過該類要繼承於celery.app.task.Task,Task類在task狀態轉換動做時提供了接口,如任務執行失敗時調用接口on_failure,python

這樣就很是方便咱們在自定義Task類中重定義。見http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classescanvas

task()裝飾器能夠接收不少參數,好比序列化類、是否保存task結果、所使用的back_end等等,見瀏覽器

http://docs.celeryproject.org/en/latest/userguide/tasks.html#list-of-options。好比,咱們能夠設置只在出錯的狀況下保存運行結果。即安全

ignore_result=False且store_errors_even_if_ignored=True,不過,ignore_result能夠在全局配置文件中設置CELERY_IGNORE_RESULTapp

當發異步消息時調用usertask.delay()或usertask.apply_async(),它實際上是將usertask的信息,如名稱,入參,id等序列化後保存在broker中。異步

若是不限制task的處理速度,那應該設置CELERY_DISABLE_RATE_LIMITS = True,這算是celery的優化部分
async

task不要嵌套,若是但願多個task順序執行(同步),那能夠用回調函數,在celery中是用chain()方法實現,見ide

http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks

http://docs.celeryproject.org/en/latest/userguide/canvas.html

celery在兩個地方有retry,一個是在task的代碼執行過程當中出現異常時能夠retry,不過這個須要用戶在task代碼中本身寫,若是想

retry那就要咱們本身捕獲異常,並拋出Retry exception,參見http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying

另一個地方是在調用delay()/apply_async()時,這個retry多是指向rabbitmq發送消息時若是失敗,能夠重試若是想鏈式調用task,那要設置link參數,見

http://docs.celeryproject.org/en/latest/userguide/canvas.html#callbacks

http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives

開發環境中,work的啓動、結束、重啓能夠用celery multi xxx命令

生產環境中,用http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing

celery中的一個worker實際上是表明一個進程池,一個進程池是由一個父進程和多個子進程組成, 貌似父進程不幹事,只用於分配task,子進程數默認是CPU核數

一臺主機上能夠啓多個worker,但這種方法貌似和一個worker中啓多個子進程區別不大啊,兩種方式都是多進程。

在啓動worker時,能夠設置worker的不少參數,如是否容許自動伸縮pool的容量,最大和最小容量,見

http://docs.celeryproject.org/en/latest/userguide/workers.html#autoscaling

celery有不少設置,其中在配置文件celeryconfig中的設置是全局的,另外,咱們還能夠單獨設置task(能夠在task定義的地方,也能夠在delay(), apply_async()中設置),

worker啓動時能夠設置參數

celery的broker中的task隊列能夠有多個,用不一樣的名字命名,還能夠有不一樣的優先級。咱們啓動worker時能夠設置該worker只處理指定隊列的task

celery中的log使用的是python的log模塊,是線程安全的,而不是進程安全的。咱們能夠給一個worker中的每一個process定義其logfile,見

http://docs.celeryproject.org/en/latest/userguide/workers.html#variables-in-file-paths

一個node是指一個worker

celery的每一個worker的每一個進程能夠一次從broker中取出多個task

查看全部node中的活動的task: celery -A proj inspect active

查看全部node中註冊的全部task:celery -A proj inspect registered

查看全部node的信息:celery -A proj inspect stats

調用異步消息接口delay() ,apply_async()時,其實只是應用程序與broker通訊而已,發消息給broker,消息中包含調用的task名稱,參數等,broker收到後保存。這個過程當中不會與celery worker打交道

所以,這個時候即便沒有啓動celery worker也沒有任何關係。當celery worker啓動後,會主動鏈接broker,並從broker那裏取數據來consume。同理,咱們能夠在任何平滑中止celery worker,而不用管

broker中是否還有沒consume的消息,當celery worker重啓後,它會繼續處理broker中的消息。這裏的啓動順序貌似只能是先啓動broker,再啓動celery,由於啓動celery時會與broker創建鏈接,broker

是被動鏈接的。

broker本身應該是有辦法保證一個消息只能被一個worker取走,而不會同時被兩個worker取走。

有一個問題:rabbitmq有內存佔用限制嗎?若是有,那當達到這個限制時,新來的消息怎麼處理?像reddis那樣把舊的消息寫到磁盤?仍是直接拋棄舊的消息?或再也不接收新消息?

Rabbitmq有監控,默認的,Rabbitmq_management是關閉的,須要打開,執行sudo rabbitmq-plugins enable rabbitmq_management,打開這個plugin,

而後重啓Rabbitmq,在瀏覽器輸入http://localhost:15672,用戶名和密碼都是guest,就能夠看到Rabbitmq的監控信息了。

Rabbitmq至關於一個郵箱,用於接收和發送消息,並且它裏面有不少隊列,發送消息時能夠指定用哪一個隊列,一個消息中包含一個task

(1) 發送者將消息發給rabbitmq的指定隊列,消息中包含消息處理函數

(2) rabbitmq接收並保存消息(無數量限制)

(3) 接收者聯繫rabbitmq,從隊列中取走消息(能夠指定接收者只消費某個指定隊列的消息)

(4) 接收者取回消息後,調用消息中的處理函數處理消息

其實這個過程當中咱們要考慮消息執行的可靠性。

第一點:若消息被worker接收後,處理過程當中worker死掉了怎麼辦?Rabbitmq考慮到了這點,在默認狀況下(是一個配置項,咱們能夠修改),每一個消息實際上是被worker拷貝了一份取走的,

固然被拷貝的消息會打上標記,當worker處理完消息後會給Rabbitmq一個確認(acknowledgment),Rabbitmq收到確認後才刪除消息,這個過程沒有限定超時時間,只有當worker與

Rabbitmq之間的TCP鏈接斷開後(Rabbitmq應該是會檢測worker的心跳),Rabbitmq纔會將沒有確認的消息從新加入到隊列。咱們可使用命令:rabbitmqctl list_queues name messages_ready

messages_unacknowledged來查看有多少消息沒有收到確認。它是一個默認項,咱們關閉這個可靠性設置,這樣,消息處理完不用再給Rabbitmq發acknowledgment了。在celery配置文件中

對應於CELERY_ACKS_LATE項

第二點:若Rabbitmq掛掉,消息也會丟失。由於要作消息的持久化,Rabbitmq這樣作了,不過它的持久化並不絕對可靠,由於從接收到消息到持久化之間有時間間隔,若是想絕對可靠,見

http://www.rabbitmq.com/tutorials/tutorial-two-python.html中的Note on message persistence部分。

還有些可靠性是worker本身來保證的,如task執行過程當中拋出異常,這種狀況是否要從新執行該task,celery的處理以下:

在celery中,task在被worker執行過程當中若是拋出異常(Exception),celery會捕獲並給Rabbitmq發acknowledgment,若是咱們想讓該task從新執行,也能夠,那就要本身手動捕獲異常,

並拋出Retry exception,這樣worker會再次執行該task,這個過程當中不會與Rabbitmq交互。

一、celery中的基本概念

 

二、Rabbitmq中的基本概念

publish:即生產者將消息發送給Rabbitmq的過程,更準確的說,是發送給Rabbitmq exchange的過程,在Rabbitmq的控制檯網頁中的overview子頁的Message rates中有publish一項,它指

的是指消息進入Rabbitmq的速率。

三、celery中的配置項

CELERYD_PREFETCH_MULTIPLIER

乘子,worker的進程數*CELERYD_PREFETCH_MULTIPLIER=worker每次從Rabbitmq一次性取走多的消息數

四、Rabbitmq配置項和查詢命令

首先幾乎全部的控制命令都經過rabbitmqctl執行,不少東西在網頁管理頁能夠看到

rabbitmqctl list_queues

查看queue相關的信息,後面可加參數name(名稱),messages(消息數量), consumers(消息者數量,該值與worker數不相等,通常是worker數的三倍,貌似每一個worker順帶兩個額外的consumer,這兩

個consumer咱們不用關心)

rabbitmqctl list_exchanges

查看全部exchange,以下,第一列是名稱,第二列是exchang type。Rabbitmq共有四種類型:direct,fanout,headers,topic。

direct是指將消息直接發到指定隊列,fanout是指廣播,發給綁定到該exchange的全部隊列。咱們通常是用direct。

在celery中,當broker是Rabbitmq時,會調用Rabbitmq建立默認的exchange和默認的queue,它們的名稱都是celery,而且該queue是沒有綁定任何exchange的,給Rabbitmq發消息時會指定隊列,exchange

將消息發到該指定的隊列。貌似基本上不會用到綁定功能

C:\Work\hera\src>rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
celery  direct
celery.pidbox   fanout
celeryev        topic
reply.celery.pidbox     direct
...done.

C:\Work\hera\src>

service rabbitmq-server start

啓動rabbitmq

相關文章
相關標籤/搜索