Kafka單線程Consumer及參數詳解

請使用0.9之後的版本:apache

示例代碼

Properties props = new Properties();
	    props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
	    props.put("group.id", "test");
	    props.put("enable.auto.commit", "true");
	    props.put("auto.commit.interval.ms", "1000");
	    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	    
	    props.put("auto.offset.reset","earliest");
	    
	    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
	    consumer.subscribe(Arrays.asList("foo", "bar"));
	  try{  
        while (true) {
	        ConsumerRecords<String, String> records = consumer.poll(1000);
	        for (ConsumerRecord<String, String> record : records) {
	        	System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
	        }
	     }
        }finally{
          consumer.close();
        }

一、只須要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);bootstrap

二、用這些Properties構建consumer對象(KafkaConsumer還有其餘構造,能夠把序列化傳進去);數組

三、subscribe訂閱topic列表(能夠用正則訂閱Pattern.compile("kafka.*")session

使用正則必須指定一個listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 能夠重寫這個接口來實現 分區變動時的邏輯。若是設置了enable.auto.commit = true 就不用理會這個邏輯。fetch

四、而後循環poll消息(這裏的1000是超時設定,若是沒有不少數據,也就等一秒);code

五、處理消息(打印了offset key value 這裏寫處理邏輯)。server

六、關閉KafkaConsumer(能夠傳一個timeout值 等待秒數 默認是30)。對象

參數詳解

bootstrap.server(最好用主機名不用ip kafka內部用的主機名 除非本身配置了ip)接口

deserializer 反序列化consumer從broker端獲取的是字節數組,還原回對象類型。ip

默認有十幾種:StringDeserializer LongDeserializer DoubleDeserializer。。

也能夠自定義:定義serializer格式 建立自定義deserializer類實現Deserializer 接口 重寫邏輯

除了四個必傳的 bootstrap.server group.id key.deserializer value.deserializer

還有session.timeout.ms "coordinator檢測失敗的時間"

是檢測consumer掛掉的時間 爲了能夠及時的rebalance 默認是10秒 能夠設置更小的值避免消息延遲。

max.poll.interval.ms "consumer處理邏輯最大時間"

處理邏輯比較複雜的時候 能夠設置這個值 避免形成沒必要要的 rebalance ,由於兩次poll時間超過了這個參數,kafka認爲這個consumer已經跟不上了,會踢出組,並且不能提交offset,就會重複消費。默認是5分鐘。

auto.offset.reset "無位移或者位移越界時kafka的應對策略"

因此若是啓動了一個group從頭消費 成功提交位移後 重啓後仍是接着消費 這個參數無效

因此3個值的解釋是:

earliset 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從最先的位移消費

latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 none topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

(注意kafka-0.10.1.X版本以前: auto.offset.reset 的值爲smallest,和,largest.(offest保存在zk中) 、

咱們這是說的是新版本:kafka-0.10.1.X版本以後: auto.offset.reset 的值更改成:earliest,latest,和none (offest保存在kafka的一個特殊的topic名爲:__consumer_offsets裏面))

enable.auto.commit 是否自動提交位移

true 自動提交 false須要用戶手動提交 有隻處理一次須要的 最近設置爲false本身控制。

fetch.max.bytes consumer單次獲取最大字節數

max.poll.records 單次poll返回的最大消息數

默認500條 若是消費很輕量 能夠適當提升這個值 增長消費速度。

hearbeat.interval.ms consumer其餘組員感知rabalance的時間

該值必須小於 session.timeout.ms 若是檢測到 consumer掛掉 也就根本沒法感知rabalance了

connections.max.idle.ms 按期關閉鏈接的時間

默認是9分鐘 能夠設置爲-1 永不關閉

更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算

相關文章
相關標籤/搜索