《從0到1學習Flink》—— Flink 讀取 Kafka 數據寫入到 RabbitMQ

前言

以前有文章 《從0到1學習Flink》—— Flink 寫入數據到 Kafka 寫過 Flink 將處理後的數據後發到 Kafka 消息隊列中去,固然咱們經常使用的消息隊列可不止這一種,還有 RocketMQ、RabbitMQ 等,恰好 Flink 也支持將數據寫入到 RabbitMQ,因此今天咱們就來寫篇文章講講如何將 Flink 處理後的數據寫入到 RabbitMQ。java

前提準備

安裝 RabbitMQ

這裏我直接用 docker 命令安裝吧,先把 docker 在 mac 上啓動起來。docker

在命令行中執行下面的命令:apache

1
docker run -d  -p 15672:15672  -p  5672:5672  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management

對這個命令不懂的童鞋能夠看看我之前的文章:http://www.54tianzhisheng.cn/2018/01/26/SpringBoot-RabbitMQ/api

登陸用戶名和密碼分別是:admin / admin ,登陸進去是這個樣子就表明安裝成功了:工具

依賴

pom.xml 中添加 Flink connector rabbitmq 的依賴以下:學習

1
2
3
4
5
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

生產者

這裏咱們依舊本身寫一個工具類一直的往 RabbitMQ 中的某個 queue 中發數據,而後由 Flink 去消費這些數據。ui

注意按照個人步驟來一步步操做,不然可能會出現一些錯誤!spa

RabbitMQProducerUtil.java命令行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQProducerUtil {
    public final static String QUEUE_NAME = "zhisheng";

    public static void main(String[] args) throws Exception {
        //建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();

        //設置RabbitMQ相關信息
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

        //建立一個新的鏈接
        Connection connection = factory.newConnection();

        //建立一個通道
        Channel channel = connection.createChannel();

        // 聲明一個隊列
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //發送消息到隊列中
        String message = "Hello zhisheng";

        //咱們這裏演示發送一千條數據
        for (int i = 0; i < 1000; i++) {
            channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8"));
            System.out.println("Producer Send +'" + message + i);
        }

        //關閉通道和鏈接
        channel.close();
        connection.close();
    }
}

Flink 主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.zhisheng.common.utils.ExecutionEnvUtil;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

/**
 * 從 rabbitmq 讀取數據
 */
public class Main {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;

        //這些配置建議能夠放在配置文件中,而後經過 parameterTool 來獲取對應的參數值
        final RMQConnectionConfig connectionConfig = new RMQConnectionConfig
                .Builder().setHost("localhost").setVirtualHost("/")
                .setPort(5672).setUserName("admin").setPassword("admin")
                .build();

        DataStreamSource<String> zhisheng = env.addSource(new RMQSource<>(connectionConfig,
                "zhisheng",
                true,
                new SimpleStringSchema()))
                .setParallelism(1);
        zhisheng.print();

        //若是想保證 exactly-once 或 at-least-once 須要把 checkpoint 開啓
//        env.enableCheckpointing(10000);
        env.execute("flink learning connectors rabbitmq");
    }
}

運行 RabbitMQProducerUtil 類,再運行 Main 類!scala

注意⚠️:

一、RMQConnectionConfig 中設置的用戶名和密碼要設置成 admin/admin,若是你換成是 guest/guest,實際上是在 RabbitMQ 裏面是沒有這個用戶名和密碼的,因此就會報這個錯誤:

1
nested exception is com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

不出意外的話應該你運行 RabbitMQProducerUtil 類後,立馬兩個運行的結果都會出來,速度仍是很快的。

二、若是你在 RabbitMQProducerUtil 工具類中把註釋的那行代碼打開的話:

1
2
// 聲明一個隊列
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

就會出現這種錯誤:

1
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'zhisheng' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)

這是由於你打開那個註釋的話,一旦你運行了該類就會建立一個叫作

的 Queue,當你再運行 Main 類中的時候,它又會建立這樣一個叫 ```zhisheng``` 的 Queue,而後由於已經有同名的 Queue 了,因此就有了衝突,解決方法就是把那行代碼註釋就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
三、該 connector(鏈接器)中提供了 RMQSource 類去消費 RabbitMQ queue 中的消息和確認 checkpoints 上的消息,它提供了三種不同的保證:

+ Exactly-once(只消費一次): 前提條件有,1 是要開啓 checkpoint,由於只有在 checkpoint 完成後,纔會返回確認消息給 RabbitMQ(這時,消息纔會在 RabbitMQ 隊列中刪除);2 是要使用 Correlation ID,在將消息發往 RabbitMQ 時,必須在消息屬性中設置 Correlation ID。數據源根據 Correlation ID 把從 checkpoint 恢復的數據進行去重;3 是數據源不能並行,這種限制主要是因爲 RabbitMQ 將消息從單個隊列分派給多個消費者。
+ At-least-once(至少消費一次): 開啓了 checkpoint,但未使用相 Correlation ID 或 數據源是並行的時候,那麼就只能保證數據至少消費一次了
+ No guarantees(沒法保證): Flink 接收到數據就返回確認消息給 RabbitMQ

### Sink 數據到 RabbitMQ

RabbitMQ 除了能夠做爲數據源,也能夠看成下游,Flink 消費數據作了一些處理以後也能把數據發往 RabbitMQ,下面演示下 Flink 消費 Kafka 數據後寫入到 RabbitMQ。

```java
public class Main1 {
    public static void main(String[] args) throws Exception {
        final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
        StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
        DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env);

        final RMQConnectionConfig connectionConfig = new RMQConnectionConfig
                .Builder().setHost("localhost").setVirtualHost("/")
                .setPort(5672).setUserName("admin").setPassword("admin")
                .build();

        //注意,換一個新的 queue,不然也會報錯
        data.addSink(new RMQSink<>(connectionConfig, "zhisheng001", new MetricSchema()));
        env.execute("flink learning connectors rabbitmq");
    }
}

 

是否是很簡單?可是須要注意的是,要換一個以前不存在的 queue,不然是會報錯的。

不出意外的話,你能夠看到 RabbitMQ 的監控頁面會出現新的一個 queue 出來,以下圖:

總結

本文先把 RabbitMQ 做爲數據源,寫了個 Flink 消費 RabbitMQ 隊列裏面的數據進行打印出來,而後又寫了個 Flink 消費 Kafka 數據後寫入到 RabbitMQ 的例子!

相關文章
相關標籤/搜索