以前有小夥伴在羣裏說:滑動窗口使用觸發器讓每條數據都觸發一次計算dom
可是他並無獲得預期的結果:每條數據都觸發一次計算,輸出一條結果,而是天天數據都輸出了不少條結果ide
爲何會這樣呢?spa
寫了個小案例,來解釋這種狀況code
爲了方便使用自定義的 source 開發數據:orm
class StringSourceFunction extends SourceFunction[String] { var flag = true override def cancel(): Unit = { flag = false } override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (flag) { val str = StringUtil.getRandomString(1).toUpperCase ctx.collect(str + "," + StringUtil.getRandomString(1).toUpperCase) Thread.sleep(1000) } } }
就是個簡單的 souce,每秒對外發出隨機的 string 字符串(基本一分鐘 60 條)blog
對應的計算程序以下:element
env.addSource(new StringSourceFunction) .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(10))) // 每條數據觸發一次計算 //.trigger(CountTrigger.of(1)) .process(new ProcessAllWindowFunction[String, String, TimeWindow] { override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { // 窗口 val window = context.window.toString // 簡單計算下窗口裏面的元素個數 var count: Long = 0 elements.iterator.foreach(s => count += 1) out.collect("time : " + sdf.format(System.currentTimeMillis()) + ", window : " + window + ", element counter : " + count) } }) .print("")
定義了一個 一分鐘的窗口,滑動間隔是10秒,一條數據就應該屬於6個窗口開發
好比: 5 這條數據屬於:(-50,10)(-40,20)(-30,30)(-20,40)(-10,50)(0,60) 這6 個窗口字符串
註釋 trigger 看結果:get
10秒滑動間隔,就是10秒有一個滑動一次,一個窗口結束,觸發一次計算,輸出一個結果(前面6個窗口,由於剛啓動數據不夠60條)
開啓了tirgger 結果就徹底不同了
能夠看到,第一條數據進去的時候,觸發了6次計算,由於它屬於6個窗口,tirgger 會觸發6次
歡迎關注Flink菜鳥公衆號,會不按期更新Flink(開發技術)相關的推文