若是你瞭解 Apache Flink 的話,那麼你應該熟悉該如何像 Flink 發送數據或者如何從 Flink 獲取數據。可是在某些狀況下,咱們須要將配置數據發送到 Flink 集羣並從中接收一些額外的數據。java
在本文的第一部分中,我將描述如何將配置數據發送到 Flink 集羣。咱們須要配置不少東西:方法參數、配置文件、機器學習模型。Flink 提供了幾種不一樣的方法,咱們將介紹如何使用它們以及什麼時候使用它們。在本文的第二部分中,我將描述如何從 Flink 集羣中獲取數據。git
在咱們深刻研究如何在 Apache Flink 中的不一樣組件之間發送數據以前,讓咱們先談談 Flink 集羣中的組件,下圖展現了 Flink 中的主要組件以及它們是如何相互做用的:github
當咱們運行 Flink 應用程序時,它會與 Flink JobManager 進行交互,這個 Flink JobManager 存儲了那些正在運行的 Job 的詳細信息,例如執行圖。 JobManager 它控制着 TaskManager,每一個 TaskManager 中包含了一部分數據來執行咱們定義的數據處理方法。算法
在許多的狀況下,咱們但願可以去配置 Flink Job 中某些運行的函數參數。根據用例,咱們可能須要設置單個變量或者提交具備靜態配置的文件,咱們下面將討論在 Flink 中該如何實現?sql
除了向 TaskManager 發送配置數據外,有時咱們可能還但願從 Flink Job 的函數方法中返回數據。緩存
假設咱們有一個從 CSV 文件中讀取電影列表的應用程序(它要過濾特定類型的全部電影):微信
//讀取電影列表數據集合 DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv") .ignoreFirstLine() .parseQuotedStrings('"') .ignoreInvalidLines() .types(Long.class, String.class, String.class); lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> { // 以「|」符號分隔電影類型 String[] genres = movie.f2.split("\\|"); // 查找全部 「動做」 類型的電影 return Stream.of(genres).anyMatch(g -> g.equals("Action")); }).print();
咱們極可能想要提取不一樣類型的電影,爲此咱們須要可以配置咱們的過濾功能。 當你要實現這樣的函數時,最直接的配置方法是實現構造函數:session
// 傳遞類型名稱 lines.filter(new FilterGenre("Action")) .print(); ... class FilterGenre implements FilterFunction<Tuple3<Long, String, String>> { //類型 String genre; //初始化構造方法 public FilterGenre(String genre) { this.genre = genre; } @Override public boolean filter(Tuple3<Long, String, String> movie) throws Exception { String[] genres = movie.f2.split("\\|"); return Stream.of(genres).anyMatch(g -> g.equals(genre)); } }
或者,若是你使用 lambda 函數,你能夠簡單地使用它的閉包中的一個變量:閉包
final String genre = "Action"; lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> { String[] genres = movie.f2.split("\\|"); //使用變量 return Stream.of(genres).anyMatch(g -> g.equals(genre)); }).print();
Flink 將序列化此變量並將其與函數一塊兒發送到集羣。架構
若是你須要將大量變量傳遞給函數,那麼這些方法就會變得很是煩人了。 爲了解決這個問題,Flink 提供了 withParameters 方法。 要使用它,你須要實現那些 Rich 函數,好比你沒必要實現 MapFunction 接口,而是實現 RichMapFunction。
Rich 函數容許你使用 withParameters 方法傳遞許多參數:
// Configuration 類來存儲參數 Configuration configuration = new Configuration(); configuration.setString("genre", "Action"); lines.filter(new FilterGenreWithParameters()) // 將參數傳遞給函數 .withParameters(configuration) .print();
要讀取這些參數,咱們須要實現 "open" 方法並讀取其中的參數:
class FilterGenreWithParameters extends RichFilterFunction<Tuple3<Long, String, String>> { String genre; @Override public void open(Configuration parameters) throws Exception { //讀取配置 genre = parameters.getString("genre", ""); } @Override public boolean filter(Tuple3<Long, String, String> movie) throws Exception { String[] genres = movie.f2.split("\\|"); return Stream.of(genres).anyMatch(g -> g.equals(genre)); } }
全部這些選項均可以使用,但若是須要爲多個函數設置相同的參數,則可能會很繁瑣。在 Flink 中要處理此種狀況, 你能夠設置全部 TaskManager 均可以訪問的全局環境變量。
爲此,首先須要使用 ParameterTool.fromArgs
從命令行讀取參數:
public static void main(String... args) { //讀取命令行參數 ParameterTool parameterTool = ParameterTool.fromArgs(args); ... }
而後使用 setGlobalJobParameters
設置全局做業參數:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameterTool); ... //該函數將可以讀取這些全局參數 lines.filter(new FilterGenreWithGlobalEnv()) //這個函數是本身定義的 .print();
如今咱們來看看這個讀取這些參數的函數,和上面說的同樣,它是一個 Rich 函數:
class FilterGenreWithGlobalEnv extends RichFilterFunction<Tuple3<Long, String, String>> { @Override public boolean filter(Tuple3<Long, String, String> movie) throws Exception { String[] genres = movie.f2.split("\\|"); //獲取全局的配置 ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); //讀取配置 String genre = parameterTool.get("genre"); return Stream.of(genres).anyMatch(g -> g.equals(genre)); } }
要讀取配置,咱們須要調用 getGlobalJobParameter 來獲取全部全局參數,而後使用 get 方法獲取咱們要的參數。
若是你想將數據從客戶端發送到 TaskManager,上面文章中討論的方法都適合你,但若是數據以數據集的形式存在於 TaskManager 中,該怎麼辦? 在這種狀況下,最好使用 Flink 中的另外一個功能 —— 廣播變量。 它只容許將數據集發送給那些執行你 Job 裏面函數的任務管理器。
假設咱們有一個數據集,其中包含咱們在進行文本處理時應忽略的單詞,而且咱們但願將其設置爲咱們的函數。 要爲單個函數設置廣播變量,咱們須要使用 withBroadcastSet 方法和數據集。
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3); // 獲取要忽略的單詞集合 DataSet<String> wordsToIgnore = ... data.map(new RichFlatMapFunction<String, String>() { // 存儲要忽略的單詞集合. 這將存儲在 TaskManager 的內存中 Collection<String> wordsToIgnore; @Override public void open(Configuration parameters) throws Exception { //讀取要忽略的單詞的集合 wordsToIgnore = getRuntimeContext().getBroadcastVariable("wordsToIgnore"); } @Override public String map(String line, Collector<String> out) throws Exception { String[] words = line.split("\\W+"); for (String word : words) //使用要忽略的單詞集合 if (wordsToIgnore.contains(word)) out.collect(new Tuple2<>(word, 1)); } //經過廣播變量傳遞數據集 }).withBroadcastSet(wordsToIgnore, "wordsToIgnore");
你應該記住,若是要使用廣播變量,那麼數據集將會存儲在 TaskManager 的內存中,若是數據集和越大,那麼佔用的內存就會越大,所以使用廣播變量適用於較小的數據集。
若是要向每一個 TaskManager 發送更多數據而且不但願將這些數據存儲在內存中,可使用 Flink 的分佈式緩存向 TaskManager 發送靜態文件。 要使用 Flink 的分佈式緩存,你首先須要將文件存儲在一個分佈式文件系統(如 HDFS)中,而後在緩存中註冊該文件:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //從 HDFS 註冊文件 env.registerCachedFile("hdfs:///path/to/file", "machineLearningModel") ... env.execute()
爲了訪問分佈式緩存,咱們須要實現一個 Rich 函數:
class MyClassifier extends RichMapFunction<String, Integer> { @Override public void open(Configuration config) { File machineLearningModel = getRuntimeContext().getDistributedCache().getFile("machineLearningModel"); ... } @Override public Integer map(String value) throws Exception { ... } }
請注意,要訪問分佈式緩存中的文件,咱們須要使用咱們用於註冊文件的 key,好比上面代碼中的 machineLearningModel
。
咱們前面已經介紹瞭如何將數據發送給 TaskManager,但如今咱們將討論如何從 TaskManager 中返回數據。 你可能想知道爲何咱們須要作這種事情。 畢竟,Apache Flink 就是創建數據處理流水線,讀取輸入數據,處理數據並返回結果。
爲了表達清楚,讓咱們來看一個例子。假設咱們須要計算每一個單詞在文本中出現的次數,同時咱們要計算文本中有多少行:
//要處理的數據集合 DataSet<String> lines = ... // Word count 算法 lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = line.split("\\W+"); for (String word : words) { out.collect(new Tuple2<>(word, 1)); } } }) .groupBy(0) .sum(1) .print(); // 計算要處理的文本中的行數 int linesCount = lines.count() System.out.println(linesCount);
問題是若是咱們運行這個應用程序,它將運行兩個 Flink 做業!首先獲得單詞統計數,而後計算行數。
這絕對是低效的,但咱們怎樣才能避免這種狀況呢?一種方法是使用累加器。它們容許你從 TaskManager 發送數據,並使用預約義的功能聚合此數據。 Flink 有如下內置累加器:
IntCounter,LongCounter,DoubleCounter:容許將 TaskManager 發送的 int,long,double 值彙總在一塊兒
AverageAccumulator:計算雙精度值的平均值
LongMaximum,LongMinimum,IntMaximum,IntMinimum,DoubleMaximum,DoubleMinimum:累加器,用於肯定不一樣類型的最大值和最小值
直方圖 - 用於計算 TaskManager 的值分佈
要使用累加器,咱們須要建立並註冊一個用戶定義的函數,而後在客戶端上讀取結果。下面咱們來看看該如何使用呢:
lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() { //建立一個累加器 private IntCounter linesNum = new IntCounter(); @Override public void open(Configuration parameters) throws Exception { //註冊一個累加器 getRuntimeContext().addAccumulator("linesNum", linesNum); } @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = line.split("\\W+"); for (String word : words) { out.collect(new Tuple2<>(word, 1)); } // 處理每一行數據後 linesNum 遞增 linesNum.add(1); } }) .groupBy(0) .sum(1) .print(); //獲取累加器結果 int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum"); System.out.println(linesNum);
這樣計算就能夠統計輸入文本中每一個單詞出現的次數以及它有多少行。
若是須要自定義累加器,還可使用 Accumulator 或 SimpleAccumulator 接口實現本身的累加器。
本篇文章由 zhisheng 翻譯,禁止任何無受權的轉載。
翻譯後地址:http://www.54tianzhisheng.cn/2019/03/28/flink-additional-data/
原文地址:https://brewing.codes/2017/10/24/flink-additional-data/
微信公衆號:zhisheng
另外我本身整理了些 Flink 的學習資料,目前已經所有放到微信公衆號了。你能夠加個人微信:zhisheng_tian,而後回覆關鍵字:Flink 便可無條件獲取到。
更多私密資料請加入知識星球!
一、Flink 從0到1學習 —— Apache Flink 介紹
二、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門
三、Flink 從0到1學習 —— Flink 配置文件詳解
四、Flink 從0到1學習 —— Data Source 介紹
五、Flink 從0到1學習 —— 如何自定義 Data Source ?
六、Flink 從0到1學習 —— Data Sink 介紹
七、Flink 從0到1學習 —— 如何自定義 Data Sink ?
八、Flink 從0到1學習 —— Flink Data transformation(轉換)
九、Flink 從0到1學習 —— 介紹 Flink 中的 Stream Windows
十、Flink 從0到1學習 —— Flink 中的幾種 Time 詳解
十一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 ElasticSearch
十二、Flink 從0到1學習 —— Flink 項目如何運行?
1三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Kafka
1四、Flink 從0到1學習 —— Flink JobManager 高可用性配置
1五、Flink 從0到1學習 —— Flink parallelism 和 Slot 介紹
1六、Flink 從0到1學習 —— Flink 讀取 Kafka 數據批量寫入到 MySQL
1七、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RabbitMQ
1八、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HBase
1九、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HDFS
20、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Redis
2一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Cassandra
2二、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Flume
2三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 InfluxDB
2四、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RocketMQ
2五、Flink 從0到1學習 —— 你上傳的 jar 包藏到哪裏去了
2六、Flink 從0到1學習 —— 你的 Flink job 日誌跑到哪裏去了
2八、Flink 從0到1學習 —— Flink 中如何管理配置?
2九、Flink 從0到1學習—— Flink 不能夠連續 Split(分流)?
30、Flink 從0到1學習—— 分享四本 Flink 國外的書和二十多篇 Paper 論文
3二、爲何說流處理即將來?
3三、OPPO 數據中臺之基石:基於 Flink SQL 構建實時數據倉庫
3六、Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理
3八、如何基於Flink+TensorFlow打造實時智能異常檢測平臺?只看這一篇就夠了
40、Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)
四、Flink 源碼解析 —— standalone session 模式啓動流程
五、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Job Manager 啓動
六、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Task Manager 啓動
七、Flink 源碼解析 —— 分析 Batch WordCount 程序的執行過程
八、Flink 源碼解析 —— 分析 Streaming WordCount 程序的執行過程
九、Flink 源碼解析 —— 如何獲取 JobGraph?
十、Flink 源碼解析 —— 如何獲取 StreamGraph?
十一、Flink 源碼解析 —— Flink JobManager 有什麼做用?
十二、Flink 源碼解析 —— Flink TaskManager 有什麼做用?
1三、Flink 源碼解析 —— JobManager 處理 SubmitJob 的過程
1四、Flink 源碼解析 —— TaskManager 處理 SubmitJob 的過程
1五、Flink 源碼解析 —— 深度解析 Flink Checkpoint 機制
1六、Flink 源碼解析 —— 深度解析 Flink 序列化機制
1七、Flink 源碼解析 —— 深度解析 Flink 是如何管理好內存的?
1八、Flink Metrics 源碼解析 —— Flink-metrics-core
1九、Flink Metrics 源碼解析 —— Flink-metrics-datadog
20、Flink Metrics 源碼解析 —— Flink-metrics-dropwizard
2一、Flink Metrics 源碼解析 —— Flink-metrics-graphite
2二、Flink Metrics 源碼解析 —— Flink-metrics-influxdb
2三、Flink Metrics 源碼解析 —— Flink-metrics-jmx
2四、Flink Metrics 源碼解析 —— Flink-metrics-slf4j
2五、Flink Metrics 源碼解析 —— Flink-metrics-statsd
2六、Flink Metrics 源碼解析 —— Flink-metrics-prometheus
2七、Flink 源碼解析 —— 如何獲取 ExecutionGraph ?
30、Flink Clients 源碼解析原文出處:zhisheng的博客,歡迎關注個人公衆號:zhisheng