請使用0.9之後的版本:apache
Properties props = new Properties(); props.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset","earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); try{ while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }finally{ consumer.close(); }
一、只須要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必須指定);bootstrap
二、用這些Properties構建consumer對象(KafkaConsumer還有其餘構造,能夠把序列化傳進去);數組
三、subscribe訂閱topic列表(能夠用正則訂閱Pattern.compile("kafka.*")session
使用正則必須指定一個listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 能夠重寫這個接口來實現 分區變動時的邏輯。若是設置了enable.auto.commit = true 就不用理會這個邏輯。fetch
四、而後循環poll消息(這裏的1000是超時設定,若是沒有不少數據,也就等一秒);code
五、處理消息(打印了offset key value 這裏寫處理邏輯)。server
六、關閉KafkaConsumer(能夠傳一個timeout值 等待秒數 默認是30)。對象
bootstrap.server(最好用主機名不用ip kafka內部用的主機名 除非本身配置了ip)接口
deserializer 反序列化consumer從broker端獲取的是字節數組,還原回對象類型。ip
默認有十幾種:StringDeserializer LongDeserializer DoubleDeserializer。。
也能夠自定義:定義serializer格式 建立自定義deserializer類實現Deserializer 接口 重寫邏輯
除了四個必傳的 bootstrap.server group.id key.deserializer value.deserializer
還有session.timeout.ms "coordinator檢測失敗的時間"
是檢測consumer掛掉的時間 爲了能夠及時的rebalance 默認是10秒 能夠設置更小的值避免消息延遲。
max.poll.interval.ms "consumer處理邏輯最大時間"
處理邏輯比較複雜的時候 能夠設置這個值 避免形成沒必要要的 rebalance ,由於兩次poll時間超過了這個參數,kafka認爲這個consumer已經跟不上了,會踢出組,並且不能提交offset,就會重複消費。默認是5分鐘。
auto.offset.reset "無位移或者位移越界時kafka的應對策略"
因此若是啓動了一個group從頭消費 成功提交位移後 重啓後仍是接着消費 這個參數無效
因此3個值的解釋是:
earliset 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從最先的位移消費
latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 none topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常
(注意kafka-0.10.1.X版本以前: auto.offset.reset 的值爲smallest,和,largest.(offest保存在zk中) 、
咱們這是說的是新版本:kafka-0.10.1.X版本以後: auto.offset.reset 的值更改成:earliest,latest,和none (offest保存在kafka的一個特殊的topic名爲:__consumer_offsets裏面))
enable.auto.commit 是否自動提交位移
true 自動提交 false須要用戶手動提交 有隻處理一次須要的 最近設置爲false本身控制。
fetch.max.bytes consumer單次獲取最大字節數
max.poll.records 單次poll返回的最大消息數
默認500條 若是消費很輕量 能夠適當提升這個值 增長消費速度。
hearbeat.interval.ms consumer其餘組員感知rabalance的時間
該值必須小於 session.timeout.ms 若是檢測到 consumer掛掉 也就根本沒法感知rabalance了
connections.max.idle.ms 按期關閉鏈接的時間
默認是9分鐘 能夠設置爲-1 永不關閉
更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算