關係型API有不少好處:是聲明式的,用戶只須要告訴須要什麼,系統決定如何計算;用戶沒必要特意實現;更方便優化,能夠執行得更高效。自己Flink就是一個統一批和流的分佈式計算平臺,因此社區設計關係型API的目的之一是可讓關係型API做爲統一的一層,兩種查詢擁有一樣的語義和語法。大多數流處理框架的API都是比較low-level的API,學習成本高並且不少邏輯須要寫到UDF中,因此Apache Flink 添加了SQL-like的API處理關係型數據--Table API。這套API中最重要的概念是Table
(能夠在上面進行關係型操做的結構化的DataSet或DataStream)。Table
API 與 DataSet
和DataStream
API 結合緊密,DataSet 和 DataStream均可以很容易地轉換成 Table,一樣轉換回來也很方便:html
val execEnv = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(execEnv) // obtain a DataSet from somewhere val tempData: DataSet[(String, Long, Double)] = // convert the DataSet to a Table val tempTable: Table = tempData.toTable(tableEnv, 'location, 'time, 'tempF) // compute your result val avgTempCTable: Table = tempTable .where('location.like("room%")) .select( ('time / (3600 * 24)) as 'day, 'Location as 'room, (('tempF - 32) * 0.556) as 'tempC ) .groupBy('day, 'room) .select('day, 'room, 'tempC.avg as 'avgTempC) // convert result Table back into a DataSet and print it avgTempCTable.toDataSet[Row].print()
example使用的是Scala的API,Java版API也有一樣的功能。sql
下圖展現了 Table API 的架構:數據庫
從 DataSet 或 DataStream 建立一個 Table,而後在上面進行關係型操做好比 fliter
、join
、select
。對Table的操做將會轉換成邏輯運算符樹。Table 轉換回 DataSet 和 DataStream 的時候將會轉換成DataSet 和 DataStream的算子。有些相似 'location.like("room%")
的表達式將會經過 code generation
編譯成Flink的函數。apache
然而,最初傳統的Table API 有必定的限制。首先,它不能獨立使用。Table API 的 query 必須嵌入到 DataSet 或 DataStream的程序中。對批處理表的查詢不支持outer join
,sorting
和不少SQL中常見的標量函數。對於流處理的查詢只支持filtetr
union
和 projection
,不支持aggregation
和join
。並且,轉換過程當中沒有利用太多查詢優化技術,除了適用於全部DataSet程序的優化。編程
隨着流處理的日益普及和Flink在該領域的增加,Flink社區認爲須要一個更簡單的API使更多的用戶可以分析流數據。一年前Flink社區決定將Table API提高到一個新的層級,擴展Table API中流處理的能力以及支持SQL。社區不想重複造輪子,因而決定在 Apache Calcite (一個比較流行的SQL解析和優化框架)的基礎上構建新的 Table API。Apache Calcite 被用在不少項目中,包括 Apache Hive,Apache Drill,Cascading等等。除此以外,Calcite社區將 SQL on Stream 寫入它的roadmap,因此Flink的SQL很適合和它結合。架構
以Calcite爲核心的新架構圖:框架
新架構提供兩種API進行關係型查詢,Table API 和 SQL。這兩種API的查詢都會用包含註冊過的Table的catalog進行驗證,而後轉換成統一Calcite的logical plan。在這種表示中,stream和batch的查詢看起來徹底同樣。下一步,利用 Calcite的 cost-based 優化器優化轉換規則和logical plan。根據數據源的性質(流式和靜態)使用不一樣的規則進行優化。最終優化後的plan轉傳成常規的Flink DataSet 或 DataStream 程序。這步還涉及code generation(將關係表達式轉換成Flink函數)。分佈式
下面咱們舉一個例子來理解新的架構。表達式轉換成Logical Plan以下圖所示:ide
調用Table API 其實是建立了不少 Table API 的 LogicalNode
,建立的過程當中對會對整個query進行validate。好比table是CalalogNode
,window groupBy以後在select時會建立WindowAggregate
和Project
,where對應Filter
。而後用RelBuilder
翻譯成Calcite LogicalPlan。若是是SQL API 將直接用Calcite的Parser進行解釋而後validate生成Calcite LogicalPlan。函數
利用Calcite內置的一些rule來優化LogicalPlan,也能夠本身添加或者覆蓋這些rule。轉換成Optimized Calcite Plan後,仍然是Calcite的內部表示方式,如今須要transform成DataStream Plan,對應上圖第三列的類,裏面封裝瞭如何translate成普通的DataStream或DataSet程序。隨後調用相應的tanslateToPlan
方法轉換和利用CodeGen元編程成Flink的各類算子。如今就至關於咱們直接利用Flink的DataSet或DataStream API開發的程序。
Table API的新架構除了維持最初的原理還改進了不少。爲流式數據和靜態數據的關係查詢保留統一的接口,並且利用了Calcite的查詢優化框架和SQL parser。該設計是基於Flink已構建好的API構建的,DataStream API 提供低延時高吞吐的流處理能力並且就有exactly-once語義並且能夠基於event-time進行處理。並且DataSet擁有穩定高效的內存算子和流水線式的數據交換。Flink的core API和引擎的全部改進都會自動應用到Table API和SQL上。
新的SQL接口集成到了Table API中。DataSteam, DataSet和外部數據源能夠在TableEnvironment中註冊成表,爲了是他們能夠經過SQL進行查詢。TableEnvironment.sql()
方法用來聲明SQL和將結果做爲Table返回。下面的是一個完整的樣例,從一個JSON編碼的Kafka topic中讀取流表,而後用SQL處理並寫到另外一個Kafka topic。
// get environments val execEnv = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(execEnv) // configure Kafka connection val kafkaProps = ... // define a JSON encoded Kafka topic as external table val sensorSource = new KafkaJsonSource[(String, Long, Double)]( "sensorTopic", kafkaProps, ("location", "time", "tempF")) // register external table tableEnv.registerTableSource("sensorData", sensorSource) // define query in external table val roomSensors: Table = tableEnv.sql( "SELECT STREAM time, location AS room, (tempF - 32) * 0.556 AS tempC " + "FROM sensorData " + "WHERE location LIKE 'room%'" ) // define a JSON encoded Kafka topic as external sink val roomSensorSink = new KafkaJsonSink(...) // define sink for room sensor data and execute query roomSensors.toSink(roomSensorSink) execEnv.execute()
這個樣例中忽略了流處理中最有趣的部分:window aggregate 和 join。這些操做如何用SQL表達呢?Apache Calcite社區提出了一個proposal來討論SQL on streams的語法和語義。社區將Calcite的stream SQL描述爲標準SQL的擴展而不是另外的 SQL-like語言。這有不少好處,首先,熟悉SQL標準的人可以在不學習新語法的狀況下分析流數據。靜態表和流表的查詢幾乎相同,能夠輕鬆地移植。此外,能夠同時在靜態表和流表上進行查詢,這和flink的願景是同樣的,將批處理看作特殊的流處理(批看做是有限的流)。最後,使用標準SQL進行流處理意味着有不少成熟的工具支持。
下面的example展現瞭如何用SQL和Table API進行滑動窗口查詢:
SQL
SELECT STREAM TUMBLE_END(time, INTERVAL '1' DAY) AS day, location AS room, AVG((tempF - 32) * 0.556) AS avgTempC FROM sensorData WHERE location LIKE 'room%' GROUP BY TUMBLE(time, INTERVAL '1' DAY), location
Table API
val avgRoomTemp: Table = tableEnv.ingest("sensorData") .where('location.like("room%")) .partitionBy('location) .window(Tumbling every Days(1) on 'time as 'w) .select('w.end, 'location, , (('tempF - 32) * 0.556).avg as 'avgTempCs)
Selection, Projection, Sort, Inner & Outer Joins, Set operations
Windows for Slide, Tumble, Session
Selection, Projection, Union
Windows for Slide, Tumble, Session
Selection, Projection, Union, Tumble
獲取流式數據,而後轉換這些數據(歸一化,聚合...),將其寫入其餘系統(File,Kafka,DBMS)。這些query的結果一般會存儲到log-style的系統。
獲取流式數據,而後對數據進行聚合來支持在線系統(dashboard,推薦)或者數據分析系統(Tableau)。一般結果被寫到k-v存儲中(Cassandra,Hbase,可查詢的Flink狀態),創建索引(Elasticsearch)或者DBMS(MySQL,PostgreSQL...)。這些查詢一般能夠被更新,改進。
針對流數據的即席查詢,以實時的方式進行分析和瀏覽數據。查詢結果直接顯示在notebook(Apache Zeppelin)中。
Flink社區還提出來和數據庫中Materialized View
很類似的Dynamic table 動態表
概念,將在之後的版本中支持,具體細節將另開文章解釋。