最近在用Apache beam作流上的異常檢測,期間遇到了不少問題,可是發現網上相關的資料不多,基本只能本身啃文檔和瞎嘗試。
因此想把本身踩過的坑記錄下來,但願能對你們有所幫助。
其中若有錯漏,歡迎指出。java
顧名思義,是從kafka上讀取數據到beam上或者將beam上的數據寫入到kafka中。官方文檔中沒有直接的教程,要在GitHub上的源碼中找到相關使用說明。
Github上的Kafka源碼git
這裏僅說明讀數據部分。
maven依賴示例github
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-kafka</artifactId> <version>...</version> </dependency>
讀數據示例shell
PCollection<KV<String,String>> lines = //這裏kV後說明kafka中的key和value均爲String類型 p.apply(KafkaIO.<String, String>read() .withBootstrapServers("hadoop1:9092, hadoop2:9092")//必需,設置kafka的服務器地址和端口 .withTopic("mytopic")//必需,設置要讀取的kafka的topic名稱 .withKeyDeserializer(StringDeserializer.class)//必需 .withValueDeserializer(StringDeserializer.class)//必需 .withMaxNumRecords(301) .withTimestampFn(new MyTimestampFunction()) .updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest")) .withoutMetadata() )
如下分別後面非必需的一些設置apache
1.設置最大記錄條數服務器
.withMaxNumRecords(301)
經過這個函數,能夠設置最大讀取的記錄條數。app
2.設置PCollection中元素對應的時間戳maven
.withTimestampFn(new MyTimestampFunction())
當不進行這個設置的時候,beam會根據當前的系統時間爲每一個元素分配一個時間戳。
而有的時候,咱們但願用kafka的數據中自身帶有的時間戳來做爲PCollection中元素的時間戳,從而進行後續的窗口操做。這時就須要經過上面的函數來達到這一目的。
其中MyTimestampFunction()是咱們自定義的一個函數,其要實現SerializableFunction<KV<String, String>, Instant>
這個接口。
即從一條kafka數據中得到時間戳,而後以Instant(org.joda.time.Instant)的格式返回。函數
public class MyTimestampFunction implements SerializableFunction<KV<String, String>, Instant> { public Instant apply(KV<String, String> input){ String[] temps = input.getValue().split(","); DateTime t = new DateTime(Long.valueOf(temps[1])); return t.toInstant(); } }
3.設置讀kafka數據的順序oop
updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest"))
KafkaIO默認的數據讀取順序是從最新的數據開始。當咱們開發測試的時候,若是沒有一個生產者同步向kafka生產數據,那麼這裏就拿不到數據。(在這坑了好久,才發現這個緣由...)
當咱們想實現相似於kafka shell中的--from-beginning
的功能的時候,即從最先的數據開始讀,就須要進行這一設置。
這裏不只能夠改變讀取數據的順序,按照相似的方式,還能夠進行其餘設置。
4.丟棄掉kafka中的附加信息
.withoutMetadata()
使用這一設置時,獲得的PCollection中的元素是kafka的key和value組成的鍵值對。
當不使用其時,獲得的PCollection中的元素是KafkaRecord。會附件不少元數據。
5.其餘設置
// custom function for watermark (default is record timestamp) * .withWatermarkFn(new MyWatermarkFunction()) * * // restrict reader to committed messages on Kafka (see method documentation). * .withReadCommitted() *
在源碼的使用說明中還提到另外的兩個設置,但由於暫時沒用到,這裏就暫且省略了。