Flink 系列博客
Flink QuickStart
Flink雙流操做
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任務提交操做
Flink配置Prometheus監控
Flink in docker 部署
Flink HA 部署
Flink 常見調優參數總結
Flink 源碼之任務提交流程分析
Flink 源碼之基本算子
Flink 源碼之Trigger
Flink 源碼之Evictorjava
簡介
Flink 雙數據流轉換爲單數據流操做的運算有cogroup
, join
和coflatmap
。下面爲你們對比介紹下這3個運算的功能和用法。docker
Join
:只輸出條件匹配的元素對。CoGroup
: 除了輸出匹配的元素對之外,未能匹配的元素也會輸出。CoFlatMap
:沒有匹配條件,不進行匹配,分別處理兩個流的元素。在此基礎上徹底能夠實現join和cogroup的功能,比他們使用上更加自由。
對於join和cogroup來講,代碼結構大體以下:session
val stream1 = ... val stream2 = ... stream1.join(stream2) .where(_._1).equalTo(_._1) //join的條件stream1中的某個字段和stream2中的字段值相等 .window(...) // 指定window,stream1和stream2中的數據會進入到該window中。只有該window中的數據纔會被後續操做join .apply((t1, t2, out: Collector[String]) => { out.collect(...) // 捕獲到匹配的數據t1和t2,在這裏能夠進行組裝等操做 }) .print()
下面咱們以實際例子來講明這些運算的功能和用法。app
Join操做
Flink中的Join操做相似於SQL中的join,按照必定條件分別取出兩個流中匹配的元素,返回給下游處理。
示例代碼以下:socket
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env.socketTextStream("127.0.0.1", 9000).map(s => s.split(" ")).map(arr => (arr(0), arr(1))) // 1 val stream2 = env.socketTextStream("127.0.0.1", 9001).map(s => s.split(" ")).map(arr => (arr(0), arr(1))) // 2 stream1.join(stream2) .where(_._1).equalTo(_._1) // 3 .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))) // 4 .trigger(CountTrigger.of(1)) // 5 .apply((t1, t2, out: Collector[String]) => { out.collect(t1._2 + "<=>" + t2._2) // 6 }) .print() env.execute("Join Demo")
代碼中有些部分須要解釋,以下:ide
- 建立一個socket stream。本機9000端口。輸入的字符串以空格爲界分割成Array[String]。而後再取出其中前兩個元素組成(String, String)類型的tuple。
- 同上。端口變爲9001。
- join條件爲兩個流中的數據(
(String, String)
類型)第一個元素相同。- 爲測試方便,這裏使用session window。只有兩個元素到來時間先後相差不大於30秒之時纔會被匹配。(Session window的特色爲,沒有固定的開始和結束時間,只要兩個元素之間的時間間隔不大於設定值,就會分配到同一個window中,不然後來的元素會進入新的window)。
- 將window默認的trigger修改成count trigger。這裏的含義爲每到來一個元素,都會馬上觸發計算。
- 處理匹配到的兩個數據,例如到來的數據爲(1, "a")和(1, "b"),輸出到下游則爲"a<=>b"
下面咱們測試下程序。測試
打開兩個terminal,分別輸入 nc -lk 127.0.0.1 9000
和 nc -lk 127.0.0.1 9001
。ui
在terminal1中輸入,1 a
,而後在terminal2中輸入2 b
。觀察程序console,發現沒有輸出。這兩條數據不知足匹配條件,所以沒有輸出。url
在30秒以內輸入1 c
,發現程序控制臺輸出告終果a<=>c
。再輸入1 d
,控制檯輸出a<=>c
和a<=>d
兩個結果。.net
等待30秒以後,在terminal2中輸入1 e
,發現控制檯無輸出。因爲session window的效果,該數據和以前stream1中的數據不在同一個window中。所以沒有匹配結果,控制檯不會有輸出。
綜上咱們得出結論:
- join只返回匹配到的數據對。若在window中沒有可以與之匹配的數據,則不會有輸出。
- join會輸出window中全部的匹配數據對。
- 不在window內的數據不會被匹配到。
CoGroup操做
因爲測試代碼基本相同,直接貼出代碼:
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream1 = env.socketTextStream("127.0.0.1", 9000).map(s => s.split(" ")).map(arr => (arr(0), arr(1))) // 1 val stream2 = env.socketTextStream("127.0.0.1", 9001).map(s => s.split(" ")).map(arr => (arr(0), arr(1))) // 2 stream1.coGroup(stream2) .where(_._1).equalTo(_._1) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))) .trigger(CountTrigger.of(1)) .apply((t1, t2, out: Collector[String]) => { val stringBuilder = new StringBuilder("Data in stream1: \n") for (i1 <- t1) { stringBuilder.append(i1._1 + "<=>" + i1._2 + "\n") } stringBuilder.append("Data in stream2: \n") for (i2 <- t2) { stringBuilder.append(i2._1 + "<=>" + i2._2 + "\n") } out.collect(stringBuilder.toString) }) .print() env.execute()
通過一樣的測試咱們得出結論:
CoGroup的做用和join基本相同,但有一點不同的是,若是未能找到新到來的數據與另外一個流在window中存在的匹配數據,仍會將其輸出。
CoFlatMap操做
相比之下CoFlatMap操做就比以上兩個簡單多了。CoFlatMap操做主要在CoFlatMapFunction中進行。
如下是CoFlatMapFunction的代碼:
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable { /** * This method is called for each element in the first of the connected streams. * * @param value The stream element * @param out The collector to emit resulting elements to * @throws Exception The function may throw exceptions which cause the streaming program * to fail and go into recovery. */ void flatMap1(IN1 value, Collector<OUT> out) throws Exception; /** * This method is called for each element in the second of the connected streams. * * @param value The stream element * @param out The collector to emit resulting elements to * @throws Exception The function may throw exceptions which cause the streaming program * to fail and go into recovery. */ void flatMap2(IN2 value, Collector<OUT> out) throws Exception; }
簡單理解就是當stream1數據到來時,會調用flatMap1方法,stream2收到數據之時,會調用flatMap2方法。
stream1.connect(stream2).flatMap(new CoFlatMapFunction[(String, String), (String, String), String] { override def flatMap1(value: (String, String), out: Collector[String]): Unit = { println("stream1 value: " + value) } override def flatMap2(value: (String, String), out: Collector[String]): Unit = { println("stream2 value: " + value) } }).print()
因爲結果不難驗證,這裏就不在贅述驗證過程了。
總結
Join、CoGroup和CoFlatMap這三個運算符都可以將雙數據流轉換爲單個數據流。Join和CoGroup會根據指定的條件進行數據配對操做,不一樣的是Join只輸出匹配成功的數據對,CoGroup不管是否有匹配都會輸出。CoFlatMap沒有匹配操做,只是分別去接收兩個流的輸入。你們能夠根據具體的業務需求,選擇不一樣的雙流操做。