需求背景java
將阿里雲同一個VPC下的RabbitMQ集羣的消息從一個網段集羣遷移到另外一個網段集羣。消息中間件的消息是即時消費,爲什麼還有歷史消息,由於是歷史遺留問題。故要遷移node
整個網絡拓撲圖以下安全
注意:cookie
若對於跨VPC網絡網絡
1. 確保各主機網絡互通app
2. 配置好各主機名ide
兩邊安全組出方向開發:15672、25672、5672、4369端口函數
否在在加入集羣會出現問題測試
資源清單阿里雲
主機名 |
IP地址 |
角色 |
備註 |
node171 |
172.20.0.171 |
老的MQ集羣_1 |
|
node172 |
172.20.0.172 |
老的MQ集羣_2 |
|
node173 |
192.168.0.173 |
新的MQ集羣_1 |
|
node174 |
192.168.0.174 |
新的MQ集羣_2 |
基礎軟件及環境信息
操做系統:CentOS Linux release 7.3.1611
Erlang:Erlang/OTP 20 [erts-9.3.3.3]
RabbitMQ:rabbitmq_server-3.7.8
集羣的部署
node171、node172組成集羣A
node173、node174組成集羣B
這裏的環境部署略
建立測試帳戶
在【node171上進行操做】
rabbitmqctl add_user root root123
rabbitmqctl add_vhost kcvhost
rabbitmqctl set_permissions -p kcvhost root ".*" ".*" ".*"
rabbitmqctl add_user admin admin123
rabbitmqctl set_permissions -p kcvhost admin ".*" ".*" ".*"
rabbitmqctl set_user_tags admin administrator
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl stop_app
rabbitmqctl start_app
生成測試數據
消息生產者代碼:
package com.zjkj.rabbitmq.demo; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /** * 消息的生產者 * @author zjkj * */ public class Rabbitmq_Producer { private static final String EXCHANGE_NAME = "exchange_test_3"; private static final String ROUTING_KEY = "routingkey_demo"; private static final String QUEUE_NAME = "queue_test_3"; private static final String IP_ADDRESS = "172.20.0.171"; private static final int PORT = 5672; //RabbitMQ服務默認端口號爲5672 public static void main(String[] args) throws IOException,TimeoutException,InterruptedException{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("root"); factory.setPassword("root123"); Connection connection = factory.newConnection(); //建立鏈接 Channel channel = connection.createChannel(); // 建立信道 //建立一個type="direct"、持久化、非自動刪除的交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct",true, false, null); // 建立一個持久化、非排他的、非自動刪除的隊列 channel.queueDeclare(QUEUE_NAME, true, false, false,null); // 將交換器與隊列經過路由鍵綁定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 發送一條持久化的消息:hello world! for(int i=1;i<=100000;i++){ String msg = "交換器_1與隊列1綁定:Message_"+i; channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); } // 關閉資源 channel.close(); connection.close(); } }
消費者代碼
package com.zjkj.rabbitmq.demo; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 消息的消費者 * @author zjkj * */ public class Rabbitmq_Consumer { private static final String QUEUE_NAME = "queue_test_3"; private static final String IP_ADDRESS = "192.168.6.171"; private static final int PORT = 5672; public static void main(String[] args) throws IOException,TimeoutException,InterruptedException{ Address[] addresses = new Address[]{ new Address(IP_ADDRESS,PORT) }; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("root"); factory.setPassword("root123"); // 這裏的鏈接方式與生產者的demo略有不一樣,注意區別 Connection connection = factory.newConnection(addresses); //建立鏈接 final Channel channel = connection.createChannel(); // 建立信道 channel.basicQos(64);// 設置客戶端最多接收未被ack的消息的個數 /** * 這裏採用了繼承DefaultConsumer的方式來實現消費,有過RabbitMQ使用經驗的開發者 * 可能喜歡使用QueueingConsumer的方式來實現消費 * 由於使用QueueingConsumer會有一些隱患。 * 同時在RabbitMQ Java客戶端4.0.0版本開始將QueueingConsumer標記爲@Deprecated了 */ Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{ System.out.println("recv message : " + new String(body)); try{ TimeUnit.SECONDS.sleep(1); }catch(InterruptedException e){ e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(QUEUE_NAME,true, consumer); //等待回調函數執行完畢以後,關閉資源 TimeUnit.SECONDS.sleep(5); channel.close(); connection.close(); } }
查看集羣中諸如用戶數、交換器數量、隊列數量等
[root@node171 rabbitmq]# rabbitmqctl list_users
Listing users ...
admin [monitoring]
guest [administrator]
root []
[root@node171 rabbitmq]# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.topic topic
amq.headers headers
exchange_test_3 direct
amq.direct direct
exchange_test_2 direct
amq.rabbitmq.trace topic
amq.match headers
direct
exchange_test_1 direct
amq.fanout fanout
[root@node171 rabbitmq]# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
queue_test_3 100000
queue_test_2 200
queue_test_1 10000
遷移方案
遷移步驟
1. 中止全部生產者和消費者的應用程序
2. 將集羣B中的機器依次一臺一臺加入集羣A中,並確認全部隊列鏡像完成
3. 剔除集羣A中的一臺一臺機器
4. 將應用指向集羣B
方案1【不可行】
將集羣B中的一臺機器加入集羣A中,而後再集羣B中的另外一他機器已加入集羣,而後剔除集羣A中的一臺機器,而後再剔除集羣A中的另外一臺機器
此方案對於RabbitMQ的普通集羣也便是Cluster模式是無效的
1. 中止A集羣中的全部鏈接
2. 將集羣B中的一臺節點加入到A集羣中
將集羣A中的.erlang.cookie的值拷貝到集羣B中的node173上
[root@node171 rabbitmq]# cat .erlang.cookie
ORMTFBMHOXOGFKRLQSPU[root@node171 rabbitmq]#
[root@node173 plugins]# cp /var/lib/rabbitmq/.erlang.cookie /var/lib/rabbitmq/erlang.cookie.bak
[root@node173 plugins]# chmod 700 /var/lib/rabbitmq/.erlang.cookie
[root@node173 plugins]# vi /var/lib/rabbitmq/.erlang.cookie
ORMTFBMHOXOGFKRLQSPU |
[root@node173 plugins]# chmod 400 /var/lib/rabbitmq/.erlang.cookie
[root@node173 plugins]# ls -lrth /var/lib/rabbitmq/.erlang.cookie
-r-------- 1 rabbitmq rabbitmq 21 Oct 24 18:51 /var/lib/rabbitmq/.erlang.cookie
3.將集羣B中的node173加入到集羣A中
[root@node173 rabbitmq]# service rabbitmq-server start
Redirecting to /bin/systemctl start rabbitmq-server.service
[root@node173 rabbitmq]# rabbitmqctl stop_app
Stopping rabbit application on node mq_173@node173 ...
[root@node173 rabbitmq]# rabbitmqctl reset
Resetting node mq_173@node173 ...
[root@node173 rabbitmq]# rabbitmqctl join_cluster mq171@node171
Clustering node mq_173@node173 with mq171@node171
[root@node173 rabbitmq]# rabbitmqctl start_app
Starting node mq_173@node173 ...
completed with 3 plugins.
4. 一樣的方法將集羣B中的node174加入到集羣A中
[root@node174 rabbitmq]# rabbitmqctl cluster_status
Cluster status of node mq_174@node174 ...
[{nodes,[{disc,[mq_174@node174]}]},
{running_nodes,[mq_174@node174]},
{cluster_name,<<"mq_174@node174">>},
{partitions,[]},
{alarms,[{mq_174@node174,[]}]}]
[root@node174 rabbitmq]# rabbitmqctl stop_app
Stopping rabbit application on node mq_174@node174 ...
[root@node174 rabbitmq]# rabbitmqctl reset
Resetting node mq_174@node174 ...
[root@node174 rabbitmq]# rabbitmqctl join_cluster mq171@node171
Clustering node mq_174@node174 with mq171@node171
[root@node174 rabbitmq]# rabbitmqctl start_app
Starting node mq_174@node174 ...
completed with 0 plugins.
5.將集羣A中的node171剔除集羣
[root@node171 rabbitmq]# rabbitmqctl stop
Stopping and halting node mq171@node171 ...
這時訪問node172 的Web集羣管理
一樣在node173上的Web管理界面查看
至此對於普通的集羣模式,這種方案是不行的。
方案2【可行】
若RabbitMQ採用鏡像隊列,將集羣A中的消息數據遷移到集羣B中,
集羣A中的node171、node172採用鏡像隊列
構建集羣A的鏡像隊列環境
1.首先集羣A中的node172加入集羣中
【在node172上操做】
[root@node172 ~]# rabbitmqctl stop_app
Stopping rabbit application on node mq172@node172 ...
[root@node172 ~]# rabbitmqctl reset
Resetting node mq172@node172 ...
[root@node172 ~]# rabbitmqctl join_cluster mq171@node171
Clustering node mq172@node172 with mq171@node171
ra[root@node172 ~]# rabbitmqctl start_app
Starting node mq172@node172 ...
2.設置鏡像策略
【在node171上操做】
[root@node171 ~]# rabbitmqctl set_policy ha-all -p kcvhost "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all","ha-sync-mode":"automatic"}" with priority "0" for vhost "kcvhost" ...
[root@node171 ~]# rabbitmqctl set_policy rabbit_mirror "^" '{"ha-mode":"all"}'
Setting policy "rabbit_mirror" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...
開始集羣A中的鏡像隊列遷移
1.中止全部消息的生產者和消費者相關應用服務
2.中止集羣A中的全部機器,並備份原始數據
【node171、node172】都要操做
Node172執行以下:
[root@node172 ~]# service rabbitmq-server stop
Redirecting to /bin/systemctl stop rabbitmq-server.service
[root@node172 ~]# cd /var/lib/rabbitmq/
[root@node172 rabbitmq]# ls
mnesia
[root@node172 rabbitmq]# cp -rf mnesia mnesia.20181025.bak
[root@node172 rabbitmq]# service rabbitmq-server start
Redirecting to /bin/systemctl start rabbitmq-server.service
node171執行以下:
[root@node171 ~]# service rabbitmq-server stop
Redirecting to /bin/systemctl stop rabbitmq-server.service
[root@node171 ~]# cd /var/lib/rabbitmq/
[root@node171 rabbitmq]# cp -rf mnesia mnesia.20181025.bak
[root@node171 rabbitmq]# service rabbitmq-server start
Redirecting to /bin/systemctl start rabbitmq-server.service
2.首先將集羣B的node173機器加入到集羣A中
[root@node173 network-scripts]# service rabbitmq-server stop
Redirecting to /bin/systemctl stop rabbitmq-server.service
[root@mq04 rabbitmq]# cp -rf /var/lib/rabbitmq /var/lib/rabbitmq.bak
[root@mq04 rabbitmq]# cd /var/lib/rabbitmq
[root@mq04 rabbitmq]# rm -rf .erlang.cookie mnesia/
[root@mq01 rabbitmq]# scp .erlang.cookie root@mq04:/var/lib/rabbitmq
The authenticity of host 'mq04 (192.168.0.232)' can't be established.
ECDSA key fingerprint is SHA256:zgAicKOpvRLLCyhdUbpNvyanKYrPt/Pp9g+Sdq9mAoo.
ECDSA key fingerprint is MD5:15:7d:1e:c2:86:d5:4a:40:63:df:f5:4e:65:c4:24:62.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'mq04' (ECDSA) to the list of known hosts.
root@mq04's password:
Permission denied, please try again.
root@mq04's password:
.erlang.cookie 100% 20 19.6KB/s 00:00
[root@mq04 rabbitmq]# chmod 400 .erlang.cookie
[root@mq04 rabbitmq]# chown -R rabbitmq:rabbitmq .erlang.cookie
[root@mq04 rabbitmq]# service rabbitmq-server start
Redirecting to /bin/systemctl start rabbitmq-server.service
[root@mq04 rabbitmq]# rabbitmqctl stop_app
Stopping rabbit application on node mq04@mq04 ...
[root@mq04 rabbitmq]# rabbitmqctl reset
Resetting node mq04@mq04 ...
對於阿里雲ECS必定要在安全組先臨時開啓15672、25672、5672、4369端口
[root@node173 network-scripts]# rabbitmqctl join_cluster mq171@node171
Clustering node mq_173@node173 with mq171@node171
[root@node173 network-scripts]# rabbitmqctl start_app
Starting node mq_173@node173 ...
completed with 3 plugins.
3. 而後再將B集羣中的node174機器加入到集羣A中
使用上面一樣的方法,將node174加入到集羣中去
4.剔除集羣A中的node171、node172機器
Node172上執行
[root@mq02 rabbitmq]# rabbitmqctl stop_app
Stopping rabbit application on node mq02@mq02 ...
[root@mq02 rabbitmq]# service rabbitmq-server stop
Redirecting to /bin/systemctl stop rabbitmq-server.service
[root@mq02 rabbitmq]# cp -rf /var/lib/rabbitmq /var/lib/rabbitmq.bak
[root@mq02 rabbitmq]# service rabbitmq-server start
Redirecting to /bin/systemctl start rabbitmq-server.service
[root@mq02 rabbitmq]# rabbitmqctl stop_app
Stopping rabbit application on node mq02@mq02 ...
[root@mq02 rabbitmq]# rabbitmqctl reset
Resetting node mq02@mq02 ...
一樣的node171上執行一樣的命令
對於採用鏡像隊列集羣,此方案可行