celery源碼解讀

 

Celery啓動的入口:redis

文件:Celery/bin/celery.py併發

 

看下main函數作了什麼事app

能夠看到主要作了幾個事根據-P參數判斷是否須要打patch,若是是gevent或者eventlet則要打對應的補丁。框架

而後執行命令行邏輯async

 

 

能夠看到,這邊取出系統參數函數

而後執行基類Command的execute_from_commandlineoop

文件:celery/bin/base.pyfetch

 

setup_app_from_commandline是核心函數,做用是得到咱們的app對象和得到咱們的配置參數ui

文件:Celery/bin/celery.pyspa

 

這邊主要獲取啓動類別及啓動參數,咱們的類別是worker因此:

 

這邊是開始準備啓動對應類別的對象,worker、beat等。

self.commands是支持的命令:

 

上面咱們知道,咱們的類型是worker,即celery.bin.worker.worker,初始化該類,而後執行run_from_argv函數

文件:celery/bin/worker.py

 

最後一行會執行到父類的__call__函數,

文件:celery/bin/base.py

 

這邊主要執行的是run函數

 

這個函數主要是啓動worker

 

終於進入worker了,如今這裏涉及一些比較關鍵的東西了,

文件:celery/worker/__init__.py  

WorkController類裏,是worker的基類

 

 

這是worker的藍圖,這邊會造成一個依賴圖,是啓動的必要組件,分別負責worker的一部分任務,比較重要的幾個:

Timer用於執行定時任務的 Timer,和 Consumer 那裏的 timer 不一樣

HubEvent loop 的封裝對象

Pool構造各類執行池(線程/進程/協程)的

Beat建立Beat進程,不過是以子進程的形式運行(不一樣於命令行中以beat參數運行)

 

文件:celery/apps/worker.py

 

文件:celery/apps/trace.py

 

文件:celery/app/base.py

 

init_before開始,這邊是最主要的,即綁定全部的task到咱們的app,註冊task在下面

 

 

 

 

每一個task都有delayapply_async函數,這個能夠用來幫咱們啓動任務。

 

文件:celery/worker/__init__.py

 

這邊是設置關注及不關注的隊列,能夠看到,celery支持ampq協議。

 

調用setup_includes安裝一些經過CELERY_INCLUDE配置的模塊,保證全部的任務模塊都導入了

最後初始化藍圖,並進行apply完成藍圖各個step的依賴關係圖的構建,並進行各個組件的初始化,依賴在component中已經標出

 

這個requires就是依賴,說明hub依賴timer,上面藍圖聲明的組件都有互相依賴關係。

回到文件:celery/worker/__init__.py執行start

 

執行的是藍圖的start

 

分別執行各個步驟的start,在apply時,會判斷step是否須要start,不start可是仍要create

 

經過啓動日誌看,worker啓動的stepPool,和Consumer

若是換成prefork方式起,worker會多起hubautoscaler兩個step

 

Hub依賴Timer,咱們用gevent,因此include_iffalse,這個不須要start

Hub建立時候引用的kombuHub組件,Connection會註冊到HubConnection是各類類型鏈接的封裝,對外提供統一接口

Queue依賴Hub,這邊是基於Hub建立任務隊列

下面是咱們的worker啓動的step其中的一個,重點進行說明

初始化線程/協程池,是否彈縮,最大和最小併發數

 

Celery支持的幾種TaskPool

 

咱們是gevent,因此這邊直接找gevent的代碼。

 

這邊直接引用geventPool

 

下面看worker啓動的第二個step

能夠看到,這邊啓動的是celery.worker.consumer.Consumer,這邊就會涉及另外一個重要的藍圖了。

文件:celery/worker/consumerConsumer

 

這是Consumer的藍圖,

 

Consumer啓動的stepConnectioneventsmingleGossipTasksContorlHeartevent loop

 

__init__初始化一些必要的組件,不少都是以前worker建立的。

而後執行blueprintapply,作的事我worker以前是同樣的。

 

執行Consumerstart,也就是執行blueprintstart

啓動的step的基本功能:

Connection:管理和brokerConnection鏈接

Mingle:不一樣worker之間同步狀態用的

Tasks:啓動消息Consumer

Gossip:消費來自其餘worker的事件

Heart:發送心跳事件(consumer的心跳)

Control:遠程命令管理服務

其中ConnectionTasksHeartevent loop是最重要的幾個。

先看Connection

 

使用了consumerconnect()

 

Conn引用了ampqconnectionampqConnection是直接使用的kombuConnection,上面說過,這個Connection是各類支持的類型(如redisrabbitMQ等)的抽象,對外提供統一接口。

若是hub存在,會將鏈接註冊到event loop

再看Tasks

 

這邊引用的ampqTaskConsumerampqTaskConsumer繼承了kombuConsumer

能夠看到,在關鍵的幾個地方,celery都引用了kombuKombu對全部的MQ進行抽象,而後經過接口對外暴露出一致的APIRedis/RabbitMQ/MongoDB),KombuMQ的抽象以下:

Message:生產消費的基本單位,就是一條條消息

Connection:對 MQ 鏈接的抽象,一個 Connection 就對應一個 MQ 的鏈接

Transport:真實的 MQ 鏈接,也是真正鏈接到 MQ(redis/rabbitmq) 的實例

Producers: 發送消息的抽象類

Consumers:接受消息的抽象類

ExchangeMQ 路由,這個和 RabbitMQ 差很少,支持 5種 類型

Queue:對應的 queue 抽象,其實就是一個字符串的封裝

Hub是一個eventloopConnection註冊到Hub,一個Connection對應一個HubConsumer綁定了消息的處理函數,每個Consumer初始化的時候都是和Channel綁定的,也就是說咱們Consumer包含了Queue也就和Connection關聯起來了,Consumer消費消息是經過Queue來消費,而後Queue又轉嫁給Channel再轉給connectionChannelAMQPMQ的操做的封裝,ConnectionAMQP對鏈接的封裝那麼二者的關係就是對MQ的操做必然離不開鏈接,可是,Kombu並不直接讓Channel使用Connection來發送/接受請求,而是引入了一個新的抽象TransportTransport負責具體的MQ的操做,也就是說Channel的操做都會落到Transport上執行

再看下event loop

 

上面咱們有了connection以及綁定connectionconsumer,下面看看消費者怎麼消費消息,若是是帶hub的狀況:

 

先對consumer進行一些設置,

 

而後開始進行循環。loopkombu建立的event loop,啓用事件循環機制,而後next這邊就開始不停的循環獲取消息並執行。

 

這個是kombu裏的部分實現,是對從池裏取到的消息進行處理。

 

看下同步代碼,register_callback將回調註冊consumer,而後執行consume

 

再看消息循環那幾行,

獲取到消息後,調用回調函數進行處理。

 

回調函數使用的是create_task_handler()strategies是在上面的update_strategies裏進行的更新,該函數是在Task裏調用的

 

打印一下strategies裏的信息,只截部分圖:

 

下面看下咱們怎麼啓動任務的,

 

調用到appsend_task

 

再調用到ampqpublish_task

 

最終又交給kombupublish

關於pool的選擇:

 

使用的是apppool,即

 

經過connection又走到了ampq再轉到kombu裏。

 

 

Workerconsumer基本大框架就是上面的流程,下面看下beat是怎麼實現的。

Beat起動的時候是celery beat,根據咱們上面的分析,首先進入的應該是celey/bin/beat.py,而後調用該文件中的Beatrun函數:

 

而後在指向appsBeat

 

在apps裏的Beat調用run

 

主要執行了三個函數,init_loader主要初始化並綁定task,第二步設置一些頭信息之類的,關鍵是第三步,主幹代碼

 

主要是初始化servicestart

 

Start最關鍵的部分是那個while循環體,只要不被shutdown,就會一直調用schedulertick

 

這邊這個self.schedule就是咱們準備調度的任務:

 

下面看對這些任務的處理:

 

這是判斷是否要執行任務的邏輯,若是要執行,則執行apply_async

 

若是發現任務該執行了,則去tasks裏獲取任務,並執行,這邊的apply_asyncworker那邊的沒區別,若是沒找到task,則將task註冊到broker

 

 

 

怎樣將consumerconcurrency聯繫起來

這邊調用了_process_task,調用的是worker裏的

這邊調用各類池的啓動函數:

可是queue裏只是引用,後面還有別的處理

在初始化consumer時候將調用池的操做傳了進去,成爲了Consumer裏的on_task_request

在Tasks調用start的時候會更新strategies

而後在這邊調用start_strategy

而後就進入

而後走入strategy的default

這裏取了consumeron_task_request,就是咱們傳入的池執行的邏輯,_limit_task是這樣的:

作了一些判斷,符合條件再執行。

這個文件是strategydefault的下半個文件,作了一些流量控制,而後執行limit_task或者直接執行handler

這邊由於使用的gevent,因此就走到geventapply_async

這邊是起一個協程處理,這樣就將任務交給了gevent

具體上面是執行流程,具體在哪裏執行的呢?

這邊註冊了callbackcreate_task_handlerstrategy這邊取值取值執行

 

Qosack的處理部分:

Kombutransportredis.py裏的額basic_consume,調用channelbasic_consume

 

在Kombu.transport.virtual.__init__.py文件中

 

這裏維護了一個dictself._delivered,一個setself._dirty和一個intprefetch_count

若是no_ackFalse在執行consume後會向self._delivered中添加一條數據,

ack後會向self._dirty中添加一條數據,而後,後面會將self._dirty逐條刪除,並同時刪除self._delivered中的數據,若是沒有ack,則不會刪除:

 

 

每次拉任務的時候會調用can_consume

 

比較prefetch_countself._deliveredself._dirty的值,若是小於預取限制,則容許,不然不容許。

相關文章
相關標籤/搜索