RabbitMQ鏡像隊列集羣搭建、與SpringBoot整合

鏡像模式

集羣模式很是經典的就是Mirror鏡像模式,保證100%數據不丟失,在實際工做中也是用的最多的,而且實現集羣比較的簡單。
Mirror鏡像隊列,目的是爲了保證 RabbitMQ 數據的高可靠性解決方案,主要就是實現數據的同步,通常來說2--3個節點實現數據同步(對於100%數據可靠性解決方案通常是3節點)
感興趣的胖友能夠體驗一哈新的閱讀地址:http://www.zhouhong.icu/post/142  (*^▽^*)

1 前提準備

1.1 服務節點分配

服務器IP
hostname
節點說明
端口
管控臺地址
192.168.2.121
zhouhong121
rabbitmq master
5672
http://192.168.2.121:15672
192.168.2.122
zhouhong122
rabbitmq slave
5672
http://192.168.2.122:15672
192.168.2.123
zhouhong123
rabbitmq slave
5672
http://192.168.2.123:15672

2 集羣搭建

前提條件:修改12一、12二、123三臺服務器的 hostname 而且可使用hostname 兩兩之間 ping 通。
  • 修改每臺服務器的 hostname

vim /etc/hostname
## 修改對應的名字,好比:
zhouhong121
  • 更改每臺服務器的 hosts

vim /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.2.121 zhouhong121
192.168.2.122 zhouhong122
192.168.2.123 zhouhong123
  •  測試,用122的hostname ping 123

2.1 集羣節點安裝
RabbitMQ下載:

rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm​
 若是下載卡頓請使用我下載好的網盤進行下載便可
連接:https://pan.baidu.com/s/1diapYC19UlDy4G-4lgZWHA
提取碼:jf5r
複製這段內容後打開百度網盤手機App,操做更方便哦

由於以前在121這檯安裝過:全部在另外兩臺上面快速安裝便可,詳細的安裝請參照:

1、安裝
rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm​
2、啓動
systemctl start rabbitmq-server
3、安裝web管控臺
rabbitmq-plugins enable rabbitmq_management
4、添加用戶
sudo rabbitmqctl add_user admin admin
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin "." "." ".*"  
5、重啓
systemctl start rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
 瀏覽器以admin登陸檢查安裝是否成功:

2.2 文件同步(注意:.erlang.cookie爲隱藏文件,須要使用 -a 查看)

選擇12一、12二、123任意一個節點爲Master(這裏選擇71爲Master),也就是說咱們須要把121的Cookie文件同步到12二、123節點上去,進入/var/lib/rabbitmq目錄下,把/var/lib/rabbitmq/.erlang.cookie文件的權限修改成777,原來是400;而後把.erlang.cookie文件copy到各個節點下;最後把全部cookie文件權限還原爲400便可。
//進入目錄修改權限;遠程copy12二、123節點
cd /var/lib/rabbitmq/
chmod 777 /var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie 192.168.2.122:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie 192.168.2.123:/var/lib/rabbitmq/
// 每臺服務器爲默認修改權限
chmod 400 /var/lib/rabbitmq/.erlang.cookie
2.3 組成集羣
2.3.1 中止服務
咱們首先中止3個節點的服務:(這裏不能使用原來的命令:/etc/init.d/rabbitmq-server stop)

rabbitmqctl stop
2.3.2 組成集羣操做
接下來咱們就可使用集羣命令,配置7一、7二、73爲集羣模式,3個節點(7一、7二、73)執行啓動命令,後續啓動集羣使用此命令便可。

rabbitmq-server -detached
2.3.3 slave加入集羣操做(從新加入集羣也是如此,以最開始的主節點爲加入節點)

//注意作這個步驟的時候:須要配置/etc/hosts 必須相互可以尋址到
//在122節點上執行如下操做
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@zhouhong121
rabbitmqctl start_app
//一樣在123節點上執行如下操做
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@zhouhong121
rabbitmqctl start_app
//在另外其餘節點上操做要移除的集羣節點
//rabbitmqctl forget_cluster_node rabbit@zhouhong12二、12二、123
2.3.4 修改集羣名稱
修改集羣名稱(默認爲第一個node名稱):

rabbitmqctl set_cluster_name rabbitmq_cluster1
2.3.5 查看集羣狀態
最後在集羣的任意一個節點執行命令:查看集羣狀態

rabbitmqctl cluster_status

2.3.6 管控臺界面(注意:這裏可能以前配置的admin 角色會失效,可能須要從新配置一遍)
訪問任意一個管控臺節點: http://192.168.2.121:15672 如圖所示

如圖:121爲dics 12二、123爲 RAMhtml

2.4 配置鏡像隊列
設置鏡像隊列策略(在任意一個節點上執行)

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

 將全部隊列設置爲鏡像隊列,即隊列會被複制到各個節點,各個節點狀態一致,RabbitMQ高可用集羣就已經搭建好了,咱們能夠重啓服務,查看其隊列是否在從節點同步。java

咱們在任何一個節點上建一個隊列,那這個隊列將會加到其餘兩個節點上面

2.5 消息一致性問題
在使用rabbitmq中,消息的一致性是很是重要的一個話題。下面咱們來研究一下,在數據一致性方面,有哪些須要關注的。發送者發送消息出來,在數據一致性的要求下,咱們一般認爲必須達到如下條件
  1. broker持久化消息
  2. publisher知道消息已經成功持久化
首先,咱們能夠採用事務來解決此問題。每一個消息都必須經歷以上兩個步驟,就算一次事務成功。
事務是同步的。所以,若是採用事務,發送性能必然不好。官方給出來的性能是:

異步的方法的效率是事務方法效率的100倍。

咱們能夠採用異步的方式來解決此問題。publisher發送消息後,不進行等待,而是異步監聽是否成功。這種方式又分爲兩種模式,一種是return,另外一種是confirm. 前一種是publisher發送到exchange後,異步收到消息。第二種是publisher發送消息到exchange,queue,consumer收到消息後纔會收到異步收到消息。可見,第二種方式更加安全可靠。以下所示:

可是,異步也存在些侷限性。若是一旦出現broker掛機或者網絡不穩定,broker已經成功接收消息,可是publisher並無收到confirm或return.這時,對於publisher來講,只能重發消息解決問題。而在這裏面,咱們會發生重複消息的問題。固然,若是業務類型要求數據一致性很是高,能夠採用低效率的事務型解決方案。

3 整合SpringBoot

生產端:
  • 引入依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.zhouhong</groupId>
  <artifactId>rabbit-producer</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>    
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency> 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- springboot rabbitmq(amqp) -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>                                    
    </dependencies>
</project>
  •  配置文件 application.properties
server.servlet.context-path=/
server.port=8011

## 鏡像隊列地址
spring.rabbitmq.addresses=192.168.2.121,192.168.2.122,192.168.2.123
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
## 默認虛擬主機
spring.rabbitmq.virtual-host=/
## 鏈接超時
spring.rabbitmq.connection-timeout=15000
## 是否使用啓用消息確認模式(可靠性投遞)
spring.rabbitmq.publisher-confirms=true
## 設置reture消息模式,注意要和mandatory一塊兒配合使用
## spring.rabbitmq.publisher-returns=true
## spring.rabbitmq.template.mandatory=true

spring.application.name=rabbit-producer
spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL
  •  消息發送
package com.zhouhong.rabbit.producer.component;
import java.util.Map;
import java.util.UUID;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RabbbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 這裏是確認消息的回調監聽接口,用於確認消息是否被 broker 所收到
     */
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        /**
         * @param CorrelationData 做爲一個惟一的標識
         * @param ack broker是否落盤成功
         * @param cause 失敗的一些異常信息
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            // TODO Auto-generated method stub        
        }
    };

    /**
     * 對外發送消息的方法
     * @param massage 具體的消息內容
     * @param properties 額外的屬性
     * @throws Exception
     */
    public void send(Object message, Map<String, Object> properties) throws Exception{
        
        MessageHeaders mhs = new MessageHeaders(properties);
        Message<?> msg = MessageBuilder.createMessage(message, mhs);
        /**
         * 使用的是confirms模式,因此在發消息以前須要監控
         */
        rabbitTemplate.setConfirmCallback(confirmCallback);
        //指定業務惟一的ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        
        MessagePostProcessor mpp = new MessagePostProcessor() {    
            @Override
            public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message)
                    throws AmqpException {
                System.out.println("post todo: "+ message);
                return message;
            }
        };        
        rabbitTemplate.convertAndSend("exchange-1", "rabbitmq.*", msg,
                 correlationData);
        
    }
 }
 消費端:
  • 配置文件 application.properties

server.servlet.context-path=/
server.port=8012

## 鏡像隊列地址
spring.rabbitmq.addresses=192.168.2.121,192.168.2.122,192.168.2.123
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
## 默認虛擬主機
spring.rabbitmq.virtual-host=/
## 鏈接超時
spring.rabbitmq.connection-timeout=15000

## 表示消費者消息消費成功之後,須要手工的進行簽收(ACK) 默認爲 auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## 線程數
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
## 一條一條消費
spring.rabbitmq.listener.simple.prefetch=1

##    最好不要在代碼裏寫死配置信息,儘可能使用這種方式也就是配置文件的方式
##    在代碼裏使用 ${}    方式進行設置配置: ${spring.rabbitmq.listener.order.exchange.name}
## 交換機名稱 
## spring.rabbitmq.listener.order.exchange.name=order-exchange
## 是否持久化 
## spring.rabbitmq.listener.order.exchange.durable=true
## type 類型
## spring.rabbitmq.listener.order.exchange.type=topic
## 規則
## spring.rabbitmq.listener.order.exchange.key=rabbitmq.*

spring.application.name=rabbit-producer
spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL
  •  接收消息
package com.zhouhong.rabbit.consumer.component;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;
@Component
public class RabbbitReceive {
    /**
     * 組合使用監聽
     * @param message
     * @param channel
     * @throws Exception
     */
    @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "queue-1", durable = "true"),
                exchange = @Exchange(name = "exchange-1", 
                durable = "true", 
                type = "topic",
                ignoreDeclarationExceptions = "true"),
                key = "rabbitmq.*"
            )
            )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        //一、收到消息之後進行業務端消費處理
        System.err.println("======================");
        System.err.println("消息消費:" + message.getPayload());        
        //二、處理成功以後獲取deliveryTay 而且進行手工的ACK操做,由於咱們配置文件裏面配置的是手工簽收
        Long deliveryTay = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTay, false);
    }
}
 測試:
在發送端建測試類:

 

package com.zhouhong.rabbit.producer.test;

import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.zhouhong.rabbit.producer.component.RabbbitSender;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
    @Autowired
    private RabbbitSender rabbbitSender;
    
    @Test
    public void testSender() throws Exception{
        Map<String , Object> properties = new HashMap<String, Object>();
        properties.put("key1", "你好呀,RabbitMQ!!");
        properties.put("key2", "你好呀,Kafka!!");
        rabbbitSender.send("rabbitmq-test", properties);
    
        Thread.sleep(10000);
    }
}
 一、啓動消費者,觀察管控臺

 創建了一個咱們代碼裏面指定的交換機 exchange-1,而且綁定了咱們指定的隊列queue-1,路由規則爲 rabbitmq.*node

二、關閉消費者,只運行咱們發送端的測試方法,觀察管控臺

 咱們發現會有一條未消費的消息。web

三、接着,咱們再啓動消費端,觀察管控臺

  以前堆積的消息已經被消費掉了,因而可知,咱們的消息能夠成功地投遞,並且被成功地消費。
  消費端會保證,在消息未消費以前不會將消息刪掉,以此來保證消息的可靠性。通常咱們只須要作生產端跟broker的可靠性就能夠了,消費端MQ會本身幫咱們作可靠性工做。
相關文章
相關標籤/搜索