看了幾天flink,剛入門。
簡單說下對flink的感覺,flink有4層(有些說3層,將Table API和SQL當作一層)API,越底層,對數據的操做就越精細,越高層完成功能所須要的代碼就越少,並且代碼越易讀。java
image.png
api使用起來很像java中的stream,這個其實很顯然,都是爲了對流數據進行處理。感受就像flink是java中並行流的分佈式版本,因此對stream熟悉的話,flink上手不難,或者說使用flink編寫代碼並不難。git
Flink的編程模式:輸入(source) -> 處理(轉換transform) -> 輸出(sink),3部分,至關清爽。apache
統一術語
數據比對通常針對兩個數據集A/B,在選定一個基準方A後,定義以下:
F000:A/B兩方數據相同
F113:A中存在,但B中沒有,A比B多
F114:B中存在,但A中沒有,B比A多
F115:A與B的關鍵字段相同,但畢竟字段不一樣,如A與B都有同一筆訂單,但訂單金額不一樣編程
新建工程
這裏咱們使用官方提供的quickstart作模板,若是是比較新版的idea(如2020.1)裏面直接有flink的quickstart模板,舊版的idea的話,須要本身添加一下。api
image.png
image.png
下次使用的時候能夠直接從這裏看到:app
image.png
若是你使用的是scala,ArtifactId則填flink-quickstart-scala。具體的版本信息能夠根據最新版的填寫。分佈式
添加Table API依賴
在pom.xml中添加Table API依賴。ide
<!-- Table API --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Table API須要scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
編寫代碼
利用模板裏的BatchJob來編寫:函數
package com.flink; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import java.util.List; /** * Skeleton for a Flink Batch Job. * * <p>For a tutorial how to write a Flink batch application, check the * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>. * * <p>To package your application into a JAR file for execution, * change the main class in the POM.xml file to this class (simply search for 'mainClass') * and run 'mvn clean package' on the command line. */ public class BatchJob { public static void main(String[] args) throws Exception { // set up the batch execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // Table Environment BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env); /** * 構造兩個數據集,實際生產從本身須要的source中獲取便可 */ DataSource<String> dataSourceA_unique = env.fromElements("orderId_1_f113", "orderId_2_f000", "orderId_3_f115"); DataSource<String> dataSourceB_unique = env.fromElements("orderId_2_f000", "orderId_3_f115", "orderId_4_f114"); // 轉換成table Table tableA_unique = tableEnvironment.fromDataSet(dataSourceA_unique); Table tableB_unique = tableEnvironment.fromDataSet(dataSourceB_unique); /** * 核心比對(對帳)邏輯 */ Table f113_table = tableA_unique.minusAll(tableB_unique);// 差集 Table f114_table = tableB_unique.minusAll(tableA_unique);// 差集 Table f000_table = tableA_unique.intersect(tableB_unique);// 交集 // 轉回DataSet用於輸出 DataSet<String> f000 = tableEnvironment.toDataSet(f000_table, String.class); DataSet<String> f113 = tableEnvironment.toDataSet(f113_table, String.class); DataSet<String> f114 = tableEnvironment.toDataSet(f114_table, String.class); /** * 輸出,實際輸出到本身須要的sink便可 */ List<String> f000_list = f000.collect(); List<String> f113_list = f113.collect(); List<String> f114_list = f114.collect(); System.out.println("=============================="); System.out.println("f000 ->" + f000_list); System.out.println("=============================="); System.out.println("f113 ->" + f113_list); System.out.println("=============================="); System.out.println("f114 ->" + f114_list); // 批處理不須要顯示調用execute,不然會報錯 // env.execute("Flink Batch Java API Skeleton"); } }
簡單說下幾個關鍵點:ui
- 使用Table API須要建立對應的執行環境:
BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);
- 模板代碼中最後顯式調用
env.execute()
,其實在批處理中不須要,顯式調用反而會報錯。
源碼
總結
本質上就是利用Table API中對數據集的處理函數(交集、差集)來完成數據比對。
若是你有更好的想法,歡迎留言,多多指教。
轉載請註明出處