Apache Beam採坑系列——KafkaIO

最近在用Apache beam作流上的異常檢測,期間遇到了不少問題,可是發現網上相關的資料不多,基本只能本身啃文檔和瞎嘗試。
因此想把本身踩過的坑記錄下來,但願能對你們有所幫助。
其中若有錯漏,歡迎指出。java

KafkaIO

顧名思義,是從kafka上讀取數據到beam上或者將beam上的數據寫入到kafka中。官方文檔中沒有直接的教程,要在GitHub上的源碼中找到相關使用說明。
Github上的Kafka源碼git

這裏僅說明讀數據部分。
maven依賴示例github

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>...</version>
</dependency>

讀數據示例shell

PCollection<KV<String,String>> lines = //這裏kV後說明kafka中的key和value均爲String類型
                p.apply(KafkaIO.<String, String>read()
                .withBootstrapServers("hadoop1:9092, hadoop2:9092")//必需,設置kafka的服務器地址和端口
                .withTopic("mytopic")//必需,設置要讀取的kafka的topic名稱
                .withKeyDeserializer(StringDeserializer.class)//必需
                .withValueDeserializer(StringDeserializer.class)//必需
                .withMaxNumRecords(301)
                .withTimestampFn(new MyTimestampFunction())
                .updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest"))
                .withoutMetadata()
        )

如下分別後面非必需的一些設置apache

1.設置最大記錄條數服務器

.withMaxNumRecords(301)

經過這個函數,能夠設置最大讀取的記錄條數。app

2.設置PCollection中元素對應的時間戳maven

.withTimestampFn(new MyTimestampFunction())

當不進行這個設置的時候,beam會根據當前的系統時間爲每一個元素分配一個時間戳。
而有的時候,咱們但願用kafka的數據中自身帶有的時間戳來做爲PCollection中元素的時間戳,從而進行後續的窗口操做。這時就須要經過上面的函數來達到這一目的。
其中MyTimestampFunction()是咱們自定義的一個函數,其要實現SerializableFunction<KV<String, String>, Instant>這個接口。
即從一條kafka數據中得到時間戳,而後以Instant(org.joda.time.Instant)的格式返回。函數

public class MyTimestampFunction implements SerializableFunction<KV<String, String>, Instant> {

    public Instant apply(KV<String, String> input){
        String[] temps = input.getValue().split(",");
        DateTime t = new DateTime(Long.valueOf(temps[1]));
        return t.toInstant();
    }
}

3.設置讀kafka數據的順序oop

updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest"))

KafkaIO默認的數據讀取順序是從最新的數據開始。當咱們開發測試的時候,若是沒有一個生產者同步向kafka生產數據,那麼這裏就拿不到數據。(在這坑了好久,才發現這個緣由...)
當咱們想實現相似於kafka shell中的--from-beginning的功能的時候,即從最先的數據開始讀,就須要進行這一設置。
這裏不只能夠改變讀取數據的順序,按照相似的方式,還能夠進行其餘設置。

4.丟棄掉kafka中的附加信息

.withoutMetadata()

使用這一設置時,獲得的PCollection中的元素是kafka的key和value組成的鍵值對。
當不使用其時,獲得的PCollection中的元素是KafkaRecord。會附件不少元數據。

5.其餘設置

// custom function for watermark (default is record timestamp)
 *       .withWatermarkFn(new MyWatermarkFunction())
 *
 *       // restrict reader to committed messages on Kafka (see method documentation).
 *       .withReadCommitted()
 *

在源碼的使用說明中還提到另外的兩個設置,但由於暫時沒用到,這裏就暫且省略了。

相關文章
相關標籤/搜索