Properties prop = new Properties(); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer"); prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { String key = record.key(); String value = record.value(); System.err.println(record.toString()); } }
kafka
消費者是以 組爲基本單位 進行消費的。消費的模型以下
java
1 個 topic
容許被多個 消費組
消費。再次強調,kafka
消費是以組爲單位。正則表達式
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");
以上這行代碼設置了消費組。算法
partition
分配topic
爲邏輯上的概念,partition
纔是物理上的概念。那麼看完這個以上的消費模型圖。你可能會很疑惑。當一個組下有多個消費者時,每一個消費者是如何消費的?安全
先說明:partition
的分配爲平均分配多線程
假設一:topic1
下面有 3 個分區。分別以下:p1 - p3。那麼 groupA
下的三個消費者消費的對應 partition
爲以下ide
instance1: p1 instance2: p2 instance3: p3
假設二:topic1
下面有 8 個分區。分別爲 p1 - p8。那麼 groupA
中每一個消費者分配到的 partition
就以下函數
instance1: p1,p2,p3 instance2: p4,p5,p6 instance3: p7,p8
partition
重分配假設三:topic1
下面有 8 個分區:P1 - P8。groupA
有三個消費者:c1,c2,c3。此時分配的 partition
以下fetch
c1: p1,p2,p3 c2: p4,p5,p6 c3: p7,p8
若是此時,又有一個新的消費者加入到 groupA
會發生什麼呢? partition
會被從新分配大數據
c1: p1,p2 c2: p3,p4 c3: p5,p6 c4: p7,p8
API
介紹void subscribe(Collection<String> topics); void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
從方法上看 kafka
容許一個消費者訂閱多個 topic
。spa
void subscribe(Pattern pattern); void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
入參 Pattern
則表示,可使用正則表達式匹配多個 topic
. 實例代碼以下
Pattern pattern = Pattern.compile("test?"); consumer.subscribe(pattern);
能夠訂閱主題,那麼天然也能夠取消訂閱主題
consumer.unsubscribe();
固然,也能夠直接獲取到消費組訂閱的主題
Set<String> topics = consumer.subscription();
一個主題下面有多個 partition
, 那麼是否能夠指定要消費的隊列呢?答案是能夠的
TopicPartition p1 = new TopicPartition("test1", 0); TopicPartition p2 = new TopicPartition("test1", 1); consumer.assign(Arrays.asList(p1, p2));
不過須要注意的是,若是指定了消費的分區,那麼是消費者是沒法自動 rebanlance
的。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
從消費者端的這行代碼,咱們能夠看出,kafka
消息消費採用的是拉取模式。當未拉取到消息時,會阻塞線程。
poll
方法返回的 ConsumerRecords
實現 Iterable
接口,是 ConsumerRecord
的迭代器。ConsumerRecord
屬性相對簡單
public class ConsumerRecord<K, V> { private final String topic; // 主題 private final int partition; // 分區 private final long offset; // 消息所屬分區偏移量 private final long timestamp; // 時間戳 private final TimestampType timestampType; // 二者類型,消息建立時間戳及消息追加到日誌的時間戳 private final int serializedKeySize; private final int serializedValueSize; private final Headers headers; // 發送的header private final K key; // 發送的 key private final V value; // 發送的內容 private volatile Long checksum; // CRC32 校驗值 }
對於分區而言,消息會有一個惟一 offset
, 表示消息在分區中的位置,稱之爲 偏移量
。對於消息消費而言,也有消費進度的 offset
,稱之爲 位移
。kafka
將消息的消費進度存儲在 kafka
內部主題 __onsumer_offset
中。kafka
中默認每隔 5s
保存消息的消費進度。可經過 auto.commit.interval.ms
進行配置。
kafka
提供手動提交的 API
,下面演示一下。
Properties prop = new Properties(); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer"); prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo"); prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println("消費:" + record.toString()); } consumer.commitSync(); }
須要注意的是,須要將 enable.auto.commit
設置爲 true
.
kafka
設置 新消費組 從哪一個位置開始消費的配置爲:auto.offset.reset
該配置有如下 3 個配置項
latest
(默認配置)默認從最新的位置,開始消費。
earliest
從最先的位置開始消費。當配置爲該參數時,kafka
會打印以下日誌:Resetting offset for partition
none
當消費組,沒有對應消費進度時,會直接拋 NoOffsetForPartitionException
異常
kafka
還提供了 seek(TopicPartition partition, long offset)
方法,容許新的消費者,設置從哪一個位置開始消費。
// 由於分配 分區的動做,發生在 pool 中,所以在設置消費偏移量時,須要先拉取消息 Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0) { consumer.poll(Duration.ofMillis(100)); assignment = consumer.assignment(); } for (TopicPartition tp : assignment) { consumer.seek(tp, 50); } while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.err.println("消費:" + record.toString()); } }
更多狀況下,咱們可能會指定消費組從指定的時間點開始消費
Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); for (TopicPartition tp : assignment) { // 指定從一天前開始消費 timestampToSearch.put(tp, System.currentTimeMillis() - 1 * 24 * 3600 * 1000); } Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch); for (TopicPartition tp : assignment) { OffsetAndTimestamp timestamp = offsets.get(tp); if (null != timestamp) { consumer.seek(tp, timestamp.offset()); } }
在分區再均衡期間,消費組內的消費者是沒法讀取消息的。而且若是以前的消費者沒有及時提交消費進度,那麼會形成重複消費。
kafka
在 subscribe
的時候,提供了回調函數,容許咱們在觸發再均衡時,作控制
void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
看一下 ConsumerRebalanceListener
定義的接口
// 再均衡開始以前和消費者中止讀取消息以前被調用,可利用該會掉,提交消費位移 void onPartitionsRevoked(Collection<TopicPartition> partitions); // 從新分區後,消費者開始讀取消息以前被調用 void onPartitionsAssigned(Collection<TopicPartition> partitions);
下面演示,如何在再均衡以前,提交消費偏移
consumer.subscribe(Collections.singleton("test"), new ConsumerRebalanceListener() { // 在再均衡開始以前和消費者中止讀取消息以前被調用,可利用該會掉,提交消費位移 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 提交消費偏移 consumer.commitSync(); } // 從新分區後,消費者開始讀取消息以前被調用 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } });
消費者,容許在 消費以前,消費偏移提交以後,關閉以前,進行控制,多個攔截器則組成攔截器鏈, 且多個攔截器以前須要用 ',' 號隔開。
先看攔截器定義的接口
public interface ConsumerInterceptor<K, V> extends Configurable { // 消息消費以前 ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records); // 提交以後調用 void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); // 關閉以前調用 void close(); }
Properties prop = new Properties(); prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName() + "," + MyConsumerInterceptor2.class.getName());
fetch.min.bytes
默認 1B
,poll
時,拉取的最小數據量。
fetch.max.bytes
默認 5242880B
,50MB,poll
時,拉取的最大數據量。
fetch.max.wait.ms
默認 500ms
,若是 kafka
一直沒有觸發 poll
動做,那麼最多等待 fetch.max.wait.ms
。
max.partition.fetch.bytes
默認 1048576B
,1MB,分區拉取時的最大數據量
max.poll.records
默認 500條
,拉取的最大消息條數
connections.max.idle.ms
默認 540000ms
, 9分鐘,多久關閉閒置的鏈接
receive.buffer.bytes
默認 65536B
,64KB
,SOCKET
接受消息的緩衝區(SO_RECBUF
)
request.timeout.ms
默認 30000ms
,配置 consumer
等待請求響應的最長時間
metadata.max.age.ms
默認 300000ms
,5 分鐘,配置元數據過時時間。元數據在限定的時間內,沒有更新,會被強制更新
reconnect.backoff.ms
默認 50ms
,配置嘗試鏈接指定主機以前的等待時間,避免頻繁鏈接主機
retry.backoff.ms
默認 100ms
,發送失敗時,2次的間隔時間
kafka
消費以組爲單位,且容許一個消費組訂閱多個 topic
partition
重分配算法,爲平均算法KafkaConsumer
爲線程不安全。所以 poll()
只有當前線程在拉取消息。kafka
要實現多線程拉取相對麻煩kafka
消費者端,提供的 API
很是靈活,容許從指定的位置消費,容許手動提交某個分區的消費偏移kafka
提供消費者攔截器鏈,容許在 消費以前,提交消費偏移以後 控制。RocketMQ
建議 1 個消費組只消費一個 topic
, 且在實際開發中,若是消費者訂閱多個 topic
會沒法正常工做。kafka
中 1 個消費者能夠訂閱多個 topic
。RocketMQ
能夠確保消費時,消息不丟失,kafka
沒法保證。RocketMQ
在消費者端,實現了多線程消費,kafka
則沒有kafka
默認每 5s
持久化消費進度,RocketMQ
也是。不過 RocketMQ
會提交偏移量最小的消息。好比,線程 A 消費了 20 的消息。線程 B 消費了 10 的消息。當線程 A 提交消費進度的時候,會提交 10,而不會提交20。這也是 RocketMQ
能夠確保消息消費時不丟的緣由。RocketMQ
發生 rebalance
,即 kafka
的再分配。默認和 kafka
一致,採用的是 平均分配算法
。不過 RocketMQ
容許自定義再分配算法,且提供了豐富算法支持。RocketMQ
與 kafka
一致,都存在重複消費問題。API
來看,kafka
客戶端會比 RocketMQ
更加靈活。kafka
設置 新的消費組 從哪一個位置開始消費,沒有額外的條件限制;RocketMQ
只有當舊消息堆積很是多時,纔會有效。