1.producer:
消息生產者,發佈消息到 kafka 集羣的終端或服務。
2.broker:
kafka 集羣中包含的服務器。
3.topic:
每條發佈到 kafka 集羣的消息屬於的類別,即 kafka 是面向 topic 的。
4.partition:
partition 是物理上的概念,每一個 topic 包含一個或多個 partition。kafka 分配的單位是 partition。
5.consumer:
從 kafka 集羣中消費消息的終端或服務。
6.Consumer group:
high-level consumer API 中,每一個 consumer 都屬於一個 consumer group,每條消息只能被 consumer group 中的一個 Consumer 消費,但能夠被多個 consumer group 消費。
7.replica:
partition 的副本,保障 partition 的高可用。
8.leader:
replica 中的一個角色, producer 和 consumer 只跟 leader 交互。
9.follower:
replica 中的一個角色,從 leader 中複製數據。
10.controller:
kafka 集羣中的其中一個服務器,用來進行 leader selection。
12.zookeeper:
kafka 經過 zookeeper 來存儲集羣的信息。java
爲了解決kafka查找效率問題,kafka的message在存儲的時候採起了分段+索引的存儲方式。
Kafka中的Message是以topic爲基本單位組織的,不一樣的topic之間是相互獨立的。每一個topic又能夠分紅幾個不一樣的partition(每一個topic有幾個partition是在建立topic時指定的),每一個partition存儲一部分Message。
partition是以文件的形式存儲在文件系統中,好比,建立了一個名爲iss_fw_state_callback的topic,其有2個partition,那麼在Kafka的數據目錄中(由配置文件中的log.dirs指定的)中就有這樣2個目錄: iss_fw_state_callback-0,iss_email_send_topic-1,其命名規則爲<topic_name>-<partition_id>,裏面存儲的分別就是這2個partition的數據
partition是分段的,每一個段叫LogSegment,包括了一個數據文件和一個索引文件,下圖是某個partition目錄下的文件
能夠看到,這個partition有4個LogSegment。partion全局的第一個segment從0開始,後續每一個segment文件名爲上一個 segment文件最後一條消息的offset值進行遞增算法
在partition中如何經過offset查找message?查找的算法是:
1.索引文件:根據offset的值,查找segment段中的index索引文件。
注:因爲索引文件命名是以上一個文件的最後一個offset進行命名的,因此,使用二分查找算法可以根據offset快速定位到指定的索引文件。
2.近似position:找到索引文件後,根據offset進行定位,找到索引文件中的符合範圍的索引。
注:因爲是稀疏索引,因此可能只能定位到一條相近的消息
3.具體消息:獲得position之後,再到對應的log文件中,從position出開始查找offset對應的消息,將每條消息的offset與目標offset進行比較,直到找到消息。緩存
示例:咱們要查找offset=368776這條消息(參照上面的圖)
那麼先找到00000000000000368769.index,
在索引文件中,找到索引6(對應的offset=368775)的具體位置position=1407(注:此次只能找到近似offset)
到log文件中,根據1407這個position開始查找,比較每條消息的offset是否大於等於368776。最後查找到對應的消息之後返回。服務器
查看kafka生產者源碼:markdown
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { this(topic, partition, timestamp, key, value, (Iterable)null); } public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) { this(topic, partition, (Long)null, key, value, headers); } public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, (Long)null, key, value, (Iterable)null); } public ProducerRecord(String topic, K key, V value) { this(topic, (Integer)null, (Long)null, key, value, (Iterable)null); } public ProducerRecord(String topic, V value) { this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null); }
partition分爲三種狀況:網絡
1.副本數據同步策略: |
方案 | 優勢 | 缺點 |
---|---|---|---|
半數以上完成同步,就發送ack | 延遲低 | 選舉新的leader時,容忍n臺節點的故障,須要2n+1個副本 | |
所有完成同步,才發送ack | 選舉新的leader時,容忍n臺節點的故障,須要n+1個副本 | 延遲高 |
Kafka採用的是第二種方案,由於網絡延遲對kafka的延遲小。第一種方案會形成大量數據冗餘,浪費資源。架構
ISR(同步副本In-sync replicas):
試想,若是有一個follower故障,長時間沒有從leader同步數據,那麼leader就會一直等下去,直到該follower同步完消息才發送ack。因此,這纔有了ISR。
In-sync replica(ISR)稱之爲同步副本,ISR中的副本都是與Leader進行同步的副本,因此不在該列表的follower會被認爲與Leader是不一樣步的. 那麼,ISR中存在是什麼副本呢?首先能夠明確的是:Leader副本老是存在於ISR中. 而follower副本是否在ISR中,取決於該follower副本是否與Leader副本保持了「同步」。ide
對於"follower副本是否與Leader副本保持了同步"的理解以下:
(1) 上面所說的同步不是指徹底的同步,即並非說一旦follower副本同步滯後與Leader副本,就會被踢出ISR列表.
(2) Kafka的broker端有一個參數replica.lag.time.max.ms, 該參數表示follower副本滯後與Leader副本的最長時間間隔,默認是10秒。這就意味着,只要follower副本落後於leader副本的時間間隔不超過10秒,就能夠認爲該follower副本與leader副本是同步的,因此哪怕當前follower副本落後於Leader副本幾條消息,只要在10秒以內遇上Leader副本,就不會被踢出出局。
(3) 若是follower副本被踢出ISR列表,等到該副本追上了Leader副本的進度,該副本會被再次加入到ISR列表中,因此ISR是一個動態列表,並非靜態不變的.this
ack應答機制:
acks參數指定了必需要有多少個分區副本收到消息,生產者才認爲該消息是寫入成功的,這個參數對於消息是否丟失起着重要做用,該參數的配置具體以下:
1).acks=0,生產者不須要等待leader的響應,leader一接收到還沒寫入磁盤時leader故障,就會形成消息丟失
2).acks=1,leader把消息寫入磁盤後返回ack。若是在follower同步以前leader發生故障,會形成消息丟失
3).acks=-1,leader和全部follower把消息寫入磁盤後才返回ack。在follower同步完成後,leader發送ack以前,leader發生故障,會致使消息重複發送。雖然-1時基本上不會丟失數據,可是在某種極端狀況下仍是會的,即全部isr的副本都宕機,而另外一些由於一些問題沒有宕機也不在isr中的follower恢復了,就從新加入isr了,並從新選出leader,(退化到acks=1的狀況了)。。。此時本來isr中的一些最新數據沒有被同步到新的isr的副本節點中,那麼那些數據就丟失了3d
LEO(Log End Offset) & HW(High Watermark)
LEO:指的是每一個副本中最大的offset
HW:指的是ISR隊列中最小的LEO
(1) follower故障:
follower發生故障後會被臨時踢出ISR,待該follower恢復後,follower會從磁盤中讀取上次的HW,並將log中高於HW的部分截取掉,從HW開始向leader進行同步。等該follower的LEO大於等於該partition的HW,即follower追上leader以後,就能夠從新加入ISR了。
(2) leader故障:
leader發生故障後,會從ISR中選出一個新的leader,以後,爲了保證多個副本之間的數據一致性,其他的follower會先將各自log文件中高於HW的部分截取掉,而後重新的leader同步數據。
注意:這隻能保證副本之間的數據一致性,並不能保證數據不丟失或者不重複
Exactly Once
· at-least-once:若是producer收到來自Kafka broker的確認(ack)或者acks = all,則表示該消息已經寫入到Kafka。但若是producer ack超時或收到錯誤,則可能會重試發送消息,客戶端會認爲該消息未寫入Kafka。若是broker在發送Ack以前失敗,但在消息成功寫入Kafka以後,此重試將致使該消息被寫入兩次,所以消息會被不止一次地傳遞給最終consumer,這種策略可能致使重複的工做和不正確的結果。
· at-most-once:若是在ack超時或返回錯誤時producer不重試,則該消息可能最終不會寫入Kafka,所以不會傳遞給consumer。在大多數狀況下,這樣作是爲了不重複的可能性,業務上必須接收數據傳遞可能的丟失。
· exactly-once:即便producer重試發送消息,消息也會保證最多一次地傳遞給最終consumer。該語義是最理想的,但也難以實現,這是由於它須要消息系統自己與生產和消費消息的應用程序進行協做。例如若是在消費消息成功後,將Kafka consumer的偏移量rollback,咱們將會再次從該偏移量開始接收消息。這代表消息傳遞系統和客戶端應用程序必須配合調整才能實現excactly-once。
at-least-once + 冪等性 = exactly-once要啓用冪等性,只須要將Producer的參數中enable.idempotence設置爲true。開啓冪等性的Producer會在初始化的時候分配一個PID,發往同一個partition的消息會附帶一個sequence number,而broker端會對<PID,Partition,SeqNumber>作緩存,當具備相同主鍵的消息提交時,broker只會持久化一條