消息隊列之推仍是拉,RocketMQ 和 Kafka是如何作的?

每一個時代,都不會虧待會學習的人java

你們好,我是 yes。面試

今天咱們就來談一談消息隊列的推拉模式,這也是一個面試熱點,例如你在簡歷裏面寫了 RocketMQ ,基本上會問你 RocketMQ 採用的是推模式仍是拉模式啊?是拉模式?不是有 PushConsumer 嗎?算法

今天咱們就來談談推拉模式,而且再來看看 RocketMQ 和 Kafka 是如何作的。緩存

推拉模式

首先明確一下推拉模式究竟是在討論消息隊列的哪個步驟,通常而言咱們在談論推拉模式的時候指的是 Comsumer 和 Broker 之間的交互微信

默認的認爲 Producer 與 Broker 之間就是推的方式,即 Producer 將消息推送給 Broker,而不是 Broker 主動去拉取消息。網絡

想象一下,若是須要 Broker 去拉取消息,那麼 Producer 就必須在本地經過日誌的形式保存消息來等待 Broker 的拉取,若是有不少生產者的話,那麼消息的可靠性不只僅靠 Broker 自身,還須要靠成百上千的 Producer。app

Broker 還能靠多副本等機制來保證消息的存儲可靠,而成百上千的 Producer 可靠性就有點難辦了,因此默認的 Producer 都是推消息給 Broker。負載均衡

因此說有些狀況分佈式好,而有些時候仍是集中管理好。分佈式

推模式

推模式指的是消息從 Broker 推向 Consumer,即 Consumer 被動的接收消息,由 Broker 來主導消息的發送。工具

咱們來想一下推模式有什麼好處?

消息實時性高, Broker 接受完消息以後能夠立馬推送給 Consumer。

對於消費者使用來講更簡單,簡單啊就等着,反正有消息來了就會推過來。

推模式有什麼缺點?

推送速率難以適應消費速率,推模式的目標就是以最快的速度推送消息,當生產者往 Broker 發送消息的速率大於消費者消費消息的速率時,隨着時間的增加消費者那邊可能就「爆倉」了,由於根本消費不過來啊。當推送速率過快就像 DDos 攻擊同樣消費者就傻了。

而且不一樣的消費者的消費速率還不同,身爲 Broker 很難平衡每一個消費者的推送速率,若是要實現自適應的推送速率那就須要在推送的時候消費者告訴 Broker ,我不行了你推慢點吧,而後 Broker 須要維護每一個消費者的狀態進行推送速率的變動。

這其實就增長了 Broker 自身的複雜度。

因此說推模式難以根據消費者的狀態控制推送速率,適用於消息量不大、消費能力強要求實時性高的狀況下。

拉模式

拉模式指的是 Consumer 主動向 Broker 請求拉取消息,即 Broker 被動的發送消息給 Consumer。

咱們來想一下拉模式有什麼好處?

拉模式主動權就在消費者身上了,消費者能夠根據自身的狀況來發起拉取消息的請求。假設當前消費者以爲本身消費不過來了,它能夠根據必定的策略中止拉取,或者間隔拉取都行。

拉模式下 Broker 就相對輕鬆了,它只管存生產者發來的消息,至於消費的時候天然由消費者主動發起,來一個請求就給它消息唄,從哪開始拿消息,拿多少消費者都告訴它,它就是一個沒有感情的工具人,消費者要是沒來取也不關它的事。

拉模式能夠更合適的進行消息的批量發送,基於推模式能夠來一個消息就推送,也能夠緩存一些消息以後再推送,可是推送的時候其實不知道消費者到底能不能一次性處理這麼多消息。而拉模式就更加合理,它能夠參考消費者請求的信息來決定緩存多少消息以後批量發送。

拉模式有什麼缺點?

消息延遲,畢竟是消費者去拉取消息,可是消費者怎麼知道消息到了呢?因此它只能不斷地拉取,可是又不能很頻繁地請求,太頻繁了就變成消費者在攻擊 Broker 了。所以須要下降請求的頻率,好比隔個 2 秒請求一次,你看着消息就頗有可能延遲 2 秒了。

消息忙請求,忙請求就是好比消息隔了幾個小時纔有,那麼在幾個小時以內消費者的請求都是無效的,在作無用功。

那究竟是推仍是拉

能夠看到推模式和拉模式各有優缺點,到底該如何選擇呢?

RocketMQ 和 Kafka 都選擇了拉模式,固然業界也有基於推模式的消息隊列如 ActiveMQ。

我我的以爲拉模式更加的合適,由於如今的消息隊列都有持久化消息的需求,也就是說自己它就有個存儲功能,它的使命就是接受消息,保存好消息使得消費者能夠消費消息便可。

而消費者各類各樣,身爲 Broker 不該該有依賴於消費者的傾向,我已經爲你保存好消息了,你要就來拿好了。

雖然說通常而言 Broker 不會成爲瓶頸,由於消費端有業務消耗比較慢,可是 Broker 畢竟是一箇中心點,能輕量就儘可能輕量。

那麼居然 RocketMQ 和 Kafka 都選擇了拉模式,它們就不怕拉模式的缺點麼?怕,因此它們操做了一波,減輕了拉模式的缺點。

長輪詢

RocketMQ 和 Kafka 都是利用「長輪詢」來實現拉模式,咱們就來看看它們是如何操做的。

爲了簡單化,下面我把消息不知足本次拉取的條數啊、總大小啊等等都統一描述成尚未消息,反正都是不知足條件。

RocketMQ 中的長輪詢

RocketMQ 中的 PushConsumer 實際上是披着推模式其實是拉模式的方法,只是看起來像推模式而已

由於 RocketMQ 在被背後偷偷的幫咱們去 Broker 請求數據了。

後臺會有個 RebalanceService 線程,這個線程會根據 topic 的隊列數量和當前消費組的消費者個數作負載均衡,每一個隊列產生的 pullRequest 放入阻塞隊列 pullRequestQueue 中。而後又有個 PullMessageService 線程不斷的從阻塞隊列 pullRequestQueue 中獲取 pullRequest,而後經過網絡請求 broker,這樣實現的準實時拉取消息。

這一部分代碼我不截了,就是這麼個事兒,稍後會用圖來展現。

而後 Broker 的 PullMessageProcessor 裏面的 processRequest 方法是用來處理拉消息請求的,有消息就直接返回,若是沒有消息怎麼辦呢?咱們來看一下代碼。

咱們再來看下 suspendPullRequest 方法作了什麼。

而 PullRequestHoldService 這個線程會每 5 秒從 pullRequestTable 取PullRequest請求,而後看看待拉取消息請求的偏移量是否小於當前消費隊列最大偏移量,若是條件成立則說明有新消息了,則會調用 notifyMessageArriving ,最終調用 PullMessageProcessor 的 executeRequestWhenWakeup() 方法從新嘗試處理這個消息的請求,也就是再來一次,整個長輪詢的時間默認 30 秒。

簡單的說就是 5 秒會檢查一次消息時候到了,若是到了則調用 processRequest 再處理一次。這好像不太實時啊?5秒?

別急,還有個 ReputMessageService 線程,這個線程用來不斷地從 commitLog 中解析數據並分發請求,構建出 ConsumeQueue 和 IndexFile 兩種類型的數據,而且也會有喚醒請求的操做,來彌補每 5s 一次這麼慢的延遲

代碼我就不截了,就是消息寫入而且會調用 pullRequestHoldService#notifyMessageArriving。

最後我再來畫個圖,描述一下整個流程。

Kafka 中的長輪詢

像 Kafka 在拉請求中有參數,可使得消費者請求在 「長輪詢」 中阻塞等待。

簡單的說就是消費者去 Broker 拉消息,定義了一個超時時間,也就是說消費者去請求消息,若是有的話立刻返回消息,若是沒有的話消費者等着直到超時,而後再次發起拉消息請求。

而且 Broker 也得配合,若是消費者請求過來,有消息確定立刻返回,沒有消息那就創建一個延遲操做,等條件知足了再返回。

咱們來簡單的看一下源碼,爲了突出重點,我會刪減一些代碼。

先來看消費者端的代碼。

上面那個 poll 接口想必你們都很熟悉,其實從註解直接就知道了確實是等待數據的到來或者超時,咱們再簡單的往下看。

咱們再來看下最終 client.poll 調用的是什麼。

最後調用的就是 Kafka 包裝過的 selector,而最終會調用 Java nio 的 select(timeout)

如今消費者端的代碼已經清晰了,咱們再來看看 Broker 如何作的

Broker 處理全部請求的入口其實我在以前的文章介紹過,就在 KafkaApis.scala 文件的 handle 方法下,此次的主角就是 handleFetchRequest 。

這個方法進來,我截取最重要的部分。

下面的圖片就是 fetchMessages 方法內部實現,源碼給的註釋已經很清晰了,你們放大圖片看下便可。

這個煉獄名字取得頗有趣,簡單的說就是利用我以前文章提到的時間輪,來執行定時任務,例如這裏是delayedFetchPurgatory,專門用來處理延遲拉取操做。

咱們先簡單想一下,這個延遲操做都須要實現哪些方法,首先構建的延遲操做須要有檢查機制,來查看消息是否已經到了,而後呢還得有個消息到了以後該執行的方法,還須要有執行完畢以後該幹啥的方法,固然還得有個超時以後得幹啥的方法。

這幾個方法其實對應的就是代碼裏的 DelayedFetch ,這個類繼承了 DelayedOperation 內部有:

  • isCompleted 檢查條件是否知足的方法

  • tryComplete 條件知足以後執行的方法

  • onComplete 執行完畢以後調用的方法

  • onExpiration 過時以後須要執行的方法

判斷是否過時就是由時間輪來推進判斷的,可是總不能等過時的時候再去看消息到了沒吧?

這裏 Kafka 和 RocketMQ 的機制同樣,也會在消息寫入的時候提醒這些延遲請求消息來了,具體代碼我不貼了, 在 ReplicaManager#appendRecords 方法內部再深刻個兩方法能夠看到。

不過雖然說代碼不貼,圖仍是要畫一下的。

小結一下

能夠看到 RocketMQ  和 Kafka 都是採用「長輪詢」的機制,具體的作法都是經過消費者等待消息,當有消息的時候 Broker 會直接返回消息,若是沒有消息都會採起延遲處理的策略,而且爲了保證消息的及時性,在對應隊列或者分區有新消息到來的時候都會提醒消息來了,及時返回消息。

一句話說就是消費者和 Broker 相互配合,拉取消息請求不知足條件的時候 hold 住,避免了屢次頻繁的拉取動做,當消息一到就提醒返回。

最後

總的而言推拉模式各有優劣,而我我的以爲通常狀況下拉模式更適合於消息隊列。

看了這篇文章相信以後面試官問你推仍是拉?建議給他個歪嘴笑。


我是 yes,從一點點到億點點,咱們下篇見

往期推薦:

消息隊列面試連環問:如何保證消息不丟失?處理重複消息?消息有序性?消息堆積處理?

圖解+代碼|常見限流算法以及限流在單機分佈式場景下的思考

面試官:說說Kafka處理請求的全流程

 

本文分享自微信公衆號 - yes的練級攻略(yes_java)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索