本節主要介紹Kafka從一些topic消費數據的示例。html
使用新版的Consumer,須要先在工程中添加kafka-clients依賴,添加的配置信息以下:java
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>
Consumer的建立過程與以前舊的API建立方法同樣,一個Consumer必備的最小配置項以下所示:apache
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // 經過其中的一臺broker來找到group的coordinator,並不須要列出全部的broker props.put("group.id", "consumer-tutorial"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // consumer實例
Consumer的其餘配置項能夠參考New Consumer Configs,除了上面的這幾個配置以外,其餘的幾個比較經常使用的配置信息以下表所示bootstrap
參數 | 默認值 | 說明 |
---|---|---|
heartbeat.interval.ms | 3000 | 當使用Kafka的group管理機制時,consumer向coordinator發送心跳的間隔,這個值要比session.timeout.ms小,最好不要超過session.timeout.ms的\frac{1}{3} |
session.timeout.ms | 30000 | 當使用Kafka的group管理機制時用於檢測到consumer失敗的時長,若是在這個時間內沒有收到consumer的心跳信息,就認爲Consumer失敗了 |
auto.offset.reset | latest | group首次開始消費數據時的offset,有如下幾個值能夠選擇:earliest、latest、none、anything else. |
enable.auto.commit | true | 設置爲true時,Consumer的offset將會被週期性地自動commit |
auto.commit.interval.ms | 5000 | Consumer的offset自動commit時的週期 |
本例使用Kafka的自動commit機制,每隔一段時間(可經過auto.commit.interval.ms
來設置)就會自動進行commit offset。緩存
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group"); props.put("auto.offset.reset", "earliest"); props.put("enable.auto.commit", "true"); // 自動commit props.put("auto.commit.interval.ms", "1000"); // 自動commit的間隔 props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test1", "test2")); // 可消費多個topic,組成一個list while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
這裏有幾點須要注意:session
- 在使用自動commit時,系統是保證at least once,由於offset是在這些messages被應用處理成功後才進行commit的;
- subscribe方法須要傳入全部topic的列表,一個group所消費的topic是不能動態增長的,可是能夠在任什麼時候間改變這個列表,它會把前面的設置覆蓋掉;
- poll中的參數就是設置一個時長,Consumer在進行拉取數據進行block的最大時間限制;
須要注意的:併發
group.id :必須設置
auto.offset.reset:若是想得到消費者啓動前生產者生產的消息,則必須設置爲earliest;若是隻須要得到消費者啓動後生產者生產的消息,則不須要設置該項
enable.auto.commit(默認值爲true):若是使用手動commit offset則須要設置爲false,並再適當的地方調用consumer.commitSync()
,不然每次啓動消費折後都會從頭開始消費信息(在auto.offset.reset=earliest的狀況下);app
手動控制commit異步
要進行手動commit,須要在配置文件中將enable.auto.commit設置爲false,來禁止自動commit,本例以手動同步commit爲例ide
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group"); props.put("enable.auto.commit", "false"); //關閉自動commit props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test1", "test2")); final int minBatchSize = 10; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); int i = 0; for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); i++; } if (i >= minBatchSize) { consumer.commitSync(); //批量完成寫入後,手工同步commit offset } }
消費者手動設置分區
Kafka在進行消費數據時,能夠指定消費某個topic的某個partition,這種使用狀況比較特殊,並不須要coordinator進行rebalance,也就意味着這種模式雖然須要設置group id,可是它跟前面的group的機制並不同,它與舊的Consumer中的Simple Consumer類似,這是Kafka在新的Consumer API中對這種狀況的支持。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group"); props.put("enable.auto.commit", "false"); //關閉自動commit props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); TopicPartition partition0 = new TopicPartition("test", 0); TopicPartition partition1 = new TopicPartition("test", 2); consumer.assign(Arrays.asList(partition0, partition1)); final int minBatchSize = 10; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); int i = 0; for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); i++; } if (i >= minBatchSize) { consumer.commitSync(); //批量完成寫入後,手工sync offset } }
注意:
KafkaStream是在Kafka 0.10.0版中新提出的內容,Kafka官方也說了設計這個feature的緣由——爲了簡單,以前在流處理方面,通常狀況下都會使用Kafka做爲消息隊列,而後再搭建一個流處理環境作流處理,而如今咱們能夠直接在Kafka中進行流處理,不須要再搭建另一個環境(加了這個feature以後會使得Kafka變得更加複雜,不過官網說,在使用時咱們只須要在工程中添加一個外部依賴包便可使用這個功能)。
須要在pom文件中添加以下依賴,KafkaStream在實際運行時也是依賴這個外部的jar包運行。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.0</version> </dependency>
KafkaStream使用的一個基本初始化部分以下所示(代碼來自Javadoc)
Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start();
完整的配置選項以下表所示,也能夠參考Streams Configs
名稱 | 描述 | 類型 | 默認值 |
---|---|---|---|
application.id | 流處理應用的標識,對同一個應用須要一致,由於它是做爲消費的group_id的 | string | |
bootstrap.servers | host1:port1,host2:port2 這樣的列表,是用來發現全部Kafka節點的種子,所以不須要配上全部的Kafka節點 | list | |
client.id | 應用的一個客戶端的邏輯名稱,設定後能夠區分是哪一個客戶端在請求 | string | 「」 |
zookeeper.connect | zookeeper | string | 「」 |
key.serde | 鍵的序列化/反序列化類 | class | org.apache.kafka.common.serialization.Serdes$ByteArraySerde |
partition.grouper | 用於分區組織的類,須要實現PartitionGrouper接口 | class | org.apache.kafka.streams.processor.DefaultPartitionGrouper |
replication.factor | 流處理應用會建立change log topic和repartition topic用於管理內部狀態,這個參數設定這些topic的副本數 | int | 1 |
state.dir | 狀態倉庫的存儲路徑 | string | /tmp/kafka-streams |
timestamp.extractor | 時間戳抽取類,須要實現TimestampExtractor接口 | class | org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor |
value.serde | 值的序列化/反序列化類 | class | org.apache.kafka.common.serialization.Serdes$ByteArraySerde |
buffered.records.per.partition | 每一個分區緩存的最大記錄數 | int | 1000 |
commit.interval.ms | 存儲處理器當前位置的間隔毫秒數 | long | 30000 |
metric.reporters | 用於性能報告的類列表。須要實現MetricReporter接口。JmxReporter會永遠開啓不須要指定 | list | [] |
metric.num.samples | 計算性能須要的採樣數 | int | 2 |
metric.sample.window.ms | 性能採樣的時間間隔 | long | 30000 |
num.standby.replicas | 每一個任務的後備副本數 | int | 0 |
num.stream.threads | 執行流處理的線程數 | int | 1 |
poll.ms | 等待輸入的毫秒數 | long | 100 |
state.cleanup.delay.ms | 一個分區遷移後,在刪除狀態前等待的毫秒數 | long | 60000 |
這是個將一個topic的事件進行過濾的示例,處理很簡單,下面給出了這個例子的完整代碼。
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Predicate; import java.util.Properties; /** * Created by matt on 16/7/22. */ public class EventFilter { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-filter"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.232.70:9091,10.4.232.77:2181"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream("test"); source.filter(new Predicate<String, String>() { @Override public boolean test(String key, String value) { return (value.split(",")[3]).equals("food"); } }).to("food"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); // usually the stream application would be running forever, // in this example we just let it run for some time and stop since the input data is finite. Thread.sleep(5000L); streams.close(); }
1. 若是consumer比partition多,是浪費,由於kafka的設計是在一個partition上是不容許併發的,因此consumer數不要大於partition數
2. 若是consumer比partition少,一個consumer會對應於多個partitions,這裏主要合理分配consumer數和partition數,不然會致使partition裏面的數據被取的不均勻。最好partiton數目是consumer數目的整數倍,因此partition數目很重要,好比取24,就很容易設定consumer數目
3. 若是consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不一樣
4. 增減consumer,broker,partition會致使rebalance,因此rebalance後consumer對應的partition會發生變化
5. High-level接口中獲取不到數據的時候是會block的