在流式處理系統中,Flink和kafka的結合非常經典。咱們能夠經過Flink消費Kafka數據,層層處理後,丟到Kafka另外一個Topic,下游再處理該Topic的數據。而對於OLAP查詢需求,咱們每每須要將數據輸出到 Hive。通常的,咱們使用Parquet格式來存儲(Spark對parquet的支持較好)。html
Flink提供了bucket sink的模式將流式數據寫入到文件中,在官方給的demo中,代碼以下apache
import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; DataStream<String> input = ...; final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path(outputPath), new SimpleStringEncoder<>("UTF-8")) .build(); input.addSink(sink);
爲了使用Parquet格式,咱們還須要轉換代碼:api
StreamingFileSink<LogTest> streamingFileSink = StreamingFileSink. forBulkFormat(new Path(outputPath), ParquetAvroWriters.forReflectRecord(LogTest.class)) .withBucketAssigner(bucketAssigner) .build();
在測試過程當中,會發現目錄建立了,但文件全爲空且處於inprogress狀態。通過多番搜索未解決該問題。最後在官方文檔中發現了這麼一句:緩存
IMPORTANT: Bulk-encoding formats can only be combined with the `OnCheckpointRollingPolicy`, which rolls the in-progress part file on every checkpoint.
這說明Flink將一直緩存從Flink消費出來的數據,只有當Checkpoint 觸發的時候,才把數據刷新到目標目錄--即咱們定義的parquet路徑中。 加上啓用CheckPoint以後,從新執行程序,能夠發現文件成功寫入了。測試
env.enableCheckpointing(3000);
其餘思考:消費kafka輸出到Parquet這一個過程,Flink可否保證一致性語義?ui