《從0到1學習Flink》—— Flink Data transformation(轉換)

前言

在第一篇介紹 Flink 的文章 《《從0到1學習Flink》—— Apache Flink 介紹》 中就說過 Flink 程序的結構java

Flink 應用程序結構就是如上圖所示:git

一、Source: 數據源,Flink 在流處理和批處理上的 source 大概有 4 類:基於本地集合的 source、基於文件的 source、基於網絡套接字的 source、自定義的 source。自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,固然你也能夠定義本身的 source。github

二、Transformation:數據轉換的各類操做,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操做不少,能夠將數據轉換計算成你想要的數據。windows

三、Sink:接收器,Flink 將轉換計算後的數據發送的地點 ,你可能須要存儲下來,Flink 常見的 Sink 大概有以下幾類:寫入文件、打印出來、寫入 socket 、自定義的 sink 。自定義的 sink 常見的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也能夠定義本身的 Sink。微信

在上四篇文章介紹了 Source 和 Sink:網絡

一、《從0到1學習Flink》—— Data Source 介紹session

二、《從0到1學習Flink》—— 如何自定義 Data Source ?app

三、《從0到1學習Flink》—— Data Sink 介紹socket

四、《從0到1學習Flink》—— 如何自定義 Data Sink ?ide

那麼這篇文章咱們就來看下 Flink Data Transformation 吧,數據轉換操做仍是蠻多的,須要好好講講!

Transformation

Map

這是最簡單的轉換之一,其中輸入是一個數據流,輸出的也是一個數據流:

仍是拿上一篇文章的案例來將數據進行 map 轉換操做:

SingleOutputStreamOperator<Student> map = student.map(new MapFunction<Student, Student>() {
    @Override
    public Student map(Student value) throws Exception {
        Student s1 = new Student();
        s1.id = value.id;
        s1.name = value.name;
        s1.password = value.password;
        s1.age = value.age + 5;
        return s1;
    }
});
map.print();

將每一個人的年齡都增長 5 歲,其餘不變。

FlatMap

FlatMap 採用一條記錄並輸出零個,一個或多個記錄。

SingleOutputStreamOperator<Student> flatMap = student.flatMap(new FlatMapFunction<Student, Student>() {
    @Override
    public void flatMap(Student value, Collector<Student> out) throws Exception {
        if (value.id % 2 == 0) {
            out.collect(value);
        }
    }
});
flatMap.print();

這裏將 id 爲偶數的彙集出來。

Filter

Filter 函數根據條件判斷出結果。

SingleOutputStreamOperator<Student> filter = student.filter(new FilterFunction<Student>() {
    @Override
    public boolean filter(Student value) throws Exception {
        if (value.id > 95) {
            return true;
        }
        return false;
    }
});
filter.print();

這裏將 id 大於 95 的過濾出來,而後打印出來。

KeyBy

KeyBy 在邏輯上是基於 key 對流進行分區。在內部,它使用 hash 函數對流進行分區。它返回 KeyedDataStream 數據流。

KeyedStream<Student, Integer> keyBy = student.keyBy(new KeySelector<Student, Integer>() {
    @Override
    public Integer getKey(Student value) throws Exception {
        return value.age;
    }
});
keyBy.print();

上面對 student 的 age 作 KeyBy 操做分區

Reduce

Reduce 返回單個的結果值,而且 reduce 操做每處理一個元素老是建立一個新值。經常使用的方法有 average, sum, min, max, count,使用 reduce 方法均可實現。

SingleOutputStreamOperator<Student> reduce = student.keyBy(new KeySelector<Student, Integer>() {
    @Override
    public Integer getKey(Student value) throws Exception {
        return value.age;
    }
}).reduce(new ReduceFunction<Student>() {
    @Override
    public Student reduce(Student value1, Student value2) throws Exception {
        Student student1 = new Student();
        student1.name = value1.name + value2.name;
        student1.id = (value1.id + value2.id) / 2;
        student1.password = value1.password + value2.password;
        student1.age = (value1.age + value2.age) / 2;
        return student1;
    }
});
reduce.print();

上面先將數據流進行 keyby 操做,由於執行 reduce 操做只能是 KeyedStream,而後將 student 對象的 age 作了一個求平均值的操做。

Fold

Fold 經過將最後一個文件夾流與當前記錄組合來推出 KeyedStream。 它會發回數據流。

KeyedStream.fold("1", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String accumulator, Integer value) throws Exception {
        return accumulator + "=" + value;
    }
})

Aggregations

DataStream API 支持各類聚合,例如 min,max,sum 等。 這些函數能夠應用於 KeyedStream 以得到 Aggregations 聚合。

KeyedStream.sum(0) 
KeyedStream.sum("key") 
KeyedStream.min(0) 
KeyedStream.min("key") 
KeyedStream.max(0) 
KeyedStream.max("key") 
KeyedStream.minBy(0) 
KeyedStream.minBy("key") 
KeyedStream.maxBy(0) 
KeyedStream.maxBy("key")

max 和 maxBy 之間的區別在於 max 返回流中的最大值,但 maxBy 返回具備最大值的鍵, min 和 minBy 同理。

Window

Window 函數容許按時間或其餘條件對現有 KeyedStream 進行分組。 如下是以 10 秒的時間窗口聚合:

inputStream.keyBy(0).window(Time.seconds(10));

Flink 定義數據片斷以便(可能)處理無限數據流。 這些切片稱爲窗口。 此切片有助於經過應用轉換處理數據塊。 要對流進行窗口化,咱們須要分配一個能夠進行分發的鍵和一個描述要對窗口化流執行哪些轉換的函數

要將流切片到窗口,咱們可使用 Flink 自帶的窗口分配器。 咱們有選項,如 tumbling windows, sliding windows, global 和 session windows。 Flink 還容許您經過擴展 WindowAssginer 類來編寫自定義窗口分配器。 這裏先預留下篇文章來說解這些不一樣的 windows 是如何工做的。

WindowAll

windowAll 函數容許對常規數據流進行分組。 一般,這是非並行數據轉換,由於它在非分區數據流上運行。

與常規數據流功能相似,咱們也有窗口數據流功能。 惟一的區別是它們處理窗口數據流。 因此窗口縮小就像 Reduce 函數同樣,Window fold 就像 Fold 函數同樣,而且還有聚合。

inputStream.keyBy(0).windowAll(Time.seconds(10));

Union

Union 函數將兩個或多個數據流結合在一塊兒。 這樣就能夠並行地組合數據流。 若是咱們將一個流與自身組合,那麼它會輸出每一個記錄兩次。

inputStream.union(inputStream1, inputStream2, ...);

Window join

咱們能夠經過一些 key 將同一個 window 的兩個數據流 join 起來。

inputStream.join(inputStream1)
           .where(0).equalTo(1)
           .window(Time.seconds(5))     
           .apply (new JoinFunction () {...});

以上示例是在 5 秒的窗口中鏈接兩個流,其中第一個流的第一個屬性的鏈接條件等於另外一個流的第二個屬性。

Split

此功能根據條件將流拆分爲兩個或多個流。 當您得到混合流而且您可能但願單獨處理每一個數據流時,可使用此方法。

SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>(); 
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

Select

此功能容許您從拆分流中選擇特定流。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even"); 
DataStream<Integer> odd = split.select("odd"); 
DataStream<Integer> all = split.select("even","odd");

Project

Project 函數容許您從事件流中選擇屬性子集,並僅將所選元素髮送到下一個處理流。

DataStream<Tuple4<Integer, Double, String, String>> in = // [...] 
DataStream<Tuple2<String, String>> out = in.project(3,2);

上述函數從給定記錄中選擇屬性號 2 和 3。 如下是示例輸入和輸出記錄:

(1,10.0,A,B)=> (B,A)
(2,20.0,C,D)=> (D,C)

最後

本文主要介紹了 Flink Data 的經常使用轉換方式:Map、FlatMap、Filter、KeyBy、Reduce、Fold、Aggregations、Window、WindowAll、Union、Window Join、Split、Select、Project 等。並用了點簡單的 demo 介紹瞭如何使用,具體在項目中該如何將數據流轉換成咱們想要的格式,還須要根據實際狀況對待。

關注我

轉載請務必註明原創地址爲:http://www.54tianzhisheng.cn/2018/11/04/Flink-Data-transformation/

微信公衆號:zhisheng

另外我本身整理了些 Flink 的學習資料,目前已經所有放到微信公衆號了。你能夠加個人微信:zhisheng_tian,而後回覆關鍵字:Flink 便可無條件獲取到。

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

之後這個項目的全部代碼都將放在這個倉庫裏,包含了本身學習 flink 的一些 demo 和博客

相關文章

一、《從0到1學習Flink》—— Apache Flink 介紹

二、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門

三、《從0到1學習Flink》—— Flink 配置文件詳解

四、《從0到1學習Flink》—— Data Source 介紹

五、《從0到1學習Flink》—— 如何自定義 Data Source ?

六、《從0到1學習Flink》—— Data Sink 介紹

七、《從0到1學習Flink》—— 如何自定義 Data Sink ?

八、《從0到1學習Flink》—— Flink Data transformation(轉換)

九、《從0到1學習Flink》—— 介紹Flink中的Stream Windows

十、《從0到1學習Flink》—— Flink 中的幾種 Time 詳解

十一、《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch

十二、《從0到1學習Flink》—— Flink 項目如何運行?

1三、《從0到1學習Flink》—— Flink 寫入數據到 Kafka

相關文章
相關標籤/搜索