模板
模板
模板
處理全部元素java
輸入數據git
你好 發送數據
程序github
package com.opensourceteams.module.bigdata.flink.example.stream.operator.map import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = env.socketTextStream("localhost", port, '\n') val dataStreamMap = dataStream.map(x => x + " 增長的數據") dataStreamMap.print() if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
輸出數據apache
1> 你好 增長的數據 2> 發送數據 增長的數據
處理全部元素,而且把每行中的子集合,彙總成一個大集合windows
輸入數據api
a b c e f g
程序數據結構
package com.opensourceteams.module.bigdata.flink.example.stream.operator.flatmap import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = env.socketTextStream("localhost", port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")) dataStream2.print() if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
輸出數據多線程
a b c e f g
過濾數據app
輸入數據socket
a b c a c b b d d
程序
package com.opensourceteams.module.bigdata.flink.example.stream.operator.filter import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment} /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = env.socketTextStream("localhost", port, '\n') val dataStreamMap = dataStream.filter( x => (x.contains("a"))) dataStreamMap.print() if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
輸出數據
3> a b c 4> a c
指定某列爲key,通常按key分組時用
輸入數據
c a b a
程序
package com.opensourceteams.module.bigdata.flink.example.stream.operator.sum import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = env.socketTextStream("localhost", port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) //dataStream.keyBy("someKey") // Key by field "someKey" //dataStream.keyBy(0) // Key by the first element of a Tuple .timeWindow(Time.seconds(2))//每2秒滾動窗口 .sum(1) dataStream2.print() if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
輸出數據,數據輸出順序多線程是不固定的,但也是同樣的規則取
默認並行度
6> (a,2) 4> (c,1) 2> (b,1)
並行度爲1,就先去重,取第一個元素,再按從最後一個開始,即 c a b a 變爲 c a b 而後變成 c b a
(c,1) (b,1) (a,2)
keyBy指定某列爲key,通常按key分組時用,sum按key分組後求合
輸入數據
c a b a
程序
package com.opensourceteams.module.bigdata.flink.example.stream.operator.sum import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = env.socketTextStream("localhost", port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) //dataStream.keyBy("someKey") // Key by field "someKey" //dataStream.keyBy(0) // Key by the first element of a Tuple .timeWindow(Time.seconds(2))//每2秒滾動窗口 .sum(1) dataStream2.print() if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
輸出數據,數據輸出順序多線程是不固定的,但也是同樣的規則取
默認並行度
6> (a,2) 4> (c,1) 2> (b,1)
並行度爲1,就先去重,取第一個元素,再按從最後一個開始,即 c a b a 變爲 c a b 而後變成 c b a
(c,1) (b,1) (a,2)
keyBy指定某列爲key,通常按key分組時用,對相同的key,元素之間進行的函數運算
輸入數據
a b b c
程序
package com.opensourceteams.module.bigdata.flink.example.stream.operator.reduce import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream("localhost", port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,2)) .keyBy(0) //dataStream.keyBy("someKey") // Key by field "someKey" //dataStream.keyBy(0) // Key by the first element of a Tuple .timeWindow(Time.seconds(2))//每2秒滾動窗口 .reduce((a,b) => (a._1,a._2 * b._2) ) dataStream2.print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
輸出數據,數據輸出順序多線程是不固定的,但也是同樣的規則取
默認並行度
6> (a,2) 4> (c,1) 2> (b,1)
(a,2) (c,2) (b,4)
a a b c c
package com.opensourceteams.module.bigdata.flink.example.stream.operator.fold import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream("localhost", port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) //dataStream.keyBy("someKey") // Key by field "someKey" //dataStream.keyBy(0) // Key by the first element of a Tuple .timeWindow(Time.seconds(2))//每2秒滾動窗口 .fold("開始字符串")((str, i) => { str + "-" + i} ) dataStream2.print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
開始字符串-(a,1)-(a,1) 開始字符串-(c,1)-(c,1) 開始字符串-(b,1)
a a c b c
package com.opensourceteams.module.bigdata.flink.example.stream.operator.aggregations.sum import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //env.setParallelism(1) //設置並行度,不設置就是默認最高並行度爲的cpu ,個人四核8線程,就是最高並行度爲8 val dataStream = env.socketTextStream("localhost", port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) //dataStream.keyBy("someKey") // Key by field "someKey" //dataStream.keyBy(0) // Key by the first element of a Tuple .timeWindow(Time.seconds(2))//每2秒滾動窗口 .sum(1) dataStream2.print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
4> (c,2) 2> (b,1) 6> (a,2)
b a b a a b
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.aggregations.sum import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time //import org.apache.flink.streaming.api.scala._ /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //env.setParallelism(1) //設置並行度,不設置就是默認最高並行度爲的cpu ,個人四核8線程,就是最高並行度爲8 val dataStream = env.socketTextStream("localhost", port, '\n') var i = 0 val dataStream2 = dataStream.flatMap(x => x.split(" ")).map( x => { i = i + 1 (x,i) }) .keyBy(0) .timeWindow(Time.seconds(2))//每2秒滾動窗口 .min(1) dataStream2.print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
輸出數據
最終輸出數據,順序取決於線程的調用
2> (b,1) 6> (a,2)
6> (a,2) 2> (b,1) 6> (a,4) 2> (b,3) 6> (a,5) 2> (b,6)
b a b a a b
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.aggregations.sum import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time //import org.apache.flink.streaming.api.scala._ /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //env.setParallelism(1) //設置並行度,不設置就是默認最高並行度爲的cpu ,個人四核8線程,就是最高並行度爲8 val dataStream = env.socketTextStream("localhost", port, '\n') var i = 0 val dataStream2 = dataStream.flatMap(x => x.split(" ")).map( x => { i = i + 1 (x,i) }) .keyBy(0) .timeWindow(Time.seconds(2))//每2秒滾動窗口 .max(1) dataStream2.print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
輸出數據
最終輸出數據,順序取決於線程的調用
2> (b,6) 6> (a,5)
6> (a,2) 2> (b,1) 6> (a,4) 2> (b,3) 6> (a,5) 2> (b,6)
b a b a a b
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.window.window import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.assigners.{ TumblingProcessingTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream("localhost", port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) /** * 定義window,並指定分配元素到window的方式 * 能夠在已經分區的KeyedStream上定義Windows。 Windows根據某些特徵(例如,在最後5秒內到達的數據)對每一個密鑰中的數據進行分組。 有關窗口的完整說明,請參見windows。 */ .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) .sum(1) dataStream2.print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
(b,3) (a,3)
b a b a a b
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.window.windowAll import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream("localhost", port, '\n') dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) .windowAll( TumblingProcessingTimeWindows.of(Time.seconds(2))) .process(new ProcessAllWindowFunction[(String, Int),(String, Int),TimeWindow] { override def process(context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = { //能夠對當前window中的全部元素進行操做,處理後,再發送給Sink for(element <- elements) out.collect(element) } }) .print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
8> (a,1) 7> (b,1) 3> (a,1) 2> (a,1) 1> (b,1) 4> (b,1)
b a b a a b
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.window.window.apply import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} import org.apache.flink.util.Collector /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream("localhost", port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) /** * 定義window,並指定分配元素到window的方式 * 能夠在已經分區的KeyedStream上定義Windows。 Windows根據某些特徵(例如,在最後5秒內到達的數據)對每一個密鑰中的數據進行分組。 有關窗口的完整說明,請參見windows。 */ .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) /** * * @tparam IN The type of the input value. * * @tparam OUT The type of the output value. * * @tparam KEY The type of the key. */ .apply(new WindowFunction[(String,Int),(String,Int),Tuple,TimeWindow] { override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit ={ //對window的全部元素進行處理 for(element <- input) out.collect(element) } }) dataStream2.print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
2> (a,1) 3> (a,1) 7> (b,1) 4> (b,1) 1> (b,1) 8> (a,1)
b a b a a b
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.window.window.reduce import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream("localhost", port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) /** * 定義window,並指定分配元素到window的方式 * 能夠在已經分區的KeyedStream上定義Windows。 Windows根據某些特徵(例如,在最後5秒內到達的數據)對每一個密鑰中的數據進行分組。 有關窗口的完整說明,請參見windows。 */ .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) /** * * @tparam IN The type of the input value. * * @tparam OUT The type of the output value. * * @tparam KEY The type of the key. */ .reduce((a,b) => (a._1,a._2 +b._2)) dataStream2.print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
2> (b,3) 6> (a,3)
b a b a a b
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.window.window.fold import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream("localhost", port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) /** * 定義window,並指定分配元素到window的方式 * 能夠在已經分區的KeyedStream上定義Windows。 Windows根據某些特徵(例如,在最後5秒內到達的數據)對每一個密鑰中的數據進行分組。 有關窗口的完整說明,請參見windows。 */ .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) /** * 按key進行處理,第一個參數,是字符串,放在每次處理的最前面第二個是表達式,第二個表達式有兩個參數,第一個參數,就是第一個參數的值,第二個參數,我每次循環key時,迭代的下一個元素 */ .fold("字符串開始")((str, i) => { str + "-" + i} ) dataStream2.print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
6> 字符串開始-(a,1)-(a,1)-(a,1) 2> 字符串開始-(b,1)-(b,1)-(b,1)
a a b
c c a
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.union import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.scala._ /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream1 = getDataStream(env,1234,"localhost") val dataStream2 = getDataStream(env,12345,"localhost") /** * 只是將兩個流的數據,union在一塊兒,以後,不能再進行操做了 */ val dataStream3 = dataStream1.union(dataStream2) dataStream3.print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } def getDataStream(env: StreamExecutionEnvironment,port:Int,host:String):DataStream[(String,Int)]={ //env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream(host, port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) .timeWindow(Time.seconds(5))//每2秒滾動窗口 .sum(1) dataStream2 } }
6> (a,2) 4> (c,2) 2> (b,1) 6> (a,1)
a a b
c c c a b
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.join import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream1 = getDataStream(env,1234,"localhost") val dataStream2 = getDataStream(env,12345,"localhost") /** * 只是將兩個流的數據,union在一塊兒,以後,不能再進行操做了 */ val dataStream3 = dataStream1.join(dataStream2) dataStream3.where(x => x._1).equalTo(x => x._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) .apply((a,b) => (a._1,a._2 + b._2) ) .print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } def getDataStream(env: StreamExecutionEnvironment,port:Int,host:String):DataStream[(String,Int)]={ //env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream(host, port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) .timeWindow(Time.seconds(5))//每2秒滾動窗口 .sum(1) dataStream2 } }
6> (a,3) 2> (b,2)
c c a
a a b
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.intervaljoin import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val dataStream1 = getDataStream(env,1234,"localhost") val dataStream2 = getDataStream(env,12345,"localhost") val dataStream3 = dataStream1.keyBy(0).intervalJoin(dataStream2.keyBy(0)) dataStream3.between(Time.seconds(-5), Time.seconds(5)) //.upperBoundExclusive(true) // optional //.lowerBoundExclusive(true) // optional .process(new ProcessJoinFunction[(String,Int),(String,Int),String] { override def processElement(left: (String, Int), right: (String, Int), ctx: ProcessJoinFunction[(String, Int), (String, Int), String]#Context, out: Collector[String]): Unit = { println(left + "," + right) out.collect( left + "," + right) } }) println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } def getDataStream(env: StreamExecutionEnvironment,port:Int,host:String):DataStream[(String,Int)]={ //env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream(host, port, '\n') val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) dataStream2 } }
(a,1),(a,1) (a,1),(a,1)
c c a
a a b
模板
==============開始 first [(a,1)] second [(a,1), (a,1)] ==============結束 ==============開始 first [(c,1), (c,1)] second [] ==============結束 ==============開始 first [] second [(b,1)] ==============結束
c c a
a a b
模板
(a,1) (a,1) (b,1) (c,1) (c,1) (a,1)
c c a
a a b
模板
(a,1) (a,1) (b,1) (c,1) (c,1) (a,1)
c c a
a a b
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.coFlatMap import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.CoMapFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setParallelism(1) //因爲想看數據結構,因此先設爲1,這樣 val dataStream1 = getDataStream(env,1234,"localhost") val dataStream2 = getDataStream(env,12345,"localhost") val dataStream3 = dataStream1.connect(dataStream2) dataStream3 .flatMap(x => x.toString.split(" ") , x => x.toString.split(" ")) .print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } def getDataStream(env: StreamExecutionEnvironment,port:Int,host:String):DataStream[String]={ //env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream(host, port, '\n') // val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) dataStream } }
a a b c c a
c c a
package com.opensourceteams.module.bigdata.flink.example.datastream.operator.assignTimestamps import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time /** * nc -lk 1234 輸入數據 */ object Run { def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //設置並行度 val dataStream = env.socketTextStream("localhost", port, '\n') dataStream.assignAscendingTimestamps(x => System.currentTimeMillis()) val dataStream2 = dataStream.flatMap(x => x.split(" ")).map((_,1)) .keyBy(0) .timeWindow(Time.seconds(2))//每2秒滾動窗口 .sum(1) dataStream2.print() println("=======================打印StreamPlanAsJSON=======================\n") println("JSON轉圖在線工具: https://flink.apache.org/visualizer") println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================================================\n") if(args == null || args.size ==0){ env.execute("默認做業") }else{ env.execute(args(0)) } println("結束") } }
(c,2) (a,1)