Apache-Flink深度解析-SQL概覽

SQL簡述

SQL是Structured Query Language的縮寫,最初是由美國計算機科學家Donald D. Chamberlin和Raymond F. Boyce在20世紀70年代早期從 Early History of SQL 中瞭解關係模型後在IBM開發的。該版本最初稱爲[SEQUEL: A Structured English Query Language](結構化英語查詢語言),旨在操縱和檢索存儲在IBM原始準關係數據庫管理系統System R中的數據。SEQUEL後來改成SQL,由於「SEQUEL」是英國Hawker Siddeley飛機公司的商標。咱們看看這款用於特技飛行的英國皇家空軍豪客Siddeley Hawk T.1A (Looks great):




第一款SQL數據庫

在20世紀70年代後期,Oracle公司(當時叫 Relational Software,Inc.)開發了基於SQL的RDBMS,並但願將其出售給美國海軍,Central Intelligence代理商和其餘美國政府機構。 1979年6月,Oracle 公司爲VAX計算機推出了第一個商業化的SQL實現,即Oracle V2。

ANSI-SQL標準的採用

直到1986年,ANSI和ISO標準組正式採用了標準的"數據庫語言SQL"語言定義。該標準的新版本發佈於1989,1992,1996,1999,2003,2006,2008,2011,以及最近的2016。Apache Flink SQL 核心算子的語義設計也參考了19922011等ANSI-SQL標準。

SQL操做及擴展

SQL是專爲查詢包含在關係數據庫中的數據而設計的,是一種基於SET操做的聲明性編程語言,而不是像C語言同樣的命令式編程語言。可是,各大關係數據庫廠商在遵循ANSI-SQL標準的同時又對標準SQL進行擴展,由基於SET(無重複元素)的操做擴展到基於BAG(有重複元素)的操做,而且添加了過程編程語言功能,如:Oracle的PL/SQL, DB2的SQL PL,MySQL - SQL/PSM以及SQL Server的T-SQL等等。
隨着時間的推移ANSI-SQL規範不斷完善,所涉及的功能不斷豐富,好比在ANSI-2011中又增長了Temporal Table的標準定義,Temporal Table的標準在結構化關係數據存儲上添加了時間維度信息,這使得關係數據庫中不只能夠對當前數據進行查詢操做,根據時間版本信息也能夠對歷史數據進行操做。這些不斷豐富的功能極大加強了SQL的應用領域。

大數據計算領域對SQL的應用

離線計算(批計算)

說起大數據計算領域不得不說MapReduce計算模型,MapReduce最先是由Google公司研究提出的一種面向大規模數據處理的並行計算模型和方法,併發於2004年發表了論文Simplified Data Processing on Large Clusters
論文發表以後Apache 開源社區參考Google MapReduce,基於Java設計開發了一個稱爲Hadoop的開源MapReduce並行計算框架。很快獲得了全球學術界和工業界的廣泛關注,並獲得推廣和普及應用。
但利用Hadoop進行MapReduce的開發,須要開發人員精通Java語言,並瞭解MapReduce的運行原理,這樣在必定程度上提升了MapReduce的開發門檻,因此在開源社區又不斷涌現了一些爲了簡化MapReduce開發的開源框架,其中Hive就是典型的表明。HSQL可讓用戶以類SQL的方式描述MapReduce計算,好比本來須要幾十行,甚至上百行才能完成的wordCount,用戶一條SQL語句就能完成了,這樣極大的下降了MapReduce的開發門檻,進而也成功的將SQL應用到了大數據計算領域當中來。

實時計算(流計算)

SQL不只僅被成功的應用到了離線計算,SQL的易用性也吸引了流計算產品,目前最熱的Spark,Flink也紛紛支持了SQL,尤爲是Flink支持的更加完全,集成了Calcite,徹底遵循ANSI-SQL標準。Apache Flink在low-level API上面用DataSet支持批計算,用DataStream支持流計算,但在High-Level API上面利用SQL將流與批進行了統一,使得用戶編寫一次SQL既能夠在流計算中使用,又能夠在批計算中使用,爲既有流計算業務,又有批計算業務的用戶節省了大量開發成本。

SQL高性能與簡潔性

性能

SQL通過傳統數據庫領域幾十年的不斷打磨,查詢優化器已經可以極大的優化SQL的查詢性能,Apache Flink 應用Calcite進行查詢優化,複用了大量數據庫查詢優化規則,在性能上不斷追求極致,可以讓用戶關心但不用擔憂性能問題。以下圖(Alibaba 對 Apache Flink 進行架構優化後的組件棧)



相對於DataStream而言,SQL會通過Optimization模塊透明的爲用戶進行查詢優化,用戶專心編寫本身的業務邏輯,不用擔憂性能,卻能獲得最優的查詢性能!

簡潔

就簡潔性而言,SQL與DataSet和DataStream相比具備很大的優越性,咱們先用一個WordCount示例來直觀的查看用戶的代碼量:


  • DataStream/DataSetAPI

... //省略初始化代碼
// 核心邏輯
text.flatMap(new WordCount.Tokenizer()).keyBy(new int[]{0}).sum(1);

// flatmap 代碼定義
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public Tokenizer() {
}

public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
String[] var4 = tokens;
int var5 = tokens.length;

for(int var6 = 0; var6 < var5; ++var6) {
String token = var4[var6];
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}

}
}複製代碼


  • SQL


複製代碼
...//省略初始化代碼
SELECT word, COUNT(word) FROM tab GROUP BY word;複製代碼

咱們直觀的體會到相同的統計功能使用SQL的簡潔性。

Flink SQL Job的組成

咱們作任何數據計算都離不開讀取原始數據,計算邏輯和寫入計算結果數據三部分,固然基於Apache Flink SQL編寫的計算Job也離不開這三個部分,以下所示:



如上所示,一個完整的Apache Flink SQL Job 由以下三部分:


  • Source Operator - Soruce operator是對外部數據源的抽象, 目前Apache Flink內置了不少經常使用的數據源實現,好比上圖提到的Kafka。

  • Query Operators - 查詢算子主要完成如圖的Query Logic,目前支持了Union,Join,Projection,Difference, Intersection以及window等大多數傳統數據庫支持的操做。

  • Sink Operator - Sink operator 是對外結果表的抽象,目前Apache Flink也內置了不少經常使用的結果表的抽象,好比上圖提到的Kafka。


image

相對於DataStream而言,SQL會通過Optimization模塊透明的爲用戶進行查詢優化,用戶專心編寫本身的業務邏輯,不用擔憂性能,卻能獲得最優的查詢性能!

簡潔

就簡潔性而言,SQL與DataSet和DataStream相比具備很大的優越性,咱們先用一個WordCount示例來直觀的查看用戶的代碼量:

  • DataStream/DataSetAPI
... //省略初始化代碼
// 核心邏輯
text.flatMap(new WordCount.Tokenizer()).keyBy(new int[]{0}).sum(1);

// flatmap 代碼定義
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        public Tokenizer() {
        }

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            String[] var4 = tokens;
            int var5 = tokens.length;

            for(int var6 = 0; var6 < var5; ++var6) {
                String token = var4[var6];
                if (token.length() > 0) {
                    out.collect(new Tuple2(token, 1));
                }
            }

        }
    }複製代碼
  • SQL
...//省略初始化代碼
SELECT word, COUNT(word) FROM tab GROUP BY word;複製代碼

咱們直觀的體會到相同的統計功能使用SQL的簡潔性。

Flink SQL Job的組成

咱們作任何數據計算都離不開讀取原始數據,計算邏輯和寫入計算結果數據三部分,固然基於Apache Flink SQL編寫的計算Job也離不開這三個部分,以下所示:

image

如上所示,一個完整的Apache Flink SQL Job 由以下三部分:

  • Source Operator - Soruce operator是對外部數據源的抽象, 目前Apache Flink內置了不少經常使用的數據源實現,好比上圖提到的Kafka。
  • Query Operators - 查詢算子主要完成如圖的Query Logic,目前支持了Union,Join,Projection,Difference, Intersection以及window等大多數傳統數據庫支持的操做。
  • Sink Operator - Sink operator 是對外結果表的抽象,目前Apache Flink也內置了不少經常使用的結果表的抽象,好比上圖提到的Kafka。

Flink SQL 核心算子

目前Flink SQL支持Union,Join,Projection,Difference, Intersection以及Window等大多數傳統數據庫支持的操做,接下來爲你們分別進行簡單直觀的介紹。

環境

爲了很好的體驗和理解Apache Flink SQL算子咱們須要先準備一下測試環境,咱們選擇IDEA,以ITCase測試方式來進行體驗。IDEA 安裝這裏不佔篇幅介紹了,相信你們能輕鬆搞定!咱們進行功能體驗有兩種方式,具體以下:

源碼方式

對於開源愛好者可能更喜歡源代碼方式理解和體驗Apache Flink SQL功能,那麼咱們須要下載源代碼並導入到IDEA中:

  • 下載源碼:
// 下載源代碼
git clone https://github.com/apache/flink.git study
// 進入源碼目錄
cd study
// 拉取穩定版release-1.6
git fetch origin release-1.6:release-1.6
//切換到穩定版
git checkout release-1.6
//將依賴安裝到本地mvn倉庫,耐心等待須要一段時間
mvn clean install -DskipTests複製代碼
  • 導入到IDEA
    將Flink源碼導入到IDEA過程這裏再也不佔用篇幅,導入後確保在IDEA中能夠運行 org.apache.flink.table.runtime.stream.sql.SqlITCase 並測試所有經過,即證實體驗環境已經完成。以下圖所示:

image

如上圖運行測試後顯示測試經過,咱們就能夠繼續下面的Apache Flink SQL功能體驗了。

依賴Flink包方式

咱們還有一種更簡單直接的方式,就是新建一個mvn項目,並在pom中添加以下依賴:

<properties>
    <table.version>1.6-SNAPSHOT</table.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>JUnit</groupId>
      <artifactId>JUnit</artifactId>
      <version>4.12</version>
    </dependency>

  </dependencies>複製代碼

完成環境準備後,咱們開始準備測試數據和寫一個簡單的測試類。

示例數據及測試類

測試數據

  • customer_tab 表 - 客戶表保存客戶id,客戶姓名和客戶描述信息。字段及測試數據以下:
c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei
  • order_tab 表 - 訂單表保存客戶購買的訂單信息,包括訂單id,訂單時間和訂單描述信息。 字段節測試數據以下:
o_id c_id o_time o_desc
o_oo1 c_002 2018-11-05 10:01:01 iphone
o_002 c_001 2018-11-05 10:01:55 ipad
o_003 c_001 2018-11-05 10:03:44 flink book
  • Item_tab
    商品表, 攜帶商品id,商品類型,出售時間,價格等信息,具體以下:
itemID itemType onSellTime price
ITEM001 Electronic 2017-11-11 10:01:00 20
ITEM002 Electronic 2017-11-11 10:02:00 50
ITEM003 Electronic
2017-11-11 10:03:00
30
ITEM004 Electronic
2017-11-11 10:03:00
60
ITEM005 Electronic 2017-11-11 10:05:00 40
ITEM006 Electronic 2017-11-11 10:06:00 20
ITEM007 Electronic 2017-11-11 10:07:00 70
ITEM008 Clothes 2017-11-11 10:08:00 20
  • PageAccess_tab
    頁面訪問表,包含用戶ID,訪問時間,用戶所在地域信息,具體數據以下:
region userId accessTime
ShangHai U0010 2017-11-11 10:01:00
BeiJing U1001 2017-11-11 10:01:00
BeiJing U2032 2017-11-11 10:10:00
BeiJing U1100 2017-11-11 10:11:00
ShangHai U0011 2017-11-11 12:10:00
  • PageAccessCount_tab
    頁面訪問表,訪問量,訪問時間,用戶所在地域信息,具體數據以下:
region userCount accessTime
ShangHai 100 2017.11.11 10:01:00
BeiJing 86 2017.11.11 10:01:00
BeiJing 210 2017.11.11 10:06:00
BeiJing 33 2017.11.11 10:10:00
ShangHai 129 2017.11.11 12:10:00
  • PageAccessSession_tab
    頁面訪問表,訪問量,訪問時間,用戶所在地域信息,具體數據以下:
region userId accessTime
ShangHai U0011 2017-11-11 10:01:00
ShangHai U0012 2017-11-11 10:02:00
ShangHai U0013 2017-11-11 10:03:00
ShangHai U0015 2017-11-11 10:05:00
ShangHai U0011 2017-11-11 10:10:00
BeiJing U0110 2017-11-11 10:10:00
ShangHai U2010 2017-11-11 10:11:00
ShangHai U0410 2017-11-11 12:16:00

測試類

咱們建立一個SqlOverviewITCase.scala 用於接下來介紹Flink SQL算子的功能體驗。代碼以下:

import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.junit.rules.TemporaryFolder
import org.junit.{Rule, Test}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class SqlOverviewITCase {
  val _tempFolder = new TemporaryFolder

  @Rule
  def tempFolder: TemporaryFolder = _tempFolder

  def getStateBackend: StateBackend = {
    new MemoryStateBackend()
  }

  // 客戶表數據
  val customer_data = new mutable.MutableList[(String, String, String)]
  customer_data.+=(("c_001", "Kevin", "from JinLin"))
  customer_data.+=(("c_002", "Sunny", "from JinLin"))
  customer_data.+=(("c_003", "JinCheng", "from HeBei"))


  // 訂單表數據
  val order_data = new mutable.MutableList[(String, String, String, String)]
  order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
  order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
  order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))

  // 商品銷售表數據
  val item_data = Seq(
    Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
    Right((1510365660000L)),
    Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
    Right((1510365720000L)),
    Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
    Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
    Right((1510365780000L)),
    Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
    Right((1510365900000L)),
    Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
    Right((1510365960000L)),
    Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
    Right((1510366020000L)),
    Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
    Right((151036608000L)))

  // 頁面訪問表數據
  val pageAccess_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
    Right((1510365660000L)),
    Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
    Right((1510365660000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
    Right((1510366200000L)),
    Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
    Right((1510366260000L)),
    Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
    Right((1510373400000L)))

  // 頁面訪問量表數據2
  val pageAccessCount_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
    Right((1510365660000L)),
    Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
    Right((1510365660000L)),
    Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
    Right((1510366200000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
    Right((1510366200000L)),
    Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
    Right((1510373400000L)))

  // 頁面訪問表數據3
  val pageAccessSession_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
    Right((1510365660000L)),
    Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
    Right((1510365720000L)),
    Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
    Right((1510365720000L)),
    Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
    Right((1510365900000L)),
    Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
    Right((1510366200000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
    Right((1510366200000L)),
    Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
    Right((1510366260000L)),
    Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
    Right((1510373760000L)))

  def procTimePrint(sql: String): Unit = {
    // Streaming 環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 將order_tab, customer_tab 註冊到catalog
    val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc) val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc) tEnv.registerTable("order_tab", order) tEnv.registerTable("customer_tab", customer) val result = tEnv.sqlQuery(sql).toRetractStream[Row] val sink = new RetractingSink result.addSink(sink) env.execute() } def rowTimePrint(sql: String): Unit = { // Streaming 環境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStateBackend(getStateBackend) env.setParallelism(1) val tEnv = TableEnvironment.getTableEnvironment(env) // 將item_tab, pageAccess_tab 註冊到catalog val item = env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data)) .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)

    val pageAccess =
      env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
      .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)

    val pageAccessCount =
      env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
      .toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)

    val pageAccessSession =
      env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
      .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)

    tEnv.registerTable("item_tab", item)
    tEnv.registerTable("pageAccess_tab", pageAccess)
    tEnv.registerTable("pageAccessCount_tab", pageAccessCount)
    tEnv.registerTable("pageAccessSession_tab", pageAccessSession)

    val result = tEnv.sqlQuery(sql).toRetractStream[Row]
    val sink = new RetractingSink
    result.addSink(sink)
    env.execute()

  }

  @Test
  def testSelect(): Unit = {
    val sql = "替換想要測試的SQL"
    // 非window 相關用 procTimePrint(sql)
    // Window 相關用 rowTimePrint(sql)
  }

}

// 自定義Sink
final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
  var retractedResults: ArrayBuffer[String] = mutable.ArrayBuffer.empty[String]

  def invoke(v: (Boolean, Row)) {
    retractedResults.synchronized {
      val value = v._2.toString
      if (v._1) {
        retractedResults += value
      } else {
        val idx = retractedResults.indexOf(value)
        if (idx >= 0) {
          retractedResults.remove(idx)
        } else {
          throw new RuntimeException("Tried to retract a value that wasn't added first. " +
                                       "This is probably an incorrectly implemented test. " +
                                       "Try to set the parallelism of the sink to 1.")
        }
      }
    }
    retractedResults.sorted.foreach(println(_))
  }
}

// Water mark 生成器
class EventTimeSourceFunction[T](
  dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }

  override def cancel(): Unit = ???
}複製代碼

Select

SELECT 用於從數據集/流中選擇數據,語法遵循ANSI-SQL標準,語義是關係代數中的投影(Projection),對關係進行垂直分割,消去某些列, 以下圖所示:
image

SQL 示例

customer_tab選擇用戶姓名,並用內置的CONCAT函數拼接客戶信息,以下:

SELECT c_name, CONCAT(c_name, ' come ', c_desc) as desc  FROM customer_tab;複製代碼

Result

c_name desc
Kevin Kevin come from JinLin
Sunny Sunny come from JinLin
Jincheng Jincheng come from HeBei

特別說明

你們看到在 SELECT 不只可使用普通的字段選擇,還可使用ScalarFunction,固然也包括User-Defined Function,同時還能夠進行字段的alias設置。其實SELECT能夠結合聚合,在GROUPBY部分會進行介紹,一個比較特殊的使用場景是攜帶 DISTINCT 關鍵字,示例以下:

SQL 示例

在訂單表查詢全部的客戶id,消除重複客戶id, 以下:

SELECT DISTINCT c_id FROM order_tab;複製代碼

Result

c_id
c_001
c_002

WHERE

WHERE 用於從數據集/流中過濾數據,與SELECT一塊兒使用,語法遵循ANSI-SQL標準,語義是關係代數的Selection,根據某些條件對關係作水平分割,即選擇符合條件的記錄,以下所示:
image

SQL 示例

customer_tab查詢客戶id爲c_001c_003的客戶信息,以下:

SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id = 'c_001' OR c_id = 'c_003';複製代碼

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_003 JinCheng from HeBei

特別說明

咱們發現WHERE是對知足必定條件的數據進行過濾,WHERE支持=, <, >, <>, >=, <=以及ANDOR等表達式的組合,最終知足過濾條件的數據會被選擇出來。而且 WHERE 能夠結合IN,NOT IN聯合使用,具體以下:

SQL 示例 (IN 常量)

使用 INcustomer_tab查詢客戶id爲c_001c_003的客戶信息,以下:

SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN ('c_001', 'c_003');複製代碼

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_003 JinCheng from HeBei

SQL 示例 (IN 子查詢)

使用 IN和 子查詢 在customer_tab查詢已經下過訂單的客戶信息,以下:

SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN (SELECT c_id FROM order_tab);複製代碼

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin

IN/NOT IN 與關係代數

如上介紹IN是關係代數中的Intersection, NOT IN是關係代數的Difference, 以下圖示意:

  • IN(Intersection)
    image
  • NOT IN(Difference)
    image

GROUP BY

GROUP BY 是對數據進行分組的操做,好比我須要分別計算一下一個學生表裏面女生和男生的人數分別是多少,以下:
image

SQL 示例

將order_tab信息按customer_tab分組統計訂單數量,簡單示例以下:

SELECT c_id, count(o_id) as o_count FROM order_tab GROUP BY c_id;複製代碼

Result

c_id o_count
c_001 2
c_002 1

特別說明

在實際的業務場景中,GROUP BY除了按業務字段進行分組外,不少時候用戶也能夠用時間來進行分組(至關於劃分窗口),好比統計每分鐘的訂單數量:

SQL 示例

按時間進行分組,查詢每分鐘的訂單數量,以下:

SELECT SUBSTRING(o_time, 1, 16) AS o_time_min, count(o_id) AS o_count FROM order_tab GROUP BY SUBSTRING(o_time, 1, 16)複製代碼

Result

o_time_min o_count
2018-11-05 10:01 2
2018-11-05 10:03 1

說明:若是咱們時間字段是timestamp類型,建議使用內置的 DATE_FORMAT 函數。

UNION ALL

UNION ALL 將兩個表合併起來,要求兩個表的字段徹底一致,包括字段類型、字段順序,語義對應關係代數的Union,只是關係代數是Set集合操做,會有去重複操做,UNION ALL 不進行去重,以下所示:
image

SQL 示例

咱們簡單的將customer_tab查詢2次,將查詢結果合併起來,以下:

SELECT c_id, c_name, c_desc  FROM customer_tab 
UNION ALL 
SELECT c_id, c_name, c_desc  FROM customer_tab複製代碼

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei

特別說明

UNION ALL 對結果數據不進行去重,若是想對結果數據進行去重,傳統數據庫須要進行UNION操做。

UNION

UNION 將兩個流給合併起來,要求兩個流的字段徹底一致,包括字段類型、字段順序,並其UNION 不一樣於UNION ALL,UNION會對結果數據去重,與關係代數的Union語義一致,以下:
image

SQL 示例

咱們簡單的將customer_tab查詢2次,將查詢結果合併起來,以下:

SELECT c_id, c_name, c_desc  FROM customer_tab 
UNION 
SELECT c_id, c_name, c_desc  FROM customer_tab複製代碼

咱們發現徹底同樣的表數據進行 UNION以後,數據是被去重的,UNION以後的數據並無增長。

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei

特別說明

UNION 對結果數據進行去重,在實際的實現過程須要對數據進行排序操做,因此非必要去重狀況請使用UNION ALL操做。

JOIN

JOIN 用於把來自兩個表的行聯合起來造成一個寬表,Apache Flink支持的JOIN類型:

  • JOIN - INNER JOIN
  • LEFT JOIN - LEFT OUTER JOIN
  • RIGHT JOIN - RIGHT OUTER JOIN
  • FULL JOIN - FULL OUTER JOIN

JOIN與關係代數的Join語義相同,具體以下:

image

SQL 示例 (JOIN)

INNER JOIN只選擇知足ON條件的記錄,咱們查詢customer_taborder_tab表,將有訂單的客戶和訂單信息選擇出來,以下:

SELECT * FROM customer_tab AS c JOIN order_tab AS o ON o.c_id = c.c_id複製代碼

Result

c_id c_name c_desc o_id c_id o_time o_desc
c_001 Kevin from JinLin o_002 c_001 2018-11-05 10:01:55 ipad
c_001 Kevin from JinLin o_003 c_001 2018-11-05 10:03:44 flink book
c_002 Sunny from JinLin o_oo1 c_002 2018-11-05 10:01:01 iphone

SQL 示例 (LEFT JOIN)

LEFT JOININNER JOIN的區別是當右表沒有與左邊相JOIN的數據時候,右邊對應的字段補NULL輸出,語義以下:

image

對應的SQL語句以下(LEFT JOIN):

SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ; 複製代碼
  • 細心的讀者可能發現上面T2.ColC是添加了前綴T2了,這裏須要說明一下,當兩張表有字段名字同樣的時候,我須要指定是從那個表裏面投影的。

咱們查詢customer_taborder_tab表,將客戶和訂單信息選擇出來以下:

SELECT * FROM customer_tab AS c LEFT JOIN order_tab AS o ON o.c_id = c.c_id複製代碼

Result

c_id c_name c_desc o_id c_id o_time o_desc
c_001 Kevin from JinLin o_002 c_001 2018-11-05 10:01:55 ipad
c_001 Kevin from JinLin o_003 c_001 2018-11-05 10:03:44 flink book
c_002 Sunny from JinLin o_oo1 c_002 2018-11-05 10:01:01 iphone
c_003 JinCheng from HeBei NULL NULL NULL NULL

特別說明

RIGHT JOIN 至關於 LEFT JOIN 左右兩個表交互一下位置。FULL JOIN至關於 RIGHT JOINLEFT JOIN 以後進行UNION ALL操做。

Window

在Apache Flink中有2種類型的Window,一種是OverWindow,即傳統數據庫的標準開窗,每個元素都對應一個窗口。一種是GroupWindow,目前在SQL中GroupWindow都是基於時間進行窗口劃分的。

Over Window

Apache Flink中對OVER Window的定義遵循標準SQL的定義語法。
按ROWS和RANGE分類是傳統數據庫的標準分類方法,在Apache Flink中還能夠根據時間類型(ProcTime/EventTime)和窗口的有限和無限(Bounded/UnBounded)進行分類,共計8種類型。爲了不你們對過細分類形成困擾,咱們按照肯定當前行的不一樣方式將OVER Window分紅兩大類進行介紹,以下:

  • ROWS OVER Window - 每一行元素都視爲新的計算行,即,每一行都是一個新的窗口。
  • RANGE OVER Window - 具備相同時間值的全部元素行視爲同一計算行,即,具備相同時間值的全部行都是同一個窗口。

Bounded ROWS OVER Window

Bounded ROWS OVER Window 每一行元素都視爲新的計算行,即,每一行都是一個新的窗口。

語義

咱們以3個元素(2 PRECEDING)的窗口爲例,以下圖:
image

上圖所示窗口 user 1 的 w5和w6, user 2的 窗口 w2 和 w3,雖然有元素都是同一時刻到達,可是他們仍然是在不一樣的窗口,這一點有別於RANGE OVER Window。

語法

Bounded ROWS OVER Window 語法以下:

SELECT 
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)] 
     ORDER BY timeCol
     ROWS 
     BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, 
... 
FROM Tab1複製代碼
  • value_expression - 進行分區的字表達式;
  • timeCol - 用於元素排序的時間字段;
  • rowCount - 是定義根據當前行開始向前追溯幾行元素。
SQL 示例

利用item_tab測試數據,咱們統計同類商品中當前和當前商品以前2個商品中的最高價格。

SELECT  
    itemID,
    itemType, 
    onSellTime, 
    price,  
    MAX(price) OVER (
        PARTITION BY itemType 
        ORDER BY onSellTime 
        ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxPrice
  FROM item_tab複製代碼

Result

itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic 2017-11-11 10:03:00 30 50
ITEM004 Electronic 2017-11-11 10:03:00 60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 60
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

Bounded RANGE OVER Window

Bounded RANGE OVER Window 具備相同時間值的全部元素行視爲同一計算行,即,具備相同時間值的全部行都是同一個窗口。

語義

咱們以3秒中數據(INTERVAL '2' SECOND)的窗口爲例,以下圖:
image

注意: 上圖所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一時刻到達,他們是在同一個窗口,這一點有別於ROWS OVER Window。

語法

Bounded RANGE OVER Window的語法以下:

SELECT 
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)] 
     ORDER BY timeCol
     RANGE 
     BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, 
... 
FROM Tab1複製代碼
  • value_expression - 進行分區的字表達式;
  • timeCol - 用於元素排序的時間字段;
  • timeInterval - 是定義根據當前行開始向前追溯指定時間的元素行;
SQL 示例

咱們統計同類商品中當前和當前商品以前2分鐘商品中的最高價格。

SELECT  
    itemID,
    itemType, 
    onSellTime, 
    price,  
    MAX(price) OVER (
        PARTITION BY itemType 
        ORDER BY rowtime 
        RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxPrice
  FROM item_tab複製代碼
Result(Bounded RANGE OVER Window)
itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic
2017-11-11 10:03:00
30 60
ITEM004 Electronic
2017-11-11 10:03:00
60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 40
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

特別說明

OverWindow最重要是要理解每一行數據都肯定一個窗口,同時目前在Apache Flink中只支持按時間字段排序。而且OverWindow開窗與GroupBy方式數據分組最大的不一樣在於,GroupBy數據分組統計時候,在SELECT中除了GROUP BY的key,不能直接選擇其餘非key的字段,可是OverWindow沒有這個限制,SELECT能夠選擇任何字段。好比一張表table(a,b,c,d)4個字段,若是按d分組求c的最大值,兩種寫完以下:

  • GROUP BY - SELECT d, MAX(c) FROM table GROUP BY d
  • OVER Window = SELECT a, b, c, d, MAX(c) OVER(PARTITION BY d, ORDER BY ProcTime())
    如上 OVER Window 雖然PARTITION BY d,但SELECT 中仍然能夠選擇 a,b,c字段。但在GROUPBY中,SELECT 只能選擇 d 字段。

Group Window

根據窗口數據劃分的不一樣,目前Apache Flink有以下3種Bounded Winodw:

  • Tumble - 滾動窗口,窗口數據有固定的大小,窗口數據無疊加;
  • Hop - 滑動窗口,窗口數據有固定大小,而且有固定的窗口重建頻率,窗口數據有疊加;
  • Session - 會話窗口,窗口數據沒有固定的大小,根據窗口數據活躍程度劃分窗口,窗口數據無疊加。

說明: Aapche Flink 還支持UnBounded的 Group Window,也就是全局Window,流上全部數據都在一個窗口裏面,語義很是簡單,這裏不作詳細介紹了。

Tumble

語義

Tumble 滾動窗口有固定size,窗口數據不重疊,具體語義以下:
image

語法

Tumble 滾動窗口對應的語法以下:

SELECT 
    [gk],
    [TUMBLE_START(timeCol, size)], 
    [TUMBLE_END(timeCol, size)], 
    agg1(col1), 
    ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)複製代碼
  • [gk] - 決定了流是Keyed仍是/Non-Keyed;
  • TUMBLE_START - 窗口開始時間;
  • TUMBLE_END - 窗口結束時間;
  • timeCol - 是流表中表示時間字段;
  • size - 表示窗口的大小,如 秒,分鐘,小時,天。
SQL 示例

利用pageAccess_tab測試數據,咱們須要按不一樣地域統計每2分鐘的淘寶首頁的訪問量(PV)。

SELECT  
    region,
    TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS winStart,  
    TUMBLE_END(rowtime, INTERVAL '2' MINUTE) AS winEnd,  
    COUNT(region) AS pv
FROM pageAccess_tab 
GROUP BY region, TUMBLE(rowtime, INTERVAL '2' MINUTE)複製代碼
Result
region winStart winEnd pv
BeiJing 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:12:00.0 2
ShangHai 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
ShangHai 2017-11-11 04:10:00.0 2017-11-11 04:12:00.0 1

Hop

Hop 滑動窗口和滾動窗口相似,窗口有固定的size,與滾動窗口不一樣的是滑動窗口能夠經過slide參數控制滑動窗口的新建頻率。所以當slide值小於窗口size的值的時候多個滑動窗口會重疊。

語義

Hop 滑動窗口語義以下所示:
image

語法

Hop 滑動窗口對應語法以下:

SELECT 
    [gk], 
    [HOP_START(timeCol, slide, size)] ,  
    [HOP_END(timeCol, slide, size)],
    agg1(col1), 
    ... 
    aggN(colN) 
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)複製代碼
  • [gk] 決定了流是Keyed仍是/Non-Keyed;
  • HOP_START - 窗口開始時間;
  • HOP_END - 窗口結束時間;
  • timeCol - 是流表中表示時間字段;
  • slide - 是滑動步伐的大小;
  • size - 是窗口的大小,如 秒,分鐘,小時,天;
SQL 示例

利用pageAccessCount_tab測試數據,咱們須要每5分鐘統計近10分鐘的頁面訪問量(PV).

SELECT  
  HOP_START(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winStart,  
  HOP_END(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winEnd,  
  SUM(accessCount) AS accessCount  
FROM pageAccessCount_tab 
GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)複製代碼
Result
winStart winEnd accessCount
2017-11-11 01:55:00.0 2017-11-11 02:05:00.0 186
2017-11-11 02:00:00.0 2017-11-11 02:10:00.0 396
2017-11-11 02:05:00.0 2017-11-11 02:15:00.0 243
2017-11-11 02:10:00.0 2017-11-11 02:20:00.0 33
2017-11-11 04:05:00.0 2017-11-11 04:15:00.0 129
2017-11-11 04:10:00.0 2017-11-11 04:20:00.0 129

Session

Seeeion 會話窗口 是沒有固定大小的窗口,經過session的活躍度分組元素。不一樣於滾動窗口和滑動窗口,會話窗口不重疊,也沒有固定的起止時間。一個會話窗口在一段時間內沒有接收到元素時,即當出現非活躍間隙時關閉。一個會話窗口 分配器經過配置session gap來指定非活躍週期的時長.

語義

Session 會話窗口語義以下所示:

image

語法

Seeeion 會話窗口對應語法以下:

SELECT 
    [gk], 
    SESSION_START(timeCol, gap) AS winStart,  
    SESSION_END(timeCol, gap) AS winEnd,
    agg1(col1),
     ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)複製代碼
  • [gk] 決定了流是Keyed仍是/Non-Keyed;
  • SESSION_START - 窗口開始時間;
  • SESSION_END - 窗口結束時間;
  • timeCol - 是流表中表示時間字段;
  • gap - 是窗口數據非活躍週期的時長;
SQL 示例

利用pageAccessSession_tab測試數據,咱們按地域統計連續的兩個訪問用戶之間的訪問時間間隔不超過3分鐘的的頁面訪問量(PV).

SELECT  
    region, 
    SESSION_START(rowtime, INTERVAL '3' MINUTE) AS winStart,  
    SESSION_END(rowtime, INTERVAL '3' MINUTE) AS winEnd, 
    COUNT(region) AS pv  
FROM pageAccessSession_tab
GROUP BY region, SESSION(rowtime, INTERVAL '3' MINUTE)複製代碼
Result
region winStart winEnd pv
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:13:00.0 1
ShangHai 2017-11-11 02:01:00.0 2017-11-11 02:08:00.0 4
ShangHai 2017-11-11 02:10:00.0 2017-11-11 02:14:00.0 2
ShangHai 2017-11-11 04:16:00.0 2017-11-11 04:19:00.0 1

UDX

Apache Flink 除了提供了大部分ANSI-SQL的核心算子,也爲用戶提供了本身編寫業務代碼的機會,那就是User-Defined Function,目前支持以下三種 User-Defined Function:

  • UDF - User-Defined Scalar Function
  • UDTF - User-Defined Table Function
  • UDAF - User-Defined Aggregate Funciton

UDX都是用戶自定義的函數,那麼Apache Flink框架爲啥將自定義的函數分紅三類呢?是根據什麼劃分的呢?Apache Flink對自定義函數進行分類的依據是根據函數語義的不一樣,函數的輸入和輸出不一樣來分類的,具體以下:

UDX INPUT OUTPUT INPUT:OUTPUT
UDF 單行中的N(N>=0)列 單行中的1列 1:1
UDTF 單行中的N(N>=0)列 M(M>=0)行 1:N(N>=0)
UDAF M(M>=0)行中的每行的N(N>=0)列 單行中的1列 M:1(M>=0)

UDF

  • 定義
    用戶想本身編寫一個字符串聯接的UDF,咱們只須要實現ScalarFunction#eval()方法便可,簡單實現以下:
object MyConnect extends ScalarFunction {
  @varargs
  def eval(args: String*): String = {
    val sb = new StringBuilder
    var i = 0
    while (i < args.length) {
      if (args(i) == null) {
        return null
      }
      sb.append(args(i))
      i += 1
    }
    sb.toString
  }
}複製代碼
  • 使用
...
 val fun = MyConnect
 tEnv.registerFunction("myConnect", fun)
 val sql = "SELECT myConnect(a, b) as str FROM tab"
...複製代碼

UDTF

  • 定義
    用戶想本身編寫一個字符串切分的UDTF,咱們只須要實現TableFunction#eval()方法便可,簡單實現以下:

ScalarFunction#eval()`

class MySplit extends TableFunction[String] {
  def eval(str: String): Unit = {
    if (str.contains("#")){
      str.split("#").foreach(collect)
    }
  }

  def eval(str: String, prefix: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach(s => collect(prefix + s))
    }
  }
}複製代碼
  • 使用
...
val fun = new MySplit()
tEnv.registerFunction("mySplit", fun)
val sql = "SELECT c, s FROM MyTable, LATERAL TABLE(mySplit(c)) AS T(s)"
...複製代碼

UDAF

  • 定義
    UDAF 要實現的接口比較多,咱們以一個簡單的CountAGG爲例,作簡單實現以下:
/** The initial accumulator for count aggregate function */
class CountAccumulator extends JTuple1[Long] {
  f0 = 0L //count
}

/**
  * User-defined count aggregate function
  */
class MyCount
  extends AggregateFunction[JLong, CountAccumulator] {

  // process argument is optimized by Calcite.
  // For instance count(42) or count(*) will be optimized to count().
  def accumulate(acc: CountAccumulator): Unit = {
    acc.f0 += 1L
  }

  // process argument is optimized by Calcite.
  // For instance count(42) or count(*) will be optimized to count().
  def retract(acc: CountAccumulator): Unit = {
    acc.f0 -= 1L
  }

  def accumulate(acc: CountAccumulator, value: Any): Unit = {
    if (value != null) {
      acc.f0 += 1L
    }
  }

  def retract(acc: CountAccumulator, value: Any): Unit = {
    if (value != null) {
      acc.f0 -= 1L
    }
  }

  override def getValue(acc: CountAccumulator): JLong = {
    acc.f0
  }

  def merge(acc: CountAccumulator, its: JIterable[CountAccumulator]): Unit = {
    val iter = its.iterator()
    while (iter.hasNext) {
      acc.f0 += iter.next().f0
    }
  }

  override def createAccumulator(): CountAccumulator = {
    new CountAccumulator
  }

  def resetAccumulator(acc: CountAccumulator): Unit = {
    acc.f0 = 0L
  }

  override def getAccumulatorType: TypeInformation[CountAccumulator] = {
    new TupleTypeInfo(classOf[CountAccumulator], BasicTypeInfo.LONG_TYPE_INFO)
  }

  override def getResultType: TypeInformation[JLong] =
    BasicTypeInfo.LONG_TYPE_INFO
}複製代碼
  • 使用
...
val fun = new MyCount()
tEnv.registerFunction("myCount", fun)
val sql = "SELECT myCount(c) FROM MyTable GROUP BY a"
...複製代碼

Source&Sink

上面咱們介紹了Apache Flink SQL核心算子的語法及語義,這部分將選取Bounded EventTime Tumble Window爲例爲你們編寫一個完整的包括Source和Sink定義的Apache Flink SQL Job。假設有一張淘寶頁面訪問表(PageAccess_tab),有地域,用戶ID和訪問時間。咱們須要按不一樣地域統計每2分鐘的淘寶首頁的訪問量(PV). 具體數據以下:

region userId accessTime
ShangHai U0010 2017-11-11 10:01:00
BeiJing U1001 2017-11-11 10:01:00
BeiJing U2032 2017-11-11 10:10:00
BeiJing U1100 2017-11-11 10:11:00
ShangHai U0011 2017-11-11 12:10:00

Source 定義

自定義Apache Flink Stream Source須要實現StreamTableSource, StreamTableSource中經過StreamExecutionEnvironmentaddSource方法獲取DataStream, 因此咱們須要自定義一個 SourceFunction, 而且要支持產生WaterMark,也就是要實現DefinedRowtimeAttributes接口。

Source Function定義

支持接收攜帶EventTime的數據集合,Either的數據結構,Right表示WaterMark和Left表示數據:

class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) 
  extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }
  override def cancel(): Unit = ???
}複製代碼

定義 StreamTableSource

咱們自定義的Source要攜帶咱們測試的數據,以及對應的WaterMark數據,具體以下:

class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

  val fieldNames = Array("accessTime", "region", "userId")
  val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
  val rowType = new RowTypeInfo(
    Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
    fieldNames)

  // 頁面訪問表數據 rows with timestamps and watermarks
  val data = Seq(
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
    Right(1510365660000L),
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
    Right(1510365660000L),
    Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
    Right(1510366200000L),
    Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
    Right(1510366260000L),
    Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
    Right(1510373400000L)
  )

  override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
    Collections.singletonList(new RowtimeAttributeDescriptor(
      "accessTime",
      new ExistingField("accessTime"),
      PreserveWatermarks.INSTANCE))
  }

  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)
  }

  override def getReturnType: TypeInformation[Row] = rowType

  override def getTableSchema: TableSchema = schema

}複製代碼

Sink 定義

咱們簡單的將計算結果寫入到Apache Flink內置支持的CSVSink中,定義Sink以下:

def getCsvTableSink: TableSink[Row] = {
    val tempFile = File.createTempFile("csv_sink_", "tem")
    // 打印sink的文件路徑,方便咱們查看運行結果
    println("Sink path : " + tempFile)
    if (tempFile.exists()) {
      tempFile.delete()
    }
    new CsvTableSink(tempFile.getAbsolutePath).configure(
      Array[String]("region", "winStart", "winEnd", "pv"),
      Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
  }複製代碼

構建主程序

主程序包括執行環境的定義,Source/Sink的註冊以及統計查SQL的執行,具體以下:

def main(args: Array[String]): Unit = {
    // Streaming 環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 設置EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //方便咱們查出輸出數據
    env.setParallelism(1)

    val sourceTableName = "mySource"
    // 建立自定義source數據結構
    val tableSource = new MyTableSource

    val sinkTableName = "csvSink"
    // 建立CSV sink 數據結構
    val tableSink = getCsvTableSink

    // 註冊source
    tEnv.registerTableSource(sourceTableName, tableSource)
    // 註冊sink
    tEnv.registerTableSink(sinkTableName, tableSink)

    val sql =
      "SELECT " +
      " region, " +
      " TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
      " TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
      " FROM mySource " +
      " GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"

    tEnv.sqlQuery(sql).insertInto(sinkTableName);
    env.execute()
  }複製代碼

執行並查看運行結果

執行主程序後咱們會在控制檯獲得Sink的文件路徑,以下:

Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem複製代碼

Cat 方式查看計算結果,以下:

jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1複製代碼

表格化如上結果:

region winStart winEnd pv
BeiJing 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:12:00.0 2
ShangHai 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
ShangHai 2017-11-11 04:10:00.0 2017-11-11 04:12:00.0 1

上面這個端到端的完整示例也能夠應用到本篇前面介紹的其餘算子示例中,只是你們根據Source和Sink的Schema不一樣來進行相應的構建便可!

總結

本篇概要的向你們介紹了SQL的由來,Apache Flink SQL 大部分核心功能,並附帶了具體的測試數據和測試程序,最後以一個End-to-End的示例展現瞭如何編寫Apache Flink SQL的Job收尾。本篇着重向你們介紹Apache Flink SQL的使用,後續咱們再繼續探究每一個算子的實現原理。

相關文章
相關標籤/搜索