以前有文章 《從0到1學習Flink》—— Flink 寫入數據到 Kafka 寫過 Flink 將處理後的數據後發到 Kafka 消息隊列中去,固然咱們經常使用的消息隊列可不止這一種,還有 RocketMQ、RabbitMQ 等,恰好 Flink 也支持將數據寫入到 RabbitMQ,因此今天咱們就來寫篇文章講講如何將 Flink 處理後的數據寫入到 RabbitMQ。java
這裏我直接用 docker 命令安裝吧,先把 docker 在 mac 上啓動起來。docker
在命令行中執行下面的命令:apache
1 |
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management |
對這個命令不懂的童鞋能夠看看我之前的文章:http://www.54tianzhisheng.cn/2018/01/26/SpringBoot-RabbitMQ/api
登陸用戶名和密碼分別是:admin / admin ,登陸進去是這個樣子就表明安裝成功了:工具
pom.xml 中添加 Flink connector rabbitmq 的依賴以下:學習
1 2 3 4 5 |
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> |
這裏咱們依舊本身寫一個工具類一直的往 RabbitMQ 中的某個 queue 中發數據,而後由 Flink 去消費這些數據。ui
注意按照個人步驟來一步步操做,不然可能會出現一些錯誤!spa
RabbitMQProducerUtil.java命令行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMQProducerUtil { public final static String QUEUE_NAME = "zhisheng"; public static void main(String[] args) throws Exception { //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置RabbitMQ相關信息 factory.setHost("localhost"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); //建立一個新的鏈接 Connection connection = factory.newConnection(); //建立一個通道 Channel channel = connection.createChannel(); // 聲明一個隊列 // channel.queueDeclare(QUEUE_NAME, false, false, false, null); //發送消息到隊列中 String message = "Hello zhisheng"; //咱們這裏演示發送一千條數據 for (int i = 0; i < 1000; i++) { channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8")); System.out.println("Producer Send +'" + message + i); } //關閉通道和鏈接 channel.close(); connection.close(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import com.zhisheng.common.utils.ExecutionEnvUtil; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; /** * 從 rabbitmq 讀取數據 */ public class Main { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL; //這些配置建議能夠放在配置文件中,而後經過 parameterTool 來獲取對應的參數值 final RMQConnectionConfig connectionConfig = new RMQConnectionConfig .Builder().setHost("localhost").setVirtualHost("/") .setPort(5672).setUserName("admin").setPassword("admin") .build(); DataStreamSource<String> zhisheng = env.addSource(new RMQSource<>(connectionConfig, "zhisheng", true, new SimpleStringSchema())) .setParallelism(1); zhisheng.print(); //若是想保證 exactly-once 或 at-least-once 須要把 checkpoint 開啓 // env.enableCheckpointing(10000); env.execute("flink learning connectors rabbitmq"); } } |
運行 RabbitMQProducerUtil 類,再運行 Main 類!scala
注意⚠️:
一、RMQConnectionConfig 中設置的用戶名和密碼要設置成 admin/admin,若是你換成是 guest/guest,實際上是在 RabbitMQ 裏面是沒有這個用戶名和密碼的,因此就會報這個錯誤:
1 |
nested exception is com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile. |
不出意外的話應該你運行 RabbitMQProducerUtil 類後,立馬兩個運行的結果都會出來,速度仍是很快的。
二、若是你在 RabbitMQProducerUtil 工具類中把註釋的那行代碼打開的話:
1 2 |
// 聲明一個隊列 // channel.queueDeclare(QUEUE_NAME, false, false, false, null); |
就會出現這種錯誤:
1 |
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'zhisheng' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10) |
這是由於你打開那個註釋的話,一旦你運行了該類就會建立一個叫作
的 Queue,當你再運行 Main 類中的時候,它又會建立這樣一個叫 ```zhisheng``` 的 Queue,而後由於已經有同名的 Queue 了,因此就有了衝突,解決方法就是把那行代碼註釋就行了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
三、該 connector(鏈接器)中提供了 RMQSource 類去消費 RabbitMQ queue 中的消息和確認 checkpoints 上的消息,它提供了三種不同的保證: + Exactly-once(只消費一次): 前提條件有,1 是要開啓 checkpoint,由於只有在 checkpoint 完成後,纔會返回確認消息給 RabbitMQ(這時,消息纔會在 RabbitMQ 隊列中刪除);2 是要使用 Correlation ID,在將消息發往 RabbitMQ 時,必須在消息屬性中設置 Correlation ID。數據源根據 Correlation ID 把從 checkpoint 恢復的數據進行去重;3 是數據源不能並行,這種限制主要是因爲 RabbitMQ 將消息從單個隊列分派給多個消費者。 + At-least-once(至少消費一次): 開啓了 checkpoint,但未使用相 Correlation ID 或 數據源是並行的時候,那麼就只能保證數據至少消費一次了 + No guarantees(沒法保證): Flink 接收到數據就返回確認消息給 RabbitMQ ### Sink 數據到 RabbitMQ RabbitMQ 除了能夠做爲數據源,也能夠看成下游,Flink 消費數據作了一些處理以後也能把數據發往 RabbitMQ,下面演示下 Flink 消費 Kafka 數據後寫入到 RabbitMQ。 ```java public class Main1 { public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env); final RMQConnectionConfig connectionConfig = new RMQConnectionConfig .Builder().setHost("localhost").setVirtualHost("/") .setPort(5672).setUserName("admin").setPassword("admin") .build(); //注意,換一個新的 queue,不然也會報錯 data.addSink(new RMQSink<>(connectionConfig, "zhisheng001", new MetricSchema())); env.execute("flink learning connectors rabbitmq"); } } |
是否是很簡單?可是須要注意的是,要換一個以前不存在的 queue,不然是會報錯的。
不出意外的話,你能夠看到 RabbitMQ 的監控頁面會出現新的一個 queue 出來,以下圖:
本文先把 RabbitMQ 做爲數據源,寫了個 Flink 消費 RabbitMQ 隊列裏面的數據進行打印出來,而後又寫了個 Flink 消費 Kafka 數據後寫入到 RabbitMQ 的例子!