SQL是Structured Query Language的縮寫,最初是由美國計算機科學家Donald D. Chamberlin和Raymond F. Boyce在20世紀70年代早期從 Early History of SQL 中瞭解關係模型後在IBM開發的。該版本最初稱爲[SEQUEL: A Structured English Query Language](結構化英語查詢語言),旨在操縱和檢索存儲在IBM原始準關係數據庫管理系統System R中的數據。SEQUEL後來改成SQL,由於「SEQUEL」是英國Hawker Siddeley飛機公司的商標。咱們看看這款用於特技飛行的英國皇家空軍豪客Siddeley Hawk T.1A (Looks great):
在20世紀70年代後期,Oracle公司(當時叫 Relational Software,Inc.)開發了基於SQL的RDBMS,並但願將其出售給美國海軍,Central Intelligence代理商和其餘美國政府機構。 1979年6月,Oracle 公司爲VAX計算機推出了第一個商業化的SQL實現,即Oracle V2。
直到1986年,ANSI和ISO標準組正式採用了標準的"數據庫語言SQL"語言定義。該標準的新版本發佈於1989,1992,1996,1999,2003,2006,2008,2011,以及最近的2016。Apache Flink SQL 核心算子的語義設計也參考了1992 、2011等ANSI-SQL標準。
SQL是專爲查詢包含在關係數據庫中的數據而設計的,是一種基於SET操做的聲明性編程語言,而不是像C語言同樣的命令式編程語言。可是,各大關係數據庫廠商在遵循ANSI-SQL標準的同時又對標準SQL進行擴展,由基於SET(無重複元素)的操做擴展到基於BAG(有重複元素)的操做,而且添加了過程編程語言功能,如:Oracle的PL/SQL, DB2的SQL PL,MySQL - SQL/PSM以及SQL Server的T-SQL等等。
隨着時間的推移ANSI-SQL規範不斷完善,所涉及的功能不斷豐富,好比在ANSI-2011中又增長了Temporal Table的標準定義,Temporal Table的標準在結構化關係數據存儲上添加了時間維度信息,這使得關係數據庫中不只能夠對當前數據進行查詢操做,根據時間版本信息也能夠對歷史數據進行操做。這些不斷豐富的功能極大加強了SQL的應用領域。
說起大數據計算領域不得不說MapReduce計算模型,MapReduce最先是由Google公司研究提出的一種面向大規模數據處理的並行計算模型和方法,併發於2004年發表了論文Simplified Data Processing on Large Clusters。
論文發表以後Apache 開源社區參考Google MapReduce,基於Java設計開發了一個稱爲Hadoop的開源MapReduce並行計算框架。很快獲得了全球學術界和工業界的廣泛關注,並獲得推廣和普及應用。
但利用Hadoop進行MapReduce的開發,須要開發人員精通Java語言,並瞭解MapReduce的運行原理,這樣在必定程度上提升了MapReduce的開發門檻,因此在開源社區又不斷涌現了一些爲了簡化MapReduce開發的開源框架,其中Hive就是典型的表明。HSQL可讓用戶以類SQL的方式描述MapReduce計算,好比本來須要幾十行,甚至上百行才能完成的wordCount,用戶一條SQL語句就能完成了,這樣極大的下降了MapReduce的開發門檻,進而也成功的將SQL應用到了大數據計算領域當中來。
SQL不只僅被成功的應用到了離線計算,SQL的易用性也吸引了流計算產品,目前最熱的Spark,Flink也紛紛支持了SQL,尤爲是Flink支持的更加完全,集成了Calcite,徹底遵循ANSI-SQL標準。Apache Flink在low-level API上面用DataSet支持批計算,用DataStream支持流計算,但在High-Level API上面利用SQL將流與批進行了統一,使得用戶編寫一次SQL既能夠在流計算中使用,又能夠在批計算中使用,爲既有流計算業務,又有批計算業務的用戶節省了大量開發成本。
SQL通過傳統數據庫領域幾十年的不斷打磨,查詢優化器已經可以極大的優化SQL的查詢性能,Apache Flink 應用Calcite進行查詢優化,複用了大量數據庫查詢優化規則,在性能上不斷追求極致,可以讓用戶關心但不用擔憂性能問題。以下圖(Alibaba 對 Apache Flink 進行架構優化後的組件棧)
相對於DataStream而言,SQL會通過Optimization模塊透明的爲用戶進行查詢優化,用戶專心編寫本身的業務邏輯,不用擔憂性能,卻能獲得最優的查詢性能!
就簡潔性而言,SQL與DataSet和DataStream相比具備很大的優越性,咱們先用一個WordCount示例來直觀的查看用戶的代碼量:
DataStream/DataSetAPI
... //省略初始化代碼
// 核心邏輯
text.flatMap(new WordCount.Tokenizer()).keyBy(new int[]{0}).sum(1);
// flatmap 代碼定義
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public Tokenizer() {
}
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
String[] var4 = tokens;
int var5 = tokens.length;
for(int var6 = 0; var6 < var5; ++var6) {
String token = var4[var6];
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}複製代碼
SQL
複製代碼
...//省略初始化代碼
SELECT word, COUNT(word) FROM tab GROUP BY word;複製代碼
咱們直觀的體會到相同的統計功能使用SQL的簡潔性。
咱們作任何數據計算都離不開讀取原始數據,計算邏輯和寫入計算結果數據三部分,固然基於Apache Flink SQL編寫的計算Job也離不開這三個部分,以下所示:
如上所示,一個完整的Apache Flink SQL Job 由以下三部分:
Source Operator - Soruce operator是對外部數據源的抽象, 目前Apache Flink內置了不少經常使用的數據源實現,好比上圖提到的Kafka。
Query Operators - 查詢算子主要完成如圖的Query Logic,目前支持了Union,Join,Projection,Difference, Intersection以及window等大多數傳統數據庫支持的操做。
Sink Operator - Sink operator 是對外結果表的抽象,目前Apache Flink也內置了不少經常使用的結果表的抽象,好比上圖提到的Kafka。
相對於DataStream而言,SQL會通過Optimization模塊透明的爲用戶進行查詢優化,用戶專心編寫本身的業務邏輯,不用擔憂性能,卻能獲得最優的查詢性能!
就簡潔性而言,SQL與DataSet和DataStream相比具備很大的優越性,咱們先用一個WordCount示例來直觀的查看用戶的代碼量:
... //省略初始化代碼
// 核心邏輯
text.flatMap(new WordCount.Tokenizer()).keyBy(new int[]{0}).sum(1);
// flatmap 代碼定義
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public Tokenizer() {
}
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
String[] var4 = tokens;
int var5 = tokens.length;
for(int var6 = 0; var6 < var5; ++var6) {
String token = var4[var6];
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}複製代碼
...//省略初始化代碼
SELECT word, COUNT(word) FROM tab GROUP BY word;複製代碼
咱們直觀的體會到相同的統計功能使用SQL的簡潔性。
咱們作任何數據計算都離不開讀取原始數據,計算邏輯和寫入計算結果數據三部分,固然基於Apache Flink SQL編寫的計算Job也離不開這三個部分,以下所示:
如上所示,一個完整的Apache Flink SQL Job 由以下三部分:
目前Flink SQL支持Union,Join,Projection,Difference, Intersection以及Window等大多數傳統數據庫支持的操做,接下來爲你們分別進行簡單直觀的介紹。
爲了很好的體驗和理解Apache Flink SQL算子咱們須要先準備一下測試環境,咱們選擇IDEA,以ITCase測試方式來進行體驗。IDEA 安裝這裏不佔篇幅介紹了,相信你們能輕鬆搞定!咱們進行功能體驗有兩種方式,具體以下:
對於開源愛好者可能更喜歡源代碼方式理解和體驗Apache Flink SQL功能,那麼咱們須要下載源代碼並導入到IDEA中:
// 下載源代碼
git clone https://github.com/apache/flink.git study
// 進入源碼目錄
cd study
// 拉取穩定版release-1.6
git fetch origin release-1.6:release-1.6
//切換到穩定版
git checkout release-1.6
//將依賴安裝到本地mvn倉庫,耐心等待須要一段時間
mvn clean install -DskipTests複製代碼
org.apache.flink.table.runtime.stream.sql.SqlITCase
並測試所有經過,即證實體驗環境已經完成。以下圖所示:如上圖運行測試後顯示測試經過,咱們就能夠繼續下面的Apache Flink SQL功能體驗了。
咱們還有一種更簡單直接的方式,就是新建一個mvn項目,並在pom中添加以下依賴:
<properties>
<table.version>1.6-SNAPSHOT</table.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>JUnit</groupId>
<artifactId>JUnit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>複製代碼
完成環境準備後,咱們開始準備測試數據和寫一個簡單的測試類。
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
o_id | c_id | o_time | o_desc |
---|---|---|---|
o_oo1 | c_002 | 2018-11-05 10:01:01 | iphone |
o_002 | c_001 | 2018-11-05 10:01:55 | ipad |
o_003 | c_001 | 2018-11-05 10:03:44 | flink book |
itemID | itemType | onSellTime | price |
---|---|---|---|
ITEM001 | Electronic | 2017-11-11 10:01:00 | 20 |
ITEM002 | Electronic | 2017-11-11 10:02:00 | 50 |
ITEM003 | Electronic |
2017-11-11 10:03:00
|
30 |
ITEM004 | Electronic |
2017-11-11 10:03:00
|
60 |
ITEM005 | Electronic | 2017-11-11 10:05:00 | 40 |
ITEM006 | Electronic | 2017-11-11 10:06:00 | 20 |
ITEM007 | Electronic | 2017-11-11 10:07:00 | 70 |
ITEM008 | Clothes | 2017-11-11 10:08:00 | 20 |
region | userId | accessTime |
---|---|---|
ShangHai | U0010 | 2017-11-11 10:01:00 |
BeiJing | U1001 | 2017-11-11 10:01:00 |
BeiJing | U2032 | 2017-11-11 10:10:00 |
BeiJing | U1100 | 2017-11-11 10:11:00 |
ShangHai | U0011 | 2017-11-11 12:10:00 |
region | userCount | accessTime |
---|---|---|
ShangHai | 100 | 2017.11.11 10:01:00 |
BeiJing | 86 | 2017.11.11 10:01:00 |
BeiJing | 210 | 2017.11.11 10:06:00 |
BeiJing | 33 | 2017.11.11 10:10:00 |
ShangHai | 129 | 2017.11.11 12:10:00 |
region | userId | accessTime |
---|---|---|
ShangHai | U0011 | 2017-11-11 10:01:00 |
ShangHai | U0012 | 2017-11-11 10:02:00 |
ShangHai | U0013 | 2017-11-11 10:03:00 |
ShangHai | U0015 | 2017-11-11 10:05:00 |
ShangHai | U0011 | 2017-11-11 10:10:00 |
BeiJing | U0110 | 2017-11-11 10:10:00 |
ShangHai | U2010 | 2017-11-11 10:11:00 |
ShangHai | U0410 | 2017-11-11 12:16:00 |
咱們建立一個SqlOverviewITCase.scala
用於接下來介紹Flink SQL算子的功能體驗。代碼以下:
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.junit.rules.TemporaryFolder
import org.junit.{Rule, Test}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class SqlOverviewITCase {
val _tempFolder = new TemporaryFolder
@Rule
def tempFolder: TemporaryFolder = _tempFolder
def getStateBackend: StateBackend = {
new MemoryStateBackend()
}
// 客戶表數據
val customer_data = new mutable.MutableList[(String, String, String)]
customer_data.+=(("c_001", "Kevin", "from JinLin"))
customer_data.+=(("c_002", "Sunny", "from JinLin"))
customer_data.+=(("c_003", "JinCheng", "from HeBei"))
// 訂單表數據
val order_data = new mutable.MutableList[(String, String, String, String)]
order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))
// 商品銷售表數據
val item_data = Seq(
Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
Right((1510365660000L)),
Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
Right((1510365720000L)),
Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
Right((1510365780000L)),
Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
Right((1510365900000L)),
Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
Right((1510365960000L)),
Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
Right((1510366020000L)),
Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
Right((151036608000L)))
// 頁面訪問表數據
val pageAccess_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
Right((1510365660000L)),
Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
Right((1510365660000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
Right((1510366200000L)),
Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
Right((1510366260000L)),
Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
Right((1510373400000L)))
// 頁面訪問量表數據2
val pageAccessCount_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
Right((1510365660000L)),
Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
Right((1510365660000L)),
Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
Right((1510366200000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
Right((1510366200000L)),
Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
Right((1510373400000L)))
// 頁面訪問表數據3
val pageAccessSession_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
Right((1510365660000L)),
Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
Right((1510365720000L)),
Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
Right((1510365720000L)),
Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
Right((1510365900000L)),
Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
Right((1510366200000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
Right((1510366200000L)),
Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
Right((1510366260000L)),
Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
Right((1510373760000L)))
def procTimePrint(sql: String): Unit = {
// Streaming 環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// 將order_tab, customer_tab 註冊到catalog
val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc) val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc) tEnv.registerTable("order_tab", order) tEnv.registerTable("customer_tab", customer) val result = tEnv.sqlQuery(sql).toRetractStream[Row] val sink = new RetractingSink result.addSink(sink) env.execute() } def rowTimePrint(sql: String): Unit = { // Streaming 環境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) env.setParallelism(1) val tEnv = TableEnvironment.getTableEnvironment(env) // 將item_tab, pageAccess_tab 註冊到catalog val item = env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data)) .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)
val pageAccess =
env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
.toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
val pageAccessCount =
env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
.toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)
val pageAccessSession =
env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
.toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
tEnv.registerTable("item_tab", item)
tEnv.registerTable("pageAccess_tab", pageAccess)
tEnv.registerTable("pageAccessCount_tab", pageAccessCount)
tEnv.registerTable("pageAccessSession_tab", pageAccessSession)
val result = tEnv.sqlQuery(sql).toRetractStream[Row]
val sink = new RetractingSink
result.addSink(sink)
env.execute()
}
@Test
def testSelect(): Unit = {
val sql = "替換想要測試的SQL"
// 非window 相關用 procTimePrint(sql)
// Window 相關用 rowTimePrint(sql)
}
}
// 自定義Sink
final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
var retractedResults: ArrayBuffer[String] = mutable.ArrayBuffer.empty[String]
def invoke(v: (Boolean, Row)) {
retractedResults.synchronized {
val value = v._2.toString
if (v._1) {
retractedResults += value
} else {
val idx = retractedResults.indexOf(value)
if (idx >= 0) {
retractedResults.remove(idx)
} else {
throw new RuntimeException("Tried to retract a value that wasn't added first. " +
"This is probably an incorrectly implemented test. " +
"Try to set the parallelism of the sink to 1.")
}
}
}
retractedResults.sorted.foreach(println(_))
}
}
// Water mark 生成器
class EventTimeSourceFunction[T](
dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
override def run(ctx: SourceContext[T]): Unit = {
dataWithTimestampList.foreach {
case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
case Right(w) => ctx.emitWatermark(new Watermark(w))
}
}
override def cancel(): Unit = ???
}複製代碼
SELECT 用於從數據集/流中選擇數據,語法遵循ANSI-SQL標準,語義是關係代數中的投影(Projection),對關係進行垂直分割,消去某些列, 以下圖所示:
從customer_tab
選擇用戶姓名,並用內置的CONCAT函數拼接客戶信息,以下:
SELECT c_name, CONCAT(c_name, ' come ', c_desc) as desc FROM customer_tab;複製代碼
c_name | desc |
---|---|
Kevin | Kevin come from JinLin |
Sunny | Sunny come from JinLin |
Jincheng | Jincheng come from HeBei |
你們看到在 SELECT
不只可使用普通的字段選擇,還可使用ScalarFunction
,固然也包括User-Defined Function
,同時還能夠進行字段的alias
設置。其實SELECT
能夠結合聚合,在GROUPBY部分會進行介紹,一個比較特殊的使用場景是攜帶 DISTINCT
關鍵字,示例以下:
在訂單表查詢全部的客戶id,消除重複客戶id, 以下:
SELECT DISTINCT c_id FROM order_tab;複製代碼
c_id |
---|
c_001 |
c_002 |
WHERE 用於從數據集/流中過濾數據,與SELECT一塊兒使用,語法遵循ANSI-SQL標準,語義是關係代數的Selection,根據某些條件對關係作水平分割,即選擇符合條件的記錄,以下所示:
在customer_tab
查詢客戶id爲c_001
和c_003
的客戶信息,以下:
SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id = 'c_001' OR c_id = 'c_003';複製代碼
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_003 | JinCheng | from HeBei |
咱們發現WHERE
是對知足必定條件的數據進行過濾,WHERE
支持=, <, >, <>, >=, <=以及AND
, OR
等表達式的組合,最終知足過濾條件的數據會被選擇出來。而且 WHERE
能夠結合IN
,NOT IN
聯合使用,具體以下:
使用 IN
在customer_tab
查詢客戶id爲c_001
和c_003
的客戶信息,以下:
SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN ('c_001', 'c_003');複製代碼
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_003 | JinCheng | from HeBei |
使用 IN
和 子查詢 在customer_tab
查詢已經下過訂單的客戶信息,以下:
SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN (SELECT c_id FROM order_tab);複製代碼
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
如上介紹IN是關係代數中的Intersection, NOT IN是關係代數的Difference, 以下圖示意:
GROUP BY 是對數據進行分組的操做,好比我須要分別計算一下一個學生表裏面女生和男生的人數分別是多少,以下:
將order_tab信息按customer_tab分組統計訂單數量,簡單示例以下:
SELECT c_id, count(o_id) as o_count FROM order_tab GROUP BY c_id;複製代碼
c_id | o_count |
---|---|
c_001 | 2 |
c_002 | 1 |
在實際的業務場景中,GROUP BY除了按業務字段進行分組外,不少時候用戶也能夠用時間來進行分組(至關於劃分窗口),好比統計每分鐘的訂單數量:
按時間進行分組,查詢每分鐘的訂單數量,以下:
SELECT SUBSTRING(o_time, 1, 16) AS o_time_min, count(o_id) AS o_count FROM order_tab GROUP BY SUBSTRING(o_time, 1, 16)複製代碼
o_time_min | o_count |
---|---|
2018-11-05 10:01 | 2 |
2018-11-05 10:03 | 1 |
說明:若是咱們時間字段是timestamp類型,建議使用內置的 DATE_FORMAT
函數。
UNION ALL 將兩個表合併起來,要求兩個表的字段徹底一致,包括字段類型、字段順序,語義對應關係代數的Union,只是關係代數是Set集合操做,會有去重複操做,UNION ALL 不進行去重,以下所示:
咱們簡單的將customer_tab
查詢2次,將查詢結果合併起來,以下:
SELECT c_id, c_name, c_desc FROM customer_tab
UNION ALL
SELECT c_id, c_name, c_desc FROM customer_tab複製代碼
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
UNION ALL 對結果數據不進行去重,若是想對結果數據進行去重,傳統數據庫須要進行UNION操做。
UNION 將兩個流給合併起來,要求兩個流的字段徹底一致,包括字段類型、字段順序,並其UNION 不一樣於UNION ALL,UNION會對結果數據去重,與關係代數的Union語義一致,以下:
咱們簡單的將customer_tab
查詢2次,將查詢結果合併起來,以下:
SELECT c_id, c_name, c_desc FROM customer_tab
UNION
SELECT c_id, c_name, c_desc FROM customer_tab複製代碼
咱們發現徹底同樣的表數據進行 UNION
以後,數據是被去重的,UNION
以後的數據並無增長。
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
UNION 對結果數據進行去重,在實際的實現過程須要對數據進行排序操做,因此非必要去重狀況請使用UNION ALL操做。
JOIN 用於把來自兩個表的行聯合起來造成一個寬表,Apache Flink支持的JOIN類型:
JOIN與關係代數的Join語義相同,具體以下:
INNER JOIN
只選擇知足ON
條件的記錄,咱們查詢customer_tab
和 order_tab
表,將有訂單的客戶和訂單信息選擇出來,以下:
SELECT * FROM customer_tab AS c JOIN order_tab AS o ON o.c_id = c.c_id複製代碼
c_id | c_name | c_desc | o_id | c_id | o_time | o_desc |
---|---|---|---|---|---|---|
c_001 | Kevin | from JinLin | o_002 | c_001 | 2018-11-05 10:01:55 | ipad |
c_001 | Kevin | from JinLin | o_003 | c_001 | 2018-11-05 10:03:44 | flink book |
c_002 | Sunny | from JinLin | o_oo1 | c_002 | 2018-11-05 10:01:01 | iphone |
LEFT JOIN
與INNER JOIN
的區別是當右表沒有與左邊相JOIN的數據時候,右邊對應的字段補NULL
輸出,語義以下:
對應的SQL語句以下(LEFT JOIN):
SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ; 複製代碼
咱們查詢customer_tab
和 order_tab
表,將客戶和訂單信息選擇出來以下:
SELECT * FROM customer_tab AS c LEFT JOIN order_tab AS o ON o.c_id = c.c_id複製代碼
c_id | c_name | c_desc | o_id | c_id | o_time | o_desc |
---|---|---|---|---|---|---|
c_001 | Kevin | from JinLin | o_002 | c_001 | 2018-11-05 10:01:55 | ipad |
c_001 | Kevin | from JinLin | o_003 | c_001 | 2018-11-05 10:03:44 | flink book |
c_002 | Sunny | from JinLin | o_oo1 | c_002 | 2018-11-05 10:01:01 | iphone |
c_003 | JinCheng | from HeBei | NULL | NULL | NULL | NULL |
RIGHT JOIN
至關於 LEFT JOIN
左右兩個表交互一下位置。FULL JOIN
至關於 RIGHT JOIN
和 LEFT JOIN
以後進行UNION ALL
操做。
在Apache Flink中有2種類型的Window,一種是OverWindow,即傳統數據庫的標準開窗,每個元素都對應一個窗口。一種是GroupWindow,目前在SQL中GroupWindow都是基於時間進行窗口劃分的。
Apache Flink中對OVER Window的定義遵循標準SQL的定義語法。
按ROWS和RANGE分類是傳統數據庫的標準分類方法,在Apache Flink中還能夠根據時間類型(ProcTime/EventTime)和窗口的有限和無限(Bounded/UnBounded)進行分類,共計8種類型。爲了不你們對過細分類形成困擾,咱們按照肯定當前行的不一樣方式將OVER Window分紅兩大類進行介紹,以下:
Bounded ROWS OVER Window 每一行元素都視爲新的計算行,即,每一行都是一個新的窗口。
咱們以3個元素(2 PRECEDING)的窗口爲例,以下圖:
上圖所示窗口 user 1 的 w5和w6, user 2的 窗口 w2 和 w3,雖然有元素都是同一時刻到達,可是他們仍然是在不一樣的窗口,這一點有別於RANGE OVER Window。
Bounded ROWS OVER Window 語法以下:
SELECT
agg1(col1) OVER(
[PARTITION BY (value_expression1,..., value_expressionN)]
ORDER BY timeCol
ROWS
BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1複製代碼
利用item_tab
測試數據,咱們統計同類商品中當前和當前商品以前2個商品中的最高價格。
SELECT
itemID,
itemType,
onSellTime,
price,
MAX(price) OVER (
PARTITION BY itemType
ORDER BY onSellTime
ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxPrice
FROM item_tab複製代碼
itemID | itemType | onSellTime | price | maxPrice |
---|---|---|---|---|
ITEM001 | Electronic | 2017-11-11 10:01:00 | 20 | 20 |
ITEM002 | Electronic | 2017-11-11 10:02:00 | 50 | 50 |
ITEM003 | Electronic | 2017-11-11 10:03:00 | 30 | 50 |
ITEM004 | Electronic | 2017-11-11 10:03:00 | 60 | 60 |
ITEM005 | Electronic | 2017-11-11 10:05:00 | 40 | 60 |
ITEM006 | Electronic | 2017-11-11 10:06:00 | 20 | 60 |
ITEM007 | Electronic | 2017-11-11 10:07:00 | 70 | 70 |
ITEM008 | Clothes | 2017-11-11 10:08:00 | 20 | 20 |
Bounded RANGE OVER Window 具備相同時間值的全部元素行視爲同一計算行,即,具備相同時間值的全部行都是同一個窗口。
咱們以3秒中數據(INTERVAL '2' SECOND)的窗口爲例,以下圖:
注意: 上圖所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一時刻到達,他們是在同一個窗口,這一點有別於ROWS OVER Window。
Bounded RANGE OVER Window的語法以下:
SELECT
agg1(col1) OVER(
[PARTITION BY (value_expression1,..., value_expressionN)]
ORDER BY timeCol
RANGE
BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1複製代碼
咱們統計同類商品中當前和當前商品以前2分鐘商品中的最高價格。
SELECT
itemID,
itemType,
onSellTime,
price,
MAX(price) OVER (
PARTITION BY itemType
ORDER BY rowtime
RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxPrice
FROM item_tab複製代碼
itemID | itemType | onSellTime | price | maxPrice |
---|---|---|---|---|
ITEM001 | Electronic | 2017-11-11 10:01:00 | 20 | 20 |
ITEM002 | Electronic | 2017-11-11 10:02:00 | 50 | 50 |
ITEM003 | Electronic |
2017-11-11 10:03:00
|
30 | 60 |
ITEM004 | Electronic |
2017-11-11 10:03:00
|
60 | 60 |
ITEM005 | Electronic | 2017-11-11 10:05:00 | 40 | 60 |
ITEM006 | Electronic | 2017-11-11 10:06:00 | 20 | 40 |
ITEM007 | Electronic | 2017-11-11 10:07:00 | 70 | 70 |
ITEM008 | Clothes | 2017-11-11 10:08:00 | 20 | 20 |
OverWindow最重要是要理解每一行數據都肯定一個窗口,同時目前在Apache Flink中只支持按時間字段排序。而且OverWindow開窗與GroupBy方式數據分組最大的不一樣在於,GroupBy數據分組統計時候,在SELECT
中除了GROUP BY的key,不能直接選擇其餘非key的字段,可是OverWindow沒有這個限制,SELECT
能夠選擇任何字段。好比一張表table(a,b,c,d)4個字段,若是按d分組求c的最大值,兩種寫完以下:
SELECT d, MAX(c) FROM table GROUP BY d
SELECT a, b, c, d, MAX(c) OVER(PARTITION BY d, ORDER BY ProcTime())
根據窗口數據劃分的不一樣,目前Apache Flink有以下3種Bounded Winodw:
說明: Aapche Flink 還支持UnBounded的 Group Window,也就是全局Window,流上全部數據都在一個窗口裏面,語義很是簡單,這裏不作詳細介紹了。
Tumble 滾動窗口有固定size,窗口數據不重疊,具體語義以下:
Tumble 滾動窗口對應的語法以下:
SELECT
[gk],
[TUMBLE_START(timeCol, size)],
[TUMBLE_END(timeCol, size)],
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)複製代碼
利用pageAccess_tab
測試數據,咱們須要按不一樣地域統計每2分鐘的淘寶首頁的訪問量(PV)。
SELECT
region,
TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS winStart,
TUMBLE_END(rowtime, INTERVAL '2' MINUTE) AS winEnd,
COUNT(region) AS pv
FROM pageAccess_tab
GROUP BY region, TUMBLE(rowtime, INTERVAL '2' MINUTE)複製代碼
region | winStart | winEnd | pv |
---|---|---|---|
BeiJing | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
BeiJing | 2017-11-11 02:10:00.0 | 2017-11-11 02:12:00.0 | 2 |
ShangHai | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
ShangHai | 2017-11-11 04:10:00.0 | 2017-11-11 04:12:00.0 | 1 |
Hop 滑動窗口和滾動窗口相似,窗口有固定的size,與滾動窗口不一樣的是滑動窗口能夠經過slide參數控制滑動窗口的新建頻率。所以當slide值小於窗口size的值的時候多個滑動窗口會重疊。
Hop 滑動窗口語義以下所示:
Hop 滑動窗口對應語法以下:
SELECT
[gk],
[HOP_START(timeCol, slide, size)] ,
[HOP_END(timeCol, slide, size)],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)複製代碼
利用pageAccessCount_tab
測試數據,咱們須要每5分鐘統計近10分鐘的頁面訪問量(PV).
SELECT
HOP_START(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winStart,
HOP_END(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winEnd,
SUM(accessCount) AS accessCount
FROM pageAccessCount_tab
GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)複製代碼
winStart | winEnd | accessCount |
---|---|---|
2017-11-11 01:55:00.0 | 2017-11-11 02:05:00.0 | 186 |
2017-11-11 02:00:00.0 | 2017-11-11 02:10:00.0 | 396 |
2017-11-11 02:05:00.0 | 2017-11-11 02:15:00.0 | 243 |
2017-11-11 02:10:00.0 | 2017-11-11 02:20:00.0 | 33 |
2017-11-11 04:05:00.0 | 2017-11-11 04:15:00.0 | 129 |
2017-11-11 04:10:00.0 | 2017-11-11 04:20:00.0 | 129 |
Seeeion 會話窗口 是沒有固定大小的窗口,經過session的活躍度分組元素。不一樣於滾動窗口和滑動窗口,會話窗口不重疊,也沒有固定的起止時間。一個會話窗口在一段時間內沒有接收到元素時,即當出現非活躍間隙時關閉。一個會話窗口 分配器經過配置session gap來指定非活躍週期的時長.
Session 會話窗口語義以下所示:
Seeeion 會話窗口對應語法以下:
SELECT
[gk],
SESSION_START(timeCol, gap) AS winStart,
SESSION_END(timeCol, gap) AS winEnd,
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)複製代碼
利用pageAccessSession_tab
測試數據,咱們按地域統計連續的兩個訪問用戶之間的訪問時間間隔不超過3分鐘的的頁面訪問量(PV).
SELECT
region,
SESSION_START(rowtime, INTERVAL '3' MINUTE) AS winStart,
SESSION_END(rowtime, INTERVAL '3' MINUTE) AS winEnd,
COUNT(region) AS pv
FROM pageAccessSession_tab
GROUP BY region, SESSION(rowtime, INTERVAL '3' MINUTE)複製代碼
region | winStart | winEnd | pv |
---|---|---|---|
BeiJing | 2017-11-11 02:10:00.0 | 2017-11-11 02:13:00.0 | 1 |
ShangHai | 2017-11-11 02:01:00.0 | 2017-11-11 02:08:00.0 | 4 |
ShangHai | 2017-11-11 02:10:00.0 | 2017-11-11 02:14:00.0 | 2 |
ShangHai | 2017-11-11 04:16:00.0 | 2017-11-11 04:19:00.0 | 1 |
Apache Flink 除了提供了大部分ANSI-SQL的核心算子,也爲用戶提供了本身編寫業務代碼的機會,那就是User-Defined Function,目前支持以下三種 User-Defined Function:
UDX都是用戶自定義的函數,那麼Apache Flink框架爲啥將自定義的函數分紅三類呢?是根據什麼劃分的呢?Apache Flink對自定義函數進行分類的依據是根據函數語義的不一樣,函數的輸入和輸出不一樣來分類的,具體以下:
UDX | INPUT | OUTPUT | INPUT:OUTPUT |
---|---|---|---|
UDF | 單行中的N(N>=0)列 | 單行中的1列 | 1:1 |
UDTF | 單行中的N(N>=0)列 | M(M>=0)行 | 1:N(N>=0) |
UDAF | M(M>=0)行中的每行的N(N>=0)列 | 單行中的1列 | M:1(M>=0) |
ScalarFunction#eval()
方法便可,簡單實現以下:object MyConnect extends ScalarFunction {
@varargs
def eval(args: String*): String = {
val sb = new StringBuilder
var i = 0
while (i < args.length) {
if (args(i) == null) {
return null
}
sb.append(args(i))
i += 1
}
sb.toString
}
}複製代碼
...
val fun = MyConnect
tEnv.registerFunction("myConnect", fun)
val sql = "SELECT myConnect(a, b) as str FROM tab"
...複製代碼
TableFunction#eval()
方法便可,簡單實現以下:ScalarFunction#eval()`
class MySplit extends TableFunction[String] {
def eval(str: String): Unit = {
if (str.contains("#")){
str.split("#").foreach(collect)
}
}
def eval(str: String, prefix: String): Unit = {
if (str.contains("#")) {
str.split("#").foreach(s => collect(prefix + s))
}
}
}複製代碼
...
val fun = new MySplit()
tEnv.registerFunction("mySplit", fun)
val sql = "SELECT c, s FROM MyTable, LATERAL TABLE(mySplit(c)) AS T(s)"
...複製代碼
/** The initial accumulator for count aggregate function */
class CountAccumulator extends JTuple1[Long] {
f0 = 0L //count
}
/**
* User-defined count aggregate function
*/
class MyCount
extends AggregateFunction[JLong, CountAccumulator] {
// process argument is optimized by Calcite.
// For instance count(42) or count(*) will be optimized to count().
def accumulate(acc: CountAccumulator): Unit = {
acc.f0 += 1L
}
// process argument is optimized by Calcite.
// For instance count(42) or count(*) will be optimized to count().
def retract(acc: CountAccumulator): Unit = {
acc.f0 -= 1L
}
def accumulate(acc: CountAccumulator, value: Any): Unit = {
if (value != null) {
acc.f0 += 1L
}
}
def retract(acc: CountAccumulator, value: Any): Unit = {
if (value != null) {
acc.f0 -= 1L
}
}
override def getValue(acc: CountAccumulator): JLong = {
acc.f0
}
def merge(acc: CountAccumulator, its: JIterable[CountAccumulator]): Unit = {
val iter = its.iterator()
while (iter.hasNext) {
acc.f0 += iter.next().f0
}
}
override def createAccumulator(): CountAccumulator = {
new CountAccumulator
}
def resetAccumulator(acc: CountAccumulator): Unit = {
acc.f0 = 0L
}
override def getAccumulatorType: TypeInformation[CountAccumulator] = {
new TupleTypeInfo(classOf[CountAccumulator], BasicTypeInfo.LONG_TYPE_INFO)
}
override def getResultType: TypeInformation[JLong] =
BasicTypeInfo.LONG_TYPE_INFO
}複製代碼
...
val fun = new MyCount()
tEnv.registerFunction("myCount", fun)
val sql = "SELECT myCount(c) FROM MyTable GROUP BY a"
...複製代碼
上面咱們介紹了Apache Flink SQL核心算子的語法及語義,這部分將選取Bounded EventTime Tumble Window爲例爲你們編寫一個完整的包括Source和Sink定義的Apache Flink SQL Job。假設有一張淘寶頁面訪問表(PageAccess_tab),有地域,用戶ID和訪問時間。咱們須要按不一樣地域統計每2分鐘的淘寶首頁的訪問量(PV). 具體數據以下:
region | userId | accessTime |
---|---|---|
ShangHai | U0010 | 2017-11-11 10:01:00 |
BeiJing | U1001 | 2017-11-11 10:01:00 |
BeiJing | U2032 | 2017-11-11 10:10:00 |
BeiJing | U1100 | 2017-11-11 10:11:00 |
ShangHai | U0011 | 2017-11-11 12:10:00 |
自定義Apache Flink Stream Source須要實現StreamTableSource
, StreamTableSource
中經過StreamExecutionEnvironment
的addSource
方法獲取DataStream
, 因此咱們須要自定義一個 SourceFunction
, 而且要支持產生WaterMark,也就是要實現DefinedRowtimeAttributes
接口。
支持接收攜帶EventTime的數據集合,Either的數據結構,Right表示WaterMark和Left表示數據:
class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]])
extends SourceFunction[T] {
override def run(ctx: SourceContext[T]): Unit = {
dataWithTimestampList.foreach {
case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
case Right(w) => ctx.emitWatermark(new Watermark(w))
}
}
override def cancel(): Unit = ???
}複製代碼
咱們自定義的Source要攜帶咱們測試的數據,以及對應的WaterMark數據,具體以下:
class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
val fieldNames = Array("accessTime", "region", "userId")
val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
val rowType = new RowTypeInfo(
Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
fieldNames)
// 頁面訪問表數據 rows with timestamps and watermarks
val data = Seq(
Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
Right(1510365660000L),
Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
Right(1510365660000L),
Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
Right(1510366200000L),
Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
Right(1510366260000L),
Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
Right(1510373400000L)
)
override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
Collections.singletonList(new RowtimeAttributeDescriptor(
"accessTime",
new ExistingField("accessTime"),
PreserveWatermarks.INSTANCE))
}
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)
}
override def getReturnType: TypeInformation[Row] = rowType
override def getTableSchema: TableSchema = schema
}複製代碼
咱們簡單的將計算結果寫入到Apache Flink內置支持的CSVSink中,定義Sink以下:
def getCsvTableSink: TableSink[Row] = {
val tempFile = File.createTempFile("csv_sink_", "tem")
// 打印sink的文件路徑,方便咱們查看運行結果
println("Sink path : " + tempFile)
if (tempFile.exists()) {
tempFile.delete()
}
new CsvTableSink(tempFile.getAbsolutePath).configure(
Array[String]("region", "winStart", "winEnd", "pv"),
Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
}複製代碼
主程序包括執行環境的定義,Source/Sink的註冊以及統計查SQL的執行,具體以下:
def main(args: Array[String]): Unit = {
// Streaming 環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// 設置EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//方便咱們查出輸出數據
env.setParallelism(1)
val sourceTableName = "mySource"
// 建立自定義source數據結構
val tableSource = new MyTableSource
val sinkTableName = "csvSink"
// 建立CSV sink 數據結構
val tableSink = getCsvTableSink
// 註冊source
tEnv.registerTableSource(sourceTableName, tableSource)
// 註冊sink
tEnv.registerTableSink(sinkTableName, tableSink)
val sql =
"SELECT " +
" region, " +
" TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
" TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
" FROM mySource " +
" GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"
tEnv.sqlQuery(sql).insertInto(sinkTableName);
env.execute()
}複製代碼
執行主程序後咱們會在控制檯獲得Sink的文件路徑,以下:
Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem複製代碼
Cat 方式查看計算結果,以下:
jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1複製代碼
表格化如上結果:
region | winStart | winEnd | pv |
---|---|---|---|
BeiJing | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
BeiJing | 2017-11-11 02:10:00.0 | 2017-11-11 02:12:00.0 | 2 |
ShangHai | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
ShangHai | 2017-11-11 04:10:00.0 | 2017-11-11 04:12:00.0 | 1 |
上面這個端到端的完整示例也能夠應用到本篇前面介紹的其餘算子示例中,只是你們根據Source和Sink的Schema不一樣來進行相應的構建便可!
本篇概要的向你們介紹了SQL的由來,Apache Flink SQL 大部分核心功能,並附帶了具體的測試數據和測試程序,最後以一個End-to-End的示例展現瞭如何編寫Apache Flink SQL的Job收尾。本篇着重向你們介紹Apache Flink SQL的使用,後續咱們再繼續探究每一個算子的實現原理。