Apache-Flink深度解析-TableAPI

您可能感興趣的文章合集:java

什麼是Table API

在《SQL概覽》中咱們概要的向你們介紹了什麼是好SQL,SQL和Table API是Apache Flink中的同一層次的API抽象,以下圖所示

Apache Flink 針對不一樣的用戶場景提供了三層用戶API,最下層ProcessFunction API能夠對State,Timer等複雜機制進行有效的控制,但用戶使用的便捷性很弱,也就是說即便很簡單統計邏輯,也要較多的代碼開發。第二層DataStream API對窗口,聚合等算子進行了封裝,用戶的便捷性有所加強。最上層是SQL/Table API,Table API是Apache Flink中的聲明式,可被查詢優化器優化的高級分析API。

Table API的特色

Table API和SQL都是Apache Flink中最高層的分析API,SQL所具有的特色Table API也都具備,以下:

  • 聲明式 - 用戶只關心作什麼,不用關心怎麼作;

  • 高性能 - 支持查詢優化,能夠獲取最好的執行性能;

  • 流批統一 - 相同的統計邏輯,既能夠流模式運行,也能夠批模式運行;

  • 標準穩定 - 語義遵循SQL標準,語法語義明確,不易變更。

固然除了SQL的特性,由於Table API是在Flink中專門設計的,因此Table API還具備自身的特色:

  • 表達方式的擴展性 - 在Flink中能夠爲Table API開發不少便捷性功能,如:Row.flatten(), map/flatMap 等

  • 功能的擴展性 - 在Flink中能夠爲Table API擴展更多的功能,如:Iteration,flatAggregate 等新功能

  • 編譯檢查 - Table API支持java和scala語言開發,支持IDE中進行編譯檢查。

說明:上面說的map/flatMap/flatAggregate都是Apache Flink 社區 FLIP-29 中規劃的新功能。

HelloWorld

在介紹Table API全部算子以前咱們先編寫一個簡單的HelloWorld來直觀瞭解如何進行Table API的開發。

Maven 依賴

在pom文件中增長以下配置,本篇以flink-1.7.0功能爲準進行後續介紹。

<properties>
    <table.version>1.7.0</table.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

  </dependencies>複製代碼

程序結構

在編寫第一Flink Table API job以前咱們先簡單瞭解一下Flink Table API job的結構,以下圖所示

  1. 外部數據源,好比Kafka, Rabbitmq, CSV 等等;

  2. 查詢計算邏輯,好比最簡單的數據導入select,雙流Join,Window Aggregate 等;

  3. 外部結果存儲,好比Kafka,Cassandra,CSV等。


說明:1和3 在Apache Flink中統稱爲Connector。

主程序

咱們以一個統計單詞數量的業務場景,編寫第一個HelloWorld程序。
根據上面Flink job基本結構介紹,要Table API完成WordCount的計算需求,咱們須要完成三部分代碼:

  • TableSoruce Code - 用於建立數據源的代碼

  • Table API Query - 用於進行word count統計的Table API 查詢邏輯

  • TableSink Code - 用於保存word count計算結果的結果表代碼

運行模式選擇

一個job咱們要選擇是Stream方式運行仍是Batch模式運行,因此任何統計job的第一步是進行運行模式選擇,以下咱們選擇Stream方式運行。

// Stream運行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)複製代碼

構建測試Source

咱們用最簡單的構建Source方式進行本次測試,代碼以下:

// 測試數據
val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
// 最簡單的獲取Source方式
val source = env.fromCollection(data).toTable(tEnv, 'word)複製代碼

WordCount 統計邏輯

WordCount核心統計邏輯就是按照單詞分組,而後計算每一個單詞的數量,統計邏輯以下:

// 單詞統計核心邏輯
 val result = source
   .groupBy('word) // 單詞分組 .select('word, 'word.count) // 單詞統計複製代碼

定義Sink

將WordCount的統計結果寫入Sink中,代碼以下:

// 自定義Sink
val sink = new RetractSink // 自定義Sink(下面有完整代碼)
// 計算結果寫入sink
result.toRetractStream[(String, Long)].addSink(sink)複製代碼

完整的HelloWord代碼

爲了方便你們運行WordCount查詢統計,將完整的代碼分享你們(基於flink-1.7.0),以下:

import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

import scala.collection.mutable

object HelloWord {

  def main(args: Array[String]): Unit = {
    // 測試數據
    val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")

    // Stream運行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    // 最簡單的獲取Source方式
    val source = env.fromCollection(data).toTable(tEnv, 'word) // 單詞統計核心邏輯 val result = source .groupBy('word) // 單詞分組
      .select('word, 'word.count) // 單詞統計

    // 自定義Sink
    val sink = new RetractSink
    // 計算結果寫入sink
    result.toRetractStream[(String, Long)].addSink(sink)

    env.execute
  }
}

class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] {
  private var resultSet: mutable.Set[(String, Long)] = _

  override def open(parameters: Configuration): Unit = {
    // 初始化內存存儲結構
    resultSet = new mutable.HashSet[(String, Long)]
  }

  override def invoke(v: (Boolean, (String, Long)), context: SinkFunction.Context[_]): Unit = {
    if (v._1) {
      // 計算數據
      resultSet.add(v._2)
    }
    else {
      // 撤回數據
      resultSet.remove(v._2)
    }
  }

  override def close(): Unit = {
    // 打印寫入sink的結果數據
    resultSet.foreach(println)
  }
}複製代碼

運行結果以下:

雖然上面用了較長的紙墨介紹簡單的WordCount統計邏輯,但source和sink部分都是能夠在學習後面算子中被複用的。本例核心的統計邏輯只有一行代碼:
source.groupBy('word).select('word, 'word.count)
因此Table API開發技術任務很是的簡潔高效。

Table API 算子

雖然Table API與SQL的算子語義一致,但在表達方式上面SQL以文本的方式展示,Table API是以java或者scala語言的方式進行開發。爲了你們方便閱讀,即使是在《Apache Flink 漫談系列(08) - SQL概覽》中介紹過的算子,在這裏也會再次進行介紹,固然對於Table API和SQL不一樣的地方會進行詳盡介紹。

示例數據及測試類

測試數據

  • customer_tab 表 - 客戶表保存客戶id,客戶姓名和客戶描述信息。字段及測試數據以下:

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei
  • order_tab 表 - 訂單表保存客戶購買的訂單信息,包括訂單id,訂單時間和訂單描述信息。 字段節測試數據以下:

o_id c_id o_time o_desc
o_oo1 c_002 2018-11-05 10:01:01 iphone
o_002 c_001 2018-11-05 10:01:55 ipad
o_003 c_001 2018-11-05 10:03:44 flink book
  • Item_tab
    商品表, 攜帶商品id,商品類型,出售時間,價格等信息,具體以下:

itemID itemType onSellTime price
ITEM001 Electronic 2017-11-11 10:01:00 20
ITEM002 Electronic 2017-11-11 10:02:00 50
ITEM003 Electronic 2017-11-11 10:03:00 30
ITEM004 Electronic 2017-11-11 10:03:00 60
ITEM005 Electronic 2017-11-11 10:05:00 40
ITEM006 Electronic 2017-11-11 10:06:00 20
ITEM007 Electronic 2017-11-11 10:07:00 70
ITEM008 Clothes 2017-11-11 10:08:00 20
  • PageAccess_tab
    頁面訪問表,包含用戶ID,訪問時間,用戶所在地域信息,具體數據以下:

region userId accessTime
ShangHai U0010 2017-11-11 10:01:00
BeiJing U1001 2017-11-11 10:01:00
BeiJing U2032 2017-11-11 10:10:00
BeiJing U1100 2017-11-11 10:11:00
ShangHai U0011 2017-11-11 12:10:00
  • PageAccessCount_tab
    頁面訪問表,訪問量,訪問時間,用戶所在地域信息,具體數據以下:

region userCount accessTime
ShangHai 100 2017.11.11 10:01:00
BeiJing 86 2017.11.11 10:01:00
BeiJing 210 2017.11.11 10:06:00
BeiJing 33 2017.11.11 10:10:00
ShangHai 129 2017.11.11 12:10:00
  • PageAccessSession_tab
    頁面訪問表,訪問量,訪問時間,用戶所在地域信息,具體數據以下:

region userId accessTime
ShangHai U0011 2017-11-11 10:01:00
ShangHai U0012 2017-11-11 10:02:00
ShangHai U0013 2017-11-11 10:03:00
ShangHai U0015 2017-11-11 10:05:00
ShangHai U0011 2017-11-11 10:10:00
BeiJing U0110 2017-11-11 10:10:00
ShangHai U2010 2017-11-11 10:11:00
ShangHai U0410 2017-11-11 12:16:00

測試類

咱們建立一個TableAPIOverviewITCase.scala 用於接下來介紹Flink Table API算子的功能體驗。代碼以下:

import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.junit.rules.TemporaryFolder
import org.junit.{Rule, Test}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class Table APIOverviewITCase {

  // 客戶表數據
  val customer_data = new mutable.MutableList[(String, String, String)]
  customer_data.+=(("c_001", "Kevin", "from JinLin"))
  customer_data.+=(("c_002", "Sunny", "from JinLin"))
  customer_data.+=(("c_003", "JinCheng", "from HeBei"))


  // 訂單表數據
  val order_data = new mutable.MutableList[(String, String, String, String)]
  order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
  order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
  order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))

  // 商品銷售表數據
  val item_data = Seq(
    Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
    Right((1510365660000L)),
    Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
    Right((1510365720000L)),
    Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
    Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
    Right((1510365780000L)),
    Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
    Right((1510365900000L)),
    Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
    Right((1510365960000L)),
    Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
    Right((1510366020000L)),
    Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
    Right((151036608000L)))

  // 頁面訪問表數據
  val pageAccess_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
    Right((1510365660000L)),
    Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
    Right((1510365660000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
    Right((1510366200000L)),
    Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
    Right((1510366260000L)),
    Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
    Right((1510373400000L)))

  // 頁面訪問量表數據2
  val pageAccessCount_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
    Right((1510365660000L)),
    Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
    Right((1510365660000L)),
    Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
    Right((1510366200000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
    Right((1510366200000L)),
    Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
    Right((1510373400000L)))

  // 頁面訪問表數據3
  val pageAccessSession_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
    Right((1510365660000L)),
    Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
    Right((1510365720000L)),
    Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
    Right((1510365720000L)),
    Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
    Right((1510365900000L)),
    Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
    Right((1510366200000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
    Right((1510366200000L)),
    Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
    Right((1510366260000L)),
    Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
    Right((1510373760000L)))

  val _tempFolder = new TemporaryFolder

  // Streaming 環境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = TableEnvironment.getTableEnvironment(env)
  env.setParallelism(1)
  env.setStateBackend(getStateBackend)

  def getProcTimeTables(): (Table, Table) = {
    // 將order_tab, customer_tab 註冊到catalog
    val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc) val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc) (customer, order) } def getEventTimeTables(): (Table, Table, Table, Table) = { env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 將item_tab, pageAccess_tab 註冊到catalog val item = env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data)) .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)

    val pageAccess =
      env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
      .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)

    val pageAccessCount =
      env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
      .toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)

    val pageAccessSession =
      env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
      .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)

    (item, pageAccess, pageAccessCount, pageAccessSession)
  }


  @Rule
  def tempFolder: TemporaryFolder = _tempFolder

  def getStateBackend: StateBackend = {
    new MemoryStateBackend()
  }

  def procTimePrint(result: Table): Unit = {
    val sink = new RetractingSink
    result.toRetractStream[Row].addSink(sink)
    env.execute()
  }

  def rowTimePrint(result: Table): Unit = {
    val sink = new RetractingSink
    result.toRetractStream[Row].addSink(sink)
    env.execute()
  }

  @Test
  def testProc(): Unit = {
    val (customer, order) = getProcTimeTables()
    val result = ...// 測試的查詢邏輯
    procTimePrint(result)
  }

  @Test
  def testEvent(): Unit = {
    val (item, pageAccess, pageAccessCount, pageAccessSession) = getEventTimeTables()
    val result = ...// 測試的查詢邏輯
    procTimePrint(result)
  }

}

// 自定義Sink
final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
  var retractedResults: ArrayBuffer[String] = null


  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    retractedResults = mutable.ArrayBuffer.empty[String]
  }

  def invoke(v: (Boolean, Row)) {
    retractedResults.synchronized {
      val value = v._2.toString
      if (v._1) {
        retractedResults += value
      } else {
        val idx = retractedResults.indexOf(value)
        if (idx >= 0) {
          retractedResults.remove(idx)
        } else {
          throw new RuntimeException("Tried to retract a value that wasn't added first. " +
                                       "This is probably an incorrectly implemented test. " +
                                       "Try to set the parallelism of the sink to 1.")
        }
      }
    }

  }

  override def close(): Unit = {
    super.close()
    retractedResults.sorted.foreach(println(_))
  }
}

// Water mark 生成器
class EventTimeSourceFunction[T](
  dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }

  override def cancel(): Unit = ???
}複製代碼

SELECT

SELECT 用於從數據集/流中選擇數據,語義是關係代數中的投影(Projection),對關係進行垂直分割,消去或增長某些列, 以下圖所示

Table API 示例

customer_tab選擇用戶姓名,並用內置的CONCAT函數拼接客戶信息,以下:

val result = customer
  .select('c_name, concat_ws('c_name, " come ", 'c_desc))複製代碼

Result

c_name desc
Kevin Kevin come from JinLin
Sunny Sunny come from JinLin
Jincheng Jincheng come from HeBei

特別說明

你們看到在 SELECT 不只可使用普通的字段選擇,還可使用ScalarFunction,固然也包括User-Defined Function,同時還能夠進行字段的alias設置。其實SELECT能夠結合聚合,在GROUPBY部分會進行介紹,一個比較特殊的使用場景是去重的場景,示例以下:

Table API示例

在訂單表查詢全部的客戶id,消除重複客戶id, 以下:

val result = order
  .groupBy('c_id) .select('c_id)複製代碼

Result

c_id
c_001
c_002

WHERE

WHERE 用於從數據集/流中過濾數據,與SELECT一塊兒使用,語義是關係代數的Selection,根據某些條件對關係作水平分割,即選擇符合條件的記錄,以下所示:

Table API 示例

customer_tab查詢客戶id爲c_001c_003的客戶信息,以下:

val result = customer
   .where("c_id = 'c_001' || c_id = 'c_003'")
   .select( 'c_id, 'c_name, 'c_desc)複製代碼

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_003 JinCheng from HeBei

特別說明

咱們發現WHERE是對知足必定條件的數據進行過濾,WHERE支持=, <, >, <>, >=, <=以及&&||等表達式的組合,最終知足過濾條件的數據會被選擇出來。 SQL中的INNOT IN在Table API裏面用intersectminus描述(flink-1.7.0版本)。

Intersect 示例

Intersect只在Batch模式下進行支持,Stream模式下咱們能夠利用雙流JOIN來實現,如:在customer_tab查詢已經下過訂單的客戶信息,以下:

// 計算客戶id,並去重
 val distinct_cids = order
   .groupBy('c_id) // 去重 .select('c_id as 'o_c_id) val result = customer .join(distinct_cids, 'c_id === 'o_c_id) .select('c_id, 'c_name, 'c_desc)複製代碼

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin

Minus 示例

Minus只在Batch模式下進行支持,Stream模式下咱們能夠利用雙流JOIN來實現,如:在customer_tab查詢沒有下過訂單的客戶信息,以下:

// 查詢下過訂單的客戶id,並去重
 val distinct_cids = order
   .groupBy('c_id) .select('c_id as 'o_c_id) // 查詢沒有下過訂單的客戶信息 val result = customer .leftOuterJoin(distinct_cids, 'c_id === 'o_c_id) .where('o_c_id isNull)
  .select('c_id, 'c_name, 'c_desc) 複製代碼

說明上面實現邏輯比較複雜,咱們後續考慮如何在流上支持更簡潔的方式。

Result

c_id c_name c_desc
c_003 JinCheng from HeBei

Intersect/Minus與關係代數

如上介紹Intersect是關係代數中的Intersection, Minus是關係代數的Difference, 以下圖示意:

  • Intersect(Intersection)

  • Minus(Difference)

GROUP BY

GROUP BY 是對數據進行分組的操做,好比我須要分別計算一下一個學生表裏面女生和男生的人數分別是多少,以下:

Table API 示例

將order_tab信息按c_id分組統計訂單數量,簡單示例以下:

val result = order
  .groupBy('c_id) .select('c_id, 'o_id.count)複製代碼

Result

c_id o_count
c_001 2
c_002 1

特別說明

在實際的業務場景中,GROUP BY除了按業務字段進行分組外,不少時候用戶也能夠用時間來進行分組(至關於劃分窗口),好比統計每分鐘的訂單數量:

Table API 示例

按時間進行分組,查詢每分鐘的訂單數量,以下:

val result = order
  .select('o_id, 'c_id, 'o_time.substring(1, 16) as 'o_time_min)
  .groupBy('o_time_min) .select('o_time_min, 'o_id.count)複製代碼

Result

o_time_min o_count
2018-11-05 10:01 2
2018-11-05 10:03 1

說明:若是咱們時間字段是timestamp類型,建議使用內置的 DATE_FORMAT 函數。

UNION ALL

UNION ALL 將兩個表合併起來,要求兩個表的字段徹底一致,包括字段類型、字段順序,語義對應關係代數的Union,只是關係代數是Set集合操做,會有去重複操做,UNION ALL 不進行去重,以下所示:

Table API 示例

咱們簡單的將customer_tab查詢2次,將查詢結果合併起來,以下:

val result = customer.unionAll(customer)複製代碼

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei

特別說明

UNION ALL 對結果數據不進行去重,若是想對結果數據進行去重,傳統數據庫須要進行UNION操做。

UNION

UNION 將兩個流給合併起來,要求兩個流的字段徹底一致,包括字段類型、字段順序,並其UNION 不一樣於UNION ALL,UNION會對結果數據去重,與關係代數的Union語義一致,以下:

Table API 示例

咱們簡單的將customer_tab查詢2次,將查詢結果合併起來,以下:

val result = customer.union(customer)複製代碼

咱們發現徹底同樣的表數據進行 UNION以後,數據是被去重的,UNION以後的數據並無增長。

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei

特別說明

UNION 對結果數據進行去重,在實際的實現過程須要對數據進行排序操做,因此非必要去重狀況請使用UNION ALL操做。

JOIN

JOIN 用於把來自兩個表的行聯合起來造成一個寬表,Apache Flink支持的JOIN類型:

  • JOIN - INNER JOIN

  • LEFT JOIN - LEFT OUTER JOIN

  • RIGHT JOIN - RIGHT OUTER JOIN

  • FULL JOIN - FULL OUTER JOIN
    JOIN與關係代數的Join語義相同,具體以下:

Table API 示例 (JOIN)

INNER JOIN只選擇知足ON條件的記錄,咱們查詢customer_taborder_tab表,將有訂單的客戶和訂單信息選擇出來,以下:

val result = customer
  .join(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)複製代碼

Result

c_id c_name c_desc o_id o_c_id o_time o_desc
c_001 Kevin from JinLin o_002 c_001 2018-11-05 10:01:55 ipad
c_001 Kevin from JinLin o_003 c_001 2018-11-05 10:03:44 flink book
c_002 Sunny from JinLin o_oo1 c_002 2018-11-05 10:01:01 iphone

Table API 示例 (LEFT JOIN)

LEFT JOININNER JOIN的區別是當右表沒有與左邊相JOIN的數據時候,右邊對應的字段補NULL輸出,語義以下:

對應的SQL語句以下(LEFT JOIN):

SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ; 複製代碼
  • 細心的讀者可能發現上面T2.ColC是添加了前綴T2了,這裏須要說明一下,當兩張表有字段名字同樣的時候,我須要指定是從那個表裏面投影的。

咱們查詢customer_taborder_tab表,將客戶和訂單信息選擇出來以下:

val result = customer
   .leftOuterJoin(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)複製代碼

Result

c_id c_name c_desc o_id c_id o_time o_desc
c_001 Kevin from JinLin o_002 c_001 2018-11-05 10:01:55 ipad
c_001 Kevin from JinLin o_003 c_001 2018-11-05 10:03:44 flink book
c_002 Sunny from JinLin o_oo1 c_002 2018-11-05 10:01:01 iphone
c_003 JinCheng from HeBei NULL NULL NULL NULL

特別說明

RIGHT JOIN 至關於 LEFT JOIN 左右兩個表交互一下位置。FULL JOIN至關於 RIGHT JOINLEFT JOIN 以後進行UNION ALL操做。

Time-Interval JOIN

Time-Interval JOIN 相對於UnBounded的雙流JOIN來講是Bounded JOIN。就是每條流的每一條數據會與另外一條流上的不一樣時間區域的數據進行JOIN。對應Apache Flink官方文檔的 Time-windowed JOIN(release-1.7以前都叫Time-Windowed JOIN)。 Time-Interval JOIN的語義和實現原理詳見《Apache Flink 漫談系列(12) - Time Interval(Time-windowed) JOIN》。其Table API核心的語法示例,以下:

...
val result = left
  .join(right)
  // 定義Time Interval
  .where('a === 'd && 'c >= 'f - 5.seconds && 'c < 'f + 6.seconds)
  ...複製代碼

Lateral JOIN

Apache Flink Lateral JOIN 是左邊Table與一個UDTF進行JOIN,詳細的語義和實現原理請參考《Apache Flink 漫談系列(10) - JOIN LATERAL》。其Table API核心的語法示例,以下:

...
val udtf = new UDTF
val result = source.join(udtf('c) as ('d, 'e)) ...複製代碼

Temporal Table JOIN

Temporal Table JOIN 是左邊表與右邊一個攜帶版本信息的表進行JOIN,詳細的語法,語義和實現原理詳見《Apache Flink 漫談系列(11) - Temporal Table JOIN》,其Table API核心的語法示例,以下:

...
val rates = tEnv.scan("versonedTable").createTemporalTableFunction('rowtime, 'r_currency)
val result = left.join(rates('o_rowtime), 'r_currency === 'o_currency) ...複製代碼

Window

在Apache Flink中有2種類型的Window,一種是OverWindow,即傳統數據庫的標準開窗,每個元素都對應一個窗口。一種是GroupWindow,目前在SQL中GroupWindow都是基於時間進行窗口劃分的。

Over Window

Apache Flink中對OVER Window的定義遵循標準SQL的定義語法。
按ROWS和RANGE分類是傳統數據庫的標準分類方法,在Apache Flink中還能夠根據時間類型(ProcTime/EventTime)和窗口的有限和無限(Bounded/UnBounded)進行分類,共計8種類型。爲了不你們對過細分類形成困擾,咱們按照肯定當前行的不一樣方式將OVER Window分紅兩大類進行介紹,以下:

  • ROWS OVER Window - 每一行元素都視爲新的計算行,即,每一行都是一個新的窗口。

  • RANGE OVER Window - 具備相同時間值的全部元素行視爲同一計算行,即,具備相同時間值的全部行都是同一個窗口。

Bounded ROWS OVER Window

Bounded ROWS OVER Window 每一行元素都視爲新的計算行,即,每一行都是一個新的窗口。

語義

咱們以3個元素(2 PRECEDING)的窗口爲例,以下圖:

上圖所示窗口 user 1 的 w5和w6, user 2的 窗口 w2 和 w3,雖然有元素都是同一時刻到達,可是他們仍然是在不一樣的窗口,這一點有別於RANGE OVER Window。

Table API 示例

利用item_tab測試數據,咱們統計同類商品中當前和當前商品以前2個商品中的最高價格。

val result = item
  .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)複製代碼

Result

itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic 2017-11-11 10:03:00 30 50
ITEM004 Electronic 2017-11-11 10:03:00 60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 60
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

Bounded RANGE OVER Window

Bounded RANGE OVER Window 具備相同時間值的全部元素行視爲同一計算行,即,具備相同時間值的全部行都是同一個窗口。

語義

咱們以3秒中數據(INTERVAL '2' SECOND)的窗口爲例,以下圖:

注意: 上圖所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一時刻到達,他們是在同一個窗口,這一點有別於ROWS OVER Window。

Tabel API 示例

咱們統計同類商品中當前和當前商品以前2分鐘商品中的最高價格。

val result = item
   .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.minute following CURRENT_RANGE as 'w) .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)複製代碼
Result(Bounded RANGE OVER Window)
itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic 2017-11-11 10:03:00 30 60
ITEM004 Electronic 2017-11-11 10:03:00 60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 40
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

特別說明

OverWindow最重要是要理解每一行數據都肯定一個窗口,同時目前在Apache Flink中只支持按時間字段排序。而且OverWindow開窗與GroupBy方式數據分組最大的不一樣在於,GroupBy數據分組統計時候,在SELECT中除了GROUP BY的key,不能直接選擇其餘非key的字段,可是OverWindow沒有這個限制,SELECT能夠選擇任何字段。好比一張表table(a,b,c,d)4個字段,若是按d分組求c的最大值,兩種寫完以下:

  • GROUP BY - tab.groupBy('d).select(d, MAX(c))

  • OVER Window = tab.window(Over.. as 'w).select('a, 'b, 'c, 'd, c.max over 'w)
    如上 OVER Window 雖然PARTITION BY d,但SELECT 中仍然能夠選擇 a,b,c字段。但在GROUPBY中,SELECT 只能選擇 d 字段。

Group Window

根據窗口數據劃分的不一樣,目前Apache Flink有以下3種Bounded Winodw:

  • Tumble - 滾動窗口,窗口數據有固定的大小,窗口數據無疊加;

  • Hop - 滑動窗口,窗口數據有固定大小,而且有固定的窗口重建頻率,窗口數據有疊加;

  • Session - 會話窗口,窗口數據沒有固定的大小,根據窗口數據活躍程度劃分窗口,窗口數據無疊加。

說明: Aapche Flink 還支持UnBounded的 Group Window,也就是全局Window,流上全部數據都在一個窗口裏面,語義很是簡單,這裏不作詳細介紹了。

Tumble

語義

Tumble 滾動窗口有固定size,窗口數據不重疊,具體語義以下:

Table API 示例

利用pageAccess_tab測試數據,咱們須要按不一樣地域統計每2分鐘的淘寶首頁的訪問量(PV)。

val result = pageAccess
   .window(Tumble over 2.minute on 'rowtime as 'w)
   .groupBy('w, 'region)
   .select('region, 'w.start, 'w.end, 'region.count as 'pv)複製代碼
Result
region winStart winEnd pv
BeiJing 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:12:00.0 2
ShangHai 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
ShangHai 2017-11-11 04:10:00.0 2017-11-11 04:12:00.0 1

Hop

Hop 滑動窗口和滾動窗口相似,窗口有固定的size,與滾動窗口不一樣的是滑動窗口能夠經過slide參數控制滑動窗口的新建頻率。所以當slide值小於窗口size的值的時候多個滑動窗口會重疊。

語義

Hop 滑動窗口語義以下所示:

Table API 示例

利用pageAccessCount_tab測試數據,咱們須要每5分鐘統計近10分鐘的頁面訪問量(PV).

val result = pageAccessCount
  .window(Slide over 10.minute every 5.minute on 'rowtime as 'w)
  .groupBy('w) .select('w.start, 'w.end, 'accessCount.sum as 'accessCount)複製代碼
Result
winStart winEnd accessCount
2017-11-11 01:55:00.0 2017-11-11 02:05:00.0 186
2017-11-11 02:00:00.0 2017-11-11 02:10:00.0 396
2017-11-11 02:05:00.0 2017-11-11 02:15:00.0 243
2017-11-11 02:10:00.0 2017-11-11 02:20:00.0 33
2017-11-11 04:05:00.0 2017-11-11 04:15:00.0 129
2017-11-11 04:10:00.0 2017-11-11 04:20:00.0 129

Session

Seeeion 會話窗口 是沒有固定大小的窗口,經過session的活躍度分組元素。不一樣於滾動窗口和滑動窗口,會話窗口不重疊,也沒有固定的起止時間。一個會話窗口在一段時間內沒有接收到元素時,即當出現非活躍間隙時關閉。一個會話窗口 分配器經過配置session gap來指定非活躍週期的時長.

語義

Session 會話窗口語義以下所示:

val result = pageAccessSession
  .window(Session withGap 3.minute on 'rowtime as 'w)
  .groupBy('w, 'region)
  .select('region, 'w.start, 'w.end, 'region.count as 'pv) 複製代碼
Result
region winStart winEnd pv
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:13:00.0 1
ShangHai 2017-11-11 02:01:00.0 2017-11-11 02:08:00.0 4
ShangHai 2017-11-11 02:10:00.0 2017-11-11 02:14:00.0 2
ShangHai 2017-11-11 04:16:00.0 2017-11-11 04:19:00.0 1

嵌套Window

在Window以後再進行Window劃分也是比較常見的統計需求,那麼在一個Event-Time的Window以後,如何再寫一個Event-Time的Window呢?一個Window以後再描述一個Event-Time的Window最重要的是Event-time屬性的傳遞,在Table API中咱們能夠利用'w.rowtime來傳遞時間屬性,好比:Tumble Window以後再接一個Session Window 示例以下:

... 
 val result = pageAccess
   .window(Tumble over 2.minute on 'rowtime as 'w1)
   .groupBy('w1) .select('w1.rowtime as 'rowtime, 'col1.count as 'cnt) .window(Session withGap 3.minute on 'rowtime as 'w2) .groupBy('w2)
   .select('cnt.sum) ...複製代碼

Source&Sink

上面咱們介紹了Apache Flink Table API核心算子的語義和具體示例,這部分將選取Bounded EventTime Tumble Window爲例爲你們編寫一個完整的包括Source和Sink定義的Apache Flink Table API Job。假設有一張淘寶頁面訪問表(PageAccess_tab),有地域,用戶ID和訪問時間。咱們須要按不一樣地域統計每2分鐘的淘寶首頁的訪問量(PV)。具體數據以下:

region userId accessTime
ShangHai U0010 2017-11-11 10:01:00
BeiJing U1001 2017-11-11 10:01:00
BeiJing U2032 2017-11-11 10:10:00
BeiJing U1100 2017-11-11 10:11:00
ShangHai U0011 2017-11-11 12:10:00

Source 定義

自定義Apache Flink Stream Source須要實現StreamTableSource, StreamTableSource中經過StreamExecutionEnvironmentaddSource方法獲取DataStream, 因此咱們須要自定義一個 SourceFunction, 而且要支持產生WaterMark,也就是要實現DefinedRowtimeAttributes接口。

Source Function定義

支持接收攜帶EventTime的數據集合,Either的數據結構,Right表示WaterMark和Left表示數據:

class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) 
  extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }
  override def cancel(): Unit = ???
}複製代碼

定義 StreamTableSource

咱們自定義的Source要攜帶咱們測試的數據,以及對應的WaterMark數據,具體以下:

class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

  val fieldNames = Array("accessTime", "region", "userId")
  val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
  val rowType = new RowTypeInfo(
    Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
    fieldNames)

  // 頁面訪問表數據 rows with timestamps and watermarks
  val data = Seq(
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
    Right(1510365660000L),
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
    Right(1510365660000L),
    Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
    Right(1510366200000L),
    Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
    Right(1510366260000L),
    Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
    Right(1510373400000L)
  )

  override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
    Collections.singletonList(new RowtimeAttributeDescriptor(
      "accessTime",
      new ExistingField("accessTime"),
      PreserveWatermarks.INSTANCE))
  }

  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
     execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1)
  }

  override def getReturnType: TypeInformation[Row] = rowType

  override def getTableSchema: TableSchema = schema

}複製代碼

Sink 定義

咱們簡單的將計算結果寫入到Apache Flink內置支持的CSVSink中,定義Sink以下:

def getCsvTableSink: TableSink[Row] = {
    val tempFile = File.createTempFile("csv_sink_", "tem")
    // 打印sink的文件路徑,方便咱們查看運行結果
    println("Sink path : " + tempFile)
    if (tempFile.exists()) {
      tempFile.delete()
    }
    new CsvTableSink(tempFile.getAbsolutePath).configure(
      Array[String]("region", "winStart", "winEnd", "pv"),
      Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
  }複製代碼

構建主程序

主程序包括執行環境的定義,Source/Sink的註冊以及統計查SQL的執行,具體以下:

def main(args: Array[String]): Unit = {
    // Streaming 環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 設置EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //方便咱們查出輸出數據
    env.setParallelism(1)

    val sourceTableName = "mySource"
    // 建立自定義source數據結構
    val tableSource = new MyTableSource

    val sinkTableName = "csvSink"
    // 建立CSV sink 數據結構
    val tableSink = getCsvTableSink

    // 註冊source
    tEnv.registerTableSource(sourceTableName, tableSource)
    // 註冊sink
    tEnv.registerTableSink(sinkTableName, tableSink)

    val result = tEnv.scan(sourceTableName)
      .window(Tumble over 2.minute on 'accessTime as 'w)
      .groupBy('w, 'region)
      .select('region, 'w.start, 'w.end, 'region.count as 'pv) result.insertInto(sinkTableName) env.execute() } 複製代碼

執行並查看運行結果

執行主程序後咱們會在控制檯獲得Sink的文件路徑,以下:

Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
複製代碼

Cat 方式查看計算結果,以下:

jinchengsunjcdeMacBook-Pro:FlinkTable APIDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1複製代碼

表格化如上結果:

region winStart winEnd pv
BeiJing 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:12:00.0 2
ShangHai 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
ShangHai 2017-11-11 04:10:00.0 2017-11-11 04:12:00.0 1

上面這個端到端的完整示例也能夠應用到本篇前面介紹的其餘算子示例中,只是你們根據Source和Sink的Schema不一樣來進行相應的構建便可!

小結

本篇首先向你們介紹了什麼是Table API, Table API的核心特色,而後以此介紹Table API的核心算子功能,並附帶了具體的測試數據和測試程序,最後以一個End-to-End的示例展現瞭如何編寫Apache Flink Table API的Job收尾。但願對你們學習Apache Flink Table API 過程當中有所幫助。

相關文章
相關標籤/搜索