今天上午被 Flink 的一個算子困惑了下,具體問題是什麼呢?html
我有這麼個需求:有不一樣種類型的告警數據流(包含恢復數據),而後我要將這些數據流作一個拆分,拆分後的話,每種告警裏面的數據又想將告警數據和恢復數據拆分出來。java
結果,這個需求用 Flink 的 Split 運算符出現了問題。git
需求以下圖所示:github
我是指望如上這樣將數據流進行拆分的,最後將每種告警和恢復用不一樣的消息模版作一個渲染,渲染後再經過各類其餘的方式(釘釘羣
郵件、短信)進行告警通知。docker
因而個人代碼大概的結構以下代碼所示:apache
//dataStream 是總的數據流 //split 是拆分後的數據流 SplitStream<AlertEvent> split = dataStream.split(new OutputSelector<AlertEvent>() { @Override public Iterable<String> select(AlertEvent value) { List<String> tags = new ArrayList<>(); switch (value.getType()) { case MIDDLEWARE: tags.add(MIDDLEWARE); break; case HEALTH_CHECK: tags.add(HEALTH_CHECK); break; case DOCKER: tags.add(DOCKER); break; //... //固然這裏還能夠不少種類型 } return tags; } }); //而後你想獲取每種不一樣的數據類型,你可使用 select DataStream<AlertEvent> middleware = split.select(MIDDLEWARE); //選出中間件的數據流 //而後你又要將中間件的數據流分流成告警和恢復 SplitStream<AlertEvent> middlewareSplit = middleware.split(new OutputSelector<AlertEvent>() { @Override public Iterable<String> select(AlertEvent value) { List<String> tags = new ArrayList<>(); if(value.isRecover()) { tags.add(RECOVER) } else { tags.add(ALERT) } return tags; } }); middlewareSplit.select(ALERT).print(); DataStream<AlertEvent> healthCheck = split.select(HEALTH_CHECK); //選出健康檢查的數據流 //而後你又要將健康檢查的數據流分流成告警和恢復 SplitStream<AlertEvent> healthCheckSplit = healthCheck.split(new OutputSelector<AlertEvent>() { @Override public Iterable<String> select(AlertEvent value) { List<String> tags = new ArrayList<>(); if(value.isRecover()) { tags.add(RECOVER) } else { tags.add(ALERT) } return tags; } }); healthCheckSplit.select(ALERT).print(); DataStream<AlertEvent> docekr = split.select(DOCKER); //選出容器的數據流 //而後你又要將容器的數據流分流成告警和恢復 SplitStream<AlertEvent> dockerSplit = docekr.split(new OutputSelector<AlertEvent>() { @Override public Iterable<String> select(AlertEvent value) { List<String> tags = new ArrayList<>(); if(value.isRecover()) { tags.add(RECOVER) } else { tags.add(ALERT) } return tags; } }); dockerSplit.select(ALERT).print();
結構我抽象後大概就長上面這樣,而後我先本地測試的時候只把容器的數據那塊代碼打開了,其餘種告警的分流代碼註釋掉了,一運行,發現居然容器告警的數據怎麼還摻雜着健康檢查的數據也一塊兒打印出來了,一開始我覺得本身出了啥問題,就再起碼運行了三遍 IDEA 才發現結果一直都是這樣的。編程
因而,我只好在第二步分流前將 docekr 數據流打印出來,發現是沒什麼問題,打印出來的數據都是容器相關的,沒有摻雜着其餘種的數據啊。這會兒遍陷入了沉思,懵逼發呆了一會。微信
因而仍是開始面向 Google 編程:session
發現第一條就找到答案了,簡直不要太快,點進去能夠看到他也有這樣的需求:ide
而後這個小夥伴還掙扎了下用不一樣的方法(雖然結果更慘):
最後換了個姿式就行了(果真小夥子會的姿式挺多的):
但從這篇文章中,我找到了關聯到的兩個 Flink Issue,分別是:
一、https://issues.apache.org/jira/browse/FLINK-5031
二、https://issues.apache.org/jira/browse/FLINK-11084
而後呢,從第二個 Issue 的討論中我發現了一些頗有趣的討論:
對話頗有趣,可是我忽然想到以前個人知識星球裏面一位很細心的小夥伴問的一個問題了:
能夠發現代碼上確實是標明瞭過時了,可是註釋裏面沒寫清楚推薦用啥,幸虧我看到了這個 Issue,否則腦子裏面估計這個問題一直會存着呢。
那麼這個問題解決方法是否是意味着就能夠利用 Side Outputs 來解決呢?固然能夠啦,官方都推薦了,還不能都話,那麼不是打臉啪啪啪的響嗎?不過這裏仍是賣個關子將 Side Outputs 後面專門用一篇文章來說,感興趣的能夠先看看官網介紹:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
另外其實也能夠經過 split + filter 組合來解決這個問題,反正關鍵就是不要連續的用 split 來分流。
用 split + filter 的方案代碼大概以下:
DataStream<AlertEvent> docekr = split.select(DOCKER); //選出容器的數據流 //容器告警的數據流 docekr.filter(new FilterFunction<AlertEvent>() { @Override public boolean filter(AlertEvent value) throws Exception { return !value.isRecover(); } }) .print(); //容器恢復的數據流 docekr.filter(new FilterFunction<AlertEvent>() { @Override public boolean filter(AlertEvent value) throws Exception { return value.isRecover(); } }) .print();
上面這種就是屢次 filter 也能夠知足需求,可是就是代碼有點囉嗦。
Flink 中不支持連續的 Split/Select 分流操做,要實現連續分流也能夠經過其餘的方式(split + filter 或者 side output)來實現
本篇文章鏈接是:http://www.54tianzhisheng.cn/2019/06/12/flink-split/
微信公衆號:zhisheng
另外我本身整理了些 Flink 的學習資料,目前已經所有放到微信公衆號了。你能夠加個人微信:zhisheng_tian,而後回覆關鍵字:Flink 便可無條件獲取到。
更多私密資料請加入知識星球!
https://github.com/zhisheng17/flink-learning/
之後這個項目的全部代碼都將放在這個倉庫裏,包含了本身學習 flink 的一些 demo 和博客。
一、Flink 從0到1學習—— Apache Flink 介紹
二、Flink 從0到1學習—— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門
四、Flink 從0到1學習—— Data Source 介紹
五、Flink 從0到1學習—— 如何自定義 Data Source ?
七、Flink 從0到1學習—— 如何自定義 Data Sink ?
八、Flink 從0到1學習—— Flink Data transformation(轉換)
九、Flink 從0到1學習—— 介紹Flink中的Stream Windows
十、Flink 從0到1學習—— Flink 中的幾種 Time 詳解
十一、Flink 從0到1學習—— Flink 寫入數據到 ElasticSearch
十二、Flink 從0到1學習—— Flink 項目如何運行?
1三、Flink 從0到1學習—— Flink 寫入數據到 Kafka
1四、Flink 從0到1學習—— Flink JobManager 高可用性配置
1五、Flink 從0到1學習—— Flink parallelism 和 Slot 介紹
1六、Flink 從0到1學習—— Flink 讀取 Kafka 數據批量寫入到 MySQL
1七、Flink 從0到1學習—— Flink 讀取 Kafka 數據寫入到 RabbitMQ
1八、Flink 從0到1學習》—— 你上傳的 jar 包藏到哪裏去了?
1九、Flink 從0到1學習 —— Flink 中如何管理配置?
四、Flink 源碼解析 —— standalonesession 模式啓動流程
五、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Job Manager 啓動
六、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Task Manager 啓動
七、Flink 源碼解析 —— 分析 Batch WordCount 程序的執行過程
八、Flink 源碼解析 —— 分析 Streaming WordCount 程序的執行過程
九、Flink 源碼解析 —— 如何獲取 JobGraph?
十、Flink 源碼解析 —— 如何獲取 StreamGraph?
十一、Flink 源碼解析 —— Flink JobManager 有什麼做用?
十二、Flink 源碼解析 —— Flink TaskManager 有什麼做用?
1三、Flink 源碼解析 —— JobManager 處理 SubmitJob 的過程
1四、Flink 源碼解析 —— TaskManager 處理 SubmitJob 的過程
1五、Flink 源碼解析 —— 深度解析 Flink Checkpoint 機制
1六、Flink 源碼解析 —— 深度解析 Flink 序列化機制
1七、Flink 源碼解析 —— 深度解析 Flink 是如何管理好內存的?原文出處:zhisheng的博客,歡迎關注個人公衆號:zhisheng