Flink Join

一.簡介

Flink DataStream API中內置有兩個能夠根據實際條件對數據流進行Join算子:基於間隔的Join和基於窗口的Join。java

語義注意事項apache

  • 建立兩個流元素的成對組合的行爲相似內鏈接,若是來自一個流的元素與另外一個流沒有相對應要鏈接的元素,則不會發出該元素。
  • 結合在一塊兒的那些元素將其時間戳設置爲位於各自窗口中最大時間戳。例如:以[5,10]爲邊界的窗口將產生鏈接的元素的時間戳爲9。

二.窗口Join

2.1 翻滾窗口(Tumbling Window Join)

執行滾動窗口鏈接(Tumbling Window Join)時,具備公共Key和公共Tumbling Window的全部元素都以成對組合形式進行鏈接,並傳遞給JoinFunction或FlatJoinFunction。由於這就像一個內鏈接,在滾動窗口中沒有來自另外一個流的元素的流的元素不會被輸出。api

圖片

如圖所示,咱們定義了一個大小爲2毫秒的滾動窗口,其結果爲[0,1],[2,3], …。該圖像顯示了每一個窗口中全部元素的成對組合,這些元素將傳遞給JoinFunction。注意,在翻滾窗口[6,7]中沒有發出任何內容,由於在綠色流中沒有元素與橙色元素⑥、⑦鏈接。微信

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply {  (e1, e2) => e1 + "," + e2 }

2.2 滑動窗口Join(Sliding Window Join)

在執行滑動窗口鏈接(Sliding Window Join)時,具備公共Key和公共滑動窗口(Sliding Window )的全部元素都做爲成對組合進行鏈接,並傳遞給JoinFunction或FlatJoinFunction。當前滑動窗口中沒有來自另外一個流的元素的流的元素不會被髮出。app

注意,有些元素可能會在一個滑動窗口中鏈接,但不會在另外一個窗口中鏈接!socket

圖片

在本例中,咱們使用的滑動窗口大小爲2毫秒,滑動1毫秒,滑動窗口結果[1,0],[0,1],[1,2],[二、3],… x軸如下是每一個滑動窗口的Join結果將被傳遞給JoinFunction的元素。在這裏你還能夠看到橙②與綠色③窗口Join(二、3),但不與任何窗口Join[1,2]。ide

import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply {  (e1, e2) => e1 + "," + e2 }

2.3 會話窗口Join(Session Window Join)

在執行會話窗口鏈接時,具備相同鍵的全部元素(當「組合」時知足會話條件)都以成對的組合進行鏈接,並傳遞給JoinFunction或FlatJoinFunction。再次執行內部鏈接,所以若是會話窗口只包含來自一個流的元素,則不會發出任何輸出。測試

圖片

在這裏,定義一個會話窗口鏈接,其中每一個會話被至少1ms的間隔所分割。有三個會話,在前兩個會話中,來自兩個流的鏈接元素被傳遞給JoinFunction。在第三次會話中綠色流沒有元素,因此⑧⑨不會Join。大數據

import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply {  (e1, e2) => e1 + "," + e2 }

2.4.小結

除了對窗口中兩條流進行Join,你還能夠對它們進行Cogroup,只需將算子定義開始位置的Join()改成coGroup()便可,Join和Cogroup的整體邏輯相同。spa

兩者區別:Join會爲兩側輸入中每一個事件對調用JoinFunction;而Cogroup中CoGroupFunction會以兩個輸入的元素遍歷器爲參數,只在每一個窗口中被調用一次。

三.間隔Join

interval join用一個公共Key鏈接兩個流的元素(將它們稱爲A & B),其中流B的元素的時間戳具備相對於流A中的元素的時間戳。 這也能夠更正式地表示爲b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中a和b是A和B中共享一個公鑰的元素。下界和上界均可以是負的或正的,只要下界小於或等於上界。interval鏈接目前只執行內部鏈接。

當將一對元素傳遞給ProcessJoinFunction時,它們將給兩個元素分配更大的時間戳(能夠經過ProcessJoinFunction.Context訪問)。

注意:間隔鏈接目前只支持事件時間。
圖片

在上面的示例中,咱們將「橙色」和「綠色」兩個流鏈接起來,它們的下界爲-2毫秒,上界爲+1毫秒。默認狀況下,這些是包含邊界的,可是能夠經過.lowerboundexclusive()和. upperboundexclusive()進行設置。

再用更正式的符號來表示angeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound 如三角形所示。

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] { 
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = { 
         out.collect(left + "," + right); 
        }
      });
    });

四.示例

4.1 間隔Join

package com.lm.flink.datastream.join
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/** * @Classname IntervalJoin * @Description TODO * @Date 2020/10/27 20:32 * @Created by limeng * 區間關聯當前僅支持EventTime * Interval JOIN 相對於UnBounded的雙流JOIN來講是Bounded JOIN。就是每條流的每一條數據會與另外一條流上的不一樣時間區域的數據進行JOIN。 */
object IntervalJoin { 
  def main(args: Array[String]): Unit = { 
    //設置至少一次或僅此一次語義
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //設置至少一次或僅此一次語義
    env.enableCheckpointing(20000,CheckpointingMode.EXACTLY_ONCE)
    //設置
    env.getCheckpointConfig
      .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //設置重啓策略
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,50000))
    env.setParallelism(1)
    val dataStream1 = env.socketTextStream("localhost",9999)
    val dataStream2 = env.socketTextStream("localhost",9998)
    import org.apache.flink.api.scala._
    val dataStreamMap1 = dataStream1.map(f=>{ 
      val tokens = f.split(",")
      StockTransaction(tokens(0),tokens(1),tokens(2).toDouble)
    }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockTransaction]{ 
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = { 
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockTransaction, previousElementTimestamp: Long): Long = { 
        val timestamp  = element.txTime.toLong
        currentTimestamp = Math.max(timestamp,currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })

    val dataStreamMap2 = dataStream2.map(f=>{ 
      val tokens = f.split(",")
      StockSnapshot(tokens(0),tokens(1),tokens(2).toDouble)
    }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockSnapshot]{ 
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = { 
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockSnapshot, previousElementTimestamp: Long): Long = { 
        val timestamp  = element.mdTime.toLong
        currentTimestamp = Math.max(timestamp,currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })
    dataStreamMap1.print("dataStreamMap1")
    dataStreamMap2.print("dataStreamMap2")
    dataStreamMap1.keyBy(_.txCode)
      .intervalJoin(dataStreamMap2.keyBy(_.mdCode))
      .between(Time.minutes(-10),Time.seconds(0))
      .process(new ProcessJoinFunction[StockTransaction,StockSnapshot,String] { 
        override def processElement(left: StockTransaction, right: StockSnapshot, ctx: ProcessJoinFunction[StockTransaction, StockSnapshot, String]#Context, out: Collector[String]): Unit = { 
          out.collect(left.toString +" =Interval Join=> "+right.toString)
        }
      }).print()

    env.execute("IntervalJoin")
  }
  case class StockTransaction(txTime:String,txCode:String,txValue:Double) extends Serializable{ 
    override def toString: String = txTime +"#"+txCode+"#"+txValue
  }
  case class StockSnapshot(mdTime:String,mdCode:String,mdValue:Double) extends Serializable { 
    override def toString: String = mdTime +"#"+mdCode+"#"+mdValue
  }
}

結果

get timestamp is 1603708942 currentMaxTimestamp 1603708942
dataStreamMap1> 1603708942#000001#10.4
get timestamp is 1603708942 currentMaxTimestamp 1603708942
dataStreamMap2> 1603708942#000001#10.4
1603708942#000001#10.4 =Interval Join=> 1603708942#000001#10.4

4.2 窗口Join

package com.lm.flink.datastream.join
import java.lang
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/** * @Classname InnerLeftRightJoinTest * @Description TODO * @Date 2020/10/26 17:22 * @Created by limeng * window join */
object InnerLeftRightJoinTest { 
  def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //每9秒發出一個watermark
    env.setParallelism(1)
    env.getConfig.setAutoWatermarkInterval(9000)

    val dataStream1 = env.socketTextStream("localhost", 9999)
    val dataStream2 = env.socketTextStream("localhost", 9998)

    /** * operator操做 * 數據格式: * tx: 2020/10/26 18:42:22,000002,10.2 * md: 2020/10/26 18:42:22,000002,10.2 * * 這裏因爲是測試,固水位線採用升序(即數據的Event Time 自己是升序輸入) */
    import org.apache.flink.api.scala._
    val dataStreamMap1 = dataStream1
      .map(f => { 
        val tokens = f.split(",")
        StockTransaction(tokens(0), tokens(1), tokens(2).toDouble)
      }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockTransaction] { 
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = { 
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockTransaction, previousElementTimestamp: Long): Long = { 
        val timestamp = element.txTime.toLong
        currentTimestamp = Math.max(timestamp, currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })

    val dataStreamMap2 = dataStream2
      .map(f => { 
        val tokens = f.split(",")
        StockSnapshot(tokens(0), tokens(1), tokens(2).toDouble)
      }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockSnapshot] { 
      var currentTimestamp = 0L
      val maxOutOfOrderness = 1000L
      override def getCurrentWatermark: Watermark = { 
        val tmpTimestamp = currentTimestamp - maxOutOfOrderness
        println(s"wall clock is ${System.currentTimeMillis()} new watermark ${tmpTimestamp}")
        new Watermark(tmpTimestamp)
      }
      override def extractTimestamp(element: StockSnapshot, previousElementTimestamp: Long): Long = { 
        val timestamp = element.mdTime.toLong
        currentTimestamp = Math.max(timestamp, currentTimestamp)
        println(s"get timestamp is $timestamp currentMaxTimestamp $currentTimestamp")
        currentTimestamp
      }
    })

    dataStreamMap1.print("dataStreamMap1")
    dataStreamMap2.print("dataStreamMap2")

    /** * Join操做 * 限定範圍是3秒鐘的Event Time窗口 */
    val joinedStream = dataStreamMap1.coGroup(dataStreamMap2)
      .where(_.txCode)
      .equalTo(_.mdCode)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    val innerJoinedStream = joinedStream.apply(new InnerJoinFunction)
    val leftJoinedStream = joinedStream.apply(new LeftJoinFunction)
    val rightJoinedStream = joinedStream.apply(new RightJoinFunction)
    innerJoinedStream.name("InnerJoinedStream").print()
    leftJoinedStream.name("LeftJoinedStream").print()
    rightJoinedStream.name("RightJoinedStream").print()
    env.execute("InnerLeftRightJoinTest")
  }

  class InnerJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] { 
    override def coGroup(first: lang.Iterable[StockTransaction], second: lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = { 
      import scala.collection.JavaConverters._
      val scalaT1 = first.asScala.toList
      val scalaT2 = second.asScala.toList

      println(scalaT1.size)
      println(scalaT2.size)
      /** * Inner join 要比較的是同一個key下,同一個時間窗口內 */
      if (scalaT1.nonEmpty && scalaT2.nonEmpty) { 
        for (transaction <- scalaT1) { 
          for (snapshot <- scalaT2) { 
            out.collect(transaction.txCode, transaction.txTime, snapshot.mdTime, transaction.txValue, snapshot.mdValue, "Inner Join Test")
          }
        }
      }
    }
  }
  class LeftJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] { 
    override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = { 
      /** * 將Java中的Iterable對象轉換爲Scala的Iterable * scala的集合操做效率高,簡潔 */
      import scala.collection.JavaConverters._
      val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList
      /** * Left Join要比較的是同一個key下,同一個時間窗口內的數據 */
      if (scalaT1.nonEmpty && scalaT2.isEmpty) { 
        for (transaction <- scalaT1) { 
          out.collect(transaction.txCode, transaction.txTime, "", transaction.txValue, 0, "Left Join Test")
        }
      }
    }
  }
  class RightJoinFunction extends CoGroupFunction[StockTransaction, StockSnapshot, (String, String, String, Double, Double, String)] { 
    override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double, String)]): Unit = { 
      /** * 將Java中的Iterable對象轉換爲Scala的Iterable * scala的集合操做效率高,簡潔 */
      import scala.collection.JavaConverters._
      val scalaT1 = T1.asScala.toList
      val scalaT2 = T2.asScala.toList
      /** * Right Join要比較的是同一個key下,同一個時間窗口內的數據 */
      if (scalaT1.isEmpty && scalaT2.nonEmpty) { 
        for (snapshot <- scalaT2) { 
          out.collect(snapshot.mdCode, "", snapshot.mdTime, 0, snapshot.mdValue, "Right Join Test")
        }
      }
    }
  }

  case class StockTransaction(txTime: String, txCode: String, txValue: Double)
  case class StockSnapshot(mdTime: String, mdCode: String, mdValue: Double)
}

參考

https://www.jianshu.com/p/ba19e4d1d802

公衆號

在這裏插入圖片描述 名稱:大數據計算 微信號:bigdata_limeng

相關文章
相關標籤/搜索