RabbitMQ集羣跨網段消息遷移

需求背景java

將阿里雲同一個VPC下的RabbitMQ集羣消息一個網段集羣遷移到另外一個網段集羣。消息中間件的消息是即時消費,爲什麼還有歷史消息,由於歷史遺留問題。要遷移node

整個網絡拓撲圖以下安全

注意:cookie

對於跨VPC網絡網絡

1. 確保各主機網絡互通app

2. 配置主機名ide

兩邊安全組方向開發:156722567256724369端口函數

否在加入集羣會出現問題測試

image.png

資源清單阿里雲

主機名

IP地址

角色

備註

node171

172.20.0.171

老的MQ集羣_1


node172

172.20.0.172

老的MQ集羣_2


node173

192.168.0.173

MQ集羣_1


node174

192.168.0.174

新的MQ集羣_2


基礎軟件及環境信息

操做系統:CentOS Linux release 7.3.1611

ErlangErlang/OTP 20 [erts-9.3.3.3]

RabbitMQrabbitmq_server-3.7.8

集羣的部署

node171node172組成集羣A

node173node174組成集羣B

這裏的環境部署略

建立測試帳戶

在【node171進行操做

rabbitmqctl add_user root root123

rabbitmqctl add_vhost kcvhost

rabbitmqctl set_permissions -p kcvhost root  ".*" ".*" ".*"

rabbitmqctl add_user admin admin123

rabbitmqctl set_permissions -p kcvhost admin  ".*" ".*" ".*"

rabbitmqctl set_user_tags admin administrator

rabbitmq-plugins enable rabbitmq_management

rabbitmqctl stop_app

rabbitmqctl start_app

生成測試數據

消息生產者代碼

package com.zjkj.rabbitmq.demo;
 
import java.io.IOException;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
 
/**
 * 消息的生產者
 * @author zjkj
 *
 */
public class Rabbitmq_Producer {

private static final String EXCHANGE_NAME = "exchange_test_3";
private static final String ROUTING_KEY = "routingkey_demo";
private static final String QUEUE_NAME = "queue_test_3";
private static final String IP_ADDRESS = "172.20.0.171";
private static final int PORT = 5672; //RabbitMQ服務默認端口號爲5672


public static void main(String[] args) throws IOException,TimeoutException,InterruptedException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("root");
factory.setPassword("root123");
Connection connection = factory.newConnection(); //建立鏈接
Channel channel = connection.createChannel(); // 建立信道
 //建立一個type="direct"、持久化、非自動刪除的交換器
channel.exchangeDeclare(EXCHANGE_NAME, "direct",true, false, null);
// 建立一個持久化、非排他的、非自動刪除的隊列
channel.queueDeclare(QUEUE_NAME, true, false, false,null);
// 將交換器與隊列經過路由鍵綁定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 發送一條持久化的消息:hello world!
for(int i=1;i<=100000;i++){
String msg = "交換器_1與隊列1綁定:Message_"+i;
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}
// 關閉資源
channel.close();
connection.close();
 
}
 
}

消費者代碼

package com.zjkj.rabbitmq.demo;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
/**
 * 消息的消費者
 * @author zjkj
 *
 */
public class Rabbitmq_Consumer {
 
private static final String QUEUE_NAME = "queue_test_3";
private static final String IP_ADDRESS = "192.168.6.171";
private static final  int PORT = 5672;


public static void main(String[] args) throws IOException,TimeoutException,InterruptedException{
Address[] addresses = new Address[]{
new Address(IP_ADDRESS,PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root123");
// 這裏的鏈接方式與生產者的demo略有不一樣,注意區別
Connection connection = factory.newConnection(addresses); //建立鏈接
final Channel channel = connection.createChannel(); // 建立信道
channel.basicQos(64);// 設置客戶端最多接收未被ack的消息的個數


/**
 * 這裏採用了繼承DefaultConsumer的方式來實現消費,有過RabbitMQ使用經驗的開發者
 * 可能喜歡使用QueueingConsumer的方式來實現消費
 * 由於使用QueueingConsumer會有一些隱患。
 * 同時在RabbitMQ Java客戶端4.0.0版本開始將QueueingConsumer標記爲@Deprecated了
 */
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)
   throws IOException{
System.out.println("recv message : " + new String(body));
try{
TimeUnit.SECONDS.sleep(1);

}catch(InterruptedException e){
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME,true, consumer);
//等待回調函數執行完畢以後,關閉資源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
 
}
 
}

查看集羣中諸如用戶數、交換器數量、隊列數量等

[root@node171 rabbitmq]# rabbitmqctl list_users

Listing users ...

admin   [monitoring]

guest   [administrator]

root    []

[root@node171 rabbitmq]# rabbitmqctl list_exchanges

Listing exchanges for vhost / ...

amq.topic       topic

amq.headers     headers

exchange_test_3 direct

amq.direct      direct

exchange_test_2 direct

amq.rabbitmq.trace      topic

amq.match       headers

        direct

exchange_test_1 direct

amq.fanout      fanout

[root@node171 rabbitmq]# rabbitmqctl list_queues

Timeout: 60.0 seconds ...

Listing queues for vhost / ...

queue_test_3    100000

queue_test_2    200

queue_test_1    10000

遷移方案

遷移步驟

1. 中止全部生產者和消費應用程序

2. 集羣B機器依次一臺一臺加入集羣A,並確認全部隊列鏡像完成

3. 剔除集羣A一臺一臺機器

4. 應用指向集羣B

方案1不可行

將集羣B中的一臺機器加入集羣A中,而後再集羣B中的另外一他機器已加入集羣,而後剔除集羣A中一臺機器,而後剔除集羣A的另外一臺機器

方案對於RabbitMQ普通集羣也便是Cluster模式是無效的

1. 中止A集羣的全部鏈接

2. 集羣B中的一臺節點加入到A集羣中

集羣A中的.erlang.cookie拷貝集羣B中的node173

[root@node171 rabbitmq]# cat .erlang.cookie

ORMTFBMHOXOGFKRLQSPU[root@node171 rabbitmq]#

[root@node173 plugins]# cp /var/lib/rabbitmq/.erlang.cookie  /var/lib/rabbitmq/erlang.cookie.bak

[root@node173 plugins]# chmod 700 /var/lib/rabbitmq/.erlang.cookie

 [root@node173 plugins]# vi /var/lib/rabbitmq/.erlang.cookie

ORMTFBMHOXOGFKRLQSPU

[root@node173 plugins]# chmod 400 /var/lib/rabbitmq/.erlang.cookie 

[root@node173 plugins]# ls -lrth /var/lib/rabbitmq/.erlang.cookie

-r-------- 1 rabbitmq rabbitmq 21 Oct 24 18:51 /var/lib/rabbitmq/.erlang.cookie

3.將集羣B中的node173加入到集羣A

[root@node173 rabbitmq]# service rabbitmq-server start

Redirecting to /bin/systemctl start  rabbitmq-server.service

[root@node173 rabbitmq]# rabbitmqctl stop_app

Stopping rabbit application on node mq_173@node173 ...

[root@node173 rabbitmq]# rabbitmqctl reset

Resetting node mq_173@node173 ...

[root@node173 rabbitmq]# rabbitmqctl join_cluster mq171@node171

Clustering node mq_173@node173 with mq171@node171

[root@node173 rabbitmq]# rabbitmqctl start_app

Starting node mq_173@node173 ...

 completed with 3 plugins.

4. 一樣的方法將集羣B中的node174加入到集羣A

[root@node174 rabbitmq]# rabbitmqctl cluster_status

Cluster status of node mq_174@node174 ...

[{nodes,[{disc,[mq_174@node174]}]},

 {running_nodes,[mq_174@node174]},

 {cluster_name,<<"mq_174@node174">>},

 {partitions,[]},

 {alarms,[{mq_174@node174,[]}]}]

 [root@node174 rabbitmq]# rabbitmqctl stop_app

Stopping rabbit application on node mq_174@node174 ...

[root@node174 rabbitmq]# rabbitmqctl reset

Resetting node mq_174@node174 ...

[root@node174 rabbitmq]# rabbitmqctl join_cluster mq171@node171

Clustering node mq_174@node174 with mq171@node171

[root@node174 rabbitmq]# rabbitmqctl start_app

Starting node mq_174@node174 ...

 completed with 0 plugins.

  5.集羣A中的node171剔除集羣

[root@node171 rabbitmq]# rabbitmqctl stop

Stopping and halting node mq171@node171 ...

這時訪問node172 Web集羣管理

image.png

一樣node173Web管理界面查看

image.png

至此對於普通的集羣模式,這種方案是不行的。

方案2【可行】

RabbitMQ採用鏡像隊列,將集羣A中的消息數據遷移集羣B

集羣A中的node171node172採用鏡像隊列

構建集羣A鏡像隊列環境

1.首先集羣Anode172加入集羣中

【在node172操做

[root@node172 ~]# rabbitmqctl stop_app

Stopping rabbit application on node mq172@node172 ...

[root@node172 ~]# rabbitmqctl reset

Resetting node mq172@node172 ...

 [root@node172 ~]# rabbitmqctl join_cluster mq171@node171

Clustering node mq172@node172 with mq171@node171

ra[root@node172 ~]# rabbitmqctl start_app

Starting node mq172@node172 ...

2.設置鏡像策略

【在node171操做

[root@node171 ~]# rabbitmqctl set_policy ha-all -p kcvhost "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all","ha-sync-mode":"automatic"}" with priority "0" for vhost "kcvhost" ...

 

[root@node171 ~]# rabbitmqctl set_policy rabbit_mirror "^" '{"ha-mode":"all"}'

Setting policy "rabbit_mirror" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...

開始集羣A中的鏡像隊列遷移

1.中止全部消息的生產者和消費者相關應用服務

2中止集羣A中的全部機器,並備份原始數據

 node171node172】都要操做

Node172執行以下:

[root@node172 ~]# service rabbitmq-server stop

Redirecting to /bin/systemctl stop  rabbitmq-server.service

[root@node172 ~]# cd /var/lib/rabbitmq/

[root@node172 rabbitmq]# ls

mnesia

[root@node172 rabbitmq]# cp -rf mnesia mnesia.20181025.bak

[root@node172 rabbitmq]# service rabbitmq-server start

Redirecting to /bin/systemctl start  rabbitmq-server.service

node171執行以下:

[root@node171 ~]# service rabbitmq-server stop

Redirecting to /bin/systemctl stop  rabbitmq-server.service

[root@node171 ~]# cd /var/lib/rabbitmq/

[root@node171 rabbitmq]# cp -rf mnesia mnesia.20181025.bak

[root@node171 rabbitmq]# service rabbitmq-server start

Redirecting to /bin/systemctl start  rabbitmq-server.service

2.首先將集羣Bnode173機器加入到集羣A

[root@node173 network-scripts]# service rabbitmq-server stop

Redirecting to /bin/systemctl stop rabbitmq-server.service

[root@mq04 rabbitmq]# cp -rf /var/lib/rabbitmq /var/lib/rabbitmq.bak

[root@mq04 rabbitmq]# cd /var/lib/rabbitmq

[root@mq04 rabbitmq]# rm -rf .erlang.cookie  mnesia/

[root@mq01 rabbitmq]# scp .erlang.cookie  root@mq04:/var/lib/rabbitmq

The authenticity of host 'mq04 (192.168.0.232)' can't be established.

ECDSA key fingerprint is SHA256:zgAicKOpvRLLCyhdUbpNvyanKYrPt/Pp9g+Sdq9mAoo.

ECDSA key fingerprint is MD5:15:7d:1e:c2:86:d5:4a:40:63:df:f5:4e:65:c4:24:62.

Are you sure you want to continue connecting (yes/no)? yes

Warning: Permanently added 'mq04' (ECDSA) to the list of known hosts.

root@mq04's password:

Permission denied, please try again.

root@mq04's password:

.erlang.cookie                                                                   100%   20    19.6KB/s   00:00

[root@mq04 rabbitmq]# chmod 400 .erlang.cookie

[root@mq04 rabbitmq]# chown -R rabbitmq:rabbitmq .erlang.cookie

[root@mq04 rabbitmq]# service rabbitmq-server start

Redirecting to /bin/systemctl start rabbitmq-server.service

[root@mq04 rabbitmq]# rabbitmqctl stop_app

Stopping rabbit application on node mq04@mq04 ...

[root@mq04 rabbitmq]# rabbitmqctl reset

Resetting node mq04@mq04 ...

對於阿里雲ECS必定要在安全組先臨時開啓156722567256724369端口

[root@node173 network-scripts]# rabbitmqctl join_cluster mq171@node171

Clustering node mq_173@node173 with mq171@node171

[root@node173 network-scripts]# rabbitmqctl start_app

Starting node mq_173@node173 ...

 completed with 3 plugins.

3. 而後再將B集羣中的node174機器加入到集羣A

 使用上面一樣的方法,將node174加入到集羣中去

4.剔除集羣A中的node171node172機器

Node172上執行

[root@mq02 rabbitmq]# rabbitmqctl stop_app

Stopping rabbit application on node mq02@mq02 ...

[root@mq02 rabbitmq]# service rabbitmq-server stop

Redirecting to /bin/systemctl stop rabbitmq-server.service

[root@mq02 rabbitmq]# cp -rf /var/lib/rabbitmq /var/lib/rabbitmq.bak

[root@mq02 rabbitmq]# service rabbitmq-server start

Redirecting to /bin/systemctl start rabbitmq-server.service

[root@mq02 rabbitmq]# rabbitmqctl stop_app

Stopping rabbit application on node mq02@mq02 ...

[root@mq02 rabbitmq]# rabbitmqctl reset

Resetting node mq02@mq02 ...

一樣的node171上執行一樣的命令

對於採用鏡像隊列集羣方案可行

相關文章
相關標籤/搜索