大數據開發實戰:Stream SQL實時開發一

  一、流計算SQL原理和架構

    流計算SQL一般是一個類SQL的聲明式語言,主要用於對流式數據(Streams)的持續性查詢,目的是在常見流計算平臺和框架(如Storm、Spark Streaming、Flink、Beam等)的底層API上,mysql

  經過使用簡易通用的的SQL語言構建SQL抽象層,下降實時開發的門檻。sql

    流計算SQL的原理其實很簡單,就是在SQL和底層的流計算引擎之間架起一座橋樑---流計算SQL被用戶提交,被SQL引擎層翻譯爲底層的API並在底層的流計算引擎上執行。好比對Storm架構

  來講,會自動翻譯成Storm的任務拓撲並在Storm集羣上運行。框架

    流計算SQL引擎是流計算SQL的核心,主要負責對用戶SQL輸入進行語法分析、語義分析、邏輯計劃生成、邏輯計劃執行、物理執行計劃生成等操做。而真正執行計算的是底層的流計算平臺。運維

    不一樣於離線任務,實時的數據是不斷流入的,因此爲了使用SQL來對流處理進行抽象,流計算SQL也引入了「表」的概念,不過這裏的表是動態表。工具

    流計算SQL的架構以下:開發工具

    

    SQL層:流計算SQL給用戶的接口,它提供過濾、轉換、關聯、聚合、窗口、select、union、split等各類功能。大數據

    SQL引擎層:負責SQL解析/校驗、邏輯計劃生成優化和物理計劃執行等。優化

    流計算引擎層:具體執行SQL引擎層生成的執行計劃。阿里雲

  二、流計算SQL:將來主要的實時開發技術

    目前流計算SQL在各個計算框架的進度和支持力度不一。

    Storm SQL還只是一個實驗性的功能。Flink SQL是Flink大力推廣的核心API。Flink是一個原生的開源流計算引擎,並且目前尚未其它開源流計算引擎能提供比Flink 更優秀的流

    計算SQL框架和語法等,因此Flink SQL實際上在定義流計算SQL的標註。

    阿里雲Stream SQL 的底層就是Flink引擎(實際是Blink,也就是Alibaba Flink),能夠認爲Blink是Flink的企業版本,

  三、Stream SQL

    阿里雲提供了Stream SQL 開發的完整環境,包括Stream SQL語法、IDE開發工具、調試及運維等。下面具體介紹概念和語法

    3.一、Stream SQL 源表

      Stream SQL 一般將源頭數據抽象爲源表,就像一個Storm任務必須至少定義一個spout,一個Stream SQL 任務必須至少定義一個源表。

      定義Stream SQL 源表的語法以下:

      CREATE TABLE tablename

      (columnName dataType [,columnName dataType]*)

      [WITH (propertyName=propertyValue [,propertyName=propertyValue ] * )];

      以下面的例子建立了一個datahub類型的源表

      create table datahub_stream(

        name varchar,

        age BIGINT,

        birthday BIGINT)

        with (

          type ='datahub',

          endPoint =‘http://dh-et2.aliyun-inc.com’,

          project='blink-datahub_test',

          topic ='test_topic_1',

          accessId =0i70RRFJD1OBAWAs',

          accessKey ='yF60EwURseo1UAn4NinvQPJ2zhCfHU',

          startTime='2018-08-20 00:00:00'

          );

      其中的type表示流式數據的源頭類型,能夠爲datahub,也能夠爲日誌或消息中間件等,type下面的各個參數類型的不一樣而不一樣,它們共同肯定了此type的某個源頭類型。

      此外,阿里雲Stream SQL底層流計算引擎是Flink/Blink,所以其支持水位線機制。

      定義水位線的語法以下:

      WATERMARK  [watermarkName] FOR <rowtime_field>

      AS withOffset(<rowtime_field>,offset)

      好比WATERMARK FOR rowtime AS withOffset(rowtime,4000)就對源頭數據列rowtime定義了固定延遲4s的水位線。

 

    3.2 、Stream SQL 結果表

      有源表,就是結果表,Stream SQL定義結果表的語法以下:

      CREATE TABLE tablename

      (columnName dataType [,columnName dataType]*)

      [WITH (propertyName=propertyValue [,propertyName=propertyValue ] * )];

      Stream SQL的結果表支持各類類型,包括相似MySQL的RDS、相似HBase的TableStore、相似消息隊列的MessageQueue的,下面以RDS來介紹Stream SQL 結果表的具體語法:

      create table rds_output(

        id int,

        len int,

        content varchar,

        primary key(id,len)

      ) with (

         type ='rds',

         url='jdbc:mysql:XXXXXX',

         tableName='test4',

         userName='test',

         password='xxxx'

      );

      在上述代碼中,結果表的type不一樣,相應後面的其它參數也不同,具體可用參考阿里雲幫助文檔。

     3.三、Stream SQL維度表

        流計算SQL的維度表數據一類特殊的外部數據,相對流數據來講,他比較穩定且變化緩慢,是靜態或準靜態數據,做爲join / left outer join的右表使用。須要特別注意的是,

      維度表在流計算中不容許做爲from 後面的數據存儲。流計算中對於from子句後對接的數據存儲必定是流式數據存儲,即 select * from dim_table是不被容許的。

        阿里雲Stream SQL中沒有專門爲維度表設計的DDL語法,使用標準的create table語法便可,可是須要額外增長一行PERIOD FOR SYSTEM_TIME的聲明,這行聲明定義了

      維度表的變化週期,即代表該表是一張會變化的表。

        一個簡單的維度表定義實例以下,type後面的語法相似源表定義,

        CREATE TABLE white_list (

          id varchar,

          name varchar,

          age int,

          PRIMARY key(id),  --用做維度表,必須有聲明的主鍵

          PERIOD FOR SYSTEM_TIME ---定義了維度表的變化週期

          ) with (

            type = 'xxx',

            。。。

          );

      

      3.四、Stream SQL 臨時表

      在實際的實時開發中,常常發現業務邏輯的複雜性使得只用一個Stream SQL來完成全部的業務邏輯基本是不可能的,而必須拆分爲多個SQL共同完成,此時就須要定義中間臨時表(

      在阿里雲Stream SQL 中也叫view,即視圖)。在Stream SQL中定義臨時表的語法以下:

      CREATE VIEW viewName

      [ (columnName[,columnName]*])]

      AS queryStatement;

      但須要注意的是,Stream SQL臨時表僅用於輔助計算邏輯表達的內存邏輯中間狀態,其物理是並不存在,也不會產生數據的物理存儲。固然,臨時表也不佔用系統空間。一個臨時表的例子

      以下:

        CREATE VIEW largeOrders(r, t, c, u) AS

        SELECT rowtime, productId, c, units

        FROM Orders;

      

    3.五、Stream SQL DML

       Stream SQL語法和SQL標準語法絕大部分都是相同的,下面僅着重介紹insert操做

      insert操做的語法:

      INSERT INTO tableName

      [ ( columnName[,columnName]* )]

      queryStatement;

      流計算不支持單獨SELECT操做,當前在執行SELECT查詢以前必須執行INSERT操做將結果保存起來。同時,須要注意的是,一個SQL文件支持多個源表輸入和多個結果表輸出。

      只有result表和tmp表能夠執行INSERT操做,且每張表只能執行一次INSERT操做,dim 表和stream表不能執行insert操做。

      普通的select操做是從幾張表中讀數據,但查詢的對象也能夠是另外一個select操做,也就是子查詢,但要注意子查詢必須加別名,實例以下:

      insert into result_table

      select * from (

            select t.a,   sum(t.b) AS sum_b,   from t1 t   

            group by t.a

            ) t1

      where t1.sum_b>100;

 

   參考資料:《離線和實時大數據開發實戰》

相關文章
相關標籤/搜索