Flink生成Parquet格式文件實戰

1.概述

在流數據應用場景中,每每會經過Flink消費Kafka中的數據,而後將這些數據進行結構化到HDFS上,再經過Hive加載這些文件供後續業務分析。今天筆者爲你們分析如何使用Flink消費Kafka的數據後,將消費後的數據結構化到Hive數據倉庫中。html

2.內容

Hive可以識別不少類型的文件,其中包含Parquet文件格式。所以,咱們只須要將Flink消費Kafka後的數據以Parquet文件格式生成到HDFS上,後續Hive就能夠將這些Parquet文件加載到數據倉庫中。具體流程圖以下所示:apache

 

2.1 Flink On YARN

 實現整個案例,咱們須要Hadoop環境、Kafka環境、Flink環境、Hive環境。這裏,筆者只介紹Flink環境的部署,其餘環境可自行搜索部署方案。關於Flink On YARN的安裝步驟以下:bootstrap

2.1.1 準備安裝包

官方下載地址bash

2.2.2 解壓

解壓命令以下所示:session

# 解壓Flink安裝包並重名名爲flink
tar -zxvf flink-1.7.1-bin-hadoop27-scala_2.12.tgz && mv flink-1.7.1 flink
# 配置環境變量
vi ~/.bash_profile

# 添加以下內容
export FLINK_HOME=/data/soft/new/flink
export PATH=$PATH:$FLINK_HOME/bin

# 保存並退出

Flink On YARN有兩種模式,分別是Flink Session和Flink Job On YARN。數據結構

2.2.3 Flink Session

Flink Session命令以下:app

# 啓動一個Flink Session
yarn-session.sh -n 2 -jm 1024 -tm 1024 -d

各個參數含義以下:maven

參數 含義
-n 2 指定2個容器
-jm 1024 JobManager內存爲1024MB
-tm 1024 TaskManager內存爲1024MB
-d 任務後臺運行

若是你不想讓Flink YARN客戶端一直運行,也能夠啓動分離的YARN Session,經過參數-d來實現。這種狀況下Flink YARN客戶端只會將Flink提交給集羣,而後自行關閉。須要注意的是,這種狀況沒法使用Flink中止YARN會話,須要使用YARN的命令來中止,命令以下:ide

yarn application -kill <appId>

2.2.4 Flink On YARN

命令以下:oop

# yarn-cluster模式提交Flink任務
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 WordCount.jar

各個參數含義以下:

參數 含義
-m yarn-cluster 鏈接指定集羣,如使用標識yarn-cluster
-yn 2 2個容器
-yjm 1024 JobManager內存爲1024MB
-ytm TaskManager內存爲1024MB

若是不知道提交隊列,任務會提交到默認隊列中,若是須要指定提交隊列,可使用參數-yqu queue_01進行提交。

3.消費Kafka並生成Parquet文件

準備一個Topic的Schema類TopicSource,TopicSource類定義以下:

public class TopicSource {

    private long time;
    private String id;

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

}

編寫一個生成Parquet的Flink類FlinkParquetUtils,具體代碼實現以下:

/**
 * Consumer kafka topic & convert data to parquet.
 * 
 * @author smartloli.
 *
 *         Created by Feb 24, 2019
 */
public class FlinkParquetUtils {

    private final static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final static Properties props = new Properties();

    static {
        /** Set flink env info. */
        env.enableCheckpointing(60 * 1000);
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        /** Set kafka broker info. */
        props.setProperty("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");
        props.setProperty("group.id", "flink_group_parquet");
        props.setProperty("kafka.topic", "flink_parquet_topic_d");

        /** Set hdfs info. */
        props.setProperty("hdfs.path", "hdfs://cluster1/flink/parquet");
        props.setProperty("hdfs.path.date.format", "yyyy-MM-dd");
        props.setProperty("hdfs.path.date.zone", "Asia/Shanghai");
        props.setProperty("window.time.second", "60");

    }

    /** Consumer topic data && parse to hdfs. */
    public static void getTopicToHdfsByParquet(StreamExecutionEnvironment env, Properties props) {
        try {
            String topic = props.getProperty("kafka.topic");
            String path = props.getProperty("hdfs.path");
            String pathFormat = props.getProperty("hdfs.path.date.format");
            String zone = props.getProperty("hdfs.path.date.zone");
            Long windowTime = Long.valueOf(props.getProperty("window.time.second"));
            FlinkKafkaConsumer010<String> flinkKafkaConsumer010 = new FlinkKafkaConsumer010<>(topic, new SimpleStringSchema(), props);
            KeyedStream<TopicSource, String> KeyedStream = env.addSource(flinkKafkaConsumer010).map(FlinkParquetUtils::transformData).assignTimestampsAndWatermarks(new CustomWatermarks<TopicSource>()).keyBy(TopicSource::getId);

            DataStream<TopicSource> output = KeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(windowTime))).apply(new WindowFunction<TopicSource, TopicSource, String, TimeWindow>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;

                @Override
                public void apply(String key, TimeWindow timeWindow, Iterable<TopicSource> iterable, Collector<TopicSource> collector) throws Exception {
                    iterable.forEach(collector::collect);
                }
            });

            // Send hdfs by parquet
            DateTimeBucketAssigner<TopicSource> bucketAssigner = new DateTimeBucketAssigner<>(pathFormat, ZoneId.of(zone));
            StreamingFileSink<TopicSource> streamingFileSink = StreamingFileSink.forBulkFormat(new Path(path), ParquetAvroWriters.forReflectRecord(TopicSource.class)).withBucketAssigner(bucketAssigner).build();
            output.addSink(streamingFileSink).name("Sink To HDFS");
            env.execute("TopicData");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    private static TopicSource transformData(String data) {
        if (data != null && !data.isEmpty()) {
            JSONObject value = JSON.parseObject(data);
            TopicSource topic = new TopicSource();
            topic.setId(value.getString("id"));
            topic.setTime(value.getLong("time"));
            return topic;
        } else {
            return new TopicSource();
        }
    }

    private static class CustomWatermarks<T> implements AssignerWithPunctuatedWatermarks<TopicSource> {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;
        private Long cuurentTime = 0L;

        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(TopicSource topic, long l) {
            return new Watermark(cuurentTime);
        }

        @Override
        public long extractTimestamp(TopicSource topic, long l) {
            Long time = topic.getTime();
            cuurentTime = Math.max(time, cuurentTime);
            return time;
        }
    }

    public static void main(String[] args) {
        getTopicToHdfsByParquet(env, props);
    }

}

而後將編寫好的應用程序進行打包,這裏咱們能夠利用Maven命令,能夠很方便的進行打包,在pom.xml文件中添加以下插件:

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>org.smartloli.kafka.connector.flink.hdfs.FlinkParquetUtils</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>

而後使用以下命令進行編譯打包:

mvn clean && mvn assembly:assembly

最後將打包的JAR上傳到Flink集羣。

4.運行及預覽

將應用程序的JAR上傳到Flink集羣后,執行以下命令進行提交:

flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -yqu hadoop kafka-connector-flink-parquet.jar

查看ResourceManager的頁面,提交任務以下:

 

 在代碼中,咱們在HDFS上以日期yyyy-MM-dd的格式進行生產,結果以下:

 

5.總結

在編寫Flink應用程序的時候,建議使用Maven來管理項目,這樣添加依賴JAR的時候,只需將依賴的信息添加到pom.xml文件便可。打包的時候,一樣使用Maven命令,這樣應用程序所依賴的JAR包均會打包進行,防止遺漏致使提交任務時失敗。

6.結束語

這篇博客就和你們分享到這裏,若是你們在研究學習的過程中有什麼問題,能夠加羣進行討論或發送郵件給我,我會盡我所能爲您解答,與君共勉!

另外,博主出書了《Kafka並不難學》,喜歡的朋友或同窗, 能夠在公告欄那裏點擊購買連接購買博主的書進行學習,在此感謝你們的支持。

相關文章
相關標籤/搜索