11.RabbitMQ單機集羣

RabbitMQ集羣設計用於完成兩個目標:容許消費者和生產者在RabbitMQ節點崩潰的狀況下繼續運行,以及經過添加更多的節點來擴展消息通訊的吞吐量。html

RabbitMQ會始終記錄如下四種類型的內部元數據:java

1.         隊列元數據-隊列的名稱和它們的屬性(是否持久化,是否自動刪除)node

2.         交換器元數據-交換器類型、名稱和屬性(可持久化等)緩存

3.         綁定元數據-一張簡單的表格展現瞭如何將消息路由到隊列安全

4.         vhost元數據-爲vhost內的隊列、交換器和綁定提供命名空間和安全屬性bash

 在單一節點內,RabbitMQ會將全部這些信息存儲在內存中,同時將那些標記爲可持久化的隊列和交換器(以及它們的綁定)存儲到硬盤上。當你引入集羣時,RabbitMQ須要追蹤新的元數據類型:集羣節點位置,以及節點與已記錄的其餘類型元數據的關係。集羣提供了選擇:將元數據存儲到磁盤上,或者存儲在內存中。服務器

Erlang Cookie

Erlang Cookie是保證不一樣節點能夠相互通訊的密鑰,要保證集羣中的不一樣節點相互通訊必須共享相同的Erlang Cookie。具體的目錄存放在/var/lib/rabbitmq/.erlang.cookie。cookie

說明: 這就要從rabbitmqctl命令的工做原理提及,RabbitMQ底層是經過Erlang架構來實現的,因此rabbitmqctl會啓動Erlang節點,並基於Erlang節點來使用Erlang系統鏈接RabbitMQ節點,在鏈接過程當中須要正確的Erlang Cookie和節點名稱,Erlang節點經過交換Erlang Cookie以得到認證。網絡

鏡像隊列

功能和原理 
RabbitMQ的Cluster集羣模式通常分爲兩種,普通模式和鏡像模式。架構

  • 普通模式:默認的集羣模式,以兩個節點(rabbit0一、rabbit02)爲例來進行說明。對於Queue來講,消息實體只存在於其中一個節點rabbit01(或者rabbit02),rabbit01和rabbit02兩個節點僅有相同的元數據,即隊列的結構。當消息進入rabbit01節點的Queue後,consumer從rabbit02節點消費時,RabbitMQ會臨時在rabbit0一、rabbit02間進行消息傳輸,把A中的消息實體取出並通過B發送給consumer。因此consumer應儘可能鏈接每個節點,從中取消息。即對於同一個邏輯隊列,要在多個節點創建物理Queue。不然不管consumer連rabbit01或rabbit02,出口總在rabbit01,會產生瓶頸。當rabbit01節點故障後,rabbit02節點沒法取到rabbit01節點中還未消費的消息實體。若是作了消息持久化,那麼得等rabbit01節點恢復,而後纔可被消費;若是沒有持久化的話,就會產生消息丟失的現象。

  • 鏡像模式:將須要消費的隊列變爲鏡像隊列,存在於多個節點,這樣就能夠實現RabbitMQ的HA高可用性。做用就是消息實體會主動在鏡像節點之間實現同步,而不是像普通模式那樣,在consumer消費數據時臨時讀取。缺點就是,集羣內部的同步通信會佔用大量的網絡帶寬。

 

內存節點和磁盤節點

每一個RabbitMQ節點,要麼是內存節點(ram node),要麼是磁盤節點(disk node)。內存節點將全部的隊列、交換器、綁定、用戶、權限和vhost的元數據定義都僅存在內存中。而磁盤節點則將元數據存儲在磁盤中。

 內存節點的效率更高,內存節點惟一存儲到磁盤上的是磁盤節點的地址。

 

RabbitMQ要求集羣中至少有一個磁盤節點。當節點加入或者離開集羣時,它們必需要將該變動通知到至少一個磁盤節點。若是隻有一個磁盤節點,並且不湊巧的是它又崩潰了,那麼集羣能夠繼續路由消息,可是不能作如下操做了:

1.         建立隊列

2.         建立交換器

3.         建立綁定

4.         添加用戶

 

5.         更改權限

 

單機環境搭建多節點羣集

一、禁用管理後臺插件rabbitmq-plugins disable rabbitmq_management

二、建立三個Shell文件


rabbitmq1.sh

#!/bin/bash

export RABBITMQ_NODE_PORT=5672

export RABBITMQ_NODENAME=rabbit 

 

rabbitmq-server

 

rabbitmq2.sh

#!/bin/bash

export RABBITMQ_NODE_PORT=5673

export RABBITMQ_NODENAME=rabbit2

 

rabbitmq-server

 

rabbitmq3.sh

#!/bin/bash

export RABBITMQ_NODE_PORT=5674

export RABBITMQ_NODENAME=rabbit3

 

rabbitmq-server

 

三、中止在Erlang節點上運行的節點2和節點3 RabbitMQ Server 並清空(重置)它們的元數據

rabbitmqctl -n rabbit1@localhost stop_app

rabbitmqctl -n rabbit2@localhost stop_app

 

rabbitmqctl -n rabbit1@localhost reset

rabbitmqctl -n rabbit2@localhost reset

 

四、將節點2做爲磁盤節點加入集羣並啓動應用

rabbitmqctl -n rabbit1@localhost join_cluster rabbit@localhost

rabbitmqctl -n rabbit1@localhost start_app

 

五、將節點3做爲內存節點加入集羣並啓動應用

rabbitmqctl -n rabbit2@localhost join_cluster --ram rabbit@localhost

rabbitmqctl -n rabbit2@localhost start_app

 

六、運行命令rabbitmqctl cluster_status查看集羣狀態

Cluster status of node rabbit@localhost ...

[{nodes,[{disc,[rabbit1@localhost,rabbit@localhost]},

         {ram,[rabbit2@localhost]}]},

 {running_nodes,[rabbit2@localhost,rabbit1@localhost,rabbit@localhost]},

 {cluster_name,<<"rabbit@localhost">>},

 {partitions,[]},

 {alarms,[{rabbit2@localhost,[]},

          {rabbit1@localhost,[]},

          {rabbit@localhost,[]}]}]

 

 

集羣安裝成功,這時候java客戶端能夠鏈接任何一個RabbitMQ Server的端口來訪問集羣了。

 

七、鏡像隊列

在聲明隊列時,能夠經過參數"x-ha-policy"設置爲"all"來把消息發送到集羣的全部節點上。

Map arg = new HashMap();

arg.put("x-ha-policy", "all");

channel.queueDeclare(queueName, false, false, false, arg);

 

客戶端發送代碼

package com.test.cluster;

 

import com.rabbitmq.client.*;

 

import java.io.IOException;

import java.lang.String;

import java.lang.System;

import java.util.HashMap;

import java.util.Map;

import java.util.Scanner;

 

public class Producer {

 

    public static void main(String[] args) throws Exception {

   

    //使用默認端口鏈接MQ

        ConnectionFactory factory = new ConnectionFactory();

    factory.setUsername("admin");

    factory.setPassword("admin");

        factory.setHost("192.168.169.142"); //使用默認端口5672

        Connection conn = factory.newConnection(); //聲明一個鏈接

        Channel channel = conn.createChannel(); //聲明消息通道

   

        String exchangeName = "TestEXG";//交換機名稱

        String routingKey = "RouteKey1";//RoutingKey關鍵字

        channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機

        String queueName = "ClusterQueue";//隊列名稱

        Map arg = new HashMap();

        arg.put("x-ha-policy", "all");

        channel.queueDeclare(queueName, false, false, false, arg);

 

        channel.queueBind(queueName, exchangeName, routingKey);//定義聲明對象

        

        byte[] messageBodyBytes = "Hello, world!".getBytes();//消息內容

        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//發佈消息

        //關閉通道和鏈接

channel.close();

conn.close();

    }

 

 

}

 

消費者代碼

package com.test.cluster;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

 

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

 

//經過channel.basicAck向服務器發送回執,刪除服務上的消息

public class Consumer {

 

    public static void main(String[] args) throws IOException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();

    factory.setUsername("admin");

    factory.setPassword("admin");

        factory.setHost("192.168.169.142"); //使用默認端口5672

        factory.setPort(5672);

        Connection conn = factory.newConnection(); //聲明一個鏈接

        Channel channel = conn.createChannel(); //聲明消息通道

        String exchangeName = "TestEXG";//交換機名稱

        String queueName = "ClusterQueue";//隊列名稱

        channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機

        channel.queueBind(queueName, exchangeName, "RouteKey1");

 

        channel.basicQos(1); //server push消息時的隊列長度

 

        //用來緩存服務器推送過來的消息

        QueueingConsumer consumer = new QueueingConsumer(channel);

 

        channel.basicConsume(queueName, false, consumer);

 

        while (true) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();

            System.out.println("Received " + new String(delivery.getBody()));

 

            //回覆ack包,若是不回覆,消息不會在服務器刪除

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        }

    }

 

}

相關文章
相關標籤/搜索