基於Flink的標準SQL操做支持

原文地址:stream-sqlhtml

個人Flink系列實戰文章地址:Github Repogit

近年來,開源的分佈式流計算系統層出不窮,引發了普遍的關注與討論。其中的先行者,譬如 Apache Storm提供了低延遲的流式處理功能,可是受限於at-least-once的投遞保證,背壓等不太良好的處理以及相對而言的開放API的底層化。不過Storm也起到了拋磚引玉的做用,自此以後,不少新的流計算系統在不一樣的維度上大放光彩。今日,Apache Flink或者 Apache Beam的使用者可以使用流式的Scala或者Java APIs來進行流處理任務,同時保證了exactly-once的投遞機制以及高吞吐狀況下的的低延遲響應。與此同時,流處理也在產界獲得了應用,從Apache Kafka與Apache Flink在流處理的基礎設施領域的大規模部署也折射除了流處理在產界的快速發展。與日俱增的實時數據流催生了開發人員或者分析人員對流數據進行分析以及實時展示的需求。不過,流數據分析也須要一些必備的技能與知識儲備,譬如無限流的基本特性、窗口、時間以及狀態等等,這些概念都會在利用Java或者Scala API來完成一個流分析的任務時候起到很大的做用。github

大概六個月以前,Apache Flink社區着手爲流數據分析系統引入一個SQL交互功能。衆所周知,SQL是訪問與處理數據的標準語言,基本上每一個用過數據庫的或者進行或數據分析的都對SQL不陌生。鑑於此,爲流處理系統添加一個SQL交互接口可以有效地擴大該技術的受用面,讓更多的人能夠熟悉而且使用。除此以外,引入SQL的支持還能知足於一些須要實時交互地用戶場景,大大簡化一些須要進行流操做或者轉化的應用代碼。這篇文章中,咱們會從現有的狀態、架構的設計以及將來Apache Flink社區準備添加SQL支持的計劃這幾個方面進行討論。sql

從何開始,先來回顧下已有的Table API

0.9.0-milestone1 發佈以後,Apache Flink添加了所謂的Table API來提供相似於SQL的表達式用於對關係型數據進行處理。這系列API的操做對象就是抽象而言的可以進行關係型操做的結構化數據或者流。Table API通常與DataSet或者DataStream緊密關聯,能夠從DataSet或者DataStream來方便地建立一個Table對象,也能夠用以下的操做將一個Table轉化回一個DataSet或者DataStream對象:數據庫

val execEnv = ExecutionEnvironment.getExecutionEnvironment

val tableEnv = TableEnvironment.getTableEnvironment(execEnv)



// obtain a DataSet from somewhere

val tempData: DataSet[(String, Long, Double)] =



// convert the DataSet to a Table

val tempTable: Table = tempData.toTable(tableEnv, 'location, 'time, 'tempF)

// compute your result

val avgTempCTable: Table = tempTable

 .where('location.like("room%"))

 .select(

   ('time / (3600 * 24)) as 'day, 

   'Location as 'room, 

   (('tempF - 32) * 0.556) as 'tempC

  )

 .groupBy('day, 'room)

 .select('day, 'room, 'tempC.avg as 'avgTempC)

// convert result Table back into a DataSet and print it

avgTempCTable.toDataSet[Row].print()

上面這坨代碼是Scala的,不過你能夠簡單地用Java版本的Table API進行實現,下面這張圖就展現了Table API的原始的結構:apache

在咱們從DataSet或者DataStream建立了Table以後,能夠利用相似於filter, join, 或者 select關係型轉化操做來轉化爲一個新的Table對象。而從內部實現上來講,全部應用於Table的轉化操做都會變成一棵邏輯表操做樹,在Table對象被轉化回DataSet或者DataStream以後,專門的轉化器會將這棵邏輯操做符樹轉化爲對等的DataSet或者DataStream操做符。譬如'location.like("room%")這樣的表達式會經由代碼生成編譯爲Flink中的函數。api

不過,老版本的Table API仍是有不少的限制的。首先,Table API並不能單獨使用,而必須嵌入到DataSet或者DataStream的程序中,對於批處理表的查詢並不支持外鏈接、排序以及其餘不少在SQL中常用的擴展操做。而流處理表中只支持譬如filters、union以及projections,不能支持aggregations以及joins。而且,這個轉化處理過程並不能有查詢優化,你要優化的話仍是要去參考那些對於DataSet操做的優化。架構

Table API joining forces with SQL

關因而否須要添加SQL支持的討論以前就在Flink社區中發生過幾回,Flink 0.9發佈以後,Table API、關係表達式的代碼生成工具以及運行時的操做符等都預示着添加SQL支持的不少基礎已經具有,能夠考慮進行添加了。不過另外一方面,在整個Hadoop生態鏈裏已經存在了大量的所謂「SQL-on-Hadoop」的解決方案,譬如Apache Hive, Apache Drill, Apache Impala, Apache Tajo,在已經有了這麼多的可選方案的狀況下,咱們以爲應該優先提高Flink其餘方面的特性,因而就暫時擱置了SQL-on-Hadoop的開發。框架

不過,隨着流處理系統的日漸火熱以及Flink受到的愈來愈普遍地應用,咱們發現有必要爲用戶提供一個更簡單的能夠用於數據分析的接口。大概半年前,咱們決定要改造下Table API,擴展其對於流處理的能力而且最終完成在流處理上的SQL支持。不過咱們也不打算重複造輪子,所以打算基於Apache Calcite這個很是流行的SQL解析器進行重構操做。Calcite在其餘不少開源項目裏也都應用到了,譬如Apache Hive, Apache Drill, Cascading, and many more。此外,Calcite社區自己也有將SQL on streams列入到它們的路線圖中,所以咱們一拍即合。Calcite 在新的架構設計中的地位大概以下所示:分佈式

新的架構主要是將Table API與SQL集成起來,用這兩貨的API構建的查詢最終都會轉化到Calcite的所謂的logicl plans表述。轉化以後的流查詢與批查詢基本上差很少,而後Calcite的優化器會基於轉化和優化規則來優化這些logical plans,針對數據源(流仍是靜態數據)的不一樣咱們會應用不一樣的規則。最後,通過優化的logical plan會轉化爲一個普通的Flink DataStream或者DataSet對象,即仍是利用代碼生成來將關係型表達式編譯爲Flink的函數。

新的架構繼續提供了Table API而且在此基礎上進行了很大的提高,它爲流數據與關係型數據提供了統一的查詢接口。另外,咱們利用了Calcite的查詢優化框架與SQL解釋器來進行了查詢優化。不過,由於這些設計都仍是基於Flink的已有的API,譬如DataStream API提供的低延遲、高吞吐以及exactly-once投遞的功能,以及DataSet API經過的健壯與高效的內存級別的操做器與管道式的數據交換,任何對於Flink核心API的提高都可以自動地提高Table API或者SQL查詢的效率。

在這些工做以後,Flink就已經具有了同時對於流數據與靜態數據的SQL支持。不過,咱們並不想把這個當成一個高效的SQL-on-Hadoop的解決方案,就像Impala, Drill, 以及 Hive那樣的角色,咱們更願意把它當成爲流處理提供便捷的交互接口的方案。另外,這套機制還能促進同時用了Flink API與SQL的應用的性能。

How will Flink’s SQL on streams look like?

咱們討論了爲啥要重構Flink的流SQL接口的緣由以及大概怎麼去完成這個任務,如今咱們討論下最終的API或者使用方式會是啥樣的。新的SQL接口會集成到Table API中。DataStreams、DataSet以及額外的數據源都會先在TableEnvironment中註冊成一個Table而後再進行SQL操做。TableEnvironment.sql()方法會容許你輸入SQL查詢語句而後執行返回一個新的Table,下面這個例子就展現了一個完整的從JSON編碼的Kafka主題中讀取數據而後利用SQL查詢進行處理最終寫入另外一個Kafka主題的模型。注意,這下面提到的KafkaJsonSource與KafkaJsonSink都還未發佈,將來的話TableSource與TableSinks都會固定提供,這樣能夠減小不少的模板代碼。

// get environments

val execEnv = StreamExecutionEnvironment.getExecutionEnvironment

val tableEnv = TableEnvironment.getTableEnvironment(execEnv)



// configure Kafka connection

val kafkaProps = ...

// define a JSON encoded Kafka topic as external table

val sensorSource = new KafkaJsonSource[(String, Long, Double)](

    "sensorTopic",

    kafkaProps,

    ("location", "time", "tempF"))



// register external table

tableEnv.registerTableSource("sensorData", sensorSource)



// define query in external table

val roomSensors: Table = tableEnv.sql(

    "SELECT STREAM time, location AS room, (tempF - 32) * 0.556 AS tempC " +

    "FROM sensorData " +

    "WHERE location LIKE 'room%'"

  )



// define a JSON encoded Kafka topic as external sink

val roomSensorSink = new KafkaJsonSink(...)



// define sink for room sensor data and execute query

roomSensors.toSink(roomSensorSink)

execEnv.execute()

你可能會發現上面這個例子中沒有體現流處理中兩個重要的方面:基於窗口的聚合與關聯。下面我就會解釋下怎麼在SQL中表達關於窗口的操做。 Apache Calcite社區關於這方面已經有所討論:SQL on streams。Calcite的流SQL被認爲是一個標準SQL的擴展,而不是另外一個相似於SQL的語言。這會有幾個方面的好處,首先,已經熟悉了標準SQL語法的同窗就不必花時間再學一波新的語法了,皆大歡喜。如今對於靜態表與流數據的查詢已經基本一致了,能夠很方便地進行轉換。Flink一直主張的是批處理只是流處理的一個特殊狀況,所以用戶也能夠同時在靜態表與流上進行查詢,譬如處理有限的流。最後,將來也會有不少工具支持進行標準的SQL進行數據分析。

儘管咱們尚未徹底地定義好在Flink SQL表達式與Table API中如何進行窗口等設置,下面這個簡單的例子會指明如何在SQL與Table API中進行滾動窗口式查詢:

SQL (following the syntax proposal of Calcite’s streaming SQL document)

SELECT STREAM 

  TUMBLE_END(time, INTERVAL '1' DAY) AS day, 

  location AS room, 

  AVG((tempF - 32) * 0.556) AS avgTempC

FROM sensorData

WHERE location LIKE 'room%'

GROUP BY TUMBLE(time, INTERVAL '1' DAY), location

Table API

val avgRoomTemp: Table = tableEnv.ingest("sensorData")

  .where('location.like("room%"))

  .partitionBy('location)

  .window(Tumbling every Days(1) on 'time as 'w)

  .select('w.end, 'location, , (('tempF - 32) * 0.556).avg as 'avgTempCs)
相關文章
相關標籤/搜索