Flink 從 0 到 1 學習 —— Flink Data transformation(轉換)


toc: true
title: Flink 從 0 到 1 學習 —— Flink Data transformation(轉換)
date: 2018-11-04
tags:java

  • Flink
  • 大數據
  • 流式計算

前言

在第一篇介紹 Flink 的文章 《《從0到1學習Flink》—— Apache Flink 介紹》 中就說過 Flink 程序的結構git

Flink 應用程序結構就是如上圖所示:github

一、Source: 數據源,Flink 在流處理和批處理上的 source 大概有 4 類:基於本地集合的 source、基於文件的 source、基於網絡套接字的 source、自定義的 source。自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,固然你也能夠定義本身的 source。sql

二、Transformation:數據轉換的各類操做,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操做不少,能夠將數據轉換計算成你想要的數據。windows

三、Sink:接收器,Flink 將轉換計算後的數據發送的地點 ,你可能須要存儲下來,Flink 常見的 Sink 大概有以下幾類:寫入文件、打印出來、寫入 socket 、自定義的 sink 。自定義的 sink 常見的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也能夠定義本身的 Sink。微信

在上四篇文章介紹了 Source 和 Sink:網絡

一、《從0到1學習Flink》—— Data Source 介紹session

二、《從0到1學習Flink》—— 如何自定義 Data Source ?架構

三、《從0到1學習Flink》—— Data Sink 介紹app

四、《從0到1學習Flink》—— 如何自定義 Data Sink ?

那麼這篇文章咱們就來看下 Flink Data Transformation 吧,數據轉換操做仍是蠻多的,須要好好講講!

Transformation

Map

這是最簡單的轉換之一,其中輸入是一個數據流,輸出的也是一個數據流:

仍是拿上一篇文章的案例來將數據進行 map 轉換操做:

SingleOutputStreamOperator<Student> map = student.map(new MapFunction<Student, Student>() {
    @Override
    public Student map(Student value) throws Exception {
        Student s1 = new Student();
        s1.id = value.id;
        s1.name = value.name;
        s1.password = value.password;
        s1.age = value.age + 5;
        return s1;
    }
});
map.print();

將每一個人的年齡都增長 5 歲,其餘不變。

FlatMap

FlatMap 採用一條記錄並輸出零個,一個或多個記錄。

SingleOutputStreamOperator<Student> flatMap = student.flatMap(new FlatMapFunction<Student, Student>() {
    @Override
    public void flatMap(Student value, Collector<Student> out) throws Exception {
        if (value.id % 2 == 0) {
            out.collect(value);
        }
    }
});
flatMap.print();

這裏將 id 爲偶數的彙集出來。

Filter

Filter 函數根據條件判斷出結果。

SingleOutputStreamOperator<Student> filter = student.filter(new FilterFunction<Student>() {
    @Override
    public boolean filter(Student value) throws Exception {
        if (value.id > 95) {
            return true;
        }
        return false;
    }
});
filter.print();

這裏將 id 大於 95 的過濾出來,而後打印出來。

KeyBy

KeyBy 在邏輯上是基於 key 對流進行分區。在內部,它使用 hash 函數對流進行分區。它返回 KeyedDataStream 數據流。

KeyedStream<Student, Integer> keyBy = student.keyBy(new KeySelector<Student, Integer>() {
    @Override
    public Integer getKey(Student value) throws Exception {
        return value.age;
    }
});
keyBy.print();

上面對 student 的 age 作 KeyBy 操做分區

Reduce

Reduce 返回單個的結果值,而且 reduce 操做每處理一個元素老是建立一個新值。經常使用的方法有 average, sum, min, max, count,使用 reduce 方法均可實現。

SingleOutputStreamOperator<Student> reduce = student.keyBy(new KeySelector<Student, Integer>() {
    @Override
    public Integer getKey(Student value) throws Exception {
        return value.age;
    }
}).reduce(new ReduceFunction<Student>() {
    @Override
    public Student reduce(Student value1, Student value2) throws Exception {
        Student student1 = new Student();
        student1.name = value1.name + value2.name;
        student1.id = (value1.id + value2.id) / 2;
        student1.password = value1.password + value2.password;
        student1.age = (value1.age + value2.age) / 2;
        return student1;
    }
});
reduce.print();

上面先將數據流進行 keyby 操做,由於執行 reduce 操做只能是 KeyedStream,而後將 student 對象的 age 作了一個求平均值的操做。

Fold

Fold 經過將最後一個文件夾流與當前記錄組合來推出 KeyedStream。 它會發回數據流。

KeyedStream.fold("1", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String accumulator, Integer value) throws Exception {
        return accumulator + "=" + value;
    }
})

Aggregations

DataStream API 支持各類聚合,例如 min,max,sum 等。 這些函數能夠應用於 KeyedStream 以得到 Aggregations 聚合。

KeyedStream.sum(0) 
KeyedStream.sum("key") 
KeyedStream.min(0) 
KeyedStream.min("key") 
KeyedStream.max(0) 
KeyedStream.max("key") 
KeyedStream.minBy(0) 
KeyedStream.minBy("key") 
KeyedStream.maxBy(0) 
KeyedStream.maxBy("key")

max 和 maxBy 之間的區別在於 max 返回流中的最大值,但 maxBy 返回具備最大值的鍵, min 和 minBy 同理。

Window

Window 函數容許按時間或其餘條件對現有 KeyedStream 進行分組。 如下是以 10 秒的時間窗口聚合:

inputStream.keyBy(0).window(Time.seconds(10));

Flink 定義數據片斷以便(可能)處理無限數據流。 這些切片稱爲窗口。 此切片有助於經過應用轉換處理數據塊。 要對流進行窗口化,咱們須要分配一個能夠進行分發的鍵和一個描述要對窗口化流執行哪些轉換的函數

要將流切片到窗口,咱們可使用 Flink 自帶的窗口分配器。 咱們有選項,如 tumbling windows, sliding windows, global 和 session windows。 Flink 還容許您經過擴展 WindowAssginer 類來編寫自定義窗口分配器。 這裏先預留下篇文章來說解這些不一樣的 windows 是如何工做的。

WindowAll

windowAll 函數容許對常規數據流進行分組。 一般,這是非並行數據轉換,由於它在非分區數據流上運行。

與常規數據流功能相似,咱們也有窗口數據流功能。 惟一的區別是它們處理窗口數據流。 因此窗口縮小就像 Reduce 函數同樣,Window fold 就像 Fold 函數同樣,而且還有聚合。

inputStream.keyBy(0).windowAll(Time.seconds(10));

Union

Union 函數將兩個或多個數據流結合在一塊兒。 這樣就能夠並行地組合數據流。 若是咱們將一個流與自身組合,那麼它會輸出每一個記錄兩次。

inputStream.union(inputStream1, inputStream2, ...);

Window join

咱們能夠經過一些 key 將同一個 window 的兩個數據流 join 起來。

inputStream.join(inputStream1)
           .where(0).equalTo(1)
           .window(Time.seconds(5))     
           .apply (new JoinFunction () {...});

以上示例是在 5 秒的窗口中鏈接兩個流,其中第一個流的第一個屬性的鏈接條件等於另外一個流的第二個屬性。

Split

此功能根據條件將流拆分爲兩個或多個流。 當您得到混合流而且您可能但願單獨處理每一個數據流時,可使用此方法。

SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>(); 
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

Select

此功能容許您從拆分流中選擇特定流。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even"); 
DataStream<Integer> odd = split.select("odd"); 
DataStream<Integer> all = split.select("even","odd");

Project

Project 函數容許您從事件流中選擇屬性子集,並僅將所選元素髮送到下一個處理流。

DataStream<Tuple4<Integer, Double, String, String>> in = // [...] 
DataStream<Tuple2<String, String>> out = in.project(3,2);

上述函數從給定記錄中選擇屬性號 2 和 3。 如下是示例輸入和輸出記錄:

(1,10.0,A,B)=> (B,A)
(2,20.0,C,D)=> (D,C)

最後

本文主要介紹了 Flink Data 的經常使用轉換方式:Map、FlatMap、Filter、KeyBy、Reduce、Fold、Aggregations、Window、WindowAll、Union、Window Join、Split、Select、Project 等。並用了點簡單的 demo 介紹瞭如何使用,具體在項目中該如何將數據流轉換成咱們想要的格式,還須要根據實際狀況對待。

關注我

轉載請務必註明原創地址爲:http://www.54tianzhisheng.cn/2018/11/04/Flink-Data-transformation/

微信公衆號:zhisheng

另外我本身整理了些 Flink 的學習資料,目前已經所有放到微信公衆號(zhisheng)了,你能夠回覆關鍵字:Flink 便可無條件獲取到。另外也能夠加我微信 你能夠加個人微信:yuanblog_tzs,探討技術!

更多私密資料請加入知識星球!

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

之後這個項目的全部代碼都將放在這個倉庫裏,包含了本身學習 flink 的一些 demo 和博客

博客

一、Flink 從0到1學習 —— Apache Flink 介紹

二、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門

三、Flink 從0到1學習 —— Flink 配置文件詳解

四、Flink 從0到1學習 —— Data Source 介紹

五、Flink 從0到1學習 —— 如何自定義 Data Source ?

六、Flink 從0到1學習 —— Data Sink 介紹

七、Flink 從0到1學習 —— 如何自定義 Data Sink ?

八、Flink 從0到1學習 —— Flink Data transformation(轉換)

九、Flink 從0到1學習 —— 介紹 Flink 中的 Stream Windows

十、Flink 從0到1學習 —— Flink 中的幾種 Time 詳解

十一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 ElasticSearch

十二、Flink 從0到1學習 —— Flink 項目如何運行?

1三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Kafka

1四、Flink 從0到1學習 —— Flink JobManager 高可用性配置

1五、Flink 從0到1學習 —— Flink parallelism 和 Slot 介紹

1六、Flink 從0到1學習 —— Flink 讀取 Kafka 數據批量寫入到 MySQL

1七、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RabbitMQ

1八、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HBase

1九、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HDFS

20、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Redis

2一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Cassandra

2二、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Flume

2三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 InfluxDB

2四、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RocketMQ

2五、Flink 從0到1學習 —— 你上傳的 jar 包藏到哪裏去了

2六、Flink 從0到1學習 —— 你的 Flink job 日誌跑到哪裏去了

2七、阿里巴巴開源的 Blink 實時計算框架真香

2八、Flink 從0到1學習 —— Flink 中如何管理配置?

2九、Flink 從0到1學習—— Flink 不能夠連續 Split(分流)?

30、Flink 從0到1學習—— 分享四本 Flink 國外的書和二十多篇 Paper 論文

3一、Flink 架構、原理與部署測試

3二、爲何說流處理即將來?

3三、OPPO 數據中臺之基石:基於 Flink SQL 構建實時數據倉庫

3四、流計算框架 Flink 與 Storm 的性能對比

3五、Flink狀態管理和容錯機制介紹

3六、Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理

3七、360深度實踐:Flink與Storm協議級對比

3八、如何基於Flink+TensorFlow打造實時智能異常檢測平臺?只看這一篇就夠了

3九、Apache Flink 1.9 重大特性提早解讀

40、Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)

4一、Flink 靈魂兩百問,這誰頂得住?

4二、Flink 從0到1學習 —— 如何使用 Side Output 來分流?

4三、你公司到底需不須要引入實時計算引擎?

4四、一文讓你完全瞭解大數據實時計算引擎 Flink

源碼解析

一、Flink 源碼解析 —— 源碼編譯運行

二、Flink 源碼解析 —— 項目結構一覽

三、Flink 源碼解析—— local 模式啓動流程

四、Flink 源碼解析 —— standalone session 模式啓動流程

五、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Job Manager 啓動

六、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Task Manager 啓動

七、Flink 源碼解析 —— 分析 Batch WordCount 程序的執行過程

八、Flink 源碼解析 —— 分析 Streaming WordCount 程序的執行過程

九、Flink 源碼解析 —— 如何獲取 JobGraph?

十、Flink 源碼解析 —— 如何獲取 StreamGraph?

十一、Flink 源碼解析 —— Flink JobManager 有什麼做用?

十二、Flink 源碼解析 —— Flink TaskManager 有什麼做用?

1三、Flink 源碼解析 —— JobManager 處理 SubmitJob 的過程

1四、Flink 源碼解析 —— TaskManager 處理 SubmitJob 的過程

1五、Flink 源碼解析 —— 深度解析 Flink Checkpoint 機制

1六、Flink 源碼解析 —— 深度解析 Flink 序列化機制

1七、Flink 源碼解析 —— 深度解析 Flink 是如何管理好內存的?

1八、Flink Metrics 源碼解析 —— Flink-metrics-core

1九、Flink Metrics 源碼解析 —— Flink-metrics-datadog

20、Flink Metrics 源碼解析 —— Flink-metrics-dropwizard

2一、Flink Metrics 源碼解析 —— Flink-metrics-graphite

2二、Flink Metrics 源碼解析 —— Flink-metrics-influxdb

2三、Flink Metrics 源碼解析 —— Flink-metrics-jmx

2四、Flink Metrics 源碼解析 —— Flink-metrics-slf4j

2五、Flink Metrics 源碼解析 —— Flink-metrics-statsd

2六、Flink Metrics 源碼解析 —— Flink-metrics-prometheus

2六、Flink Annotations 源碼解析

2七、Flink 源碼解析 —— 如何獲取 ExecutionGraph ?

2八、大數據重磅炸彈——實時計算框架 Flink

2九、Flink Checkpoint-輕量級分佈式快照

30、Flink Clients 源碼解析
原文出處:zhisheng的博客,歡迎關注個人公衆號:zhisheng

相關文章
相關標籤/搜索