package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime import java.util.{Date, Properties} import com.alibaba.fastjson.JSON import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector object SockWordCountRun { def main(args: Array[String]): Unit = { // get the execution environment // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val configuration : Configuration = ConfigurationUtil.getConfiguration(true) val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) import org.apache.flink.streaming.api.scala._ val dataStream = env.socketTextStream("localhost", 1234, '\n') // .setParallelism(3) dataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] { val maxOutOfOrderness = 2 * 1000L // 3.5 seconds var currentMaxTimestamp: Long = _ var currentTimestamp: Long = _ override def getCurrentWatermark: Watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness) override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = { val jsonObject = JSON.parseObject(element) val timestamp = jsonObject.getLongValue("extract_data_time") currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp) currentTimestamp = timestamp /* println("===========watermark begin===========") println() println(new Date(currentMaxTimestamp - 20 * 1000)) println(jsonObject) println("===========watermark end===========") println()*/ timestamp } }) .timeWindowAll(Time.seconds(3)) .process(new ProcessAllWindowFunction[String,String,TimeWindow]() { override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { println() println("開始提交window") println(new Date()) for(e <- elements) out.collect(e) println("結束提交window") println(new Date()) println() } }) .print() //.setParallelism(3) println("==================================如下爲執行計劃==================================") println("執行地址(firefox效果更好):https://flink.apache.org/visualizer") //執行計劃 println(env.getStreamGraph.getStreamingPlanAsJSON) println("==================================以上爲執行計劃 JSON串==================================\n") env.execute("Socket 水印做業") println("結束") } // Data type for words with count case class WordWithCount(word: String, count: Long){ //override def toString: String = Thread.currentThread().getName + word + " : " + count } def getConfiguration(isDebug:Boolean = false):Configuration = { val configuration : Configuration = new Configuration() if(isDebug){ val timeout = "100000 s" val timeoutHeartbeatPause = "1000000 s" configuration.setString("akka.ask.timeout",timeout) configuration.setString("akka.lookup.timeout",timeout) configuration.setString("akka.tcp.timeout",timeout) configuration.setString("akka.transport.heartbeat.interval",timeout) configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) configuration.setString("akka.watch.heartbeat.pause",timeout) configuration.setInteger("heartbeat.interval",10000000) configuration.setInteger("heartbeat.timeout",50000000) } configuration } }
{"no":0,"extract_data_time":"2019-03-06 11:40:38","data_time":1551843638000,"message":"0 2019-03-06 11:40:38","send_data_string":"2019-03-06 21:03:43"} {"no":1,"extract_data_time":"2019-03-06 11:40:39","data_time":1551843639000,"message":"1 2019-03-06 11:40:39","send_data_string":"2019-03-06 21:03:43"} {"no":2,"extract_data_time":"2019-03-06 11:40:40","data_time":1551843640000,"message":"2 2019-03-06 11:40:40","send_data_string":"2019-03-06 21:03:43"} {"no":3,"extract_data_time":"2019-03-06 11:40:41","data_time":1551843641000,"message":"3 2019-03-06 11:40:41","send_data_string":"2019-03-06 21:03:43"} {"no":4,"extract_data_time":"2019-03-06 11:40:42","data_time":1551843642000,"message":"4 2019-03-06 11:40:42","send_data_string":"2019-03-06 21:03:43"} {"no":5,"extract_data_time":"2019-03-06 11:40:43","data_time":1551843643000,"message":"5 2019-03-06 11:40:43","send_data_string":"2019-03-06 21:03:43"} {"no":6,"extract_data_time":"2019-03-06 11:40:44","data_time":1551843644000,"message":"6 2019-03-06 11:40:44","send_data_string":"2019-03-06 21:03:43"} {"no":7,"extract_data_time":"2019-03-06 11:40:45","data_time":1551843645000,"message":"7 2019-03-06 11:40:45","send_data_string":"2019-03-06 21:03:43"} {"no":8,"extract_data_time":"2019-03-06 11:40:46","data_time":1551843646000,"message":"8 2019-03-06 11:40:46","send_data_string":"2019-03-06 21:03:43"} {"no":9,"extract_data_time":"2019-03-06 11:40:47","data_time":1551843647000,"message":"9 2019-03-06 11:40:47","send_data_string":"2019-03-06 21:03:43"} {"no":10,"extract_data_time":"2019-03-06 11:40:48","data_time":1551843648000,"message":"10 2019-03-06 11:40:48","send_data_string":"2019-03-06 21:03:43"} {"no":11,"extract_data_time":"2019-03-06 11:40:49","data_time":1551843649000,"message":"11 2019-03-06 11:40:49","send_data_string":"2019-03-06 21:03:43"} {"no":12,"extract_data_time":"2019-03-06 11:40:50","data_time":1551843650000,"message":"12 2019-03-06 11:40:50","send_data_string":"2019-03-06 21:03:43"} {"no":13,"extract_data_time":"2019-03-06 11:40:51","data_time":1551843651000,"message":"13 2019-03-06 11:40:51","send_data_string":"2019-03-06 21:03:43"} {"no":14,"extract_data_time":"2019-03-06 11:40:52","data_time":1551843652000,"message":"14 2019-03-06 11:40:52","send_data_string":"2019-03-06 21:03:43"} {"no":15,"extract_data_time":"2019-03-06 11:40:53","data_time":1551843653000,"message":"15 2019-03-06 11:40:53","send_data_string":"2019-03-06 21:03:43"} {"no":16,"extract_data_time":"2019-03-06 11:40:54","data_time":1551843654000,"message":"16 2019-03-06 11:40:54","send_data_string":"2019-03-06 21:03:43"} {"no":17,"extract_data_time":"2019-03-06 11:40:55","data_time":1551843655000,"message":"17 2019-03-06 11:40:55","send_data_string":"2019-03-06 21:03:43"} {"no":18,"extract_data_time":"2019-03-06 11:40:56","data_time":1551843656000,"message":"18 2019-03-06 11:40:56","send_data_string":"2019-03-06 21:03:43"} {"no":19,"extract_data_time":"2019-03-06 11:40:57","data_time":1551843657000,"message":"19 2019-03-06 11:40:57","send_data_string":"2019-03-06 21:03:43"} {"no":20,"extract_data_time":"2019-03-06 11:40:58","data_time":1551843658000,"message":"20 2019-03-06 11:40:58","send_data_string":"2019-03-06 21:03:43"} {"no":21,"extract_data_time":"2019-03-06 11:40:59","data_time":1551843659000,"message":"21 2019-03-06 11:40:59","send_data_string":"2019-03-06 21:03:43"} {"no":22,"extract_data_time":"2019-03-06 11:41:00","data_time":1551843660000,"message":"22 2019-03-06 11:41:00","send_data_string":"2019-03-06 21:03:43"} {"no":23,"extract_data_time":"2019-03-06 11:41:01","data_time":1551843661000,"message":"23 2019-03-06 11:41:01","send_data_string":"2019-03-06 21:03:43"} {"no":24,"extract_data_time":"2019-03-06 11:41:02","data_time":1551843662000,"message":"24 2019-03-06 11:41:02","send_data_string":"2019-03-06 21:03:43"} {"no":25,"extract_data_time":"2019-03-06 11:41:03","data_time":1551843663000,"message":"25 2019-03-06 11:41:03","send_data_string":"2019-03-06 21:03:43"} {"no":26,"extract_data_time":"2019-03-06 11:41:04","data_time":1551843664000,"message":"26 2019-03-06 11:41:04","send_data_string":"2019-03-06 21:03:43"} {"no":27,"extract_data_time":"2019-03-06 11:41:05","data_time":1551843665000,"message":"27 2019-03-06 11:41:05","send_data_string":"2019-03-06 21:03:43"} {"no":28,"extract_data_time":"2019-03-06 11:41:06","data_time":1551843666000,"message":"28 2019-03-06 11:41:06","send_data_string":"2019-03-06 21:03:43"} {"no":29,"extract_data_time":"2019-03-06 11:41:07","data_time":1551843667000,"message":"29 2019-03-06 11:41:07","send_data_string":"2019-03-06 21:03:43"} {"no":30,"extract_data_time":"2019-03-06 11:41:08","data_time":1551843668000,"message":"30 2019-03-06 11:41:08","send_data_string":"2019-03-06 21:03:43"} {"no":31,"extract_data_time":"2019-03-06 11:41:09","data_time":1551843669000,"message":"31 2019-03-06 11:41:09","send_data_string":"2019-03-06 21:03:43"} {"no":32,"extract_data_time":"2019-03-06 11:41:10","data_time":1551843670000,"message":"32 2019-03-06 11:41:10","send_data_string":"2019-03-06 21:03:43"} {"no":33,"extract_data_time":"2019-03-06 11:41:11","data_time":1551843671000,"message":"33 2019-03-06 11:41:11","send_data_string":"2019-03-06 21:03:43"} {"no":34,"extract_data_time":"2019-03-06 11:41:12","data_time":1551843672000,"message":"34 2019-03-06 11:41:12","send_data_string":"2019-03-06 21:03:43"} {"no":35,"extract_data_time":"2019-03-06 11:41:13","data_time":1551843673000,"message":"35 2019-03-06 11:41:13","send_data_string":"2019-03-06 21:03:43"} {"no":36,"extract_data_time":"2019-03-06 11:41:14","data_time":1551843674000,"message":"36 2019-03-06 11:41:14","send_data_string":"2019-03-06 21:03:43"} {"no":37,"extract_data_time":"2019-03-06 11:41:15","data_time":1551843675000,"message":"37 2019-03-06 11:41:15","send_data_string":"2019-03-06 21:03:43"} {"no":38,"extract_data_time":"2019-03-06 11:41:16","data_time":1551843676000,"message":"38 2019-03-06 11:41:16","send_data_string":"2019-03-06 21:03:43"} {"no":39,"extract_data_time":"2019-03-06 11:41:17","data_time":1551843677000,"message":"39 2019-03-06 11:41:17","send_data_string":"2019-03-06 21:03:43"} {"no":40,"extract_data_time":"2019-03-06 11:41:18","data_time":1551843678000,"message":"40 2019-03-06 11:41:18","send_data_string":"2019-03-06 21:03:43"} {"no":41,"extract_data_time":"2019-03-06 11:41:19","data_time":1551843679000,"message":"41 2019-03-06 11:41:19","send_data_string":"2019-03-06 21:03:43"} {"no":42,"extract_data_time":"2019-03-06 11:41:20","data_time":1551843680000,"message":"42 2019-03-06 11:41:20","send_data_string":"2019-03-06 21:03:43"} {"no":43,"extract_data_time":"2019-03-06 11:41:21","data_time":1551843681000,"message":"43 2019-03-06 11:41:21","send_data_string":"2019-03-06 21:03:43"} {"no":44,"extract_data_time":"2019-03-06 11:41:22","data_time":1551843682000,"message":"44 2019-03-06 11:41:22","send_data_string":"2019-03-06 21:03:43"} {"no":45,"extract_data_time":"2019-03-06 11:41:23","data_time":1551843683000,"message":"45 2019-03-06 11:41:23","send_data_string":"2019-03-06 21:03:43"} {"no":46,"extract_data_time":"2019-03-06 11:41:24","data_time":1551843684000,"message":"46 2019-03-06 11:41:24","send_data_string":"2019-03-06 21:03:43"} {"no":47,"extract_data_time":"2019-03-06 11:41:25","data_time":1551843685000,"message":"47 2019-03-06 11:41:25","send_data_string":"2019-03-06 21:03:43"} {"no":48,"extract_data_time":"2019-03-06 11:41:26","data_time":1551843686000,"message":"48 2019-03-06 11:41:26","send_data_string":"2019-03-06 21:03:43"} {"no":49,"extract_data_time":"2019-03-06 11:41:27","data_time":1551843687000,"message":"49 2019-03-06 11:41:27","send_data_string":"2019-03-06 21:03:43"} {"no":50,"extract_data_time":"2019-03-06 11:41:28","data_time":1551843688000,"message":"50 2019-03-06 11:41:28","send_data_string":"2019-03-06 21:03:43"} {"no":51,"extract_data_time":"2019-03-06 11:41:29","data_time":1551843689000,"message":"51 2019-03-06 11:41:29","send_data_string":"2019-03-06 21:03:43"} {"no":52,"extract_data_time":"2019-03-06 11:41:30","data_time":1551843690000,"message":"52 2019-03-06 11:41:30","send_data_string":"2019-03-06 21:03:43"} {"no":53,"extract_data_time":"2019-03-06 11:41:31","data_time":1551843691000,"message":"53 2019-03-06 11:41:31","send_data_string":"2019-03-06 21:03:43"} {"no":54,"extract_data_time":"2019-03-06 11:41:32","data_time":1551843692000,"message":"54 2019-03-06 11:41:32","send_data_string":"2019-03-06 21:03:43"} {"no":55,"extract_data_time":"2019-03-06 11:41:33","data_time":1551843693000,"message":"55 2019-03-06 11:41:33","send_data_string":"2019-03-06 21:03:43"} {"no":56,"extract_data_time":"2019-03-06 11:41:34","data_time":1551843694000,"message":"56 2019-03-06 11:41:34","send_data_string":"2019-03-06 21:03:43"} {"no":57,"extract_data_time":"2019-03-06 11:41:35","data_time":1551843695000,"message":"57 2019-03-06 11:41:35","send_data_string":"2019-03-06 21:03:43"} {"no":58,"extract_data_time":"2019-03-06 11:41:36","data_time":1551843696000,"message":"58 2019-03-06 11:41:36","send_data_string":"2019-03-06 21:03:43"} {"no":59,"extract_data_time":"2019-03-06 11:41:37","data_time":1551843697000,"message":"59 2019-03-06 11:41:37","send_data_string":"2019-03-06 21:03:43"} {"no":60,"extract_data_time":"2019-03-06 11:41:38","data_time":1551843698000,"message":"60 2019-03-06 11:41:38","send_data_string":"2019-03-06 21:03:43"} {"no":61,"extract_data_time":"2019-03-06 11:41:39","data_time":1551843699000,"message":"61 2019-03-06 11:41:39","send_data_string":"2019-03-06 21:03:43"} {"no":62,"extract_data_time":"2019-03-06 11:41:40","data_time":1551843700000,"message":"62 2019-03-06 11:41:40","send_data_string":"2019-03-06 21:03:43"} {"no":63,"extract_data_time":"2019-03-06 11:41:41","data_time":1551843701000,"message":"63 2019-03-06 11:41:41","send_data_string":"2019-03-06 21:03:43"} {"no":64,"extract_data_time":"2019-03-06 11:41:42","data_time":1551843702000,"message":"64 2019-03-06 11:41:42","send_data_string":"2019-03-06 21:03:43"} {"no":65,"extract_data_time":"2019-03-06 11:41:43","data_time":1551843703000,"message":"65 2019-03-06 11:41:43","send_data_string":"2019-03-06 21:03:43"} {"no":66,"extract_data_time":"2019-03-06 11:41:44","data_time":1551843704000,"message":"66 2019-03-06 11:41:44","send_data_string":"2019-03-06 21:03:43"} {"no":67,"extract_data_time":"2019-03-06 11:41:45","data_time":1551843705000,"message":"67 2019-03-06 11:41:45","send_data_string":"2019-03-06 21:03:43"} {"no":68,"extract_data_time":"2019-03-06 11:41:46","data_time":1551843706000,"message":"68 2019-03-06 11:41:46","send_data_string":"2019-03-06 21:03:43"} {"no":69,"extract_data_time":"2019-03-06 11:41:47","data_time":1551843707000,"message":"69 2019-03-06 11:41:47","send_data_string":"2019-03-06 21:03:43"} {"no":70,"extract_data_time":"2019-03-06 11:41:48","data_time":1551843708000,"message":"70 2019-03-06 11:41:48","send_data_string":"2019-03-06 21:03:43"} {"no":71,"extract_data_time":"2019-03-06 11:41:49","data_time":1551843709000,"message":"71 2019-03-06 11:41:49","send_data_string":"2019-03-06 21:03:43"} {"no":72,"extract_data_time":"2019-03-06 11:41:50","data_time":1551843710000,"message":"72 2019-03-06 11:41:50","send_data_string":"2019-03-06 21:03:43"} {"no":73,"extract_data_time":"2019-03-06 11:41:51","data_time":1551843711000,"message":"73 2019-03-06 11:41:51","send_data_string":"2019-03-06 21:03:43"} {"no":74,"extract_data_time":"2019-03-06 11:41:52","data_time":1551843712000,"message":"74 2019-03-06 11:41:52","send_data_string":"2019-03-06 21:03:43"} {"no":75,"extract_data_time":"2019-03-06 11:41:53","data_time":1551843713000,"message":"75 2019-03-06 11:41:53","send_data_string":"2019-03-06 21:03:43"} {"no":76,"extract_data_time":"2019-03-06 11:41:54","data_time":1551843714000,"message":"76 2019-03-06 11:41:54","send_data_string":"2019-03-06 21:03:43"} {"no":77,"extract_data_time":"2019-03-06 11:41:55","data_time":1551843715000,"message":"77 2019-03-06 11:41:55","send_data_string":"2019-03-06 21:03:43"} {"no":78,"extract_data_time":"2019-03-06 11:41:56","data_time":1551843716000,"message":"78 2019-03-06 11:41:56","send_data_string":"2019-03-06 21:03:43"} {"no":79,"extract_data_time":"2019-03-06 11:41:57","data_time":1551843717000,"message":"79 2019-03-06 11:41:57","send_data_string":"2019-03-06 21:03:43"} {"no":80,"extract_data_time":"2019-03-06 11:41:58","data_time":1551843718000,"message":"80 2019-03-06 11:41:58","send_data_string":"2019-03-06 21:03:43"} {"no":81,"extract_data_time":"2019-03-06 11:41:59","data_time":1551843719000,"message":"81 2019-03-06 11:41:59","send_data_string":"2019-03-06 21:03:43"} {"no":82,"extract_data_time":"2019-03-06 11:42:00","data_time":1551843720000,"message":"82 2019-03-06 11:42:00","send_data_string":"2019-03-06 21:03:43"} {"no":83,"extract_data_time":"2019-03-06 11:42:01","data_time":1551843721000,"message":"83 2019-03-06 11:42:01","send_data_string":"2019-03-06 21:03:43"} {"no":84,"extract_data_time":"2019-03-06 11:42:02","data_time":1551843722000,"message":"84 2019-03-06 11:42:02","send_data_string":"2019-03-06 21:03:43"} {"no":85,"extract_data_time":"2019-03-06 11:42:03","data_time":1551843723000,"message":"85 2019-03-06 11:42:03","send_data_string":"2019-03-06 21:03:43"} {"no":86,"extract_data_time":"2019-03-06 11:42:04","data_time":1551843724000,"message":"86 2019-03-06 11:42:04","send_data_string":"2019-03-06 21:03:43"} {"no":87,"extract_data_time":"2019-03-06 11:42:05","data_time":1551843725000,"message":"87 2019-03-06 11:42:05","send_data_string":"2019-03-06 21:03:43"} {"no":88,"extract_data_time":"2019-03-06 11:42:06","data_time":1551843726000,"message":"88 2019-03-06 11:42:06","send_data_string":"2019-03-06 21:03:43"} {"no":89,"extract_data_time":"2019-03-06 11:42:07","data_time":1551843727000,"message":"89 2019-03-06 11:42:07","send_data_string":"2019-03-06 21:03:43"} {"no":90,"extract_data_time":"2019-03-06 11:42:08","data_time":1551843728000,"message":"90 2019-03-06 11:42:08","send_data_string":"2019-03-06 21:03:43"} {"no":91,"extract_data_time":"2019-03-06 11:42:09","data_time":1551843729000,"message":"91 2019-03-06 11:42:09","send_data_string":"2019-03-06 21:03:43"} {"no":92,"extract_data_time":"2019-03-06 11:42:10","data_time":1551843730000,"message":"92 2019-03-06 11:42:10","send_data_string":"2019-03-06 21:03:43"} {"no":93,"extract_data_time":"2019-03-06 11:42:11","data_time":1551843731000,"message":"93 2019-03-06 11:42:11","send_data_string":"2019-03-06 21:03:43"} {"no":94,"extract_data_time":"2019-03-06 11:42:12","data_time":1551843732000,"message":"94 2019-03-06 11:42:12","send_data_string":"2019-03-06 21:03:43"} {"no":95,"extract_data_time":"2019-03-06 11:42:13","data_time":1551843733000,"message":"95 2019-03-06 11:42:13","send_data_string":"2019-03-06 21:03:43"} {"no":96,"extract_data_time":"2019-03-06 11:42:14","data_time":1551843734000,"message":"96 2019-03-06 11:42:14","send_data_string":"2019-03-06 21:03:43"} {"no":97,"extract_data_time":"2019-03-06 11:42:15","data_time":1551843735000,"message":"97 2019-03-06 11:42:15","send_data_string":"2019-03-06 21:03:43"} {"no":98,"extract_data_time":"2019-03-06 11:42:16","data_time":1551843736000,"message":"98 2019-03-06 11:42:16","send_data_string":"2019-03-06 21:03:43"} {"no":99,"extract_data_time":"2019-03-06 11:42:17","data_time":1551843737000,"message":"99 2019-03-06 11:42:17","send_data_string":"2019-03-06 21:03:43"} {"no":100,"extract_data_time":"2019-03-06 11:42:18","data_time":1551843738000,"message":"100 2019-03-06 11:42:18","send_data_string":"2019-03-06 21:03:43"}
0 = {Execution@5366} "Attempt #0 (Source: Socket Stream -> Timestamps/Watermarks (1/1)) @ org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@5987f5d2 - [SCHEDULED]" 1 = {Execution@5371} "Attempt #0 (TriggerWindow(TumblingEventTimeWindows(100000000000), ListStateDescriptor{name=window-contents, defaultValue=null, serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@1e4a7dd4}, EventTimeTrigger(), AllWindowedStream.process(AllWindowedStream.scala:593)) -> Sink: Print to Std. Out (1/1)) @ org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4cb70b36 - [SCHEDULED]"
0 = {TimestampsAndPeriodicWatermarksOperator@7820} 1 = {StreamSource@7785}
protected void run() throws Exception { headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); }
public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception { run(lockingObject, streamStatusMaintainer, output); }
public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer, final Output<StreamRecord<OUT>> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(); final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured() ? getExecutionConfig().getLatencyTrackingInterval() : configuration.getLong(MetricOptions.LATENCY_INTERVAL); LatencyMarksEmitter<OUT> latencyEmitter = null; if (latencyTrackingInterval > 0) { latencyEmitter = new LatencyMarksEmitter<>( getProcessingTimeService(), collector, latencyTrackingInterval, this.getOperatorID(), getRuntimeContext().getIndexOfThisSubtask()); } final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( timeCharacteristic, getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval, -1); try { userFunction.run(ctx); // if we get here, then the user function either exited after being done (finite source) // or the function was canceled or stopped. For the finite source case, we should emit // a final watermark that indicates that we reached the end of event-time if (!isCanceledOrStopped()) { ctx.emitWatermark(Watermark.MAX_WATERMARK); } } finally { // make sure that the context is closed in any case ctx.close(); if (latencyEmitter != null) { latencyEmitter.close(); } } }
public void run(SourceContext<String> ctx) throws Exception { final StringBuilder buffer = new StringBuilder(); long attempt = 0; while (isRunning) { try (Socket socket = new Socket()) { currentSocket = socket; LOG.info("Connecting to server socket " + hostname + ':' + port); socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { char[] cbuf = new char[8192]; int bytesRead; while (isRunning && (bytesRead = reader.read(cbuf)) != -1) { buffer.append(cbuf, 0, bytesRead); int delimPos; while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) { String record = buffer.substring(0, delimPos); // truncate trailing carriage return if (delimiter.equals("\n") && record.endsWith("\r")) { record = record.substring(0, record.length() - 1); } ctx.collect(record); buffer.delete(0, delimPos + delimiter.length()); } } } } // if we dropped out of this loop due to an EOF, sleep and retry if (isRunning) { attempt++; if (maxNumRetries == -1 || attempt < maxNumRetries) { LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs..."); Thread.sleep(delayBetweenRetries); } else { // this should probably be here, but some examples expect simple exists of the stream source // throw new EOFException("Reached end of stream and reconnects are not enabled."); break; } } } // collect trailing data if (buffer.length() > 0) { ctx.collect(buffer.toString()); } }
public void collect(T element) { synchronized (checkpointLock) { streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); if (nextCheck != null) { this.failOnNextCheck = false; } else { scheduleNextIdleDetectionTask(); } processAndCollect(element); } }
protected void processAndCollect(T element) { output.collect(reuse.replace(element)); }
public void collect(StreamRecord<OUT> record) { numRecordsOut.inc(); output.collect(record); }
public void collect(StreamRecord<T> record) { if (this.outputTag != null) { // we are only responsible for emitting to the main input return; } pushToOperator(record); }
protected <X> void pushToOperator(StreamRecord<X> record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator (and Serializer) expects. @SuppressWarnings("unchecked") StreamRecord<T> castRecord = (StreamRecord<T>) record; numRecordsIn.inc(); StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { if (outputTag != null) { // Enrich error message ClassCastException replace = new ClassCastException( String.format( "%s. Failed to push OutputTag with id '%s' to operator. " + "This can occur when multiple OutputTags with different types " + "but identical names are being used.", e.getMessage(), outputTag.getId())); throw new ExceptionInChainedOperatorException(replace); } else { throw new ExceptionInChainedOperatorException(e); } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } } }
public void processElement(StreamRecord<T> element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); }
public void collect(StreamRecord<OUT> record) { numRecordsOut.inc(); output.collect(record); }
0 = {StreamSink@6060} 1 = {WindowOperator@6041}
protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor<IN> inputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }
public boolean processInput() throws Exception { if (isFinished) { return false; } if (numRecordsIn == null) { try { numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); } catch (Exception e) { LOG.warn("An exception occurred during the metrics setup.", e); numRecordsIn = new SimpleCounter(); } } while (true) { if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { StreamElement recordOrMark = deserializationDelegate.getInstance(); if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); continue; } else if (recordOrMark.isStreamStatus()) { // handle stream status statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); continue; } else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } continue; } else { // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } return true; } } } final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked(); if (bufferOrEvent != null) { if (bufferOrEvent.isBuffer()) { currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); if (event.getClass() != EndOfPartitionEvent.class) { throw new IOException("Unexpected event: " + event); } } } else { isFinished = true; if (!barrierHandler.isEmpty()) { throw new IllegalStateException("Trailing data in checkpoint barrier handler."); } return false; } } }
分配Window,也就是計算Window的開啓時間和結束時間,該計算方式,算法,剛好把全部元素都分配到整size的時間段 如size = 5秒,5000毫秒,也就是時間戳在這些範圍內的,就會被分配的最近的一個區間java
[2019-03-06 20:00:00 -> 2019-03-06 20:00:05) [2019-03-06 20:00:05 -> 2019-03-06 20:00:10) [2019-03-06 20:00:15 -> 2019-03-06 20:00:20) [2019-03-06 20:00:20 -> 2019-03-06 20:00:25) [2019-03-06 20:00:25 -> 2019-03-06 20:00:30) ......
long start = timestamp - (timestamp - offset + windowSize) % windowSize;
long end = start + size
final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext);
若是當前元素的時間戳,計算後,結束時間大於該Window的結束時間,就說明劃到後面的window去了,也就是這一條數據延時了git
// drop if the window is already late if (isWindowLate(window)) { continue; }
windowState.add(element.getValue());
TriggerResult triggerResult = triggerContext.onElement(element);
public void processElement(StreamRecord<IN> element) throws Exception { final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { MergingWindowSet<W> mergingWindows = getMergingWindowSet(); for (W window: elementWindows) { // adding the new window might result in a merge, in that case the actualWindow // is the merged window and we work with that. If we don't merge then // actualWindow == window W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() { @Override public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception { if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { throw new UnsupportedOperationException("The end timestamp of an " + "event-time window cannot become earlier than the current watermark " + "by merging. Current watermark: " + internalTimerService.currentWatermark() + " window: " + mergeResult); } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) { throw new UnsupportedOperationException("The end timestamp of a " + "processing-time window cannot become earlier than the current processing time " + "by merging. Current processing time: " + internalTimerService.currentProcessingTime() + " window: " + mergeResult); } triggerContext.key = key; triggerContext.window = mergeResult; triggerContext.onMerge(mergedWindows); for (W m: mergedWindows) { triggerContext.window = m; triggerContext.clear(); deleteCleanupTimer(m); } // merge the merged state windows into the newly resulting state window windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); } }); // drop if the window is already late if (isWindowLate(actualWindow)) { mergingWindows.retireWindow(actualWindow); continue; } isSkippedElement = false; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = actualWindow; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(actualWindow); } // need to make sure to update the merging state in state mergingWindows.persist(); } else { for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(window); } } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){ sideOutput(element); } else { this.numLateRecordsDropped.inc(); } } }
public void onEventTime(InternalTimer<K, W> timer) throws Exception { triggerContext.key = timer.getKey(); triggerContext.window = timer.getNamespace(); MergingWindowSet<W> mergingWindows; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(triggerContext.window); if (stateWindow == null) { // Timer firing for non-existent window, this can only happen if a // trigger did not clean up timers. We have already cleared the merging // window and therefore the Trigger state, however, so nothing to do. return; } else { windowState.setCurrentNamespace(stateWindow); } } else { windowState.setCurrentNamespace(triggerContext.window); mergingWindows = null; } TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp()); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents != null) { emitWindowContents(triggerContext.window, contents); } } if (triggerResult.isPurge()) { windowState.clear(); } if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { clearAllState(triggerContext.window, windowState, mergingWindows); } if (mergingWindows != null) { // need to make sure to update the merging state in state mergingWindows.persist(); } }
private void emitWindowContents(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); processContext.window = window; userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector); }
public void process(Byte key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception { this.ctx.window = window; this.ctx.internalContext = context; ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction; wrappedFunction.process(ctx, input, out); }
public void collect(StreamRecord<OUT> record) { numRecordsOut.inc(); output.collect(record); }
protected <X> void pushToOperator(StreamRecord<X> record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator (and Serializer) expects. @SuppressWarnings("unchecked") StreamRecord<T> castRecord = (StreamRecord<T>) record; numRecordsIn.inc(); StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { if (outputTag != null) { // Enrich error message ClassCastException replace = new ClassCastException( String.format( "%s. Failed to push OutputTag with id '%s' to operator. " + "This can occur when multiple OutputTags with different types " + "but identical names are being used.", e.getMessage(), outputTag.getId())); throw new ExceptionInChainedOperatorException(replace); } else { throw new ExceptionInChainedOperatorException(e); } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } } }
public void processElement(StreamRecord<IN> element) throws Exception { sinkContext.element = element; userFunction.invoke(element.getValue(), sinkContext); }