>舒適提示:整個 Kafka 專欄基於 kafka-2.2.1 版本。java
根據 KafkaConsumer 類上的註釋上來看 KafkaConsumer 具備以下特徵:算法
在 Kafka 中 KafkaConsumer 是線程不安全的。apache
2.2.1 版本的KafkaConsumer 兼容 kafka 0.10.0 和 0.11.0 等低版本。bootstrap
消息偏移量與消費偏移量(消息消費進度) Kafka 爲分區中的每一條消息維護一個偏移量,即消息偏移量。這個偏移量充當該分區內記錄的惟一標識符。消費偏移量(消息消費進度)存儲的是消費組當前的處理進度。消息消費進度的提交在 kafka 中能夠定時自動提交也能夠手動提交。手動提交能夠調用 ommitSync() 或 commitAsync 方法。安全
消費組 與 訂閱關係 多個消費這能夠同屬於一個消費組,消費組內的全部消費者共同消費主題下的全部消息。一個消費組能夠訂閱多個主題。服務器
隊列負載機制 既然同一個消費組內的消費者共同承擔主題下全部隊列的消費,那他們如何進行分工呢?默認狀況下采起平均分配,例如一個消費組有兩個消費者c一、c2,一個 topic 的分區數爲6,那 c1 會負責3個分區的消費,一樣 c2 會負責另外3個分區的分配。網絡
那若是其中一個消費者宕機或新增一個消費者,那隊列能動態調整嗎?session
答案是會從新再次平衡,例如若是新增一個消費者 c3,則c1,c2,c3都會負責2個分區的消息消費,分區重平衡會在後續文章中重點介紹。消費者也能夠經過 assign 方法手動指定分區,此時會禁用默認的自動分配機制。架構
消費者故障檢測機制 當經過 subscribe 方法訂閱某些主題時,此時該消費者還未真正加入到訂閱組,只有當 consumeer#poll 方法被調用後,而且會向 broker 定時發送心跳包,若是 broker 在 session.timeout.ms 時間內未收到心跳包,則 broker 會任務該消費者已宕機,會將其剔除,並觸發消費端的分區重平衡。併發
消費者也有可能遇到「活體鎖」的狀況,即它繼續發送心跳,但沒有任何進展。在這種狀況下,爲了防止消費者無限期地佔用它的分區,能夠使用max.poll.interval.ms 設置提供了一個活性檢測機制。基本上,若是您調用輪詢的頻率低於配置的最大間隔,那麼客戶機將主動離開組,以便另外一個消費者能夠接管它的分區。當這種狀況發生時,您可能會看到一個偏移提交失敗(由調用{@link #commitSync()}拋出的{@link CommitFailedException}表示)。
kafka 對 poll loop 行爲的控制參數 Kafka 提供了以下兩個參數來控制 poll 的行爲:
對於消息處理時間不可預測的狀況下上述兩個參數可能不夠用,那將如何是好呢?
一般的建議將消息拉取與消息消費分開,一個線程負責 poll 消息,處理這些消息使用另外的線程,這裏就須要手動提交消費進度。爲了控制消息拉起的過快,您可能會須要用到 Consumer#pause(Collection) 方法,暫時中止向該分區拉起消息。RocketMQ 的推模式就是採用了這種策略。若是你們有興趣的話,能夠從筆者所著的《RocketMQ技術內幕》一書中詳細瞭解。
public static void testConsumer1() { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072"); props.setProperty("group.id", "C_ODS_ORDERCONSUME_01"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<string, string> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("TOPIC_ORDER")); while (true) { ConsumerRecords<string, string> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string, string> record : records) { System.out.println("消息消費中"); System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
public static void testConsumer2() { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "false"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<string, string> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<consumerrecord<string, string>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<string, string> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<string, string> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { // insertIntoDb(buffer); // 省略處理邏輯 consumer.commitSync(); buffer.clear(); } } }
要認識 Kafka 的消費者,我的認爲最好的辦法就是從它的類圖着手,下面給出 Consumer 接口的類圖。
接下來對起重點方法進行一個初步的介紹,從下篇文章開始將對其進行詳細設計。
接下來筆者根據其構造函數,對一一介紹其核心屬性的含義,爲接下來說解其核心方法打下基礎。
Kafka Consumer 消費者就介紹到這裏了,從下篇文章開始將開始詳細介紹 Kafka 關於消息消費的方方面面。
做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。
</topicpartition,></topicpartition,></topicpartition,></topicpartition,></topicpartition,></topicpartition,></topicpartition,></topicpartition,></string,></string,></metricname,></topicpartition,></topicpartition,></k,></string,></string,></consumerrecord<string,></string,></string,></string,></string,>