4-kafka0.10 新消費者使用

Consumer Client

本節主要介紹Kafka從一些topic消費數據的示例。html

配置

使用新版的Consumer,須要先在工程中添加kafka-clients依賴,添加的配置信息以下:java

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.10.0.0</version>
</dependency>

初始化與配置

Consumer的建立過程與以前舊的API建立方法同樣,一個Consumer必備的最小配置項以下所示:apache

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // 經過其中的一臺broker來找到group的coordinator,並不須要列出全部的broker
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // consumer實例

Consumer的其餘配置項能夠參考New Consumer Configs,除了上面的這幾個配置以外,其餘的幾個比較經常使用的配置信息以下表所示bootstrap

參數 默認值 說明
heartbeat.interval.ms 3000 當使用Kafka的group管理機制時,consumer向coordinator發送心跳的間隔,這個值要比session.timeout.ms小,最好不要超過session.timeout.ms的\frac{1}{3}
session.timeout.ms 30000 當使用Kafka的group管理機制時用於檢測到consumer失敗的時長,若是在這個時間內沒有收到consumer的心跳信息,就認爲Consumer失敗了
auto.offset.reset latest group首次開始消費數據時的offset,有如下幾個值能夠選擇:earliest、latest、none、anything else.
enable.auto.commit true 設置爲true時,Consumer的offset將會被週期性地自動commit
auto.commit.interval.ms 5000 Consumer的offset自動commit時的週期

Consumer Auto Offset Commit

本例使用Kafka的自動commit機制,每隔一段時間(可經過auto.commit.interval.ms來設置)就會自動進行commit offset。緩存

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true"); // 自動commit
props.put("auto.commit.interval.ms", "1000"); // 自動commit的間隔
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test1", "test2")); // 可消費多個topic,組成一個list
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100);
   for (ConsumerRecord<String, String> record : records) {
       System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
       try {
           Thread.sleep(1000);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}

這裏有幾點須要注意:session

  1. 在使用自動commit時,系統是保證at least once,由於offset是在這些messages被應用處理成功後才進行commit的;
  2. subscribe方法須要傳入全部topic的列表,一個group所消費的topic是不能動態增長的,可是能夠在任什麼時候間改變這個列表,它會把前面的設置覆蓋掉;
  3. poll中的參數就是設置一個時長,Consumer在進行拉取數據進行block的最大時間限制;

須要注意的:併發

group.id :必須設置 
auto.offset.reset:若是想得到消費者啓動前生產者生產的消息,則必須設置爲earliest;若是隻須要得到消費者啓動後生產者生產的消息,則不須要設置該項 
enable.auto.commit(默認值爲true):若是使用手動commit offset則須要設置爲false,並再適當的地方調用consumer.commitSync(),不然每次啓動消費折後都會從頭開始消費信息(在auto.offset.reset=earliest的狀況下);app

Consumer Manual Offset Control

手動控制commit異步

要進行手動commit,須要在配置文件中將enable.auto.commit設置爲false,來禁止自動commit,本例以手動同步commit爲例ide

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group");
props.put("enable.auto.commit", "false"); //關閉自動commit
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test1", "test2"));
final int minBatchSize = 10;
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100);
   int i = 0;
   for (ConsumerRecord<String, String> record : records) {
       System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
       i++;
   }
   if (i >= minBatchSize) {
       consumer.commitSync(); //批量完成寫入後,手工同步commit offset
   }
}
  1. 在本例中,咱們調用了commitSync方法,這是同步commit的方式,同時Kafka還提供了commitAsync方法,它們的區別是:使用同步提交時,consumer會進行block知道commit的結果返回,這樣的話若是commit失敗就能夠今早地發現錯誤,而當使用異步commit時,commit的結果還未返回,Consumer就會開始拉取下一批的數據,可是使用異步commit能夠系統的吞吐量,具體使用哪一種方式須要開發者本身權衡;
  2. 本例中的實現依然是保證at least once,可是若是每次拉取到數據以後,就進行commit,最後再處理數據,就能夠保證at last once。

 

Consumer Manual Partition Assign

消費者手動設置分區

Kafka在進行消費數據時,能夠指定消費某個topic的某個partition,這種使用狀況比較特殊,並不須要coordinator進行rebalance,也就意味着這種模式雖然須要設置group id,可是它跟前面的group的機制並不同,它與舊的Consumer中的Simple Consumer類似,這是Kafka在新的Consumer API中對這種狀況的支持。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group");
props.put("enable.auto.commit", "false"); //關閉自動commit
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);
TopicPartition partition0 = new TopicPartition("test", 0);
TopicPartition partition1 = new TopicPartition("test", 2);
consumer.assign(Arrays.asList(partition0, partition1));

final int minBatchSize = 10;
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100);
   int i = 0;
   for (ConsumerRecord<String, String> record : records) {
       System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
       i++;
   }
   if (i >= minBatchSize) {
       consumer.commitSync(); //批量完成寫入後,手工sync offset
   }
}

注意:

  1. 與前面的subscribe方法同樣,在調用assign方法時,須要傳入這個Consumer要消費的全部TopicPartition的列表;
  2. 無論對於simple consumer仍是consumer group,全部offset的commit都必須通過group coordinator;
  3. 在進行commit時,必須設置一個合適的group.id,避免與其餘的group產生衝突。若是一個simple consumer試圖使用一個與一個active group相同的id進行commit offset,coordinator將會拒絕這個commit請求,會返回一個CommitFailedException異常,可是,若是一個simple consumer與另外一個simple consumer使用同一個id,系統就不會報任何錯誤。

 

KafkaStream使用

KafkaStream是在Kafka 0.10.0版中新提出的內容,Kafka官方也說了設計這個feature的緣由——爲了簡單,以前在流處理方面,通常狀況下都會使用Kafka做爲消息隊列,而後再搭建一個流處理環境作流處理,而如今咱們能夠直接在Kafka中進行流處理,不須要再搭建另一個環境(加了這個feature以後會使得Kafka變得更加複雜,不過官網說,在使用時咱們只須要在工程中添加一個外部依賴包便可使用這個功能)。

 

配置

須要在pom文件中添加以下依賴,KafkaStream在實際運行時也是依賴這個外部的jar包運行。

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-streams</artifactId>
   <version>0.10.0.0</version>
</dependency>

初始化與配置

KafkaStream使用的一個基本初始化部分以下所示(代碼來自Javadoc

Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
StreamsConfig config = new StreamsConfig(props);

KStreamBuilder builder = new KStreamBuilder();
builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

完整的配置選項以下表所示,也能夠參考Streams Configs

 

名稱 描述 類型 默認值
application.id 流處理應用的標識,對同一個應用須要一致,由於它是做爲消費的group_id的 string  
bootstrap.servers host1:port1,host2:port2 這樣的列表,是用來發現全部Kafka節點的種子,所以不須要配上全部的Kafka節點 list  
client.id 應用的一個客戶端的邏輯名稱,設定後能夠區分是哪一個客戶端在請求 string 「」
zookeeper.connect zookeeper string 「」
key.serde 鍵的序列化/反序列化類 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
partition.grouper 用於分區組織的類,須要實現PartitionGrouper接口 class org.apache.kafka.streams.processor.DefaultPartitionGrouper
replication.factor 流處理應用會建立change log topic和repartition topic用於管理內部狀態,這個參數設定這些topic的副本數 int 1
state.dir 狀態倉庫的存儲路徑 string /tmp/kafka-streams
timestamp.extractor 時間戳抽取類,須要實現TimestampExtractor接口 class org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
value.serde 值的序列化/反序列化類 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
buffered.records.per.partition 每一個分區緩存的最大記錄數 int 1000
commit.interval.ms 存儲處理器當前位置的間隔毫秒數 long 30000
metric.reporters 用於性能報告的類列表。須要實現MetricReporter接口。JmxReporter會永遠開啓不須要指定 list []
metric.num.samples 計算性能須要的採樣數 int 2
metric.sample.window.ms 性能採樣的時間間隔 long 30000
num.standby.replicas 每一個任務的後備副本數 int 0
num.stream.threads 執行流處理的線程數 int 1
poll.ms 等待輸入的毫秒數 long 100
state.cleanup.delay.ms 一個分區遷移後,在刪除狀態前等待的毫秒數 long 60000

小示例

這是個將一個topic的事件進行過濾的示例,處理很簡單,下面給出了這個例子的完整代碼。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Predicate;

import java.util.Properties;

/**
* Created by matt on 16/7/22.
*/
public class EventFilter {
   public static void main(String[] args) throws Exception {
       Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-filter");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.232.70:9091,10.4.232.77:2181");
       props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
       props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

       // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

       KStreamBuilder builder = new KStreamBuilder();

       KStream<String, String> source = builder.stream("test");

       source.filter(new Predicate<String, String>() {
           @Override
           public boolean test(String key, String value) {
               return (value.split(",")[3]).equals("food");
           }
       }).to("food");

       KafkaStreams streams = new KafkaStreams(builder, props);
       streams.start();

       // usually the stream application would be running forever,
       // in this example we just let it run for some time and stop since the input data is finite.
       Thread.sleep(5000L);

       streams.close();
   }

官方對於consumer與partition的建議

1. 若是consumer比partition多,是浪費,由於kafka的設計是在一個partition上是不容許併發的,因此consumer數不要大於partition數

2. 若是consumer比partition少,一個consumer會對應於多個partitions,這裏主要合理分配consumer數和partition數,不然會致使partition裏面的數據被取的不均勻。最好partiton數目是consumer數目的整數倍,因此partition數目很重要,好比取24,就很容易設定consumer數目

3. 若是consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不一樣

4. 增減consumer,broker,partition會致使rebalance,因此rebalance後consumer對應的partition會發生變化

5. High-level接口中獲取不到數據的時候是會block的

相關文章
相關標籤/搜索