在 Kafka 中,消費者一般是消費者羣組的一部分,多個消費者羣組共同讀取同一個主題時,彼此之間互不影響。Kafka 之因此要引入消費者羣組這個概念是由於 Kafka 消費者常常會作一些高延遲的操做,好比把數據寫到數據庫或 HDFS ,或者進行耗時的計算,在這些狀況下,單個消費者沒法跟上數據生成的速度。此時能夠增長更多的消費者,讓它們分擔負載,分別處理部分分區的消息,這就是 Kafka 實現橫向伸縮的主要手段。java
須要注意的是:同一個分區只能被同一個消費者羣組裏面的一個消費者讀取,不可能存在同一個分區被同一個消費者羣裏多個消費者共同讀取的狀況,如圖:git
能夠看到即使消費者 Consumer5 空閒了,可是也不會去讀取任何一個分區的數據,這同時也提醒咱們在使用時應該合理設置消費者的數量,以避免形成閒置和額外開銷。github
由於羣組裏的消費者共同讀取主題的分區,因此當一個消費者被關閉或發生崩潰時,它就離開了羣組,本來由它讀取的分區將由羣組裏的其餘消費者來讀取。同時在主題發生變化時 , 好比添加了新的分區,也會發生分區與消費者的從新分配,分區的全部權從一個消費者轉移到另外一個消費者,這樣的行爲被稱爲再均衡。正是由於再均衡,因此消費費者羣組才能保證高可用性和伸縮性。數據庫
消費者經過向羣組協調器所在的 broker 發送心跳來維持它們和羣組的從屬關係以及它們對分區的全部權。只要消費者以正常的時間間隔發送心跳,就被認爲是活躍的,說明它還在讀取分區裏的消息。消費者會在輪詢消息或提交偏移量時發送心跳。若是消費者中止發送心跳的時間足夠長,會話就會過時,羣組協調器認爲它已經死亡,就會觸發再均衡。apache
在建立消費者的時候如下如下三個選項是必選的:bootstrap
除此以外你還須要指明你須要想訂閱的主題,可使用以下兩個 API :api
最後只須要經過輪詢 API(poll
) 向服務器定時請求數據。一旦消費者訂閱了主題,輪詢就會處理全部的細節,包括羣組協調、分區再均衡、發送心跳和獲取數據,這使得開發者只須要關注從分區返回的數據,而後進行業務處理。 示例以下:服務器
String topic = "Hello-Kafka"; String group = "group1"; Properties props = new Properties(); props.put("bootstrap.servers", "hadoop001:9092"); /*指定分組 ID*/ props.put("group.id", group); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /*訂閱主題 (s)*/ consumer.subscribe(Collections.singletonList(topic)); try { while (true) { /*輪詢獲取數據*/ ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", record.topic(), record.partition(), record.key(), record.value(), record.offset()); } } } finally { consumer.close(); }
本篇文章的全部示例代碼能夠從 Github 上進行下載:kafka-basissession
Kafka 的每一條消息都有一個偏移量屬性,記錄了其在分區中的位置,偏移量是一個單調遞增的整數。消費者經過往一個叫做 _consumer_offset
的特殊主題發送消息,消息裏包含每一個分區的偏移量。 若是消費者一直處於運行狀態,那麼偏移量就沒有
什麼用處。不過,若是有消費者退出或者新分區加入,此時就會觸發再均衡。完成再均衡以後,每一個消費者可能分配到新的分區,而不是以前處理的那個。爲了可以繼續以前的工做,消費者須要讀取每一個分區最後一次提交的偏移量,而後從偏移量指定的地方繼續處理。 由於這個緣由,因此若是不能正確提交偏移量,就可能會致使數據丟失或者重複出現消費,好比下面狀況:異步
Kafka 支持自動提交和手動提交偏移量兩種方式。這裏先介紹比較簡單的自動提交:
只須要將消費者的 enable.auto.commit
屬性配置爲 true
便可完成自動提交的配置。 此時每隔固定的時間,消費者就會把 poll()
方法接收到的最大偏移量進行提交,提交間隔由 auto.commit.interval.ms
屬性進行配置,默認值是 5s。
使用自動提交是存在隱患的,假設咱們使用默認的 5s 提交時間間隔,在最近一次提交以後的 3s 發生了再均衡,再均衡以後,消費者從最後一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落後了 3s ,因此在這 3s 內到達的消息會被重複處理。能夠經過修改提交時間間隔來更頻繁地提交偏移量,減少可能出現重複消息的時間窗,不過這種狀況是沒法徹底避免的。基於這個緣由,Kafka 也提供了手動提交偏移量的 API,使得用戶能夠更爲靈活的提交偏移量。
用戶能夠經過將 enable.auto.commit
設爲 false
,而後手動提交偏移量。基於用戶需求手動提交偏移量能夠分爲兩大類:
而按照 Kafka API,手動提交偏移量又能夠分爲同步提交和異步提交。
經過調用 consumer.commitSync()
來進行同步提交,不傳遞任何參數時提交的是當前輪詢的最大偏移量。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*同步提交*/ consumer.commitSync(); }
若是某個提交失敗,同步提交還會進行重試,這能夠保證數據可以最大限度提交成功,可是同時也會下降程序的吞吐量。基於這個緣由,Kafka 還提供了異步提交的 API。
異步提交能夠提升程序的吞吐量,由於此時你能夠儘管請求數據,而不用等待 Broker 的響應。代碼以下:
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*異步提交併定義回調*/ consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.out.println("錯誤處理"); offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset())); } } }); }
異步提交存在的問題是,在提交失敗的時候不會進行自動重試,實際上也不能進行自動重試。假設程序同時提交了 200 和 300 的偏移量,此時 200 的偏移量失敗的,可是緊隨其後的 300 的偏移量成功了,此時若是重試就會存在 200 覆蓋 300 偏移量的可能。同步提交就不存在這個問題,由於在同步提交的狀況下,300 的提交請求必須等待服務器返回 200 提交請求的成功反饋後纔會發出。基於這個緣由,某些狀況下,須要同時組合同步和異步兩種提交方式。
注:雖然程序不能在失敗時候進行自動重試,可是咱們是能夠手動進行重試的,你能夠經過一個 Map<TopicPartition, Integer> offsets 來維護你提交的每一個分區的偏移量,而後當失敗時候,你能夠判斷失敗的偏移量是否小於你維護的同主題同分區的最後提交的偏移量,若是小於則表明你已經提交了更大的偏移量請求,此時不須要重試,不然就能夠進行手動重試。
下面這種狀況,在正常的輪詢中使用異步提交來保證吞吐量,可是由於在最後即將要關閉消費者了,因此此時須要用同步提交來保證最大限度的提交成功。
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } // 異步提交 consumer.commitAsync(); } } catch (Exception e) { e.printStackTrace(); } finally { try { // 由於即將要關閉消費者,因此要用同步提交保證提交成功 consumer.commitSync(); } finally { consumer.close(); } }
在上面同步和異步提交的 API 中,實際上咱們都沒有對 commit 方法傳遞參數,此時默認提交的是當前輪詢的最大偏移量,若是你須要提交特定的偏移量,能夠調用它們的重載方法。
/*同步提交特定偏移量*/ commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) /*異步提交特定偏移量*/ commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
須要注意的是,由於你能夠訂閱多個主題,因此 offsets
中必需要包含全部主題的每一個分區的偏移量,示例代碼以下:
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); /*記錄每一個主題的每一個分區的偏移量*/ TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset()+1, "no metaData"); /*TopicPartition 重寫過 hashCode 和 equals 方法,因此可以保證同一主題和分區的實例不會被重複添加*/ offsets.put(topicPartition, offsetAndMetadata); } /*提交特定偏移量*/ consumer.commitAsync(offsets, null); } } finally { consumer.close(); }
由於分區再均衡會致使分區與消費者的從新劃分,有時候你可能但願在再均衡前執行一些操做:好比提交已經處理可是還沒有提交的偏移量,關閉數據庫鏈接等。此時能夠在訂閱主題時候,調用 subscribe
的重載方法傳入自定義的分區再均衡監聽器。
/*訂閱指定集合內的全部主題*/ subscribe(Collection<String> topics, ConsumerRebalanceListener listener) /*使用正則匹配須要訂閱的主題*/ subscribe(Pattern pattern, ConsumerRebalanceListener listener)
代碼示例以下:
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() { /*該方法會在消費者中止讀取消息以後,再均衡開始以前就調用*/ @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("再均衡即將觸發"); // 提交已經處理的偏移量 consumer.commitSync(offsets); } /*該方法會在從新分配分區以後,消費者開始讀取消息以前被調用*/ @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } }); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metaData"); /*TopicPartition 重寫過 hashCode 和 equals 方法,因此可以保證同一主題和分區的實例不會被重複添加*/ offsets.put(topicPartition, offsetAndMetadata); } consumer.commitAsync(offsets, null); } } finally { consumer.close(); }
Kafka 提供了 consumer.wakeup()
方法用於退出輪詢,它經過拋出 WakeupException
異常來跳出循環。須要注意的是,在退出線程時最好顯示的調用 consumer.close()
, 此時消費者會提交任何尚未提交的東西,並向羣組協調器發送消息,告知本身要離開羣組,接下來就會觸發再均衡 ,而不須要等待會話超時。
下面的示例代碼爲監聽控制檯輸出,當輸入 exit
時結束輪詢,關閉消費者並退出程序:
/*調用 wakeup 優雅的退出*/ final Thread mainThread = Thread.currentThread(); new Thread(() -> { Scanner sc = new Scanner(System.in); while (sc.hasNext()) { if ("exit".equals(sc.next())) { consumer.wakeup(); try { /*等待主線程完成提交偏移量、關閉消費者等操做*/ mainThread.join(); break; } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> rd : records) { System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset()); } } } catch (WakeupException e) { //對於 wakeup() 調用引發的 WakeupException 異常能夠沒必要處理 } finally { consumer.close(); System.out.println("consumer 關閉"); }
由於 Kafka 的設計目標是高吞吐和低延遲,因此在 Kafka 中,消費者一般都是從屬於某個羣組的,這是由於單個消費者的處理能力是有限的。可是某些時候你的需求可能很簡單,好比可能只須要一個消費者從一個主題的全部分區或者某個特定的分區讀取數據,這個時候就不須要消費者羣組和再均衡了, 只須要把主題或者分區分配給消費者,而後開始讀取消息井提交偏移量便可。
在這種狀況下,就不須要訂閱主題, 取而代之的是消費者爲本身分配分區。 一個消費者能夠訂閱主題(井加入消費者羣組),或者爲本身分配分區,但不能同時作這兩件事情。 分配分區的示例代碼以下:
List<TopicPartition> partitions = new ArrayList<>(); List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); /*能夠指定讀取哪些分區 如這裏假設只讀取主題的 0 分區*/ for (PartitionInfo partition : partitionInfos) { if (partition.partition()==0){ partitions.add(new TopicPartition(partition.topic(), partition.partition())); } } // 爲消費者指定分區 consumer.assign(partitions); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<Integer, String> record : records) { System.out.printf("partition = %s, key = %d, value = %s\n", record.partition(), record.key(), record.value()); } consumer.commitSync(); }
消費者從服務器獲取記錄的最小字節數。若是可用的數據量小於設置值,broker 會等待有足夠的可用數據時纔會把它返回給消費者。
broker 返回給消費者數據的等待時間,默認是 500ms。
該屬性指定了服務器從每一個分區返回給消費者的最大字節數,默認爲 1MB。
消費者在被認爲死亡以前能夠與服務器斷開鏈接的時間,默認是 3s。
該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下該做何處理:
是否自動提交偏移量,默認值是 true。爲了不出現重複消費和數據丟失,能夠把它設置爲 false。
客戶端 id,服務器用來識別消息的來源。
單次調用 poll()
方法可以返回的記錄數量。
這兩個參數分別指定 TCP socket 接收和發送數據包緩衝區的大小,-1 表明使用操做系統的默認值。
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南