RabbitMQ集羣設計用於完成兩個目標:容許消費者和生產者在RabbitMQ節點崩潰的狀況下繼續運行,以及經過添加更多的節點來擴展消息通訊的吞吐量。html
RabbitMQ會始終記錄如下四種類型的內部元數據:java
1. 隊列元數據-隊列的名稱和它們的屬性(是否持久化,是否自動刪除)node
2. 交換器元數據-交換器類型、名稱和屬性(可持久化等)緩存
3. 綁定元數據-一張簡單的表格展現瞭如何將消息路由到隊列安全
4. vhost元數據-爲vhost內的隊列、交換器和綁定提供命名空間和安全屬性bash
在單一節點內,RabbitMQ會將全部這些信息存儲在內存中,同時將那些標記爲可持久化的隊列和交換器(以及它們的綁定)存儲到硬盤上。當你引入集羣時,RabbitMQ須要追蹤新的元數據類型:集羣節點位置,以及節點與已記錄的其餘類型元數據的關係。集羣提供了選擇:將元數據存儲到磁盤上,或者存儲在內存中。服務器
Erlang Cookie是保證不一樣節點能夠相互通訊的密鑰,要保證集羣中的不一樣節點相互通訊必須共享相同的Erlang Cookie。具體的目錄存放在/var/lib/rabbitmq/.erlang.cookie。cookie
說明: 這就要從rabbitmqctl命令的工做原理提及,RabbitMQ底層是經過Erlang架構來實現的,因此rabbitmqctl會啓動Erlang節點,並基於Erlang節點來使用Erlang系統鏈接RabbitMQ節點,在鏈接過程當中須要正確的Erlang Cookie和節點名稱,Erlang節點經過交換Erlang Cookie以得到認證。網絡
功能和原理
RabbitMQ的Cluster集羣模式通常分爲兩種,普通模式和鏡像模式。架構
普通模式:默認的集羣模式,以兩個節點(rabbit0一、rabbit02)爲例來進行說明。對於Queue來講,消息實體只存在於其中一個節點rabbit01(或者rabbit02),rabbit01和rabbit02兩個節點僅有相同的元數據,即隊列的結構。當消息進入rabbit01節點的Queue後,consumer從rabbit02節點消費時,RabbitMQ會臨時在rabbit0一、rabbit02間進行消息傳輸,把A中的消息實體取出並通過B發送給consumer。因此consumer應儘可能鏈接每個節點,從中取消息。即對於同一個邏輯隊列,要在多個節點創建物理Queue。不然不管consumer連rabbit01或rabbit02,出口總在rabbit01,會產生瓶頸。當rabbit01節點故障後,rabbit02節點沒法取到rabbit01節點中還未消費的消息實體。若是作了消息持久化,那麼得等rabbit01節點恢復,而後纔可被消費;若是沒有持久化的話,就會產生消息丟失的現象。
鏡像模式:將須要消費的隊列變爲鏡像隊列,存在於多個節點,這樣就能夠實現RabbitMQ的HA高可用性。做用就是消息實體會主動在鏡像節點之間實現同步,而不是像普通模式那樣,在consumer消費數據時臨時讀取。缺點就是,集羣內部的同步通信會佔用大量的網絡帶寬。
每一個RabbitMQ節點,要麼是內存節點(ram node),要麼是磁盤節點(disk node)。內存節點將全部的隊列、交換器、綁定、用戶、權限和vhost的元數據定義都僅存在內存中。而磁盤節點則將元數據存儲在磁盤中。
內存節點的效率更高,內存節點惟一存儲到磁盤上的是磁盤節點的地址。
RabbitMQ要求集羣中至少有一個磁盤節點。當節點加入或者離開集羣時,它們必需要將該變動通知到至少一個磁盤節點。若是隻有一個磁盤節點,並且不湊巧的是它又崩潰了,那麼集羣能夠繼續路由消息,可是不能作如下操做了:
1. 建立隊列
2. 建立交換器
3. 建立綁定
4. 添加用戶
5. 更改權限
單機環境搭建多節點羣集
一、禁用管理後臺插件rabbitmq-plugins disable rabbitmq_management
二、建立三個Shell文件
rabbitmq1.sh
#!/bin/bash
export RABBITMQ_NODE_PORT=5672
export RABBITMQ_NODENAME=rabbit
rabbitmq-server
rabbitmq2.sh
#!/bin/bash
export RABBITMQ_NODE_PORT=5673
export RABBITMQ_NODENAME=rabbit2
rabbitmq-server
rabbitmq3.sh
#!/bin/bash
export RABBITMQ_NODE_PORT=5674
export RABBITMQ_NODENAME=rabbit3
rabbitmq-server
三、中止在Erlang節點上運行的節點2和節點3 RabbitMQ Server 並清空(重置)它們的元數據
rabbitmqctl -n rabbit1@localhost stop_app
rabbitmqctl -n rabbit2@localhost stop_app
rabbitmqctl -n rabbit1@localhost reset
rabbitmqctl -n rabbit2@localhost reset
四、將節點2做爲磁盤節點加入集羣並啓動應用
rabbitmqctl -n rabbit1@localhost join_cluster rabbit@localhost
rabbitmqctl -n rabbit1@localhost start_app
五、將節點3做爲內存節點加入集羣並啓動應用
rabbitmqctl -n rabbit2@localhost join_cluster --ram rabbit@localhost
rabbitmqctl -n rabbit2@localhost start_app
六、運行命令rabbitmqctl cluster_status查看集羣狀態
Cluster status of node rabbit@localhost ...
[{nodes,[{disc,[rabbit1@localhost,rabbit@localhost]},
{ram,[rabbit2@localhost]}]},
{running_nodes,[rabbit2@localhost,rabbit1@localhost,rabbit@localhost]},
{cluster_name,<<"rabbit@localhost">>},
{partitions,[]},
{alarms,[{rabbit2@localhost,[]},
{rabbit1@localhost,[]},
{rabbit@localhost,[]}]}]
集羣安裝成功,這時候java客戶端能夠鏈接任何一個RabbitMQ Server的端口來訪問集羣了。
七、鏡像隊列
在聲明隊列時,能夠經過參數"x-ha-policy"設置爲"all"來把消息發送到集羣的全部節點上。
Map arg = new HashMap();
arg.put("x-ha-policy", "all");
channel.queueDeclare(queueName, false, false, false, arg);
客戶端發送代碼
package com.test.cluster;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) throws Exception {
//使用默認端口鏈接MQ
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.142"); //使用默認端口5672
Connection conn = factory.newConnection(); //聲明一個鏈接
Channel channel = conn.createChannel(); //聲明消息通道
String exchangeName = "TestEXG";//交換機名稱
String routingKey = "RouteKey1";//RoutingKey關鍵字
channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機
String queueName = "ClusterQueue";//隊列名稱
Map arg = new HashMap();
arg.put("x-ha-policy", "all");
channel.queueDeclare(queueName, false, false, false, arg);
channel.queueBind(queueName, exchangeName, routingKey);//定義聲明對象
byte[] messageBodyBytes = "Hello, world!".getBytes();//消息內容
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//發佈消息
//關閉通道和鏈接
channel.close();
conn.close();
}
}
消費者代碼
package com.test.cluster;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
//經過channel.basicAck向服務器發送回執,刪除服務上的消息
public class Consumer {
public static void main(String[] args) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.142"); //使用默認端口5672
factory.setPort(5672);
Connection conn = factory.newConnection(); //聲明一個鏈接
Channel channel = conn.createChannel(); //聲明消息通道
String exchangeName = "TestEXG";//交換機名稱
String queueName = "ClusterQueue";//隊列名稱
channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機
channel.queueBind(queueName, exchangeName, "RouteKey1");
channel.basicQos(1); //server push消息時的隊列長度
//用來緩存服務器推送過來的消息
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("Received " + new String(delivery.getBody()));
//回覆ack包,若是不回覆,消息不會在服務器刪除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}