深刻理解 Kafka 副本機制

1、Kafka集羣

Kafka使用Zookeeper來維護集羣成員(brokers)的信息。每一個broker都有一個惟一標識broker.id,用於標識本身在集羣中的身份,能夠在配置文件server.properties中進行配置,或者由程序自動生成。下面是Kafka brokers集羣自動建立的過程:java

  • 每個broker啓動的時候,它會在Zookeeper的/brokers/ids路徑下建立一個臨時節點,並將本身的broker.id寫入,從而將自身註冊到集羣;
  • 當有多個broker時,全部broker會競爭性地在Zookeeper上建立/controller節點,因爲Zookeeper上的節點不會重複,因此必然只會有一個broker建立成功,此時該broker稱爲controller broker。它除了具有其餘broker的功能外,還負責管理主題分區及其副本的狀態
  • 當broker出現宕機或者主動退出從而致使其持有的Zookeeper會話超時時,會觸發註冊在Zookeeper上的watcher事件,此時Kafka會進行相應的容錯處理;若是宕機的是controller broker時,還會觸發新的controller選舉。

2、副本機制

爲了保證高可用,kafka的分區是多副本的,若是一個副本丟失了,那麼還能夠從其餘副本中獲取分區數據。可是這要求對應副本的數據必須是完整的,這是Kafka數據一致性的基礎,因此才須要使用controller broker來進行專門的管理。下面將詳解介紹Kafka的副本機制。git

2.1 分區和副本

Kafka 的主題被分爲多個分區 ,分區是Kafka最基本的存儲單位。每一個分區能夠有多個副本(能夠在建立主題時使用replication-factor參數進行指定)。其中一個副本是首領副本(Leader replica),全部的事件都直接發送給首領副本;其餘副本是跟隨者副本(Follower replica),須要經過複製來保持與首領副本數據一致,當首領副本不可用時,其中一個跟隨者副本將成爲新首領。github

2.2 ISR機制

每一個分區都有一個ISR(in-sync Replica)列表,用於維護全部同步的、可用的副本。首領副本必然是同步副本,而對於跟隨者副原本說,它須要知足如下條件才能被認爲是同步副本:apache

  • 與Zookeeper之間有一個活躍的會話,即必須定時向Zookeeper發送心跳;
  • 在規定的時間內從首領副本那裏低延遲地獲取過消息。

若是副本不知足上面條件的話,就會被從ISR列表中移除,直到知足條件纔會被再次加入。api

這裏給出一個主題建立的示例:使用--replication-factor指定副本系數爲3,建立成功後使用--describe命令能夠看到分區0的有0,1,2三個副本,且三個副本都在ISR列表中,其中1爲首領副本。緩存

2.3 不徹底的首領選舉

對於副本機制,在broker級別有一個可選的配置參數unclean.leader.election.enable,默認值爲fasle,表明禁止不徹底的首領選舉。這是針對當首領副本掛掉且ISR中沒有其餘可用副本時,是否容許某個不徹底同步的副本成爲首領副本,這可能會致使數據丟失或者數據不一致,在某些對數據一致性要求較高的場景(如金融領域),這可能沒法容忍的,因此其默認值爲false,若是你可以容許部分數據不一致的話,能夠配置爲true。服務器

2.4 最少同步副本

ISR機制的另一個相關參數是min.insync.replicas , 能夠在broker或者主題級別進行配置,表明ISR列表中至少要有幾個可用副本。這裏假設設置爲2,那麼當可用副本數量小於該值時,就認爲整個分區處於不可用狀態。此時客戶端再向分區寫入數據時候就會拋出異常org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。網絡

2.5 發送確認

Kafka在生產者上有一個可選的參數ack,該參數指定了必需要有多少個分區副本收到消息,生產者纔會認爲消息寫入成功:架構

  • acks=0 :消息發送出去就認爲已經成功了,不會等待任何來自服務器的響應;
  • acks=1 : 只要集羣的首領節點收到消息,生產者就會收到一個來自服務器成功響應;
  • acks=all :只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應。

3、數據請求

3.1 元數據請求機制

在全部副本中,只有領導副本才能進行消息的讀寫處理。因爲不一樣分區的領導副本可能在不一樣的broker上,若是某個broker收到了一個分區請求,可是該分區的領導副本並不在該broker上,那麼它就會向客戶端返回一個Not a Leader for Partition的錯誤響應。 爲了解決這個問題,Kafka提供了元數據請求機制。socket

首先集羣中的每一個broker都會緩存全部主題的分區副本信息,客戶端會按期發送發送元數據請求,而後將獲取的元數據進行緩存。定時刷新元數據的時間間隔能夠經過爲客戶端配置metadata.max.age.ms來進行指定。有了元數據信息後,客戶端就知道了領導副本所在的broker,以後直接將讀寫請求發送給對應的broker便可。

若是在定時請求的時間間隔內發生的分區副本的選舉,則意味着原來緩存的信息可能已通過時了,此時還有可能會收到Not a Leader for Partition的錯誤響應,這種狀況下客戶端會再次求發出元數據請求,而後刷新本地緩存,以後再去正確的broker上執行對應的操做,過程以下圖:

3.2 數據可見性

須要注意的是,並非全部保存在分區首領上的數據均可以被客戶端讀取到,爲了保證數據一致性,只有被全部同步副本(ISR中全部副本)都保存了的數據才能被客戶端讀取到。

3.3 零拷貝

Kafka全部數據的寫入和讀取都是經過零拷貝來實現的。傳統拷貝與零拷貝的區別以下:

傳統模式下的四次拷貝與四次上下文切換

以將磁盤文件經過網絡發送爲例。傳統模式下,通常使用以下僞代碼所示的方法先將文件數據讀入內存,而後經過Socket將內存中的數據發送出去。

buffer = File.read
Socket.send(buffer)

這一過程實際上發生了四次數據拷貝。首先經過系統調用將文件數據讀入到內核態Buffer(DMA拷貝),而後應用程序將內存態Buffer數據讀入到用戶態Buffer(CPU拷貝),接着用戶程序經過Socket發送數據時將用戶態Buffer數據拷貝到內核態Buffer(CPU拷貝),最後經過DMA拷貝將數據拷貝到NIC Buffer。同時,還伴隨着四次上下文切換,以下圖所示:

sendfile和transferTo實現零拷貝

Linux 2.4+內核經過sendfile系統調用,提供了零拷貝。數據經過DMA拷貝到內核態Buffer後,直接經過DMA拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減小數據拷貝外,由於整個讀文件到網絡發送由一個sendfile調用完成,整個過程只有兩次上下文切換,所以大大提升了性能。零拷貝過程以下圖所示:

從具體實現來看,Kafka的數據傳輸經過TransportLayer來完成,其子類PlaintextTransportLayertransferFrom方法經過調用Java NIO中FileChannel的transferTo方法實現零拷貝,以下所示:

@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
    return fileChannel.transferTo(position, count, socketChannel);
}

注: transferTotransferFrom並不保證必定能使用零拷貝。其實是否能使用零拷貝與操做系統相關,若是操做系統提供sendfile這樣的零拷貝系統調用,則這兩個方法會經過這樣的系統調用充分利用零拷貝的優點,不然並不能經過這兩個方法自己實現零拷貝。

4、物理存儲

4.1 分區分配

在建立主題時,Kafka會首先決定如何在broker間分配分區副本,它遵循如下原則:

  • 在全部broker上均勻地分配分區副本;
  • 確保分區的每一個副本分佈在不一樣的broker上;
  • 若是使用了broker.rack參數爲broker指定了機架信息,那麼會盡量的把每一個分區的副本分配到不一樣機架的broker上,以免一個機架不可用而致使整個分區不可用。

基於以上緣由,若是你在一個單節點上建立一個3副本的主題,一般會拋出下面的異常:

Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactor   
Exception: Replication factor: 3 larger than available brokers: 1.

4.2 分區數據保留規則

保留數據是 Kafka 的一個基本特性, 可是Kafka不會一直保留數據,也不會等到全部消費者都讀取了消息以後才刪除消息。相反, Kafka爲每一個主題配置了數據保留期限,規定數據被刪除以前能夠保留多長時間,或者清理數據以前能夠保留的數據量大小。分別對應如下四個參數:

  • log.retention.bytes :刪除數據前容許的最大數據量;默認值-1,表明沒有限制;
  • log.retention.ms:保存數據文件的毫秒數,若是未設置,則使用log.retention.minutes中的值,默認爲null;
  • log.retention.minutes:保留數據文件的分鐘數,若是未設置,則使用log.retention.hours中的值,默認爲null;
  • log.retention.hours:保留數據文件的小時數,默認值爲168,也就是一週。

由於在一個大文件裏查找和刪除消息是很費時的,也很容易出錯,因此Kafka把分區分紅若干個片斷,當前正在寫入數據的片斷叫做活躍片斷。活動片斷永遠不會被刪除。若是按照默認值保留數據一週,並且天天使用一個新片斷,那麼你就會看到,在天天使用一個新片斷的同時會刪除一個最老的片斷,因此大部分時間該分區會有7個片斷存在。

4.3 文件格式

一般保存在磁盤上的數據格式與生產者發送過來消息格式是同樣的。 若是生產者發送的是壓縮過的消息,那麼同一個批次的消息會被壓縮在一塊兒,被看成「包裝消息」進行發送(格式以下所示) ,而後保存到磁盤上。以後消費者讀取後再本身解壓這個包裝消息,獲取每條消息的具體信息。

參考資料

  1. Neha Narkhede, Gwen Shapira ,Todd Palino(著) , 薛命燈(譯) . Kafka權威指南 . 人民郵電出版社 . 2017-12-26
  2. Kafka高性能架構之道

更多大數據系列文章能夠參見我的 GitHub 開源項目: 大數據入門指南

相關文章
相關標籤/搜索