以前文章 《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch 寫了如何將 Kafka 中的數據存儲到 ElasticSearch 中,裏面其實就已經用到了 Flink 自帶的 Kafka source connector(FlinkKafkaConsumer)。存入到 ES 只是其中一種狀況,那麼若是咱們有多個地方須要這份經過 Flink 轉換後的數據,是否是又要咱們繼續寫個 sink 的插件呢?確實,因此 Flink 裏面就默認支持了很多 sink,好比也支持 Kafka sink connector(FlinkKafkaProducer),那麼這篇文章咱們就講講如何將數據寫入到 Kafka。java
Flink 裏面支持 Kafka 0.八、0.九、0.十、0.11 ,之後有時間能夠分析下源碼的實現。git
這裏咱們須要安裝下 Kafka,請對應添加對應的 Flink Kafka connector 依賴的版本,這裏咱們使用的是 0.11 版本:github
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>${flink.version}</version> </dependency>
這裏就不寫這塊內容了,能夠參考我之前的文章 Kafka 安裝及快速入門。apache
這裏咱們演示把其餘 Kafka 集羣中 topic 數據原樣寫入到本身本地起的 Kafka 中去。微信
kafka.brokers=xxx:9092,xxx:9092,xxx:9092 kafka.group.id=metrics-group-test kafka.zookeeper.connect=xxx:2181 metrics.topic=xxx stream.parallelism=5 kafka.sink.brokers=localhost:9092 kafka.sink.topic=metric-test stream.checkpoint.interval=1000 stream.checkpoint.enable=false stream.sink.parallelism=5
目前咱們先看下本地 Kafka 是否有這個 metric-test topic 呢?須要執行下這個命令:學習
bin/kafka-topics.sh --list --zookeeper localhost:2181
能夠看到本地的 Kafka 是沒有任何 topic 的,若是等下咱們的程序運行起來後,再次執行這個命令出現 metric-test topic,那麼證實個人程序確實起做用了,已經將其餘集羣的 Kafka 數據寫入到本地 Kafka 了。ui
Main.java插件
public class Main { public static void main(String[] args) throws Exception{ final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env); data.addSink(new FlinkKafkaProducer011<Metrics>( parameterTool.get("kafka.sink.brokers"), parameterTool.get("kafka.sink.topic"), new MetricSchema() )).name("flink-connectors-kafka") .setParallelism(parameterTool.getInt("stream.sink.parallelism")); env.execute("flink learning connectors kafka"); } }
啓動程序,查看運行結果,不段執行上面命令,查看是否有新的 topic 出來:code
執行命令能夠查看該 topic 的信息:orm
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic metric-test
上面代碼咱們使用 Flink Kafka Producer 只傳了三個參數:brokerList、topicId、serializationSchema(序列化)
其實也能夠傳入多個參數進去,如今有的參數用的是默認參數,由於這個內容比較多,後面能夠抽出一篇文章單獨來說。
本篇文章寫了 Flink 讀取其餘 Kafka 集羣的數據,而後寫入到本地的 Kafka 上。我在 Flink 這層沒作什麼數據轉換,只是原樣的將數據轉發了下,若是大家有什麼其餘的需求,是能夠在 Flink 這層將數據進行各類轉換操做,好比這篇文章中的一些轉換:《從0到1學習Flink》—— Flink Data transformation(轉換),而後將轉換後的數據發到 Kafka 上去。
本文原創地址是: http://www.54tianzhisheng.cn/2019/01/06/Flink-Kafka-sink/ , 未經容許禁止轉載。
微信公衆號:zhisheng
另外我本身整理了些 Flink 的學習資料,目前已經所有放到微信公衆號了。你能夠加個人微信:zhisheng_tian,而後回覆關鍵字:Flink 便可無條件獲取到。
https://github.com/zhisheng17/flink-learning/
之後這個項目的全部代碼都將放在這個倉庫裏,包含了本身學習 flink 的一些 demo 和博客
一、《從0到1學習Flink》—— Apache Flink 介紹
二、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門
三、《從0到1學習Flink》—— Flink 配置文件詳解
四、《從0到1學習Flink》—— Data Source 介紹
五、《從0到1學習Flink》—— 如何自定義 Data Source ?
六、《從0到1學習Flink》—— Data Sink 介紹
七、《從0到1學習Flink》—— 如何自定義 Data Sink ?
八、《從0到1學習Flink》—— Flink Data transformation(轉換)
九、《從0到1學習Flink》—— 介紹Flink中的Stream Windows
十、《從0到1學習Flink》—— Flink 中的幾種 Time 詳解
十一、《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch