解決KafKa數據存儲與順序一致性保證

「嚴格的順序消費」有多麼困難

下面就從3個方面來分析一下,對於一個消息中間件來講,」嚴格的順序消費」有多麼困難,或者說不可能。安全

發送端

發送端不能異步發送,異步發送在發送失敗的狀況下,就沒辦法保證消息順序。多線程

好比你連續發了1,2,3。 過了一會,返回結果1失敗,2, 3成功。你把1再從新發送1遍,這個時候順序就亂掉了。併發

存儲端

對於存儲端,要保證消息順序,會有如下幾個問題: 
(1)消息不能分區。也就是1個topic,只能有1個隊列。在Kafka中,它叫作partition;在RocketMQ中,它叫作queue。 若是你有多個隊列,那同1個topic的消息,會分散到多個分區裏面,天然不能保證順序。異步

(2)即便只有1個隊列的狀況下,會有第2個問題。該機器掛了以後,可否切換到其餘機器?也就是高可用問題。分佈式

好比你當前的機器掛了,上面還有消息沒有消費完。此時切換到其餘機器,可用性保證了。但消息順序就亂掉了。性能

要想保證,一方面要同步複製,不能異步複製;另1方面得保證,切機器以前,掛掉的機器上面,全部消息必須消費完了,不能有殘留。很明顯,這個很難!!!大數據

接收端

對於接收端,不能並行消費,也即不能開多線程或者多個客戶端消費同1個隊列。ui

總結

從上面的分析能夠看出,要保證消息的嚴格有序,有多麼困難!spa

發送端和接收端的問題,還好解決一點,限制異步發送,限制並行消費。但對於存儲端,機器掛了以後,切換的問題,就很難解決了。線程

你切換了,可能消息就會亂;你不切換,那就暫時不可用。這2者之間,就須要權衡了。

業務須要全局有序嗎?

經過上面分析能夠看出,要保證一個topic內部,消息嚴格的有序,是很困難的,或者說條件是很苛刻的。

那怎麼辦呢?咱們必定要使出全部力氣、用盡全部辦法,來保證消息的嚴格有序嗎?

這裏就須要從另一個角度去考慮這個問題:業務角度。正如在下面這篇博客中所說的: 
http://www.jianshu.com/p/453c6e7ff81c

實際狀況中: 
(1)不關注順序的業務大量存在; 
(2) 隊列無序不表明消息無序。

第(2)條的意思是說:咱們不保證隊列的全局有序,但能夠保證消息的局部有序。

舉個例子:保證來自同1個order id的消息,是有序的!

下面就看一下在Kafka和RocketMQ中,分別是如何對待這個問題的:

Kafka中:發送1條消息的時候,能夠指定(topic, partition, key) 3個參數。partiton和key是可選的。

若是你指定了partition,那就是全部消息發往同1個partition,就是有序的。而且在消費端,Kafka保證,1個partition只能被1個consumer消費。

或者你指定key(好比order id),具備同1個key的全部消息,會發往同1個partition。也是有序的。

RocketMQ: RocketMQ在Kafka的基礎上,把這個限制更放寬了一步。只指定(topic, key),不指定具體發往哪一個隊列。也就是說,它更加不但願業務方,非要去要一個全局的嚴格有序。

Apache Kafka官方保證了partition內部的數據有效性(追加寫、offset讀);爲了提升Topic的併發吞吐能力,能夠提升Topic的partition數,並經過設置partition的replica來保證數據高可靠;

可是在多個Partition時,不能保證Topic級別的數據有序性。

所以,若是大家就像死磕kafka,可是對數據有序性有嚴格要求,那我建議:

  1. 建立Topic只指定1個partition,這樣的壞處就是磨滅了kafka最優秀的特性。

因此能夠思考下是否是技術選型有問題, kafka自己適合與流式大數據量,要求高吞吐,對數據有序性要求不嚴格的場景。

    2. 在Producer往Kafka插入數據時,控制同一Key分發到同一Partition,而且設置參數max.in.flight.requests.per.connection=1,也即同一個連接只能發送一條消息,如此即可嚴格保證Kafka消息的順序

    3. 經過key, 通常會hash(某一屬性)爲key,來作若干個分組,這樣只需在分組內嚴格有序便可,不犧牲併發性能。

再談談數據一致性保證:

一致性定義:若某條消息對client可見,那麼即便Leader掛了,在新Leader上數據依然能夠被讀到
HW-HighWaterMark: client能夠從Leader讀到的最大msg offset,即對外可見的最大offset, HW=max(replica.offset)
對於Leader新收到的msg,client不能馬上消費,Leader會等待該消息被全部ISR中的replica同步後,更新HW,此時該消息才能被client消費,這樣就保證了若是Leader fail,該消息仍然能夠重新選舉的Leader中獲取。
對於來自內部Broker的讀取請求,沒有HW的限制。同時,Follower也會維護一份本身的HW,Folloer.HW = min(Leader.HW, Follower.offset)


數據存儲
Topic
一類消息稱爲一個Topic

 

Topic邏輯結構
Topic可分爲多個Parition;
Parition內部保證數據的有序,按照消息寫入順序給每一個消息賦予一個遞增的offset;
爲保證數據的安全性,每一個Partition有多個Replica

 

多Parition的優勢
併發讀寫,加快讀寫速度
多Partition分佈式存儲,利於集羣數據的均衡
加快數據恢復的速率:當某臺機器掛了,每一個Topic僅需恢復一部分的數據,多機器併發

缺點
Partition間Msg無序,若想保證Msg寫入與讀取的序不變,只能申請一個Partition

Partition


Partition存儲結構
每一個Partition分爲多個Segment
每一個Segment包含兩個文件:log文件和index文件,分別命名爲start_offset.log和start_offset.index
log文件包含具體的msg數據,每條msg會有一個遞增的offset
Index文件是對log文件的索引:每隔必定大小的塊,索引msg在該segment中的相對offset和在log文件中的位置偏移量

根據offset查找msg的過程
根據msg的offset和log文件名中的start_offset,找到最後一個不大於msgoffset的segment,即爲msg所在的segment;
根據對應segment的index文件,進一步查找msg在log文件中的偏移量
從log文件的偏移量開始讀取解析msg,比較msgoffset,找到所要讀取的msg

 

Partition recovery過程
每一個Partition會在磁盤記錄一個RecoveryPoint, 記錄已經flush到磁盤的最大offset。當broker fail 重啓時,會進行loadLogs。首先會讀取該Partition的RecoveryPoint,找到包含RecoveryPoint的segment及之後的segment, 這些segment就是可能沒有徹底flush到磁盤segments。而後調用segment的recover,從新讀取各個segment的msg,並重建索引
優勢
以segment爲單位管理Partition數據,方便數據生命週期的管理,刪除過時數據簡單
在程序崩潰重啓時,加快recovery速度,只需恢復未徹底flush到磁盤的segment
經過命名中offset信息和index文件,大大加快msg查找時間,而且經過分多個Segment,每一個index文件很小,查找速度更快

數據的同步


數據流
Partition的多個replica中一個爲Leader,其他爲follower
Producer只與Leader交互,把數據寫入到Leader中
Followers從Leader中拉取數據進行數據同步
Consumer只從Leader拉取數據

 

ISR:全部不落後的replica集合, 不落後有兩層含義:距離上次FetchRequest的時間不大於某一個值或落後的消息數不大於某一個值,Leader失敗後會從ISR中選取一個Follower作Leader

數據可靠性保證
當Producer向Leader發送數據時,能夠經過acks參數設置數據可靠性的級別
0: 不論寫入是否成功,server不須要給Producer發送Response,若是發生異常,server會終止鏈接,觸發Producer更新meta數據;
1: Leader寫入成功後即發送Response,此種狀況若是Leader fail,會丟失數據
-1: 等待全部ISR接收到消息後再給Producer發送Response,這是最強保證

僅設置acks=-1也不能保證數據不丟失,當Isr列表中只有Leader時,一樣有可能形成數據丟失。要保證數據不丟除了設置acks=-1, 還要保證ISR的大小大於等於2,具體參數設置:
request.required.acks:設置爲-1 等待全部ISR列表中的Replica接收到消息後採算寫成功;
min.insync.replicas: 設置爲大於等於2,保證ISR中至少有兩個Replica

Producer要在吞吐率和數據可靠性之間作一個權衡
數據一致性保證
一致性定義:若某條消息對client可見,那麼即便Leader掛了,在新Leader上數據依然能夠被讀到
HW-HighWaterMark: client能夠從Leader讀到的最大msg offset,即對外可見的最大offset, HW=max(replica.offset)
對於Leader新收到的msg,client不能馬上消費,Leader會等待該消息被全部ISR中的replica同步後,更新HW,此時該消息才能被client消費,這樣就保證了若是Leader fail,該消息仍然能夠重新選舉的Leader中獲取。

對於來自內部Broker的讀取請求,沒有HW的限制。同時,Follower也會維護一份本身的HW,Folloer.HW = min(Leader.HW, Follower.offset)
HDFS數據組織


與Kafka有幾點明顯不一樣: 數據分塊,好比以64M爲一個數據塊; 流水線複製:每一個數據塊沒有Leader和Follower之分,採用流水線的方式進行數據複製; 就近讀取:爲了減小讀取時的網路IO,採用就近讀取,加快讀取速率
相關文章
相關標籤/搜索