Flink SQL Table 咱們一塊兒去看2018中超聯賽-Flink牛刀小試

版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。版權聲明:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,若有任何問題,可隨時聯繫。java

寫在前面的話

Flink是一個新型的流式處理引擎,做者自身只是對Spark底層較爲熟悉,有興趣能夠查閱個人Spark core ,Spark Streaming 以及 Spark SQL 源碼解讀系列。在這裏咱們只是品味一下號稱第四代大數據處理引擎的Flink,做者也並無深刻到Flink底層源碼級別。請見諒若是您已是FLink大牛了!看一下2018中超聯賽積分榜:sql

1 SQL Table(牛刀小試)

  • Table API 是以 表 爲中心的聲明式DSL,其中表可能會動態變化(在表達流數據時)。Table API遵循(擴展的)關係模型:表有二維數據結構(schema)(相似於關係數據庫中的表),同時API提供可比較的操做,例如select、project、join、group-by、aggregate等。Table API程序聲明式地定義了 什麼邏輯操做應該執行 而不是準確地肯定 這些操做代碼的看上去如何 。 儘管Table API能夠經過多種類型的用戶自定義函數(UDF)進行擴展,其仍不如 核心API 更具表達能力,可是使用起來卻更加簡潔(代碼量更少)。除此以外,Table API程序在執行以前會通過內置優化器進行優化。數據庫

  • 你能夠在表與 DataStream/DataSet 之間無縫切換,以容許程序將 Table API 與 DataStream 以及 DataSet 混合使用。express

  • Flink提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與 Table API 相似,可是是以SQL查詢表達式的形式表現程序。SQL抽象與Table API交互密切,同時SQL查詢能夠直接在Table API定義的表上執行。apache

  • Apache Flink對SQL的支持能夠追溯到一年前發佈的0.9.0-milestone1版本。此版本經過引入Table API來提供相似於SQL查詢的功能,此功能能夠操做分佈式的數據集,而且能夠自由地和Flink其餘API進行組合。Tables在發佈之初就支持靜態的以及流式數據(也就是提供了DataSet和DataStream相關APIs)。咱們能夠將DataSet或DataStream轉成Table;同時也能夠將Table轉換成DataSet或DataStream。api

  • The highest level abstraction offered by Flink is SQL. This abstraction is similar to the Table API both in semantics and expressiveness, but represents programs as SQL query expressions. The SQL abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the Table API.數據結構

  • 用戶能夠經過 TableEnvironment 類中的 sqlQuery() 方法執行SQL查詢,查詢結果會以 Table 形式返回。用戶可將 Table 用於後續的 SQL 及 Table 查詢,或將其轉換成 DataSet 或 DataStream,亦可將它寫入到某個 TableSink 中。不管是經過 SQL 仍是 Table API 提交的查詢均可以進行無縫銜接,系統內部會對它們進行總體優化,並最終轉換成一個 Flink 程序執行。分佈式

  • 爲了在 SQL 查詢中使用某個 Table,用戶必須先在 TableEnvironment 中對其進行註冊。Table 的註冊來源能夠是某個 TableSource,某個現有的 Table,或某個DataStream 或 DataSet。此外,用戶還能夠經過在 TableEnvironment 中註冊外部 Catalog 的方式來指定數據源位置。ide

  • 爲方便使用,在執行 Table.toString() 方法時,系統會自動以一個惟一名稱在當前 TableEnvironment 中註冊該 Table 並返回該惟一名稱。所以,在如下示例中,Table 對象均可以直接之內聯(字符串拼接)方式出如今 SQL 語句中。函數

  • 注意: 現階段Flink對於SQL的支持還並不完善。若是在查詢中使用了系統尚不支持的功能,會引起 TableException 。

2 上代碼分析(球隊粒度進行進球聚合排序)

  • 1 進行pojo對象的數據封裝。
  • 2 BatchTableEnvironment tableEnv環境生成: BatchTableEnvironment.getTableEnvironment(env);
  • 3 Table表生成:Table topScore = tableEnv.fromDataSet(topInput)
  • 4 Table表註冊:tableEnv.registerTable("topScore",topScore);
  • 5 Table表查詢:tableEnv.sqlQuery
  • 6 Table錶轉換回DataSet: tableEnv.toDataSet

2.1 詳情請參考代碼

import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    
    public class TableSQL {
    
        public static void main(String[] args) throws Exception{
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
            
            DataSet<String> input = env.readTextFile("C:\\CoreForBigData\\FLINK\\TopCore.csv");
            input.print();
            DataSet<TopScorers> topInput = input.map(new MapFunction<String, TopScorers>() {
                @Override
                public TopScorers map(String s) throws Exception {
                    String[] splits = s.split("\t");
                    return new TopScorers(Integer.valueOf(splits[0]),splits[1],splits[2],Integer.valueOf(splits[3]),Integer.valueOf(splits[4]),Integer.valueOf(splits[5]),Integer.valueOf(splits[6]),Integer.valueOf(splits[7]),Integer.valueOf(splits[8]),Integer.valueOf(splits[9]),Integer.valueOf(splits[10]));
                }
            });

            //將DataSet轉換爲Table
            Table topScore = tableEnv.fromDataSet(topInput);
            //將topScore註冊爲一個表
            tableEnv.registerTable("topScore",topScore);
    
            Table tapiResult = tableEnv.scan("topScore").select("club");
            tapiResult.printSchema();
    
            Table groupedByCountry = tableEnv.sqlQuery("select club, sum(jinqiu) as sum_score from topScore group by club order by sum_score desc");
            
            //轉換回dataset
            DataSet<Result> result = tableEnv.toDataSet(groupedByCountry, Result.class);
    
            //將dataset map成tuple輸出
            result.map(new MapFunction<Result, Tuple2<String,Integer>>() {
                @Override
                public Tuple2<String, Integer> map(Result result) throws Exception {
                    String country = result.club;
                    int sum_total_score = result.sum_score;
                    return Tuple2.of(country,sum_total_score);
                }
            }).print();
    
        }
    
        /**
         * 源數據的映射類
         */
        public static class TopScorers {
            /**
             * 排名,球員,球隊,出場,進球,射正,任意球,犯規,黃牌,紅牌
             */
            public Integer rank;
            public String player;
            public String club;
            public Integer chuchang;
            public Integer jinqiu;
            public Integer zhugong;
            public Integer shezheng;
            public Integer renyiqiu;
            public Integer fangui;
            public Integer huangpai;
            public Integer hongpai;
    
            public TopScorers() {
                super();
            }
    
            public TopScorers(Integer rank, String player, String club, Integer chuchang, Integer jinqiu, Integer zhugong, Integer shezheng, Integer renyiqiu, Integer fangui, Integer huangpai, Integer hongpai) {
                this.rank = rank;
                this.player = player;
                this.club = club;
                this.chuchang = chuchang;
                this.jinqiu = jinqiu;
                this.zhugong = zhugong;
                this.shezheng = shezheng;
                this.renyiqiu = renyiqiu;
                this.fangui = fangui;
                this.huangpai = huangpai;
                this.hongpai = hongpai;
            }
        }
    
        /**
         * 統計結果對應的類
         */
        public static class Result {
            public String club;
            public Integer sum_score;
    
            public Result() {}
        }
    }
複製代碼

2.2 結果展現(2018恆大隊很厲害,進球55個)

3 理論昇華一下

3.1 Create a TableEnvironment

// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for streaming queries
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);

// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for batch queries
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
複製代碼

3.2 DSL風格用法

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// emit or convert Table
// execute query
複製代碼

3.3 Register a DataStream or DataSet as Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
複製代碼

3.4 Convert a DataStream or DataSet into a Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);

// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
複製代碼

4 收工

經過2018中超聯賽,咱們管中窺豹,學會了Flink SQL Table 的核心思想,固然本文並不完善,但願本文可以給你們帶來一些收穫。辛苦成文,彼此珍惜,謝謝!

版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。版權聲明:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,若有任何問題,可隨時聯繫。

秦凱新 於深圳 201811262252

相關文章
相關標籤/搜索