流計算SQL一般是一個類SQL的聲明式語言,主要用於對流式數據(Streams)的持續性查詢,目的是在常見流計算平臺和框架(如Storm、Spark Streaming、Flink、Beam等)的底層API上,mysql
經過使用簡易通用的的SQL語言構建SQL抽象層,下降實時開發的門檻。sql
流計算SQL的原理其實很簡單,就是在SQL和底層的流計算引擎之間架起一座橋樑---流計算SQL被用戶提交,被SQL引擎層翻譯爲底層的API並在底層的流計算引擎上執行。好比對Storm架構
來講,會自動翻譯成Storm的任務拓撲並在Storm集羣上運行。框架
流計算SQL引擎是流計算SQL的核心,主要負責對用戶SQL輸入進行語法分析、語義分析、邏輯計劃生成、邏輯計劃執行、物理執行計劃生成等操做。而真正執行計算的是底層的流計算平臺。運維
不一樣於離線任務,實時的數據是不斷流入的,因此爲了使用SQL來對流處理進行抽象,流計算SQL也引入了「表」的概念,不過這裏的表是動態表。工具
流計算SQL的架構以下:開發工具
SQL層:流計算SQL給用戶的接口,它提供過濾、轉換、關聯、聚合、窗口、select、union、split等各類功能。大數據
SQL引擎層:負責SQL解析/校驗、邏輯計劃生成優化和物理計劃執行等。優化
流計算引擎層:具體執行SQL引擎層生成的執行計劃。阿里雲
目前流計算SQL在各個計算框架的進度和支持力度不一。
Storm SQL還只是一個實驗性的功能。Flink SQL是Flink大力推廣的核心API。Flink是一個原生的開源流計算引擎,並且目前尚未其它開源流計算引擎能提供比Flink 更優秀的流
計算SQL框架和語法等,因此Flink SQL實際上在定義流計算SQL的標註。
阿里雲Stream SQL 的底層就是Flink引擎(實際是Blink,也就是Alibaba Flink),能夠認爲Blink是Flink的企業版本,
阿里雲提供了Stream SQL 開發的完整環境,包括Stream SQL語法、IDE開發工具、調試及運維等。下面具體介紹概念和語法
Stream SQL 一般將源頭數據抽象爲源表,就像一個Storm任務必須至少定義一個spout,一個Stream SQL 任務必須至少定義一個源表。
定義Stream SQL 源表的語法以下:
CREATE TABLE tablename
(columnName dataType [,columnName dataType]*)
[WITH (propertyName=propertyValue [,propertyName=propertyValue ] * )];
以下面的例子建立了一個datahub類型的源表
create table datahub_stream(
name varchar,
age BIGINT,
birthday BIGINT)
with (
type ='datahub',
endPoint =‘http://dh-et2.aliyun-inc.com’,
project='blink-datahub_test',
topic ='test_topic_1',
accessId =0i70RRFJD1OBAWAs',
accessKey ='yF60EwURseo1UAn4NinvQPJ2zhCfHU',
startTime='2018-08-20 00:00:00'
);
其中的type表示流式數據的源頭類型,能夠爲datahub,也能夠爲日誌或消息中間件等,type下面的各個參數類型的不一樣而不一樣,它們共同肯定了此type的某個源頭類型。
此外,阿里雲Stream SQL底層流計算引擎是Flink/Blink,所以其支持水位線機制。
定義水位線的語法以下:
WATERMARK [watermarkName] FOR <rowtime_field>
AS withOffset(<rowtime_field>,offset)
好比WATERMARK FOR rowtime AS withOffset(rowtime,4000)就對源頭數據列rowtime定義了固定延遲4s的水位線。
有源表,就是結果表,Stream SQL定義結果表的語法以下:
CREATE TABLE tablename
(columnName dataType [,columnName dataType]*)
[WITH (propertyName=propertyValue [,propertyName=propertyValue ] * )];
Stream SQL的結果表支持各類類型,包括相似MySQL的RDS、相似HBase的TableStore、相似消息隊列的MessageQueue的,下面以RDS來介紹Stream SQL 結果表的具體語法:
create table rds_output(
id int,
len int,
content varchar,
primary key(id,len)
) with (
type ='rds',
url='jdbc:mysql:XXXXXX',
tableName='test4',
userName='test',
password='xxxx'
);
在上述代碼中,結果表的type不一樣,相應後面的其它參數也不同,具體可用參考阿里雲幫助文檔。
流計算SQL的維度表數據一類特殊的外部數據,相對流數據來講,他比較穩定且變化緩慢,是靜態或準靜態數據,做爲join / left outer join的右表使用。須要特別注意的是,
維度表在流計算中不容許做爲from 後面的數據存儲。流計算中對於from子句後對接的數據存儲必定是流式數據存儲,即 select * from dim_table是不被容許的。
阿里雲Stream SQL中沒有專門爲維度表設計的DDL語法,使用標準的create table語法便可,可是須要額外增長一行PERIOD FOR SYSTEM_TIME的聲明,這行聲明定義了
維度表的變化週期,即代表該表是一張會變化的表。
一個簡單的維度表定義實例以下,type後面的語法相似源表定義,
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY key(id), --用做維度表,必須有聲明的主鍵
PERIOD FOR SYSTEM_TIME ---定義了維度表的變化週期
) with (
type = 'xxx',
。。。
);
在實際的實時開發中,常常發現業務邏輯的複雜性使得只用一個Stream SQL來完成全部的業務邏輯基本是不可能的,而必須拆分爲多個SQL共同完成,此時就須要定義中間臨時表(
在阿里雲Stream SQL 中也叫view,即視圖)。在Stream SQL中定義臨時表的語法以下:
CREATE VIEW viewName
[ (columnName[,columnName]*])]
AS queryStatement;
但須要注意的是,Stream SQL臨時表僅用於輔助計算邏輯表達的內存邏輯中間狀態,其物理是並不存在,也不會產生數據的物理存儲。固然,臨時表也不佔用系統空間。一個臨時表的例子
以下:
CREATE VIEW largeOrders(r, t, c, u) AS
SELECT rowtime, productId, c, units
FROM Orders;
Stream SQL語法和SQL標準語法絕大部分都是相同的,下面僅着重介紹insert操做
insert操做的語法:
INSERT INTO tableName
[ ( columnName[,columnName]* )]
queryStatement;
流計算不支持單獨SELECT操做,當前在執行SELECT查詢以前必須執行INSERT操做將結果保存起來。同時,須要注意的是,一個SQL文件支持多個源表輸入和多個結果表輸出。
只有result表和tmp表能夠執行INSERT操做,且每張表只能執行一次INSERT操做,dim 表和stream表不能執行insert操做。
普通的select操做是從幾張表中讀數據,但查詢的對象也能夠是另外一個select操做,也就是子查詢,但要注意子查詢必須加別名,實例以下:
insert into result_table
select * from (
select t.a, sum(t.b) AS sum_b, from t1 t
group by t.a
) t1
where t1.sum_b>100;
參考資料:《離線和實時大數據開發實戰》