Celery啓動的入口:redis
文件:Celery/bin/celery.py併發
看下main函數作了什麼事app
能夠看到主要作了幾個事根據-P參數判斷是否須要打patch,若是是gevent或者eventlet則要打對應的補丁。框架
而後執行命令行邏輯async
能夠看到,這邊取出系統參數函數
而後執行基類Command的execute_from_commandline,oop
文件:celery/bin/base.pyfetch
setup_app_from_commandline是核心函數,做用是得到咱們的app對象和得到咱們的配置參數ui
文件:Celery/bin/celery.pyspa
這邊主要獲取啓動類別及啓動參數,咱們的類別是worker因此:
這邊是開始準備啓動對應類別的對象,worker、beat等。
self.commands是支持的命令:
上面咱們知道,咱們的類型是worker,即celery.bin.worker.worker,初始化該類,而後執行run_from_argv函數
文件:celery/bin/worker.py
最後一行會執行到父類的__call__函數,
文件:celery/bin/base.py
這邊主要執行的是run函數
這個函數主要是啓動worker
終於進入worker了,如今這裏涉及一些比較關鍵的東西了,
文件:celery/worker/__init__.py
在WorkController類裏,是worker的基類
這是worker的藍圖,這邊會造成一個依賴圖,是啓動的必要組件,分別負責worker的一部分任務,比較重要的幾個:
Timer:用於執行定時任務的 Timer,和 Consumer 那裏的 timer 不一樣
Hub:Event loop 的封裝對象
Pool:構造各類執行池(線程/進程/協程)的
Beat:建立Beat進程,不過是以子進程的形式運行(不一樣於命令行中以beat參數運行)
文件:celery/apps/worker.py
文件:celery/apps/trace.py
文件:celery/app/base.py
從init_before開始,這邊是最主要的,即綁定全部的task到咱們的app,註冊task在下面
每一個task都有delay和apply_async函數,這個能夠用來幫咱們啓動任務。
文件:celery/worker/__init__.py
這邊是設置關注及不關注的隊列,能夠看到,celery支持ampq協議。
調用setup_includes安裝一些經過CELERY_INCLUDE配置的模塊,保證全部的任務模塊都導入了
最後初始化藍圖,並進行apply完成藍圖各個step的依賴關係圖的構建,並進行各個組件的初始化,依賴在component中已經標出
這個requires就是依賴,說明hub依賴timer,上面藍圖聲明的組件都有互相依賴關係。
回到文件:celery/worker/__init__.py執行start
執行的是藍圖的start。
分別執行各個步驟的start,在apply時,會判斷step是否須要start,不start可是仍要create。
經過啓動日誌看,worker啓動的step爲Pool,和Consumer;
若是換成prefork方式起,worker會多起hub和autoscaler兩個step:
Hub依賴Timer,咱們用gevent,因此include_if是false,這個不須要start。
Hub建立時候引用的kombu的Hub組件,Connection會註冊到Hub,Connection是各類類型鏈接的封裝,對外提供統一接口
Queue依賴Hub,這邊是基於Hub建立任務隊列
下面是咱們的worker啓動的step其中的一個,重點進行說明
初始化線程/協程池,是否彈縮,最大和最小併發數
Celery支持的幾種TaskPool,
咱們是gevent,因此這邊直接找gevent的代碼。
這邊直接引用gevent的Pool
下面看worker啓動的第二個step
能夠看到,這邊啓動的是celery.worker.consumer.Consumer,這邊就會涉及另外一個重要的藍圖了。
文件:celery/worker/consumer,Consumer類
這是Consumer的藍圖,
Consumer啓動的step爲Connection,events,mingle,Gossip,Tasks,Contorl,Heart和event loop。
__init__初始化一些必要的組件,不少都是以前worker建立的。
而後執行blueprint的apply,作的事我worker以前是同樣的。
執行Consumer的start,也就是執行blueprint的start。
啓動的step的基本功能:
Connection:管理和broker的Connection鏈接
Mingle:不一樣worker之間同步狀態用的
Tasks:啓動消息Consumer
Gossip:消費來自其餘worker的事件
Heart:發送心跳事件(consumer的心跳)
Control:遠程命令管理服務
其中Connection,Tasks,Heart和event loop是最重要的幾個。
先看Connection。
使用了consumer的connect()
Conn引用了ampq的connection,ampq的Connection是直接使用的kombu的Connection,上面說過,這個Connection是各類支持的類型(如redis,rabbitMQ等)的抽象,對外提供統一接口。
若是hub存在,會將鏈接註冊到event loop。
再看Tasks:
這邊引用的ampq的TaskConsumer,ampq的TaskConsumer繼承了kombu的Consumer。
能夠看到,在關鍵的幾個地方,celery都引用了kombu,Kombu對全部的MQ進行抽象,而後經過接口對外暴露出一致的API(Redis/RabbitMQ/MongoDB),Kombu對MQ的抽象以下:
Message:生產消費的基本單位,就是一條條消息
Connection:對 MQ 鏈接的抽象,一個 Connection 就對應一個 MQ 的鏈接
Transport:真實的 MQ 鏈接,也是真正鏈接到 MQ(redis/rabbitmq) 的實例
Producers: 發送消息的抽象類
Consumers:接受消息的抽象類
Exchange:MQ 路由,這個和 RabbitMQ 差很少,支持 5種 類型
Queue:對應的 queue 抽象,其實就是一個字符串的封裝
Hub是一個eventloop,Connection註冊到Hub,一個Connection對應一個Hub。Consumer綁定了消息的處理函數,每個Consumer初始化的時候都是和Channel綁定的,也就是說咱們Consumer包含了Queue也就和Connection關聯起來了,Consumer消費消息是經過Queue來消費,而後Queue又轉嫁給Channel,再轉給connection,Channel是AMQP對MQ的操做的封裝,Connection是AMQP對鏈接的封裝,那麼二者的關係就是對MQ的操做必然離不開鏈接,可是,Kombu並不直接讓Channel使用Connection來發送/接受請求,而是引入了一個新的抽象Transport,Transport負責具體的MQ的操做,也就是說Channel的操做都會落到Transport上執行。
再看下event loop:
上面咱們有了connection以及綁定connection的consumer,下面看看消費者怎麼消費消息,若是是帶hub的狀況:
先對consumer進行一些設置,
而後開始進行循環。loop是kombu建立的event loop,啓用事件循環機制,而後next這邊就開始不停的循環獲取消息並執行。
這個是kombu裏的部分實現,是對從池裏取到的消息進行處理。
看下同步代碼,register_callback將回調註冊consumer,而後執行consume:
再看消息循環那幾行,
獲取到消息後,調用回調函數進行處理。
回調函數使用的是create_task_handler(),strategies是在上面的update_strategies裏進行的更新,該函數是在Task裏調用的
打印一下strategies裏的信息,只截部分圖:
下面看下咱們怎麼啓動任務的,
調用到app的send_task
再調用到ampq的publish_task,
最終又交給kombu的publish。
關於pool的選擇:
使用的是app的pool,即
經過connection又走到了ampq再轉到kombu裏。
Worker和consumer基本大框架就是上面的流程,下面看下beat是怎麼實現的。
Beat起動的時候是celery beat,根據咱們上面的分析,首先進入的應該是celey/bin/beat.py,而後調用該文件中的Beat的run函數:
而後在指向apps的Beat:
在apps裏的Beat調用run:
主要執行了三個函數,init_loader主要初始化並綁定task,第二步設置一些頭信息之類的,關鍵是第三步,主幹代碼
主要是初始化service並start。
Start最關鍵的部分是那個while循環體,只要不被shutdown,就會一直調用scheduler的tick
這邊這個self.schedule就是咱們準備調度的任務:
下面看對這些任務的處理:
這是判斷是否要執行任務的邏輯,若是要執行,則執行apply_async。
若是發現任務該執行了,則去tasks裏獲取任務,並執行,這邊的apply_async和worker那邊的沒區別,若是沒找到task,則將task註冊到broker。
怎樣將consumer和concurrency聯繫起來
這邊調用了_process_task,調用的是worker裏的
這邊調用各類池的啓動函數:
可是queue裏只是引用,後面還有別的處理
在初始化consumer時候將調用池的操做傳了進去,成爲了Consumer裏的on_task_request
在Tasks調用start的時候會更新strategies
而後在這邊調用start_strategy
而後就進入
而後走入strategy的default
這裏取了consumer的on_task_request,就是咱們傳入的池執行的邏輯,_limit_task是這樣的:
作了一些判斷,符合條件再執行。
這個文件是strategy的default的下半個文件,作了一些流量控制,而後執行limit_task或者直接執行handler。
這邊由於使用的gevent,因此就走到gevent的apply_async,
這邊是起一個協程處理,這樣就將任務交給了gevent。
具體上面是執行流程,具體在哪裏執行的呢?
這邊註冊了callback,create_task_handler從strategy這邊取值取值執行
Qos對ack的處理部分:
Kombu的transport的redis.py裏的額basic_consume,調用channel的basic_consume;
在Kombu.transport.virtual.__init__.py文件中
這裏維護了一個dict:self._delivered,一個set:self._dirty和一個int:prefetch_count,
若是no_ack爲False在執行consume後會向self._delivered中添加一條數據,
ack後會向self._dirty中添加一條數據,而後,後面會將self._dirty逐條刪除,並同時刪除self._delivered中的數據,若是沒有ack,則不會刪除:
每次拉任務的時候會調用can_consume:
比較prefetch_count和self._delivered減self._dirty的值,若是小於預取限制,則容許,不然不容許。