使用Flink批處理完成數據比對(對帳)一

看了幾天flink,剛入門。
簡單說下對flink的感覺,flink有4層(有些說3層,將Table API和SQL當作一層)API,越底層,對數據的操做就越精細,越高層完成功能所須要的代碼就越少,並且代碼越易讀。java


image.png

api使用起來很像java中的stream,這個其實很顯然,都是爲了對流數據進行處理。感受就像flink是java中並行流的分佈式版本,因此對stream熟悉的話,flink上手不難,或者說使用flink編寫代碼並不難。git

Flink的編程模式:輸入(source) -> 處理(轉換transform) -> 輸出(sink),3部分,至關清爽。apache

統一術語

數據比對通常針對兩個數據集A/B,在選定一個基準方A後,定義以下:
F000:A/B兩方數據相同
F113:A中存在,但B中沒有,A比B多
F114:B中存在,但A中沒有,B比A多
F115:A與B的關鍵字段相同,但畢竟字段不一樣,如A與B都有同一筆訂單,但訂單金額不一樣編程

新建工程

這裏咱們使用官方提供的quickstart作模板,若是是比較新版的idea(如2020.1)裏面直接有flink的quickstart模板,舊版的idea的話,須要本身添加一下。api


image.png
image.png

下次使用的時候能夠直接從這裏看到:app


image.png

若是你使用的是scala,ArtifactId則填flink-quickstart-scala。具體的版本信息能夠根據最新版的填寫。分佈式

添加Table API依賴

在pom.xml中添加Table API依賴。ide

<!-- Table API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Table API須要scala -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

編寫代碼

利用模板裏的BatchJob來編寫:函數

package com.flink;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;

import java.util.List;

/**
 * Skeleton for a Flink Batch Job.
 *
 * <p>For a tutorial how to write a Flink batch application, check the
 * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
 *
 * <p>To package your application into a JAR file for execution,
 * change the main class in the POM.xml file to this class (simply search for 'mainClass')
 * and run 'mvn clean package' on the command line.
 */
public class BatchJob {

    public static void main(String[] args) throws Exception {
        // set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Table Environment
        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);

        /**
         * 構造兩個數據集,實際生產從本身須要的source中獲取便可
         */
        DataSource<String> dataSourceA_unique = env.fromElements("orderId_1_f113", "orderId_2_f000", "orderId_3_f115");
        DataSource<String> dataSourceB_unique = env.fromElements("orderId_2_f000", "orderId_3_f115", "orderId_4_f114");


        // 轉換成table
        Table tableA_unique = tableEnvironment.fromDataSet(dataSourceA_unique);
        Table tableB_unique = tableEnvironment.fromDataSet(dataSourceB_unique);


        /**
         * 核心比對(對帳)邏輯
         */
        Table f113_table = tableA_unique.minusAll(tableB_unique);// 差集
        Table f114_table = tableB_unique.minusAll(tableA_unique);// 差集
        Table f000_table = tableA_unique.intersect(tableB_unique);// 交集

        // 轉回DataSet用於輸出
        DataSet<String> f000 = tableEnvironment.toDataSet(f000_table, String.class);
        DataSet<String> f113 = tableEnvironment.toDataSet(f113_table, String.class);
        DataSet<String> f114 = tableEnvironment.toDataSet(f114_table, String.class);


        /**
         * 輸出,實際輸出到本身須要的sink便可
         */
        List<String> f000_list = f000.collect();
        List<String> f113_list = f113.collect();
        List<String> f114_list = f114.collect();

        System.out.println("==============================");
        System.out.println("f000 ->" + f000_list);
        System.out.println("==============================");
        System.out.println("f113 ->" + f113_list);
        System.out.println("==============================");
        System.out.println("f114 ->" + f114_list);



        // 批處理不須要顯示調用execute,不然會報錯
        // env.execute("Flink Batch Java API Skeleton");
    }

}

簡單說下幾個關鍵點:ui

  1. 使用Table API須要建立對應的執行環境:
BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);
  1. 模板代碼中最後顯式調用env.execute(),其實在批處理中不須要,顯式調用反而會報錯。

源碼

源碼

總結

本質上就是利用Table API中對數據集的處理函數(交集、差集)來完成數據比對。
若是你有更好的想法,歡迎留言,多多指教。
轉載請註明出處

相關文章
相關標籤/搜索