Flink 讀取Kafka寫入Hive

官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/streamfile_sink.html

在流式處理系統中,Flink和kafka的結合非常經典。咱們能夠經過Flink消費Kafka數據,層層處理後,丟到Kafka另外一個Topic,下游再處理該Topic的數據。而對於OLAP查詢需求,咱們每每須要將數據輸出到 Hive。通常的,咱們使用Parquet格式來存儲(Spark對parquet的支持較好)。html

Flink提供了bucket sink的模式將流式數據寫入到文件中,在官方給的demo中,代碼以下apache

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

DataStream<String> input = ...;

final StreamingFileSink<String> sink = StreamingFileSink
	.forRowFormat(new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
	.build();

input.addSink(sink);

爲了使用Parquet格式,咱們還須要轉換代碼:api

StreamingFileSink<LogTest> streamingFileSink = StreamingFileSink.
        forBulkFormat(new Path(outputPath), ParquetAvroWriters.forReflectRecord(LogTest.class))
        .withBucketAssigner(bucketAssigner)
        .build();

在測試過程當中,會發現目錄建立了,但文件全爲空且處於inprogress狀態。通過多番搜索未解決該問題。最後在官方文檔中發現了這麼一句:緩存

IMPORTANT: Bulk-encoding formats can only be combined with the 
`OnCheckpointRollingPolicy`, which rolls the in-progress part 
file on every checkpoint.

這說明Flink將一直緩存從Flink消費出來的數據,只有當Checkpoint 觸發的時候,才把數據刷新到目標目錄--即咱們定義的parquet路徑中。 加上啓用CheckPoint以後,從新執行程序,能夠發現文件成功寫入了。測試

env.enableCheckpointing(3000);

其餘思考:消費kafka輸出到Parquet這一個過程,Flink可否保證一致性語義?ui

相關文章
相關標籤/搜索