使用說明-Kafka

  • Kafka名詞解釋
  1. Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集羣。
  2. Topic:一類消息,消息存放的目錄即主題,例如page view日誌、click日誌等都可以以topic的形式存在,Kafka集羣能夠同時負責多個topic的分發。
  3. Partition:topic物理上的分組,一個topic可以分爲多個partition,每個partition是一個有序的隊列
  4. Producer : 生產message發送到topic
  5. Consumer : 訂閱topic消費message, consumer作爲一個線程來消費
  6. Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置文件中配置好的。各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group ) 中的一個consumer(consumer 線程 )消費,如果一個message可以被多個consumer(consumer 線程 ) 消費的話,那麼這些consumer必須在不同的組。
  • Kafka集羣概覽

  • 剖析一個Topic的生產與消費

  1. 生產者

集羣模式下,ProducerA和ProducerB都是「my topic」的生產者,即2個實例

  1. 消費者

在集羣模式下,ConsumerA、ConsumerB和ConsumerC都是「my topic」的消費者,即3個實例,它們都在一個ConsumerGroup中

  1. My Topic

一個主題,如:用戶點擊事件

Partition1、Partition2、Partition3是對My Topic按照特定規則的分解,例如

P1存放: key mod 3 = 0

P2存放: key mod 3 = 1

P3存放: key mod 3 = 2

一個topic,在kafka集羣裏的物理存儲怎樣?-----Partition & Replication

  1. Partition在broker中的分佈

舉例:在一個含有4個broker節點的集羣中,每個主題3個分區,每個分區3個備份

bin/kafka-topics.sh --create --zookeeper 192.168.102.197:2181 --replication-factor 3 --partitions 3 --topic topic1

 

 

  1. 創建過程

Step1:創建一個topic

Step2:將這個topic邏輯分3個區

Step3:將這3個區,分到每個broker上(分區數量可以大於broker數量嗎?)

Step4:爲每個分區,創建對應數量的副本(副本數量可以大於broker數量嗎?)

  1. 消息分配規則:

P0存放: key mod 3 = 0(只有3個分區,每個消息必須落在其中一個分區)

P1存放: key mod 3 = 1

P2存放: key mod 3 = 2

 

主題名稱

Key

Payload負載

所在分區

目標Broker

MyTopic

1

Data1

Part1

Broker2

MyTopic

2

Data2

Part2

Broker4

MyTopic

3

Data3

Part0

Broker1

MyTopic

4

Data4

Part1

Broker2

MyTopic

5

Data5

Part2

Broker4

MyTopic

6

Data6

Part0

Broker1

MyTopic

7

Data7

Part1

Broker2

MyTopic

8

Data8

Part2

Broker4

  1. 結論:

同一個Partition可能會有多個Replica,需要保證同一個Partition的多個Replica之間的數據一致性。而這時需要在這些Replication之間選出一個Leader(紅色背景),Producer和Consumer只與這個Leader交互,其它Replica作爲Follower從Leader中複製數據

  1. Producer發送消息到Kafka集羣的過程分析

選擇分區

我們從創建一個ProducerRecord 對象開始, Producer Record 對象需要包含目標主題和要發送的內容。我們還可以指定分區。在發送ProducerRecord 對象時,生產者要先把鍵和值對象序列化成字節數組,這樣它們才能夠在網絡上傳輸。

接下來,數據被傳給分區器。如果之前在ProducerRecord 對象裏指定了分區,那麼分區器就不會再做任何事情,直接把指定的分區返回。如果沒有指定分區,那麼分區器會根據ProducerRecord 對象的鍵來選擇一個分區。選好分區以後,生產者就知道該往哪個主題和分區發送這條記錄了。緊接着,這條記錄被添加到一個記錄批次裏,這個批次裏的所有消息會被髮送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的broker 上。

接收寫入結果

服務器在收到這些消息時會返回一個響應。如果消息成功寫入Kafka ,就返回一個RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區裏的偏移量。如果寫入失敗, 則會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送消息,幾次之後如果還

是失敗,就返回錯誤信息。

同步發送消息---通過調用Future.get()方法阻塞,等待結果返回

異步發送消息---回調函數

生產者如何保證消息寫入的高可用?

acks 參數指定了必須要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的。這個參數對消息丟失的可能性有重要影響。

如果acks=0 ,生產者在成功寫入消息之前不會等待任何來自服務器的響應。

如果acks=1 ,只要集羣的首領節點收到消息,生產者就會收到一個來自服務器的成功

響應。

如果acks=all ,只有當所有參與複製的節點全部收到消息時,生產者纔會收到一個來自

服務器的成功響應。

  1. Consumer從Kafka集羣消費消息的過程分析

主題分區和消費者羣組的多種分配方式

消費者羣組和分區再均衡

一個新的悄費者加入羣組時,它讀取的是原本由其他消費者讀取的消息。當一個消費者被關閉或發生崩憤時,它就離開羣組,原本由它讀取的分區將由羣組裏的其他消費者來讀取。在主題發生變化時, 比如管理員添加了新的分區,會發生分區重分配。當消費者要加入羣組時,它會向羣組協調器發送一個Join Gro up 請求。第一個加入羣組的消費者將成爲「羣主」。羣主從協調器那裏獲得羣組的成員列表(列表中包含了所有最近發送過心跳的消費者,它們被認爲是活躍的),並負責給每一個悄費者分配分區。它使用一個實現了P a 「ti.ti.onAssi. gn o 「接口的類來決定哪些分區應該被分配給哪個消費者。Kafka 內置了兩種分配策略,在後面的配置參數小節我們將深入討論。分配完畢之後,羣主把分配情況列表發送給羣組協調器,協調器再把這些信息發送給所有消費者。每個消費者只能看到自己的分配信息,只有羣主知道羣組裏所有消費者的分配信息。這個過程會在每次再均衡時重複發生。

消費者讀取消息

poll () 方能返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息、記錄所在分

區的信息、記錄在分區裏的偏移量,以及記錄的鍵值對。我們一般會遍歷這個列表,逐

條處理這些記錄。

消費者指定偏移量進行消費

我們知道了如何使用poll () 方告從各個分區的最新偏移量處開始處理消息。不過,有時候我們也需要從特定的偏移量處開始讀取悄息。如果你想從分區的起始位置開始讀取消息,或者直接跳到分區的末尾開始讀取消息, 可以使用seekToBeginning(Collection<TopicPartition> tp ) 和seekToEnd (Collection < TopicPartition >

tp ) 這兩個方法。

消費者提交分區消息偏移量

消費者往一個叫作_consumer_offset 的特殊主題發送消息,消息裏包含每個分區的偏移量。如果消費者一直處於運行狀態,那麼偏移量就沒有什麼用處。不過,如果悄費者發生崩憤或者有新的消費者加入羣組,就會觸發再均衡,完成再均衡之後,每個消費者可能分配到新的分區,而不是之前處理的那個。爲了能夠繼續之前的工作,消費者需要讀取每個分區最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。

提交方式:

自動提交consumerProps.put("enable.auto.commit", "true")

手動提交consumer.commitSync();或consumer.commitAsync();

提交特定的偏移量:邊處理邊提交

  • 安裝kafka
  • Kafka常用命令