kafka consumer 配置詳解

一、Consumer Group 與 topic 訂閱

每一個Consumer 進程都會劃歸到一個邏輯的Consumer Group中,邏輯的訂閱者是Consumer Group。因此一條message能夠被多個訂閱message 所在的topic的每個Consumer Group,也就好像是這條message被廣播到每一個Consumer Group同樣。而每一個Consumer Group中,相似於一個Queue(JMS中的Queue)的概念差很少,即一條消息只會被Consumer Group中的一個Consumer消費。apache

 

1.1 Consumer 與 partition

    其實上面所說的訂閱關係還不夠明確,其實topic中的partition被分配到某個consumer上,也就是某個consumer訂閱了某個partition。 再重複一下:consumer訂閱的是partition,而不是message。因此在同一時間點上,訂閱到同一個partition的consumer必然屬於不一樣的Consumer Group。bootstrap

 

    在官方網站上,給出了這樣一張圖:緩存

 

一個kafka cluster中的某個topic,有4個partition。有兩個consumer group (A and B)訂閱了該topic。 Consumer Group A有2個partition:p0、p1,Consumer Group B有4個partition:c3,c4,c5,c6。通過分區分配後,consumer與partition的訂閱關係以下:安全

複製代碼
Topic 中的4partitionConsumer Group A中的分配狀況以下: C1 訂閱p0,p3 C2 訂閱p1,p2 Topic 中的4partitionConsumer Group B中的分配狀況以下: C3 訂閱p0 C4 訂閱p3 C5 訂閱p1 C6 訂閱p2
複製代碼

 

 另外要知道的是,partition分配的工做實際上是在consumer leader中完成的。網絡

1.2 Consumer 與Consumer Group

Consumer Group與Consumer的關係是動態維護的:session

當一個Consumer 進程掛掉 或者是卡住時,該consumer所訂閱的partition會被從新分配到該group內的其它的consumer上。當一個consumer加入到一個consumer group中時,一樣會從其它的consumer中分配出一個或者多個partition 到這個新加入的consumer。多線程

    當啓動一個Consumer時,會指定它要加入的group,使用的是配置項:group.id。fetch

爲了維持Consumer 與 Consumer Group的關係,須要Consumer週期性的發送heartbeat到coordinator(協調者,在早期版本,以zookeeper做爲協調者。後期版本則以某個broker做爲協調者)。當Consumer因爲某種緣由不能發Heartbeat到coordinator時,而且時間超過session.timeout.ms時,就會認爲該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。而這個過程,被稱爲rebalance。網站

 

那麼如今有這樣一個問題:若是一個consumer 進程一直在週期性的發送heartbeat,可是它就是不消費消息,這種狀態稱爲livelock狀態。那麼在這種狀態下,它所訂閱的partition不消息是否就一直不能被消費呢?spa

 

1.3 Coordinator

    Coordinator 協調者,協調consumer、broker。早期版本中Coordinator,使用zookeeper實現,可是這樣作,rebalance的負擔過重。爲了解決scalable的問題,再也不使用zookeeper,而是讓每一個broker來負責一些group的管理,這樣consumer就徹底再也不依賴zookeeper了。

 

1.3.1 Consumer鏈接到coordinator

    從Consumer的實現來看,在執行poll或者是join group以前,都要保證已鏈接到Coordinator。鏈接到coordinator的過程是:

    1)鏈接到最後一次鏈接的broker(若是是剛啓動的consumer,則要根據配置中的borker)。它會響應一個包含coordinator信息(host, port等)的response。

    2)鏈接到coordinator。

 

1.4 Consumer Group Management

    Consumer Group 管理中,也是須要coordinator的參與。一個Consumer要join到一個group中,或者一個consumer退出時,都要進行rebalance。進行rebalance的流程是:

1)會給一個coordinator發起Join請求(請求中要包括本身的一些元數據,例如本身感興趣的topics)

2)Coordinator 根據這些consumer的join請求,選擇出一個leader,並通知給各個consumer。這裏的leader是consumer group 內的leader,是由某個consumer擔任,不要與partition的leader混淆。

3)Consumer leader 根據這些consumer的metadata,從新爲每一個consumer member從新分配partition。分配完畢經過coordinator把最新分配狀況同步給每一個consumer。

4)Consumer拿到最新的分配後,繼續工做。

 

 

二、Consumer Fetch Message

   

在Kafka partition中,每一個消息有一個惟一標識,即partition內的offset。每一個consumer group中的訂閱到某個partition的consumer在從partition中讀取數據時,是依次讀取的。

   

    上圖中,Consumer A、B分屬於不用的Consumer Group。Consumer B讀取到offset =11,Consumer A讀取到offset=9 。這個值表示Consumer Group中的某個Consumer 在下次讀取該partition時會從哪一個offset的 message開始讀取,即 Consumer Group A 中的Consumer下次會從offset = 9 的message 讀取, Consumer Group B 中的Consumer下次會從offset = 11 的message 讀取。

    這裏並無說是Consumer A 下次會從offset = 9 的message讀取,緣由是Consumer A可能會退出Group ,而後Group A 進行rebalance,即從新分配分區。

 

2.1 poll 方法

 

Consumer讀取partition中的數據是經過調用發起一個fetch請求來執行的。而從KafkaConsumer來看,它有一個poll方法。可是這個poll方法只是可能會發起fetch請求。緣由是:Consumer每次發起fetch請求時,讀取到的數據是有限制的,經過配置項max.partition.fetch.bytes來限制的。而在執行poll方法時,會根據配置項個max.poll.records來限制一次最多pool多少個record。

那麼就可能出現這樣的狀況: 在知足max.partition.fetch.bytes限制的狀況下,假如fetch到了100個record,放到本地緩存後,因爲max.poll.records限制每次只能poll出15個record。那麼KafkaConsumer就須要執行7次才能將這一次經過網絡發起的fetch請求所fetch到的這100個record消費完畢。其中前6次是每次pool中15個record,最後一次是poll出10個record。

 

    在consumer中,還有另一個配置項:max.poll.interval.ms ,它表示最大的poll數據間隔,若是超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認爲該consumer處於 livelock狀態。就會將該consumer退出consumer group。因此爲了避免使Consumer 本身被退出,Consumer 應該不停的發起poll(timeout)操做。而這個動做 KafkaConsumer Client是不會幫咱們作的,這就須要本身在程序中不停的調用poll方法了。

 

2.2 commit offset

    當一個consumer因某種緣由退出Group時,進行從新分配partition後,同一group中的另外一個consumer在讀取該partition時,怎麼可以知道上一個consumer該從哪一個offset的message讀取呢?也是是如何保證同一個group內的consumer不重複消費消息呢?上面說了一次走網絡的fetch請求會拉取到必定量的數據,可是這些數據尚未被消息完畢,Consumer就掛掉了,下一次進行數據fetch時,是否會從上次讀到的數據開始讀取,而致使Consumer消費的數據丟失嗎?

    爲了作到這一點,當使用完poll從本地緩存拉取到數據以後,須要client調用commitSync方法(或者commitAsync方法)去commit 下一次該去讀取 哪個offset的message。

    而這個commit方法會經過走網絡的commit請求將offset在coordinator中保留,這樣就可以保證下一次讀取(不論進行了rebalance)時,既不會重複消費消息,也不會遺漏消息。

 

    對於offset的commit,Kafka Consumer Java Client支持兩種模式:由KafkaConsumer自動提交,或者是用戶經過調用commitSync、commitAsync方法的方式完成offset的提交。

 

自動提交的例子:

 

複製代碼
   Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
複製代碼

手動提交的例子: 

複製代碼
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } }
複製代碼

在手動提交時,須要注意的一點是:要提交的是下一次要讀取的offset,例如: 

複製代碼
try { while(running) { // 取得消息 ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); // 根據分區來遍歷數據: for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); // 數據處理 for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } // 取得當前讀取到的最後一條記錄的offset long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 提交offset,記得要 + 1 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }
複製代碼

 

 

三、Consumer的線程安全性

KafkaProducer是線程安全的,上一節已經瞭解到。但Consumer卻沒有設計成線程安全的。當用戶想要在在多線程環境下使用kafkaConsumer時,須要本身來保證synchronized。若是沒有這樣的保證,就會拋出ConcurrentModificatinException的。

當你想要關閉Consumer或者爲也其它的目的想要中斷Consumer的處理時,能夠調用consumer的wakeup方法。這個方法會拋出WakeupException。

 

例如:

複製代碼
public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(10000); // Handle new records  } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } }
複製代碼

  

四、Consumer Configuration

    在kafka 0.9+使用Java Consumer替代了老版本的scala Consumer。新版的配置以下:

·bootstrap.servers

在啓動consumer時配置的broker地址的。不須要將cluster中全部的broker都配置上,由於啓動後會自動的發現cluster全部的broker。

    它配置的格式是:host1:port1;host2:port2…

·key.descrializervalue.descrializer

Message record 的key, value的反序列化類。

·group.id

用於表示該consumer想要加入到哪一個group中。默認值是 「」。

·heartbeat.interval.ms

心跳間隔。心跳是在consumer與coordinator之間進行的。心跳是肯定consumer存活,加入或者退出group的有效手段。

    這個值必須設置的小於session.timeout.ms,由於:

當Consumer因爲某種緣由不能發Heartbeat到coordinator時,而且時間超過session.timeout.ms時,就會認爲該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。

    一般設置的值要低於session.timeout.ms的1/3。

    默認值是:3000 (3s)

·session.timeout.ms

Consumer session 過時時間。這個值必須設置在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。

其默認值是:10000 (10 s)

 

·enable.auto.commit

Consumer 在commit offset時有兩種模式:自動提交,手動提交。手動提交在前面已經說過。自動提交:是Kafka Consumer會在後臺週期性的去commit。

默認值是true。

·auto.commit.interval.ms

    自動提交間隔。範圍:[0,Integer.MAX],默認值是 5000 (5 s)

 

·auto.offset.reset

    這個配置項,是告訴Kafka Broker在發現kafka在沒有初始offset,或者當前的offset是一個不存在的值(若是一個record被刪除,就確定不存在了)時,該如何處理。它有4種處理方式:

1) earliest:自動重置到最先的offset。

2) latest:看上去重置到最晚的offset。

3) none:若是邊更早的offset也沒有的話,就拋出異常給consumer,告訴consumer在整個consumer group中都沒有發現有這樣的offset。

4) 若是不是上述3種,只拋出異常給consumer。

 

默認值是latest。

 

·connections.max.idle.ms

鏈接空閒超時時間。由於consumer只與broker有鏈接(coordinator也是一個broker),因此這個配置的是consumer到broker之間的。

默認值是:540000 (9 min)

 

·fetch.max.wait.ms

Fetch請求發給broker後,在broker中可能會被阻塞的(當topic中records的總size小於fetch.min.bytes時),此時這個fetch請求耗時就會比較長。這個配置就是來配置consumer最多等待response多久。

 

·fetch.min.bytes

當consumer向一個broker發起fetch請求時,broker返回的records的大小最小值。若是broker中數據量不夠的話會wait,直到數據大小知足這個條件。

取值範圍是:[0, Integer.Max],默認值是1。

默認值設置爲1的目的是:使得consumer的請求可以儘快的返回。

 

·fetch.max.bytes

一次fetch請求,從一個broker中取得的records最大大小。若是在從topic中第一個非空的partition取消息時,若是取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片狀況下,只會返回這一條record。

    broker、topic都會對producer發給它的message size作限制。因此在配置這值時,能夠參考broker的message.max.bytes 和 topic的max.message.bytes的配置。

取值範圍是:[0, Integer.Max],默認值是:52428800 (5 MB)

 

 

·max.partition.fetch.bytes

一次fetch請求,從一個partition中取得的records最大大小。若是在從topic中第一個非空的partition取消息時,若是取到的第一個record的大小就超過這個配置時,仍然會讀取這個record,也就是說在這片狀況下,只會返回這一條record。

    broker、topic都會對producer發給它的message size作限制。因此在配置這值時,能夠參考broker的message.max.bytes 和 topic的max.message.bytes的配置。

 

·max.poll.interval.ms

前面說過要求程序中不間斷的調用poll()。若是長時間沒有調用poll,且間隔超過這個值時,就會認爲這個consumer失敗了。

 

·max.poll.records

    Consumer每次調用poll()時取到的records的最大數。

 

·receive.buffer.byte

Consumer receiver buffer (SO_RCVBUF)的大小。這個值在建立Socket鏈接時會用到。

取值範圍是:[-1, Integer.MAX]。默認值是:65536 (64 KB)

若是值設置爲-1,則會使用操做系統默認的值。

 

·request.timeout.ms

請求發起後,並不必定會很快接收到響應信息。這個配置就是來配置請求超時時間的。默認值是:305000 (305 s)

 

·client.id

Consumer進程的標識。若是設置一我的爲可讀的值,跟蹤問題會比較方便。

 

·interceptor.classes

    用戶自定義interceptor。

·metadata.max.age.ms

Metadata數據的刷新間隔。即使沒有任何的partition訂閱關係變動也行執行。

範圍是:[0, Integer.MAX],默認值是:300000 (5 min)

相關文章
相關標籤/搜索