小白從零開始學ApacheFlink

1. Apache Flink 介紹

來源: http://www.54tianzhisheng.cn/...

Apache Flink 是近年來愈來愈流行的一款開源大數據計算引擎,它同時支持了批處理和流處理,也能用來作一些基於事件的應用。使用官網的一句話來介紹 Flink 就是 「Stateful Computations Over Streams」java

首先 Flink 是一個純流式的計算引擎,它的基本數據模型是數據流。流能夠是無邊界的無限流,即通常意義上的流處理。也能夠是有邊界的有限流,這樣就是批處理。所以 Flink 用一套架構同時支持了流處理和批處理。其次,Flink 的一個優點是支持有狀態的計算。若是處理一個事件(或一條數據)的結果只跟事件自己的內容有關,稱爲無狀態處理;反之結果還和以前處理過的事件有關,稱爲有狀態處理。稍微複雜一點的數據處理,好比說基本的聚合,數據流之間的關聯都是有狀態處理。mysql

  • 無窮數據集:無窮的持續集成的數據集合
  • 有界數據集:有限不會改變的數據集合

那麼那些常見的無窮數據集有哪些呢?git

  • 用戶與客戶端的實時交互數據
  • 應用實時產生的日誌
  • 金融市場的實時交易記錄

數據運算模型有哪些呢:github

  • 流式:只要數據一直在產生,計算就持續地進行
  • 批處理:在預先定義的時間內運行計算,當完成時釋放計算機資源

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-i3IYaQm9-1595768814163)(https://ws3.sinaimg.cn/large/...]web

2. What is Flink?

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-1KK2dXk1-1595768814167)(https://ws3.sinaimg.cn/large/...]sql

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-8uDNlF7v-1595768814169)(https://ws2.sinaimg.cn/large/...]apache

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-W7z6hQI9-1595768814171)(https://ws4.sinaimg.cn/large/...]編程

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-4mHt3Iq8-1595768814173)(/Applications/Typora.app/Contents/Resources/TypeMark/Docs/img/006tNbRwly1fw6nu5yishj31kw0w04cm.jpg)]api

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-qGWVgGhG-1595768814174)(https://ws2.sinaimg.cn/large/...]緩存

從下至上:

一、部署:Flink 支持本地運行、能在獨立集羣或者在被 YARN 或 Mesos 管理的集羣上運行, 也能部署在雲上。

二、運行:Flink 的核心是分佈式流式數據引擎,意味着數據以一次一個事件的形式被處理。

三、API:DataStream、DataSet、Table、SQL API。

四、擴展庫:Flink 還包括用於復瑣事件處理,機器學習,圖形處理和 Apache Storm 兼容性的專用代碼庫。

3. Flink 數據流編程模型

1. 抽象級別

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-yZWEXSka-1595768814175)(https://ws2.sinaimg.cn/large/...]

  • 最底層提供了有狀態流。它將經過 過程函數(Process Function)嵌入到 DataStream API 中。它容許用戶能夠自由地處理來自一個或多個流數據的事件,並使用一致、容錯的狀態。除此以外,用戶能夠註冊事件時間和處理事件回調,從而使程序能夠實現複雜的計算。
  • DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 處理有界的數據集,DataStream 處理有界或者無界的數據流。用戶能夠經過各類方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)將數據進行轉換 / 計算。
  • Table API 是以 爲中心的聲明式 DSL,其中表可能會動態變化(在表達流數據時)。Table API 提供了例如 select、project、join、group-by、aggregate 等操做,使用起來卻更加簡潔(代碼量更少)。

你能夠在表與 DataStream/DataSet 之間無縫切換,也容許程序將 Table APIDataStream 以及 DataSet 混合使用。

  • Flink 提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與 Table API 相似,可是是以 SQL查詢表達式的形式表現程序。SQL 抽象與 Table API 交互密切,同時 SQL 查詢能夠直接在 Table API 定義的表上執行。

2. Flink 程序與數據流結構

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-j4U1eMhC-1595768814176)(https://ws1.sinaimg.cn/large/...]

Flink 應用程序結構就是如上圖所示:

一、Source: 數據源,Flink 在流處理和批處理上的 source 大概有 4 類:基於本地集合的 source、基於文件的 source、基於網絡套接字的 source、自定義的 source。自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,固然你也能夠定義本身的 source。

二、Transformation:數據轉換的各類操做,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操做不少,能夠將數據轉換計算成你想要的數據。

三、Sink:接收器,Flink 將轉換計算後的數據發送的地點 ,你可能須要存儲下來,Flink 常見的 Sink 大概有以下幾類:寫入文件、打印出來、寫入 socket 、自定義的 sink 。自定義的 sink 常見的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也能夠定義本身的 sink。

4. 爲何選擇Flink

  1. Flink 在 JVM 中提供了本身的內存管理,使其獨立於 Java 的默認垃圾收集器。 它經過使用散列,索引,緩存和排序有效地進行內存管理。
  2. Flink 擁有豐富的庫來進行機器學習,圖形處理,關係數據處理等。 因爲其架構,很容易執行復雜的事件處理和警報
  3. Flink 能知足高併發和低延遲(計算大量數據很快)。下圖顯示了 Apache Flink 與 Apache Storm 在完成流數據清洗的分佈式任務的性能對比。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-4Tz386WK-1595768814177)(https://ws3.sinaimg.cn/large/...]

  1. Flink 保證狀態化計算強一致性。」狀態化「意味着應用能夠維護隨着時間推移已經產生的數據聚合或者,而且 Filnk 的檢查點機制在一次失敗的事件中一個應用狀態的強一致性。
  2. Flink 支持流式計算和帶有事件時間語義的視窗。事件時間機制使得那些事件無序到達甚至延遲到達的數據流可以計算出精確的結果。
  3. 除了提供數據驅動的視窗外,Flink 還支持基於時間,計數,session 等的靈活視窗。視窗可以用靈活的觸發條件定製化從而達到對複雜的流傳輸模式的支持。Flink 的視窗使得模擬真實的建立數據的環境成爲可能
  4. Flink 的容錯能力是輕量級的,容許系統保持高併發,同時在相同時間內提供強一致性保證。Flink 以零數據丟失的方式從故障中恢復,但沒有考慮可靠性和延遲之間的折衷。
  5. Flink 保存點提供了一個狀態化的版本機制,使得能以無丟失狀態和最短停機時間的方式更新應用或者回退歷史數據。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-6QPM1ZUj-1595768814178)(https://ws1.sinaimg.cn/large/...]

  1. ........

5. 搭建Flink環境並運行簡單程序

1. 安裝Flink

經過 homebrew 來安裝。

brew install apache-flink

2. 檢查是否安裝成功

flink -v

3. 啓動Flink

進入到:/usr/local/Cellar/apache-flink/1.7.1/libexec/bin

./start-cluster.sh

4. 啓動成功

訪問http://localhost:8081/#/overview

5. 新建maven項目

GroupId=org.apache.flink
ArtifactId=flink-quickstart-java
Version=1.6.1

  1. 建立一個 SocketTextStreamWordCount 文件,加入如下代碼:
package study;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author wangjun
 * @date 2019/03/25
 */
public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception {
        //參數檢查
        if (args.length != 2) {
            System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }

        String hostname = args[0];
        Integer port = Integer.parseInt(args[1]);


        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //獲取數據
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        //計數
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token: tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}
  1. 接着進入工程目錄,使用如下命令打包
mvn clean package -Dmaven.test.skip=true
  1. 開啓監聽 9000 端口:
nc -l 9000
  1. 進入 flink 安裝目錄 bin 下執行如下命令跑程序
flink run -c study.SocketTextStreamWordCount /Users/wangjun/timwang/codeArea/study/target/original-study-1.0-SNAPSHOT.jar 127.0.0.1 9000
  1. 咱們能夠在 webUI 中看到正在運行的程序

  1. 能夠在 nc 監聽端口中輸入 text,好比

  1. 而後咱們經過 tail 命令看一下輸出的 log 文件,來觀察統計結果。進入目錄 apache-flink/1.6.0/libexec/log,執行如下命令:
tail -f flink-wangjun-taskexecutor-0-wangjundeMBP.out

6. Data Source介紹

數據來源,Flink 作爲一款流式計算框架,它可用來作批處理,即處理靜態的數據集、歷史的數據集;也能夠用來作流處理,即實時的處理些實時數據流,實時的產生數據流結果,只要數據源源不斷的過來,Flink 就可以一直計算下去,這個 Data Sources 就是數據的來源地。

Flink 中你可使用 StreamExecutionEnvironment.addSource(sourceFunction) 來爲你的程序添加數據來源。

Flink 已經提供了若干實現好了的 source functions,固然你也能夠經過實現 SourceFunction 來自定義非並行的 source 或者實現 ParallelSourceFunction 接口或者擴展 RichParallelSourceFunction 來自定義並行的 source,

1. Flink數據來源

StreamExecutionEnvironment 中可使用如下幾個已實現的 stream sources,

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-9Jk36Myp-1595768814183)(https://ws4.sinaimg.cn/large/...]

a. 基於集合

一、fromCollection(Collection) - 從 Java 的 Java.util.Collection 建立數據流。集合中的全部元素類型必須相同。

二、fromCollection(Iterator, Class) - 從一個迭代器中建立數據流。Class 指定了該迭代器返回元素的類型。

三、fromElements(T …) - 從給定的對象序列中建立數據流。全部對象類型必須相同。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Person> input = env.fromElements(
        new Person(1, "name", 12),
        new Person(2, "name2", 13),
        new Person(3, "name3", 14)
);

四、fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中建立並行數據流。Class 指定了該迭代器返回元素的類型。

五、generateSequence(from, to) - 建立一個生成指定區間範圍內的數字序列的並行數據流。

b. 基於文件

一、readTextFile(path) - 讀取文本文件,即符合 TextInputFormat 規範的文件,並將其做爲字符串返回。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

二、readFile(fileInputFormat, path) - 根據指定的文件輸入格式讀取文件(一次)。

三、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是上面兩個方法內部調用的方法。它根據給定的 fileInputFormat 和讀取路徑讀取文件。根據提供的 watchType,這個 source 能夠按期(每隔 interval 毫秒)監測給定路徑的新數據(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次路徑對應文件的數據並退出(FileProcessingMode.PROCESS_ONCE)。你能夠經過 pathFilter 進一步排除掉須要處理的文件。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

c. 基於 Socket:

socketTextStream(String hostname, int port) - 從 socket 讀取。元素能夠用分隔符切分。

d. 自定義

addSource - 添加一個新的 source function。例如,你能夠 addSource(new FlinkKafkaConsumer011<>(…)) 以從 Apache Kafka 讀取數據

package study;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

/**
 * @author wangjun
 * @date 2019/3/25
 */
public class SourceFromMySQL extends RichSourceFunction<Student> {
    private PreparedStatement ps;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "select * from student;";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    @Override
    public void run(SourceContext<Student> ctx) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            Student student = new Student(
                    resultSet.getInt("id"),
                    resultSet.getString("name").trim(),
                    resultSet.getString("password").trim(),
                    resultSet.getInt("age"));
            ctx.collect(student);
        }
    }

    @Override
    public void cancel() {

    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/cloud?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
        }
        return con;
    }
}
package study;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author wangjun
 * @date 2019/3/25
 */
public class MySqlSourceMain {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new SourceFromMySQL()).print();

        env.execute("Flink add data source");
    }
}

2. 幾種數據源特色

一、基於集合:有界數據集,更偏向於本地測試用

二、基於文件:適合監聽文件修改並讀取其內容

三、基於 Socket:監聽主機的 host port,從 Socket 中獲取數據

四、自定義 addSource:大多數的場景數據都是無界的,會源源不斷的過來。好比去消費 Kafka 某個 topic 上的數據,這時候就須要用到這個 addSource,可能由於用的比較多的緣由吧,Flink 直接提供了 FlinkKafkaConsumer011 等類可供你直接使用。你能夠去看看 FlinkKafkaConsumerBase 這個基礎類,它是 Flink Kafka 消費的最根本的類。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-SS0BBXqM-1595768814185)(https://ws4.sinaimg.cn/large/...]

7. 實例-計算熱門商品數據

本案例將實現一個「實時熱門商品」的需求,每隔 5 分鐘輸出最近一小時內點擊量最多的前 N 個商品。將這個需求進行分解
咱們大概要作這麼幾件事情:

• 抽取出業務時間戳,告訴 Flink 框架基於業務時間作窗口

• 過濾出點擊行爲數據

• 按一小時的窗口大小,每 5 分鐘統計一次,作滑動窗口聚合( Sliding Window)

• 按每一個窗口聚合,輸出每一個窗口中點擊量前 N 名的商品

1. 數據準備

這裏咱們準備了一份淘寶用戶行爲數據集(來自阿里雲天池公開數據集)。本數據集包含了淘寶上某一天隨機一百萬用戶的全部行爲(包括點擊、購買、加購、收藏)。數據集的組織形式和 MovieLens-20M 相似,即數據集的每一行表示一條用戶行爲,由用戶 ID、商品 ID、商品類目 ID、行爲類型和時間戳組成,並以逗號分隔。關於數據集中每一列的詳細描述以下:

curl https://raw.githubusercontent... UserBehavior.csv

2. 開發

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 爲了打印到控制檯的結果不亂序,咱們配置全局的併發爲 1,這裏改變併發對結果正確性沒有影響
env.setParallelism(1);


// UserBehavior.csv 的本地文件路徑
URL fileUrl = HotItems.class.getClassLoader().getResource("UserBehavior.csv");
Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
// 抽取 UserBehavior 的 TypeInformation,是一個 PojoTypeInfo
PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>)
        TypeExtractor.createTypeInfo(UserBehavior.class);
// 因爲 Java 反射抽取出的字段順序是不肯定的,須要顯式指定下文件中字段的順序
String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
// 建立 PojoCsvInputFormat
PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);

// PojoCsvInputFormat 建立輸入源。
DataStream<UserBehavior> dataSource = env.createInput(csvInput, pojoType);

3. 建立模擬數據源

CsvInputFormat 建立模擬數據源。咱們先建立一個 UserBehavior 的 POJO 類(全部成員變量聲明成 public 即是 POJO 類),強類
型化後能方便後續的處理

package study.goods;

import java.io.Serializable;

/**
 * @author wangjun
 * @date 2019/3/26
 */
public class UserBehavior implements Serializable {
    /**
     * 用戶 ID
     */
    public long userId;
    /**
     * 商品 ID
     */
    public long itemId;
    /**
     * 商品類目 ID
     */
    public int categoryId;
    /**
     * 用戶行爲, 包括("pv", "buy", "cart", "fav")
     */
    public String behavior;
    /**
     * 行爲發生的時間戳,單位秒
     */
    public long timestamp;

    public long getUserId() {
        return userId;
    }

    public void setUserId(long userId) {
        this.userId = userId;
    }

    public long getItemId() {
        return itemId;
    }

    public void setItemId(long itemId) {
        this.itemId = itemId;
    }

    public int getCategoryId() {
        return categoryId;
    }

    public void setCategoryId(int categoryId) {
        this.categoryId = categoryId;
    }

    public String getBehavior() {
        return behavior;
    }

    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }
}

​ 當咱們說「統計過去一小時內點擊量」,這裏的「一小時」是指什麼呢? 在 Flink 中它能夠是指 ProcessingTime ,也能夠是 EventTime,由用戶決定。
​ • ProcessingTime:事件被處理的時間。也就是由機器的系統時間來決定。
​ • EventTime:事件發生的時間。通常就是數據自己攜帶的時間。
在本案例中,咱們須要統計業務時間上的每小時的點擊量,因此要基於 EventTime 來處理。那麼若是讓 Flink 按照咱們想要的業務時間來處理呢?這裏主要有兩件事情要作。

​ 第一件是告訴 Flink 咱們如今按照 EventTime 模式進行處理, Flink 默認使用 ProcessingTime處理,因此咱們要顯式設置下。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

​ 第二件事情是指定如何得到業務時間,以及生成 Watermark。 Watermark 是用來追蹤業務事件的概念,能夠理解成 EventTime 世界中的時鐘,用來指示當前處理到什麼時刻的數據了。因爲咱們的數據源的數據已經通過整理,沒有亂序,即事件的時間戳是單調遞增的,因此能夠將每條數據的業務時間就當作 Watermark。這裏咱們用 AscendingTimestampExtractor 來實現時間戳的抽取和 Watermark 的生成。

// 這樣咱們就獲得了一個帶有時間標記的數據流了,後面就能作一些窗口的操做
DataStream<UserBehavior> timedData = dataSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
    @Override
    public long extractAscendingTimestamp(UserBehavior userBehavior) {
        // 原始數據單位秒,將其轉成毫秒
        return userBehavior.timestamp * 1000;
    }
});

在開始窗口操做以前,先回顧下需求「每隔 5 分鐘輸出過去一小時內點擊量最多的前 N 個商品」。因爲原始數據中存在點擊、加購、購買、收藏各類行爲的數據,可是咱們只須要統計點擊量,因此先使用 FilterFunction 將點擊行爲數據過濾出來

DataStream<UserBehavior> pvData = timedData.filter(new FilterFunction<UserBehavior>() {
    @Override
    public boolean filter(UserBehavior userBehavior) throws Exception {
    // 過濾出只有點擊的數據
        return userBehavior.behavior.equals("pv");
    }
});

因爲要每隔 5 分鐘統計一次最近一小時每一個商品的點擊量,因此窗口大小是一小時,每隔 5 分鐘滑動一次。即分別要統計 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)... 等窗口的商品點擊量。是一個常見的滑動窗口需求( Sliding Window)

DataStream<ItemViewCount> windowedData = pvData.keyBy("itemId")
        .timeWindow(Time.minutes(60), Time.minutes(5))
        .aggregate(new CountAgg(), new WindowResultFunction());

咱們使用.keyBy("itemId")對商品進行分組,使用.timeWindow(Time size, Time slide)對每一個 商 品 作 滑 動 窗 口 (1 小 時 窗 口 , 5 分 鍾 滑 動 一 次 )。 然 後 我 們 使用 .aggregate(AggregateFunction af, WindowFunction wf) 作增量的聚合操做,它能使用AggregateFunction 提早聚合掉數據,減小 state 的存儲壓力。較之.apply(WindowFunction wf)會將窗口中的數據都存儲下來,最後一塊兒計算要高效地多。aggregate()方法的第一個參數用於
這裏的 CountAgg 實現了 AggregateFunction 接口,功能是統計窗口中的條數,即遇到一條數據就加一。

package study.goods;

import org.apache.flink.api.common.functions.AggregateFunction;

/**
 * COUNT 統計的聚合函數實現,每出現一條記錄加一 
 * @author wangjun
 * @date 2019/3/26
 */
public class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(UserBehavior userBehavior, Long acc) {
        return acc + 1;
    }

    @Override
    public Long getResult(Long acc) {
        return acc;
    }

    @Override
    public Long merge(Long acc1, Long acc2) {
        return acc1 + acc2;
    }
}

aggregate(AggregateFunction af, WindowFunction wf)的第二個參數 WindowFunction 將每一個 key 每一個窗口聚合後的結果帶上其餘信息進行輸出。咱們這裏實現的 WindowResultFunction將主鍵商品 ID,窗口,點擊量封裝成了 ItemViewCount 進行輸出。

package study.goods;

/**
 * @author wangjun
 * @date 2019/3/26
 */
public class ItemViewCount {
    public long itemId; // 商品 ID
    public long windowEnd; // 窗口結束時間戳
    public long viewCount; // 商品的點擊量
    public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
        ItemViewCount result = new ItemViewCount();
        result.itemId = itemId;
        result.windowEnd = windowEnd;
        result.viewCount = viewCount;
        return result;
    }
}

如今咱們獲得了每一個商品在每一個窗口的點擊量的數據流。

爲了統計每一個窗口下最熱門的商品,咱們須要再次按窗口進行分組,這裏根據 ItemViewCount中的 windowEnd 進行 keyBy()操做。而後使用 ProcessFunction 實現一個自定義的 TopN 函數TopNHotItems來計算點擊量排名前 3名的商品,並將排名結果格式化成字符串,便於後續輸出。

DataStream<String> topItems = windowedData
        .keyBy("windowEnd")
        .process(new TopNHotItems(3)); // 求點擊量前 3 名的商品

ProcessFunction 是 Flink 提供的一個 low-level API,用於實現更高級的功能。它主要提供了定時器 timer 的功能(支持 EventTime 或 ProcessingTime)。本案例中咱們將利用 timer 來判斷什麼時候收齊了某個 window 下全部商品的點擊量數據。因爲 Watermark 的進度是全局的,

在 processElement 方法中,每當收到一條數據( ItemViewCount),咱們就註冊一個 windowEnd+1的定時器( Flink 框架會自動忽略同一時間的重複註冊)。 windowEnd+1 的定時器被觸發時,意味着收到了 windowEnd+1 的 Watermark,即收齊了該 windowEnd 下的全部商品窗口統計值。我
們在 onTimer()中處理將收集的全部商品及點擊量進行排序,選出 TopN,並將排名信息格式化成字符串後進行輸出。

這裏咱們還使用了 ListState<ItemViewCount>來存儲收到的每條 ItemViewCount 消息,保證在發生故障時,狀態數據的不丟失和一致性。 ListState 是 Flink 提供的相似 Java List 接口的State API,它集成了框架的 checkpoint 機制,自動作到了 exactly-once 的語義保證

package study.goods;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;


/**
 * @author wangjun
 * @date 2019/3/26
 */
public class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
    private final int topSize;
    // 用於存儲商品與點擊數的狀態,待收齊同一個窗口的數據後,再觸發 TopN 計算
    private ListState<ItemViewCount> itemState;

    public TopNHotItems(int topSize) {
        this.topSize = topSize;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 狀態的註冊
        ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
                "itemState-state",
                ItemViewCount.class);
        itemState = getRuntimeContext().getListState(itemsStateDesc);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        // 獲取收到的全部商品點擊量
        List<ItemViewCount> allItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {
            allItems.add(item);
        }
        // 提早清除狀態中的數據,釋放空間
        itemState.clear();
        // 按照點擊量從大到小排序
        allItems.sort(new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                return (int) (o2.viewCount - o1.viewCount);
            }
        });
        // 將排名信息格式化成 String, 便於打印
        StringBuilder result = new StringBuilder();
        result.append("====================================\n");
        result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n");
        for (int i = 0; i < topSize; i++) {
            ItemViewCount currentItem = allItems.get(i);
            // No1: 商品 ID=12224 瀏覽量=2413
            result.append("No").append(i).append(":")
                    .append(" 商品 ID=").append(currentItem.itemId)
                    .append(" 瀏覽量=").append(currentItem.viewCount)
                    .append("\n");
        }
        result.append("====================================\n\n");
        out.collect(result.toString());
    }

    @Override
    public void processElement(ItemViewCount input, Context context, Collector<String> collector) throws Exception {
        // 每條數據都保存到狀態中
        itemState.add(input);
        // 註冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於 windowEnd 窗口的全部商品數據
        context.timerService().registerEventTimeTimer(input.windowEnd + 1);
    }
}
相關文章
相關標籤/搜索