Kafka消息投遞語義-消息不丟失,不重複,不丟不重

介紹

kafka支持3種消息投遞語義:網絡

  • At most once——最多一次,消息可能會丟失,但不會重複
  • At least once——最少一次,消息不會丟失,可能會重複
  • Exactly once——只且一次,消息不丟失不重複,只且消費一次。

可是總體的消息投遞語義須要Producer端和Consumer端二者來保證。session

Producer 消息生產者端

一個場景例子:
當producer向broker發送一條消息,這時網絡出錯了,producer沒法得知broker是否接受到了這條消息。
網絡出錯多是發生在消息傳遞的過程當中,也可能發生在broker已經接受到了消息,並返回ack給producer的過程當中。負載均衡

這時,producer只能進行重發,消息可能會重複,可是保證了at least once。spa

0.11.0的版本經過給每一個producer一個惟一ID,而且在每條消息中生成一個sequence num,
這樣就能對消息去重,達到producer端的exactly once。日誌

這裏還涉及到producer端的acks設置和broker端的副本數量,以及min.insync.replicas的設置。
好比producer端的acks設置以下:
acks=0 //消息發了就發了,不等任何響應就認爲消息發送成功
acks=1 //leader分片寫消息成功就返回響應給producer
acks=all(-1) //當acks=all, min.insync.replicas=2,就要求INSRNC列表中必需要有2個副本都寫成功,才返回響應給producer,
若是INSRNC中已同步副本數量不足2,就會報異常,若是沒有2個副本寫成功,也會報異常,消息就會認爲沒有寫成功。code

Broker 消息接收端

上文說過acks=1,表示當leader分片副本寫消息成功就返回響應給producer,此時認爲消息發送成功。
若是leader寫成功單立刻掛了,尚未將這個寫成功的消息同步給其餘的分片副本,那麼這個分片此時的ISR列表爲空,
若是unclean.leader.election.enable=true,就會發生log truncation(日誌截取),一樣會發生消息丟失。
若是unclean.leader.election.enable=false,那麼這個分片上的服務就不可用了,producer向這個分片發消息就會拋異常。事務

因此咱們設置min.insync.replicas=2,unclean.leader.election.enable=false,producer端的acks=all,這樣發送成功的消息就毫不會丟失。內存

Consumer 消息消費者端

全部分片的副本都有本身的log文件(保存消息)和相同的offset值。當consumer沒掛的時候,offset直接保存在內存中,
若是掛了,就會發生負載均衡,須要consumer group中另外的consumer來接管並繼續消費。kafka

consumer消費消息的方式有如下2種;同步

  1. consumer讀取消息,保存offset,而後處理消息。
    如今假設一個場景:保存offset成功,可是消息處理失敗,consumer又掛了,這時來接管的consumer
    就只能從上次保存的offset繼續消費,這種狀況下就有可能丟消息,可是保證了at most once語義。

  2. consumer讀取消息,處理消息,處理成功,保存offset。
    若是消息處理成功,可是在保存offset時,consumer掛了,這時來接管的consumer也只能
    從上一次保存的offset開始消費,這時消息就會被重複消費,也就是保證了at least once語義。

以上這些機制的保證都不是直接一個配置能夠解決的,而是你的consumer代碼來完成的,只是一個處理順序前後問題。 
第一種對應的代碼:

List<String> messages = consumer.poll();
consumer.commitOffset();
processMsg(messages);

第二種對應的代碼:

List<String> messages = consumer.poll();
processMsg(messages);
consumer.commitOffset();

Exactly Once實現原理

下面詳細說說exactly once的實現原理。

Producer端的消息冪等性保證

每一個Producer在初始化的時候都會被分配一個惟一的PID,
Producer向指定的Topic的特定Partition發送的消息都攜帶一個sequence number(簡稱seqNum),從零開始的單調遞增的。

Broker會將Topic-Partition對應的seqNum在內存中維護,每次接受到Producer的消息都會進行校驗;
只有seqNum比上次提交的seqNum恰好大一,才被認爲是合法的。比它大的,說明消息有丟失;比它小的,說明消息重複發送了。

以上說的這個只是針對單個Producer在一個session內的狀況,假設Producer掛了,又從新啓動一個Producer被並且分配了另一個PID,
這樣就不能達到防重的目的了,因此kafka又引進了Transactional Guarantees(事務性保證)。

Transactional Guarantees 事務性保證

kafka的事務性保證說的是:同時向多個TopicPartitions發送消息,要麼都成功,要麼都失敗。

爲何搞這麼個東西出來?我想了下有多是這種例子:
用戶定了一張機票,付款成功以後,訂單的狀態改了,飛機座位也被佔了,這樣至關因而
2條消息,那麼保證這個事務性就是:向訂單狀態的Topic和飛機座位的Topic分別發送一條消息,
這樣就須要kafka的這種事務性保證。

這種功能可使得consumer offset的提交(也是向broker產生消息)和producer的發送消息綁定在一塊兒。
用戶須要提供一個惟一的全局性TransactionalId,這樣就能將PID和TransactionalId映射起來,就能解決
producer掛掉後跨session的問題,應該是將以前PID的TransactionalId賦值給新的producer。

Consumer端

以上的事務性保證只是針對的producer端,對consumer端沒法保證,有如下緣由:

  1. 壓實類型的topics,有些事務消息可能被新版本的producer重寫
  2. 事務可能跨坐2個log segments,這時舊的segments可能被刪除,就會丟消息
  3. 消費者可能尋址到事務中任意一點,也會丟失一些初始化的消息
  4. 消費者可能不會同時從全部的參與事務的TopicPartitions分片中消費消息

若是是消費kafka中的topic,而且將結果寫回到kafka中另外的topic,
能夠將消息處理後結果的保存和offset的保存綁定爲一個事務,這時就能保證
消息的處理和offset的提交要麼都成功,要麼都失敗。

若是是將處理消息後的結果保存到外部系統,這時就要用到兩階段提交(tow-phase commit), 可是這樣作很麻煩,較好的方式是offset本身管理,將它和消息的結果保存到同一個地方,總體上進行綁定, 

相關文章
相關標籤/搜索