淺析 Flink Table/SQL API

從何而來

關係型API有不少好處:是聲明式的,用戶只須要告訴須要什麼,系統決定如何計算;用戶沒必要特意實現;更方便優化,能夠執行得更高效。自己Flink就是一個統一批和流的分佈式計算平臺,因此社區設計關係型API的目的之一是可讓關係型API做爲統一的一層,兩種查詢擁有一樣的語義和語法。大多數流處理框架的API都是比較low-level的API,學習成本高並且不少邏輯須要寫到UDF中,因此Apache Flink 添加了SQL-like的API處理關係型數據--Table API。這套API中最重要的概念是Table(能夠在上面進行關係型操做的結構化的DataSet或DataStream)。Table API 與 DataSetDataStream API 結合緊密,DataSet 和 DataStream均可以很容易地轉換成 Table,一樣轉換回來也很方便:html

val execEnv = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(execEnv)

// obtain a DataSet from somewhere
val tempData: DataSet[(String, Long, Double)] =

// convert the DataSet to a Table
val tempTable: Table = tempData.toTable(tableEnv, 'location, 'time, 'tempF)
// compute your result
val avgTempCTable: Table = tempTable
 .where('location.like("room%"))
 .select(
   ('time / (3600 * 24)) as 'day, 
   'Location as 'room, 
   (('tempF - 32) * 0.556) as 'tempC
  )
 .groupBy('day, 'room)
 .select('day, 'room, 'tempC.avg as 'avgTempC)
// convert result Table back into a DataSet and print it
avgTempCTable.toDataSet[Row].print()

example使用的是Scala的API,Java版API也有一樣的功能。sql

下圖展現了 Table API 的架構:數據庫

從 DataSet 或 DataStream 建立一個 Table,而後在上面進行關係型操做好比 fliterjoinselect。對Table的操做將會轉換成邏輯運算符樹。Table 轉換回 DataSet 和 DataStream 的時候將會轉換成DataSet 和 DataStream的算子。有些相似 'location.like("room%") 的表達式將會經過 code generation 編譯成Flink的函數。apache

然而,最初傳統的Table API 有必定的限制。首先,它不能獨立使用。Table API 的 query 必須嵌入到 DataSet 或 DataStream的程序中。對批處理表的查詢不支持outer joinsorting和不少SQL中常見的標量函數。對於流處理的查詢只支持filtetr unionprojection,不支持aggregationjoin。並且,轉換過程當中沒有利用太多查詢優化技術,除了適用於全部DataSet程序的優化。編程

Table API 和 SQL 緊密結合

隨着流處理的日益普及和Flink在該領域的增加,Flink社區認爲須要一個更簡單的API使更多的用戶可以分析流數據。一年前Flink社區決定將Table API提高到一個新的層級,擴展Table API中流處理的能力以及支持SQL。社區不想重複造輪子,因而決定在 Apache Calcite (一個比較流行的SQL解析和優化框架)的基礎上構建新的 Table API。Apache Calcite 被用在不少項目中,包括 Apache Hive,Apache Drill,Cascading等等。除此以外,Calcite社區將 SQL on Stream 寫入它的roadmap,因此Flink的SQL很適合和它結合。架構

以Calcite爲核心的新架構圖:框架

新架構提供兩種API進行關係型查詢,Table API 和 SQL。這兩種API的查詢都會用包含註冊過的Table的catalog進行驗證,而後轉換成統一Calcite的logical plan。在這種表示中,stream和batch的查詢看起來徹底同樣。下一步,利用 Calcite的 cost-based 優化器優化轉換規則和logical plan。根據數據源的性質(流式和靜態)使用不一樣的規則進行優化。最終優化後的plan轉傳成常規的Flink DataSet 或 DataStream 程序。這步還涉及code generation(將關係表達式轉換成Flink函數)。分佈式

下面咱們舉一個例子來理解新的架構。表達式轉換成Logical Plan以下圖所示:ide

調用Table API 其實是建立了不少 Table API 的 LogicalNode,建立的過程當中對會對整個query進行validate。好比table是CalalogNode,window groupBy以後在select時會建立WindowAggregateProject,where對應Filter。而後用RelBuilder翻譯成Calcite LogicalPlan。若是是SQL API 將直接用Calcite的Parser進行解釋而後validate生成Calcite LogicalPlan。函數

利用Calcite內置的一些rule來優化LogicalPlan,也能夠本身添加或者覆蓋這些rule。轉換成Optimized Calcite Plan後,仍然是Calcite的內部表示方式,如今須要transform成DataStream Plan,對應上圖第三列的類,裏面封裝瞭如何translate成普通的DataStream或DataSet程序。隨後調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各類算子。如今就至關於咱們直接利用Flink的DataSet或DataStream API開發的程序。

Table API的新架構除了維持最初的原理還改進了不少。爲流式數據和靜態數據的關係查詢保留統一的接口,並且利用了Calcite的查詢優化框架和SQL parser。該設計是基於Flink已構建好的API構建的,DataStream API 提供低延時高吞吐的流處理能力並且就有exactly-once語義並且能夠基於event-time進行處理。並且DataSet擁有穩定高效的內存算子和流水線式的數據交換。Flink的core API和引擎的全部改進都會自動應用到Table API和SQL上。

新的SQL接口集成到了Table API中。DataSteam, DataSet和外部數據源能夠在TableEnvironment中註冊成表,爲了是他們能夠經過SQL進行查詢。TableEnvironment.sql()方法用來聲明SQL和將結果做爲Table返回。下面的是一個完整的樣例,從一個JSON編碼的Kafka topic中讀取流表,而後用SQL處理並寫到另外一個Kafka topic。

// get environments
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(execEnv)

// configure Kafka connection
val kafkaProps = ...
// define a JSON encoded Kafka topic as external table
val sensorSource = new KafkaJsonSource[(String, Long, Double)](
    "sensorTopic",
    kafkaProps,
    ("location", "time", "tempF"))

// register external table
tableEnv.registerTableSource("sensorData", sensorSource)

// define query in external table
val roomSensors: Table = tableEnv.sql(
    "SELECT STREAM time, location AS room, (tempF - 32) * 0.556 AS tempC " +
    "FROM sensorData " +
    "WHERE location LIKE 'room%'"
  )

// define a JSON encoded Kafka topic as external sink
val roomSensorSink = new KafkaJsonSink(...)

// define sink for room sensor data and execute query
roomSensors.toSink(roomSensorSink)
execEnv.execute()

這個樣例中忽略了流處理中最有趣的部分:window aggregate 和 join。這些操做如何用SQL表達呢?Apache Calcite社區提出了一個proposal來討論SQL on streams的語法和語義。社區將Calcite的stream SQL描述爲標準SQL的擴展而不是另外的 SQL-like語言。這有不少好處,首先,熟悉SQL標準的人可以在不學習新語法的狀況下分析流數據。靜態表和流表的查詢幾乎相同,能夠輕鬆地移植。此外,能夠同時在靜態表和流表上進行查詢,這和flink的願景是同樣的,將批處理看作特殊的流處理(批看做是有限的流)。最後,使用標準SQL進行流處理意味着有不少成熟的工具支持。

下面的example展現瞭如何用SQL和Table API進行滑動窗口查詢:

SQL

SELECT STREAM
  TUMBLE_END(time, INTERVAL '1' DAY) AS day,
  location AS room,
  AVG((tempF - 32) * 0.556) AS avgTempC
FROM sensorData
WHERE location LIKE 'room%'
GROUP BY TUMBLE(time, INTERVAL '1' DAY), location

Table API

val avgRoomTemp: Table = tableEnv.ingest("sensorData")
  .where('location.like("room%"))
  .partitionBy('location)
  .window(Tumbling every Days(1) on 'time as 'w)
  .select('w.end, 'location, , (('tempF - 32) * 0.556).avg as 'avgTempCs)

Table API的現狀

Batch SQL & Table API 支持:

  • Selection, Projection, Sort, Inner & Outer Joins, Set operations

  • Windows for Slide, Tumble, Session

Streaming Table API 支持:

  • Selection, Projection, Union

  • Windows for Slide, Tumble, Session

Streaming SQL:

  • Selection, Projection, Union, Tumble

Streaming SQL案例

持續的ETL和數據導入

獲取流式數據,而後轉換這些數據(歸一化,聚合...),將其寫入其餘系統(File,Kafka,DBMS)。這些query的結果一般會存儲到log-style的系統。

實時的Dashboards 和 報表

獲取流式數據,而後對數據進行聚合來支持在線系統(dashboard,推薦)或者數據分析系統(Tableau)。一般結果被寫到k-v存儲中(Cassandra,Hbase,可查詢的Flink狀態),創建索引(Elasticsearch)或者DBMS(MySQL,PostgreSQL...)。這些查詢一般能夠被更新,改進。

即席分析

針對流數據的即席查詢,以實時的方式進行分析和瀏覽數據。查詢結果直接顯示在notebook(Apache Zeppelin)中。

Flink社區還提出來和數據庫中Materialized View很類似的Dynamic table 動態表概念,將在之後的版本中支持,具體細節將另開文章解釋。

相關文章
相關標籤/搜索