(https://github.com/sunjincheng121/flink-study/blob/test/docs/sql/SQL.md)git
做者: 孫金城,淘寶花名 金竹github
本篇核心目標是讓你們概要了解一個完整的Apache Flink SQL Job的組成部分,以及Apache Flink SQL所提供的核心算子的語義,最後會應用Tumble Window編寫一個End-to-End的頁面訪問的統計示例。sql
咱們作任何數據計算都離不開讀取原始數據,計算邏輯和寫入計算結果數據三部分,固然基於Apache Flink SQL編寫的計算Job也離不開這個三部分,以下所所示:數據庫
如上所示,一個完整的Apache Flink SQL Job 由以下三部分:express
Source Operator - Soruce operator是對外部數據源的抽象, 目前Apache Flink內置了不少經常使用的數據源實現,好比上圖提到的Kafka。apache
Query Operators - 查詢算子主要完成如圖的Query Logic,目前支持了Union,Join,Projection,Difference, Intersection以及window等大多數傳統數據庫支持的操做。bash
Sink Operator - Sink operator 是對外結果表的抽象,目前Apache Flink也內置了不少經常使用的結果表的抽象,好比上圖提到的Kafka。session
SQL是Structured Quevy Language的縮寫,最初是由美國計算機科學家Donald D. Chamberlin和Raymond F. Boyce在20世紀70年代早期從Early History of SQL中瞭解關係模型後在IBM開發的。該版本最初稱爲[SEQUEL: A Structured English Query Language](結構化英語查詢語言),旨在操縱和檢索存儲在IBM原始準關係數據庫管理系統System R中的數據。直到1986年,ANSI和ISO標準組正式採用了標準的"數據庫語言SQL"語言定義。Apache Flink SQL 核心算子的語義設計也參考了1992、2011等ANSI-SQL標準。接下來咱們將簡單爲你們介紹Apache Flink SQL 每個算子的語義。數據結構
SELECT 用於從數據集/流中選擇數據,語法遵循ANSI-SQL標準,語義是關係代數中的投影(Projection),對關係進行垂直分割,消去某些列。jsp
一個使用Select的語句以下:
SELECT ColA, ColC FROME tab ;
複製代碼
WHERE 用於從數據集/流中過濾數據,與SELECT一塊兒使用,語法遵循ANSI-SQL標準,語義是關係代數的Selection,根據某些條件對關係作水平分割,即選擇符合條件的記錄,以下所示:
對應的SQL語句以下:
SELECT * FROM tab WHERE ColA <> 'a2' ;
複製代碼
GROUP BY 是對數據進行分組的操做,好比我須要分別計算一下一個學生表裏面女生和男生的人數分別是多少,以下:
對應的SQL語句以下:
SELECT sex, COUNT(name) AS count FROM tab GROUP BY sex ;
複製代碼
UNION ALL
UNION ALL 將兩個表合併起來,要求兩個表的字段徹底一致,包括字段類型、字段順序,語義對應關係代數的Union,只是關係代數是Set集合操做,會有去重複操做,UNION ALL 不進行去重,以下所示:
對應的SQL語句以下:
SELECT * FROM T1 UNION ALL SELECT * FROM T2
複製代碼
UNION
UNION 將兩個流給合併起來,要求兩個流的字段徹底一致,包括字段類型、字段順序,並其UNION 不一樣於UNION ALL,UNION會對結果數據去重,與關係代數的Union語義一致,以下:
對應的SQL語句以下:
SELECT * FROM T1 UNION SELECT * FROM T2
複製代碼
JOIN
JOIN 用於把來自兩個表的行聯合起來造成一個寬表,Apache Flink支持的JOIN類型:
JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN
JOIN與關係代數的Join語義相同,具體以下:
對應的SQL語句以下(INNER JOIN):
SELECT ColA, ColB, T2.ColC, ColE FROM TI JOIN T2 ON T1.ColC = T2.ColC ;
複製代碼
對應的SQL語句以下(LEFT JOIN):
SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;
複製代碼
說明:
細心的讀者可能發現上面T2.ColC是添加了前綴T2了,這裏須要說明一下,當兩張表有字段名字同樣的時候,我須要指定是從那個表裏面投影的。
RIGHT JOIN至關於LEFT JOIN左右兩個表交互一下位置。FULL JOIN至關於RIGHT JOIN和LEFT JOIN以後進行UNION ALL操做。
Window
在Apache Flink中有2種類型的Window,一種是OverWindow,即傳統數據庫的標準開窗,每個元素都對應一個窗口。一種是GroupWindow,目前在SQL中GroupWindow都是基於時間進行窗口劃分的。
OverWindow
OVER Window 目前支持由以下三個元素組合的8中類型:
時間 - Processing Time 和 EventTime
數據集 - Bounded 和 UnBounded
劃分方式 - ROWS 和 RANGE 咱們以的Bounded ROWS 和 Bounded RANGE 兩種經常使用類型,想你們介紹Over Window的語義
Bounded ROWS Over Window
Bounded ROWS OVER Window 每一行元素都視爲新的計算行,即,每一行都是一個新的窗口。
語法
SELECT
FROM Tab1
- value_expression - 進行分區的字表達式;
- timeCol - 用於元素排序的時間字段;
- rowCount - 是定義根據當前行開始向前追溯幾行元素;
複製代碼
語義
咱們以3個元素(2 PRECEDING)的窗口爲例,以下圖:
上圖所示窗口 user 1 的 w5和w6, user 2的 窗口 w2 和 w3,雖然有元素都是同一時刻到達,可是他們仍然是在不一樣的窗口,這一點有別於RANGE OVER Window.
Bounded RANGE Over Window
Bounded RANGE OVER Window 具備相同時間值的全部元素行視爲同一計算行,即,具備相同時間值的全部行都是同一個窗口;
語法
Bounded RANGE OVER Window的語法以下:
SELECT
agg1(col1) OVER(
[PARTITION BY (value_expression1,..., value_expressionN)]
ORDER BY timeCol
RANGE
BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1
複製代碼
value_expression - 進行分區的字表達式;
timeCol - 用於元素排序的時間字段;
timeInterval - 是定義根據當前行開始向前追溯指定時間的元素行;
語義
咱們以3秒中數據(INTERVAL '2' SECOND)的窗口爲例,以下圖:
注意: 上圖所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一時刻到達,他們是在同一個窗口,這一點有別於ROWS OVER Window.
GroupWindow
根據窗口數據劃分的不一樣,目前Apache Flink有以下3種Bounded Winodw:
Tumble - 滾動窗口,窗口數據有固定的大小,窗口數據無疊加;
Hop - 滑動窗口,窗口數據有固定大小,而且有固定的窗口重建頻率,窗口數據有疊加;
Session - 會話窗口,窗口數據沒有固定的大小,根據窗口數據活躍程度劃分窗口,窗口數據無疊加;
說明: Aapche Flink 還支持UnBounded的 Group Window,也就是全局Window,流上全部數據都在一個窗口裏面,語義很是簡單,這裏不作詳細介紹了。
GroupWindow的語法以下:
SELECT
[gk],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [WINDOW(definition)], [gk]
複製代碼
Tumble Window
Tumble 滾動窗口有固定size,窗口數據不重疊,具體語義以下:
假設咱們要寫一個2分鐘大小的Tumble,示例SQL以下:
SELECT gk, COUNT(*) AS pv
FROM tab
GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE), gk
複製代碼
Hop Window
Hop 滑動窗口和滾動窗口相似,窗口有固定的size,與滾動窗口不一樣的是滑動窗口能夠經過slide參數控制滑動窗口的新建頻率。所以當slide值小於窗口size的值的時候多個滑動窗口會重疊,具體語義以下:
假設咱們要寫一個每5分鐘統計近10分鐘的頁面訪問量(PV).
SELECT gk, COUNT(*) AS pv
FROM tab
GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), gk
複製代碼
Session Window
Session 會話窗口 是沒有固定大小的窗口,經過session的活躍度分組元素。不一樣於滾動窗口和滑動窗口,會話窗口不重疊,也沒有固定的起止時間。一個會話窗口在一段時間內沒有接收到元素時,即當出現非活躍間隙時關閉。一個會話窗口 分配器經過配置session gap來指定非活躍週期的時長,具體語義以下:
假設咱們要寫一個統計連續的兩個訪問用戶之間的訪問時間間隔不超過3分鐘的的頁面訪問量(PV).
SELECT gk, COUNT(*) AS pv
FROM pageAccessSession_tab
GROUP BY SESSION(rowtime, INTERVAL '3' MINUTE), gk
複製代碼
說明: 不少場景用戶須要得到Window的開始和結束時間,上面的GroupWindow的SQL示例中沒有體現,那麼窗口的開始和結束時間應該怎樣獲取呢? Apache Flink 咱們提供了以下輔助函數:
TUMBLE_START/TUMBLE_END
HOP_START/HOP_END
SESSION_START/SESSION_END
這些輔助函數如何使用,請參考以下完整示例的使用方式。
完整的 SQL Job 案例
上面咱們介紹了Apache Flink SQL核心算子的語法及語義,這部分將選取Bounded EventTime Tumble Window爲例爲你們編寫一個完整的包括Source和Sink定義的Apache Flink SQL Job。假設有一張淘寶頁面訪問表(PageAccess_tab),有地域,用戶ID和訪問時間。咱們須要按不一樣地域統計每2分鐘的淘寶首頁的訪問量(PV). 具體數據以下:
region | userId | accessTime |
---|---|---|
ShangHai | U0010 | 2017-11-11 10:01:00 |
BeiJing | U1001 | 2017-11-11 10:01:00 |
BeiJing | U2032 | 2017-11-11 10:10:00 |
BeiJing | U1100 | 2017-11-11 10:11:00 |
ShangHai | U0011 | 2017-11-11 12:10:00 |
Source 定義
自定義Apache Flink Stream Source須要實現StreamTableSource, StreamTableSource中經過StreamExecutionEnvironment的addSource方法獲取DataStream, 因此咱們須要自定義一個SourceFunction, 而且要支持產生WaterMark,也就是要實現DefinedRowtimeAttributes接口。出於代碼篇幅問題,咱們以下只介紹核心部分,完整代碼 請查看: EventTimeTumbleWindowDemo.scala
Source Function定義
支持接收攜帶EventTime的數據集合,Either的數據結構Right是WaterMark,Left是元數據:
class MySourceFunction[T](dataList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
override def run(ctx: SourceContext[T]): Unit = {
dataList.foreach {
case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
case Right(w) => ctx.emitWatermark(new Watermark(w)) // emit watermark
}
}
}
複製代碼
定義 StreamTableSource
咱們自定義的Source要攜帶咱們測試的數據,以及對應的WaterMark數據,具體以下:
class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
// 頁面訪問表數據 rows with timestamps and watermarks
val data = Seq(
// Data
Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
// Watermark
Right(1510365660000L),
..
)
val fieldNames = Array("accessTime", "region", "userId")
val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
val rowType = new RowTypeInfo(
Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
fieldNames)
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
// 添加數據源實現
execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)
}
...
}
複製代碼
Sink 定義
咱們簡單的將計算結果寫入到Apache Flink內置支持的CSVSink中,定義Sink以下:
def getCsvTableSink: TableSink[Row] = {
val tempFile = ...
new CsvTableSink(tempFile.getAbsolutePath).configure(
Array[String]("region", "winStart", "winEnd", "pv"),
Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
}
複製代碼
構建主程序
主程序包括執行環境的定義,Source/Sink的註冊以及統計查SQL的執行,具體以下:
def main(args: Array[String]): Unit = {
// Streaming 環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// 設置EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//方便咱們查出輸出數據
env.setParallelism(1)
val sourceTableName = "mySource"
// 建立自定義source數據結構
val tableSource = new MyTableSource
val sinkTableName = "csvSink"
// 建立CSV sink 數據結構
val tableSink = getCsvTableSink
// 註冊source
tEnv.registerTableSource(sourceTableName, tableSource)
// 註冊sink
tEnv.registerTableSink(sinkTableName, tableSink)
val sql =
"SELECT " +
" region, " +
" TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
" TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
" FROM mySource " +
" GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"
tEnv.sqlQuery(sql).insertInto(sinkTableName);
env.execute()
}
複製代碼
執行並查看運行結果
執行主程序後咱們會在控制檯獲得Sink的文件路徑,以下:
Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
複製代碼
Cat 方式查看計算結果,以下:
jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1
複製代碼
小結
本篇概要的介紹了Apache Flink SQL 的全部核心算子,並以一個End-to-End的示例展現瞭如何編寫Apache Flink SQL的Job. 但願對你們有所幫助,更多Apache Flink SQL的介紹歡迎你們訪問做者的以下專欄或專輯:
51CTO 金竹 專欄 -zhuanlan.51cto.com/columnlist/…
雲棲社區 金竹 專輯 -yq.aliyun.com/album/206
更多資訊請訪問 Apache Flink 中文社區網站