集羣模式下,ProducerA和ProducerB都是「my topic」的生產者,即2個實例
在集羣模式下,ConsumerA、ConsumerB和ConsumerC都是「my topic」的消費者,即3個實例,它們都在一個ConsumerGroup中
一個主題,如:用戶點擊事件
Partition1、Partition2、Partition3是對My Topic按照特定規則的分解,例如
P1存放: key mod 3 = 0
P2存放: key mod 3 = 1
P3存放: key mod 3 = 2
一個topic,在kafka集羣裏的物理存儲怎樣?-----Partition & Replication
舉例:在一個含有4個broker節點的集羣中,每個主題3個分區,每個分區3個備份
bin/kafka-topics.sh --create --zookeeper 192.168.102.197:2181 --replication-factor 3 --partitions 3 --topic topic1
Step1:創建一個topic
Step2:將這個topic邏輯分3個區
Step3:將這3個區,分到每個broker上(分區數量可以大於broker數量嗎?)
Step4:爲每個分區,創建對應數量的副本(副本數量可以大於broker數量嗎?)
P0存放: key mod 3 = 0(只有3個分區,每個消息必須落在其中一個分區)
P1存放: key mod 3 = 1
P2存放: key mod 3 = 2
主題名稱 |
Key |
Payload負載 |
所在分區 |
目標Broker |
MyTopic |
1 |
Data1 |
Part1 |
Broker2 |
MyTopic |
2 |
Data2 |
Part2 |
Broker4 |
MyTopic |
3 |
Data3 |
Part0 |
Broker1 |
MyTopic |
4 |
Data4 |
Part1 |
Broker2 |
MyTopic |
5 |
Data5 |
Part2 |
Broker4 |
MyTopic |
6 |
Data6 |
Part0 |
Broker1 |
MyTopic |
7 |
Data7 |
Part1 |
Broker2 |
MyTopic |
8 |
Data8 |
Part2 |
Broker4 |
同一個Partition可能會有多個Replica,需要保證同一個Partition的多個Replica之間的數據一致性。而這時需要在這些Replication之間選出一個Leader(紅色背景),Producer和Consumer只與這個Leader交互,其它Replica作爲Follower從Leader中複製數據
我們從創建一個ProducerRecord 對象開始, Producer Record 對象需要包含目標主題和要發送的內容。我們還可以指定鍵或分區。在發送ProducerRecord 對象時,生產者要先把鍵和值對象序列化成字節數組,這樣它們才能夠在網絡上傳輸。
接下來,數據被傳給分區器。如果之前在ProducerRecord 對象裏指定了分區,那麼分區器就不會再做任何事情,直接把指定的分區返回。如果沒有指定分區,那麼分區器會根據ProducerRecord 對象的鍵來選擇一個分區。選好分區以後,生產者就知道該往哪個主題和分區發送這條記錄了。緊接着,這條記錄被添加到一個記錄批次裏,這個批次裏的所有消息會被髮送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的broker 上。
服務器在收到這些消息時會返回一個響應。如果消息成功寫入Kafka ,就返回一個RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區裏的偏移量。如果寫入失敗, 則會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送消息,幾次之後如果還
是失敗,就返回錯誤信息。
同步發送消息---通過調用Future.get()方法阻塞,等待結果返回
異步發送消息---回調函數
acks 參數指定了必須要有多少個分區副本收到消息,生產者纔會認爲消息寫入是成功的。這個參數對消息丟失的可能性有重要影響。
如果acks=0 ,生產者在成功寫入消息之前不會等待任何來自服務器的響應。
如果acks=1 ,只要集羣的首領節點收到消息,生產者就會收到一個來自服務器的成功
響應。
如果acks=all ,只有當所有參與複製的節點全部收到消息時,生產者纔會收到一個來自
服務器的成功響應。
一個新的悄費者加入羣組時,它讀取的是原本由其他消費者讀取的消息。當一個消費者被關閉或發生崩憤時,它就離開羣組,原本由它讀取的分區將由羣組裏的其他消費者來讀取。在主題發生變化時, 比如管理員添加了新的分區,會發生分區重分配。當消費者要加入羣組時,它會向羣組協調器發送一個Join Gro up 請求。第一個加入羣組的消費者將成爲「羣主」。羣主從協調器那裏獲得羣組的成員列表(列表中包含了所有最近發送過心跳的消費者,它們被認爲是活躍的),並負責給每一個悄費者分配分區。它使用一個實現了P a 「ti.ti.onAssi. gn o 「接口的類來決定哪些分區應該被分配給哪個消費者。Kafka 內置了兩種分配策略,在後面的配置參數小節我們將深入討論。分配完畢之後,羣主把分配情況列表發送給羣組協調器,協調器再把這些信息發送給所有消費者。每個消費者只能看到自己的分配信息,只有羣主知道羣組裏所有消費者的分配信息。這個過程會在每次再均衡時重複發生。
poll () 方能返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息、記錄所在分
區的信息、記錄在分區裏的偏移量,以及記錄的鍵值對。我們一般會遍歷這個列表,逐
條處理這些記錄。
我們知道了如何使用poll () 方告從各個分區的最新偏移量處開始處理消息。不過,有時候我們也需要從特定的偏移量處開始讀取悄息。如果你想從分區的起始位置開始讀取消息,或者直接跳到分區的末尾開始讀取消息, 可以使用seekToBeginning(Collection<TopicPartition> tp ) 和seekToEnd (Collection < TopicPartition >
tp ) 這兩個方法。
消費者往一個叫作_consumer_offset 的特殊主題發送消息,消息裏包含每個分區的偏移量。如果消費者一直處於運行狀態,那麼偏移量就沒有什麼用處。不過,如果悄費者發生崩憤或者有新的消費者加入羣組,就會觸發再均衡,完成再均衡之後,每個消費者可能分配到新的分區,而不是之前處理的那個。爲了能夠繼續之前的工作,消費者需要讀取每個分區最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。
提交方式:
自動提交consumerProps.put("enable.auto.commit", "true")
手動提交consumer.commitSync();或consumer.commitAsync();
提交特定的偏移量:邊處理邊提交