(1)兩種經常使用的消息模型java
隊列模型(queuing)和發佈-訂閱模型(publish-subscribe)。正則表達式
隊列的處理方式是一組消費者從服務器讀取消息,一條消息只由其中的一個消費者來處理。apache
發佈-訂閱模型中,消息被廣播給全部的消費者,接收到消息的消費者均可以處理此消息。json
(2)Kafka的消費者和消費者組bootstrap
Kafka爲這兩種模型提供了單一的消費者抽象模型: 消費者組 (consumer group)。 消費者用一個消費者組名標記本身。 一個發佈在Topic上消息被分發給此消費者組中的一個消費者。 假如全部的消費者都在一個組中,那麼這就變成了隊列模型。 假如全部的消費者都在不一樣的組中,那麼就徹底變成了發佈-訂閱模型。 一個消費者組中消費者訂閱同一個Topic,每一個消費者接受Topic的一部分分區的消息,從而實現對消費者的橫向擴展,對消息進行分流。緩存
注意:當單個消費者沒法跟上數據生成的速度,就能夠增長更多的消費者分擔負載,每一個消費者只處理部分partition的消息,從而實現單個應用程序的橫向伸縮。可是不要讓消費者的數量多於partition的數量,此時多餘的消費者會空閒。此外,Kafka還容許多個應用程序從同一個Topic讀取全部的消息,此時只要保證每一個應用程序有本身的消費者組便可。安全
消費者組的概念就是:當有多個應用程序都須要從Kafka獲取消息時,讓每一個app對應一個消費者組,從而使每一個應用程序都能獲取一個或多個Topic的所有消息;在每一個消費者組中,往消費者組中添加消費者來伸縮讀取能力和處理能力,消費者組中的每一個消費者只處理每一個Topic的一部分的消息,每一個消費者對應一個線程。服務器
(3)線程安全網絡
在同一個羣組中,沒法讓一個線程運行多個消費者,也沒法讓多線線程安全地共享一個消費者。按照規則,一個消費者使用一個線程,若是要在同一個消費者組中運行多個消費者,須要讓每一個消費者運行在本身的線程中。最好把消費者的邏輯封裝在本身的對象中,而後使用java的ExecutorService啓動多個線程,使每一個消費者運行在本身的線程上,可參考https://www.confluent.io/blogsession
(1)消費者組中新添加消費者讀取到本來是其餘消費者讀取的消息
(2)消費者關閉或崩潰以後離開羣組,本來由他讀取的partition將由羣組裏其餘消費者讀取
(3)當向一個Topic添加新的partition,會發生partition在消費者中的從新分配
以上三種現象會使partition的全部權在消費者之間轉移,這樣的行爲叫做再均衡。
再均衡的優勢:
給消費者組帶來了高可用性和伸縮性
再均衡的缺點:
(1)再均衡期間消費者沒法讀取消息,整個羣組有一小段時間不可用
(2)partition被從新分配給一個消費者時,消費者當前的讀取狀態會丟失,有可能還須要去刷新緩存,在它從新恢復狀態以前會拖慢應用程序。
所以須要進行安全的再均衡和避免沒必要要的再均衡。
Properties props = new Properties(); props.put("bootstrap", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //1.建立消費者 KafkaConsuner<String, String> consumer = new KafkaConsumer<String, String>(props); //2.訂閱Topic //建立一個只包含單個元素的列表,Topic的名字叫做customerCountries consumer.subscribe(Collections.singletonList("customerCountries")); //支持正則表達式,訂閱全部與test相關的Topic //consumer.subscribe("test.*"); //3.輪詢 //消息輪詢是消費者的核心API,經過一個簡單的輪詢向服務器請求數據,一旦消費者訂閱了Topic,輪詢就會處理所欲的細節,包括羣組協調、partition再均衡、發送心跳 //以及獲取數據,開發者只要處理從partition返回的數據便可。 try { while (true) {//消費者是一個長期運行的程序,經過持續輪詢向Kafka請求數據。在其餘線程中調用consumer.wakeup()能夠退出循環 //在100ms內等待Kafka的broker返回數據.超市參數指定poll在多久以後能夠返回,無論有沒有可用的數據都要返回 ConsumerRecord<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { log.debug(record.topic() + record.partition() + record.offset() + record.key() + record.value()); //統計各個地區的客戶數量,即模擬對消息的處理 int updatedCount = 1; updatedCount += custCountryMap.getOrDefault(record.value(), 0) + 1; custCountryMap.put(record.value(), updatedCount); //真實場景中,結果通常會被保存到數據存儲系統中 JSONObject json = new JSONObject(custCountryMap); System.out.println(json.toString(4)); } } } finally { //退出應用程序前使用close方法關閉消費者,網絡鏈接和socket也會隨之關閉,並當即觸發一次再均衡 consumer.close(); }
1:fetch.min.bytes,指定消費者從broker獲取消息的最小字節數,即等到有足夠的數據時才把它返回給消費者
2:fetch.max.wait.ms,等待broker返回數據的最大時間,默認是500ms。fetch.min.bytes和fetch.max.wait.ms哪一個條件先獲得知足,就按照哪一種方式返回數據
3:max.partition.fetch.bytes,指定broker從每一個partition中返回給消費者的最大字節數,默認1MB
4:session.timeout.ms,指定消費者被認定死亡以前能夠與服務器斷開鏈接的時間,默認是3s
5:auto.offset.reset,消費者在讀取一個沒有偏移量或者偏移量無效的狀況下(由於消費者長時間失效,包含偏移量的記錄已通過時並被刪除)該做何處理。默認是latest(消費者從最新的記錄開始讀取數據)。另外一個值是 earliest(消費者從起始位置讀取partition的記錄)
6:enable.auto.commit,指定消費者是否自動提交偏移量,默認爲true
7:partition.assignment.strategy,指定partition如何分配給消費者,默認是Range。Range:把Topic的若干個連續的partition分配給消費者。RoundRobin:把Topic的全部partition逐個分配給消費者
8:max.poll.records,單次調用poll方法可以返回的消息數量
當消費者崩潰或者有新的消費者加入,那麼就會觸發再均衡(rebalance),完成再均衡後,每一個消費者可能會分配到新的分區,而不是以前處理那個,爲了可以繼續以前的工做,消費者須要讀取每一個partition最後一次提交的偏移量,而後從偏移量指定的地方繼續處理。
case1:若是提交的偏移量小於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息就會被重複處理。
case2:若是提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息將會丟失。
enable.auto.commit設置成true(默認爲true),那麼每過5s,消費者自動把從poll()方法接收到的最大的偏移量提交。提交的時間間隔由auto.commit.interval.ms控制,默認是5s
自動提交的優勢是方便,可是可能會重複處理消息
將enable.auto.commit設置成false,讓應用程序決定什麼時候提交偏移量。commitSync()提交由poll()方法返回的最新偏移量,因此在處理完全部消息後要確保調用commitSync,不然會有消息丟失的風險。commitSync在提交成功或碰到沒法恢復的錯誤以前,會一直重試。若是發生了再均衡,從最近一批消息到發生再均衡之間的全部消息都會被重複處理。
不足:broker在對提交請求做出迴應以前,應用程序會一直阻塞,會限制應用程序的吞吐量
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } try { consumer.commitSync();//處理完當前批次的消息,在輪詢更多的消息以前,調用commitSync方法提交當前批次最新的消息 } catch (CommitFailedException e) { log.error("commit failed", e);//只要沒有發生不可恢復的錯誤,commitSync方法會一直嘗試直至提交成功。若是提交失敗,咱們也只能把異常記錄到錯誤日誌裏 } }
異步提交的commitAsync,只管發送提交請求,無需等待broker響應。commitAsync提交以後不進行重試,假設要提交偏移量2000,這時候發生短暫的通訊問題,服務器接收不到提交請求,所以也就不會做出響應。與此同時,咱們處理了另一批消息,併成功提交了偏移量3000,。若是commitAsync從新嘗試提交2000,那麼它有可能在3000以後提交成功,這個時候若是發生再均衡,就會出現重複消息。
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(new OffsetCommitCallback() {//在broker做出響應後執行回調函數,回調常常被用於記錄提交錯誤或生成度量指標 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { if (e != null) { log.error("Commit Failed for offsets {}", offsets, e); } }}); }
通常狀況下,針對偶爾出現的提交失敗,不進行重試不會有太大的問題,由於若是提交失敗是由於臨時問題致使的,那麼後續的提交總會有成功的。可是若是在關閉消費者或再均衡前的最後一次提交,就要確保提交成功。
所以,在消費者關閉以前通常會組合使用commitAsync和commitSync提交偏移量。
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync();//若是一切正常,咱們使用commitAsync來提交,這樣速度更快,並且即便此次提交失敗,下次提交極可能會成功 } catch (CommitFailedException e) { log.error("commit failed", e); } finally { try { consumer.commitSync();//關閉消費者前,使用commitSync,直到提交成成功或者發生沒法恢復的錯誤 } finally { consumer.close(); } }
消費者API容許調用commitSync()和commitAsync()方法時傳入但願提交的partition和offset的map,即提交特定的偏移量。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();//用於跟蹤偏移量的map int count = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());//模擬對消息的處理
//在讀取每條消息後,使用指望處理的下一個消息的偏移量更新map裏的偏移量。下一次就從這裏開始讀取消息 currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, 「no matadata」)); if (count++ % 1000 == 0) {//每處理1000條消息就提交一次偏移量,在實際應用中,能夠根據時間或者消息的內容進行提交 consumer.commitAsync(currentOffsets, null); }
}
}
在爲消費者分配新的partition或者移除舊的partition時,能夠經過消費者API執行一些應用程序代碼,在使用subscribe()方法時傳入一個ConsumerRebalanceListener實例。
ConsumerRebalanceListener須要實現的兩個方法
1:public void onPartitionRevoked(Collection<TopicPartition> partitions)方法會在再均衡開始以前和消費者中止讀取消息以後被調用。若是在這裏提交偏移量,下一個接管partition的消費者就知道該從哪裏開始讀取了。
2:public void onPartitionAssigned(Collection<TopicPartition> partitions)方法會在從新分配partition以後和消費者開始讀取消息以前被調用。
下面的例子演示如何在失去partition的全部權以前經過onPartitionRevoked()方法來提交偏移量。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();//用於跟蹤偏移量的map private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { //若是發生再均衡,要在即將失去partition全部權時提交偏移量。 //注意:(1)提交的是最近處理過的偏移量,而不是批次中還在處理的最後一個偏移量。由於partition有可能在咱們還在處理消息時被撤回。 //(2)咱們要提交全部分區的偏移量,而不僅是即將市區全部權的分區的偏移量。由於提交的偏移量是已經處理過的,因此不會有什麼問題。 //(3)調用commitSync方法,確保在再均衡發生以前提交偏移量 consumer.commitSync(currentOffsets); } } try{ consumer.subscribe(topics, new HandleRebalance()); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());//模擬對消息的處理 //在讀取每條消息後,使用指望處理的下一個消息的偏移量更新map裏的偏移量。下一次就從這裏開始讀取消息 currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, 「no matadata」)); } consumer.commitAsync(currentOffsets, null); } catch(WakeupException e) { //忽略異常,正在關閉消費者 } catch (Exception e) { log.error("unexpected error", e); } finally { try{ consumer.commitSync(currentOffsets); } finally { consumer.close(); } }
參考:《Kafka權威指南》