分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

前言

對於分佈式消息中間件,首先要了解兩個基礎的概念,即什麼是分佈式系統,什麼又是中間件。java

分佈式系統node

「A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messasges.」——《Distributed Systems Concepts and Design》web

從上面這個解釋能夠獲得分佈式系統的兩個特色:組件分佈在網絡計算機上組件之間經過消息來協調行動正則表達式

中間件算法

Middleware is computer software that provides services to software applications beyond those available from the operating system. It can be described as "software glue". Middleware makes it easier for software developers to implement communication and input/output, so they can focus on the specific purpose of their application.——維基百科api

中間件被描述爲爲應用程序提供操做系統所提供的服務以外的服務,簡化應用程序的通訊、輸入輸出的開發,使他們專一於本身的業務邏輯。從維基百科上對中間件的解釋感受有點繞,其實能夠從「空間」的角度去理解中間件,即中間件是處於「中間層」的組件,是上層的應用程序和底層的服務之間的橋樑(好比DB中間件的上層是應用程序,底層是DB服務),也是應用與應用之間的橋樑(好比分佈式服務組件)。緩存

分佈式消息中間件服務器

「Message-oriented middleware (MOM) is software or hardware infrastructure supporting sending and receiving messages between distributed systems.」——維基百科網絡

維基百科給出的消息中間件的定義是支持在分佈式系統中發送和接受消息的硬件或軟件基礎設施(對咱們這裏討論的範圍來講確定就是軟件了)。session

那麼分佈式消息中間件其實就是指消息中間件自己也是一個分佈式系統。

消息中間件能作什麼?

任何中間件必然都是要去解決特定領域的某個問題,消息中間件解決的就是分佈式系統之間消息傳遞的問題。消息傳遞是分佈式系統必然要面對的一個問題。

簡單歸納一下消息中間件的應用場景大體以下:

  • 業務解耦:交易系統不須要知道短信通知服務的存在,只須要發佈消息
  • 削峯填谷:好比上游系統的吞吐能力高於下游系統,在流量洪峯時可能會沖垮下游系統,消息中間件能夠在峯值時堆積消息,而在峯值過去後下游系統慢慢消費消息解決流量洪峯的問題
  • 事件驅動:系統與系統之間能夠經過消息傳遞的形式驅動業務,以流式的模型處理

分佈式消息中間件長什麼樣?

一個抽象的對分佈式消息中間件的認知大概是這樣:

  • 有一個SDK,提供給業務系統發送、消費消息的接口
  • 有一批Server節點用於接受和存儲消息,並在合適的時候發送給下游的系統進行消費

別嫌囉嗦,大體介紹一下,方便下面的理解,本系列主要講三個經常使用的消息中間件,也就是RabbitmqRocketMqKafka,固然篇幅所限確定講不完,只能挑比較重要的東西寫,但也能讓不會的同窗初步掌握怎麼去使用。

完整版的消息中間件學習資料和我我的整理的筆記

能夠直接點擊藍字領取

好了,話很少說,發車嘍!


分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

RabbitMQ除了像兔子同樣跑的很快之外,還有這些特色:

  • 開源、性能優秀,穩定性保障
  • 提供可靠性消息投遞模式、返回模式
  • 與Spring AMQP完美整合,API豐富
  • 集羣模式豐富,表達式配置,HA模式,鏡像隊列模型
  • 保證數據不丟失的前提作到高可靠性、可用性

1、Rabbitmq消息隊列應用

一、RabbitMQ介紹

  RabbitMQ是一款基於AMQP(消息隊列協議),由Erlang開發的開源消息隊列組件。是一款優秀的消息隊列組件,他由兩部分組成:服務端和客戶端,客戶端支持多種語言的驅動,如:.Net、JAVA、Erlang等。RabbitMQ與其餘消息隊列組件性能比較,在此不做介紹,網上有大把的資料。


二、RabbitMQ原理簡介

分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

  RabbitMQ中間件分爲服務端(RabbitMQ Server)和客戶端(RabbitMQ Client),服務端能夠理解爲是一個消息的代理消費者,客戶端又分爲消息生產者(Producer)和消息消費者(Consumer)。

  2.1 消息生產者(Producer):主要生產消息並將消息基於TCP協議,經過創建Connection和Channel,將消息傳輸給RabbitMQ Server,對於Producer而言基本就完成了工做。

  2.2 服務端(RabbitMQ Server):主要負責處理消息路由、分發、入隊列、緩存和出列。主要由三部分組成:Exchange、RoutingKey、Queue。

    (1)Exchange:用於接收消息生產者發送的消息,有三種類型的exchange:direct, fanout,topic,不一樣類型實現了不一樣的路由算法;

    A. direct exchange:將與routing key 比配的消息,直接推入相對應的隊列,建立隊列時,默認就建立同名的routing key。

    B. fanout exchange:是一種廣播模式,忽略routingkey的規則。

    C. topic exchange:應用主題,根據key進行模式匹配路由,例如:若爲abc則推入到全部abc相對應的queue;若爲abc.#則推入到abc.xx.one ,abc.yy.two對應的queue。

    (2)RoutingKey:是RabbitMQ實現路由分發到各個隊列的規則,並結合Binging提供於Exchange使用將消息推送入隊列;

    (3)Queue:是消息隊列,能夠根據須要定義多個隊列,設置隊列的屬性,好比:消息移除、消息緩存、回調機制等設置,實現與Consumer通訊;

  2.3 消息消費者(Consumer):主要負責消費Queue的消息,一樣基於TCP協議,經過創建Connection和Channel與Queue傳輸消息,一個消息能夠給多個Consumer消費;

  2.4 關鍵名詞說明:Connection、Channel、Binging等;

    (1)Connection:是創建客戶端與服務端的鏈接。

    (2)Channel:是基於Connection之上創建通訊通道,由於每次Connection創建TCP協議通訊開銷及性能消耗較大,因此一次創建Connection後,使用多個Channel通道通訊減小開銷和提升性能。

    (3)Binging:是一個捆綁定義,將exchange和queue捆綁,定義routingkey相關策略。


三、RabbitMQ安裝部署

   以上對RabbitMQ簡介,接下來咱們經過實際搭建消息隊列服務實踐。RabbitMQ服務端能運行於Window、Linux和Mac平臺,客戶端也支持多種技術的實現。本次咱們將在Linux之CentOS7平臺搭建。

  3.1 安裝Erlang運行環境

    因爲RabbitMQ使用Erlang技術開發,因此須要先安裝Erlang運行環境後,才能安裝消息隊列服務。

    (1)配置系統能正常訪問公網,設置默認網關

`route add ``default` `gw 192.168.1.1`

    (2)安裝erlang

`su -c ``'rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-5.noarch.rpm'`

`sudo yum install erlang`

    (3)檢查erlang是否安裝成功

`erl`

    (4)安裝成功

    分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

  3.2 安裝RabbitMQ服務端

    (1)下載安裝包

`wget http:``//www.rabbitmq.com/releases/rabbitmq-server/v3.6.0/rabbitmq-server-3.6.0-1.noarch.rpm`

    (2)安裝和配置RabbitMQ服務端,3.6.0版本:

`rpm --import https:``//www.rabbitmq.com/rabbitmq-signing-key-public.asc`
`yum install rabbitmq-server-3.6.0-1.noarch.rpm`

    (3)啓用web管理插件

`rabbitmq-plugins enable rabbitmq_management`

    (4)啓動RabbitMQ

`chkconfig rabbitmq-server ``on`
`/sbin/service rabbitmq-server start`

    (5)防火牆開通端口

`# firewall-cmd --permanent --zone=public --add-port=5672/tcp`
`# firewall-cmd --permanent --zone=public --add-port=15672/tcp`
`# firewall-cmd --reload`

    (6)rabbitmq默認會建立guest帳號,只能用於localhost登陸頁面管理員,本機訪問地址:

http://localhost:15672/

`rabbitmqctl add_user test test`
`rabbitmqctl set_user_tags test administrator<br>rabbitmqctl set_permissions -p / test ``".*"` `".*"` `".*"`

    分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

  RabbitMQ 管理員頁面。


四、RabbitMQ應用

   本章節描述,web應用生產的日誌,經過rabbitmq傳輸,而後日誌服務接收消息隊列的消息。

    分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

  本系統採用官方的Client,經過nuget引用。

  分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

   4.1 Web應用生產業務日誌

[HttpPost]
        public ActionResult Create()
        {
            this.HttpContext.Session["mysession"] = DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss");
            var txt = Request.Form["txtSite"].ToString();
            RabbitMQHelper helper = new RabbitMQHelper();
            helper.SendMsg(txt + ",操做日誌,時間:" + DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));

            return RedirectToAction("Index");
        }

`}`

  分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

頁面效果圖。

  4.2 日誌服務接收日誌消息

     基於window form開發一個日誌處理服務,並將接收的消息打印出來。

private void btnReceive_Click(object sender, EventArgs e)
        {
            isConnected = true;
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("MyLog", false, false, false, null);

                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume("MyLog", true, consumer);

                while (isConnected)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    txtMsg.Text += message + "\r\n";

                }
            }
        }

分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

   4.3 RabbitMQ頁面監控狀況

   RabbitMQ自帶頁面監控工具,經過此工具能夠監控MQ的狀況:

分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

  分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

完整版的消息中間件學習資料和我我的整理的筆記能夠直接點擊藍字領取

2、Rabbitmq消息確認機制

一、生產端 Confirm 消息確認機制

消息的確認,是指生產者投遞消息後,若是 Broker 收到消息,則會給咱們生產者一個應答。生產者進行接收應答,用來肯定這條消息是否正常的發送到 Broker ,這種方式也是消息的可靠性投遞的核心保障!

Confirm 確認機制流程圖

分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

二、如何實現Confirm確認消息?

  • 第一步:在 channel 上開啓確認模式: channel.confirmSelect()
  • 第二步:在 channel 上添加監聽: channel.addConfirmListener(ConfirmListener listener);, 監聽成功和失敗的返回結果,根據具體的結果對消息進行從新發送、或記錄日誌等後續處理!
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class ConfirmProducer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "item.update";

        //指定消息的投遞模式:confirm 確認模式
        channel.confirmSelect();

        //發送
        final long start = System.currentTimeMillis();
        for (int i = 0; i < 5 ; i++) {
            String msg = "this is confirm msg ";
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
            System.out.println("Send message : " + msg);
        }

        //添加一個確認監聽, 這裏就不關閉鏈接了,爲了能保證能收到監聽消息
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 返回成功的回調函數
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("succuss ack");
                System.out.println(multiple);
                System.out.println("耗時:" + (System.currentTimeMillis() - start) + "ms");
            }
            /**
             * 返回失敗的回調函數
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.printf("defeat ack");
                System.out.println("耗時:" + (System.currentTimeMillis() - start) + "ms");
            }
        });
    }
}
`import com.rabbitmq.client.*;
import java.io.IOException;

public class ConfirmConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String queueName = "test_confirm_queue";
        String routingKey = "item.#";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        //通常不用代碼綁定,在管理界面手動綁定
        channel.queueBind(queueName, exchangeName, routingKey);

        //建立消費者並接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };

        //設置 Channel 消費者綁定隊列
        channel.basicConsume(queueName, true, consumer);

    }
}

咱們此處只關注生產端輸出消息

Send message : this is confirm msg 
Send message : this is confirm msg 
Send message : this is confirm msg 
Send message : this is confirm msg 
Send message : this is confirm msg 
succuss ack
true
耗時:3ms
succuss ack
true
耗時:4ms

注意事項

  • 咱們採用的是異步 confirm 模式:提供一個回調方法,服務端 confirm 了一條或者多條消息後 Client 端會回調這個方法。除此以外還有單條同步 confirm 模式、批量同步 confirm 模式,因爲現實場景中不多使用咱們在此不作介紹,若有興趣直接參考官方文檔。

  • 咱們運行生產端會發現每次運行結果都不同,會有多種狀況出現,由於 Broker 會進行優化,有時會批量一次性 confirm ,有時會分開幾條 confirm。

    `succuss ack
    true
    耗時:3ms
    succuss ack
    false
    耗時:4ms
    
    或者
    succuss ack
    true
    耗時:3ms`

三、Return 消息機制

  • Return Listener 用於處理一-些不可路 由的消息!

  • 消息生產者,經過指定一個 Exchange 和 Routingkey,把消息送達到某一個隊列中去,而後咱們的消費者監聽隊列,進行消費處理操做!

  • 可是在某些狀況下,若是咱們在發送消息的時候,當前的 exchange 不存在或者指定的路由 key 路由不到,這個時候若是咱們須要監聽這種不可達的消息,就要使用 Return Listener !

  • 在基礎API中有一個關鍵的配置項:Mandatory:若是爲 true,則監聽器會接收到路由不可達的消息,而後進行後續處理,若是爲 false,那麼 broker 端自動刪除該消息!

Return 消息機制流程圖

分佈式消息中間件(1):Rabbitmq入門到高可用實戰!

Return 消息示例

  • 首先咱們須要發送三條消息,而且故意將第 0 條消息的 routing Key設置爲錯誤的,讓他沒法正常路由到消費端。

  • mandatory 設置爲 true 路由不可達的消息會被監聽到,不會被自動刪除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());

  • 最後添加監聽便可監聽到不可路由到消費端的消息channel.addReturnListener(ReturnListener r))
    `import com.rabbitmq.client.*;
    import java.io.IOException;

public class ReturnListeningProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");

Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    String exchangeName = "test_return_exchange";
    String routingKey = "item.update";
    String errRoutingKey = "error.update";

    //指定消息的投遞模式:confirm 確認模式
    channel.confirmSelect();

    //發送
    for (int i = 0; i < 3 ; i++) {
        String msg = "this is return——listening msg ";
        //@param mandatory 設置爲 true 路由不可達的消息會被監聽到,不會被自動刪除
        if (i == 0) {
            channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
        } else {
            channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
        }
        System.out.println("Send message : " + msg);
    }

    //添加一個確認監聽, 這裏就不關閉鏈接了,爲了能保證能收到監聽消息
    channel.addConfirmListener(new ConfirmListener() {
        /**
         * 返回成功的回調函數
         */
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            System.out.println("succuss ack");
        }
        /**
         * 返回失敗的回調函數
         */
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            System.out.printf("defeat ack");
        }
    });

    //添加一個 return 監聽
    channel.addReturnListener(new ReturnListener() {
        public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("return relyCode: " + replyCode);
            System.out.println("return replyText: " + replyText);
            System.out.println("return exchange: " + exchange);
            System.out.println("return routingKey: " + routingKey);
            System.out.println("return properties: " + properties);
            System.out.println("return body: " + new String(body));
        }
    });

}

}

 
 

`import com.rabbitmq.client.*;
import java.io.IOException;

public class ReturnListeningConsumer {
public static void main(String[] args) throws Exception {
//1. 建立一個 ConnectionFactory 並進行設置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);

//2\. 經過鏈接工廠來建立鏈接
    Connection connection = factory.newConnection();

    //3\. 經過 Connection 來建立 Channel
    Channel channel = connection.createChannel();

    //4\. 聲明
    String exchangeName = "test_return_exchange";
    String queueName = "test_return_queue";
    String routingKey = "item.#";

    channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    channel.queueDeclare(queueName, false, false, false, null);

    //通常不用代碼綁定,在管理界面手動綁定
    channel.queueBind(queueName, exchangeName, routingKey);

    //5\. 建立消費者並接收消息
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        }
    };

    //6\. 設置 Channel 消費者綁定隊列
    channel.basicConsume(queueName, true, consumer);
}

}

咱們只關注生產端結果,消費端只收到兩條消息。

`Send message : this is return——listening msg
Send message : this is return——listening msg
Send message : this is return——listening msg
return relyCode: 312
return replyText: NO_ROUTE
return exchange: test_return_exchange
return routingKey: error.update
return properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
return body: this is return——listening msg
succuss ack
succuss ack
succuss ack

### 四、消費端 Ack 和 Nack 機制
消費端進行消費的時候,若是因爲業務異常咱們能夠進行日誌的記錄,而後進行補償!若是因爲服務器宕機等嚴重問題,那咱們就須要手工進行ACK保障消費端消費成功!消費端重回隊列是爲了對沒有處理成功的消息,把消息從新會遞給Broker!通常咱們在實際應用中,都會關閉重回隊列,也就是設置爲False。

#### 參考 api

>`void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;`
`void basicAck(long deliveryTag, boolean multiple) throws IOException;`

#### 如何設置手動 Ack 、Nack 以及重回隊列

*   首先咱們發送五條消息,將每條消息對應的循環下標 i 放入消息的 `properties` 中做爲標記,以便於咱們在後面的回調方法中識別。

*   其次, 咱們將消費端的 ·`channel.basicConsume(queueName, false, consumer);` 中的 `autoAck`屬性設置爲 `false`,若是設置爲`true`的話 將會正常輸出五條消息。

*   咱們經過 `Thread.sleep(2000)`來延時一秒,用以看清結果。咱們獲取到`properties`中的`num`以後,經過`channel.basicNack(envelope.getDeliveryTag(), false, true);`將 `num`爲0的消息設置爲 nack,即消費失敗,而且將 `requeue`屬性設置爲`true`,即消費失敗的消息重回隊列末端。

`import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;

public class AckAndNackProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");

Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    String exchangeName = "test_ack_exchange";
    String routingKey = "item.update";

    String msg = "this is ack msg";
    for (int i = 0; i < 5; i++) {
        Map<String, Object> headers = new HashMap<String, Object>();
        headers.put("num" ,i);

        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .headers(headers)
                .build();

        String tem = msg + ":" + i;

        channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes());
        System.out.println("Send message : " + msg);
    }

    channel.close();
    connection.close();
}

}

 
 

import com.rabbitmq.client.*;
import java.io.IOException;

public class AckAndNackConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);

Connection connection = factory.newConnection();

    final Channel channel = connection.createChannel();

    String exchangeName = "test_ack_exchange";
    String queueName = "test_ack_queue";
    String routingKey = "item.#";
    channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    channel.queueDeclare(queueName, false, false, false, null);

    //通常不用代碼綁定,在管理界面手動綁定
    channel.queueBind(queueName, exchangeName, routingKey);

    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body)
                throws IOException {

            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if ((Integer) properties.getHeaders().get("num") == 0) {
                channel.basicNack(envelope.getDeliveryTag(), false, true);
            } else {
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    };

    //6\. 設置 Channel 消費者綁定隊列
    channel.basicConsume(queueName, false, consumer);

}

}

咱們此處只關心消費端輸出,能夠看到第 0 條消費失敗從新回到隊列尾部消費。

[x] Received 'this is ack msg:1'
[x] Received 'this is ack msg:2'
[x] Received 'this is ack msg:3'
[x] Received 'this is ack msg:4'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'

## 3、Rabbitmq鏡像隊列
### 一、 鏡像隊列的設置

鏡像隊列的配置經過添加policy完成,policy添加的命令爲:

rabbitmqctl  set_policy  [-p Vhost]  Name  Pattern  Definition  [Priority]

**-p Vhost**:  可選參數,針對指定vhost下的queue進行設置

**Name**:  policy的名稱

**Pattern**:  queue的匹配模式(正則表達式)

**Definition**:  鏡像定義,包括三個部分 ha-mode,ha-params,ha-sync-mode

 **ha-mode**:  指明鏡像隊列的模式,有效值爲 all/exactly/nodes

   - all表示在集羣全部的節點上進行鏡像

    - exactly表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定

-  nodes表示在指定的節點上進行鏡像,節點名稱經過ha-params指定

**ha-params**: ha-mode模式須要用到的參數

 **ha-sync-mode**:  鏡像隊列中消息的同步方式,有效值爲automatic,manually

**Priority**:  可選參數, policy的優先級

例如,對隊列名稱以hello開頭的全部隊列進行鏡像,並在集羣的兩個節點上完成鏡像,policy的設置命令爲:

rabbitmqctl  set_policy  hello-ha  "^hello"  '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

### 二、 鏡像隊列的大概實現

#### 2.1  總體介紹

一般隊列由兩部分組成:一部分是amqqueue_process,負責協議相關的消息處理,即接收生產者發佈的消息、向消費者投遞消息、處理消息confirm、acknowledge等等;另外一部分是backing_queue,它提供了相關的接口供amqqueue_process調用,完成消息的存儲以及可能的持久化工做等。

![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/e13f28de0f56400eb1cdd566dff820b3~tplv-k3u1fbpfcp-zoom-1.image)

鏡像隊列一樣由這兩部分組成,
- amqqueue_process仍舊進行協議相關的消息處理
- backing_queue則是由master節點和slave節點組成的一個特殊的backing_queue
- master節點和slave節點都由一組進程組成,一個負責消息廣播的gm,一個負責對gm收到的廣播消息進行回調處理。
- master節點上回調處理是coordinator
- slave節點上則是mirror_queue_slave。mirror_queue_slave中包含了普通的backing_queue進行消息的存儲
- master節點中backing_queue包含在mirror_queue_master中由amqqueue_process進行調用。

![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/748ddfc5d1944a199c6dea71f455b798~tplv-k3u1fbpfcp-zoom-1.image)

注意:消息的發佈與消費都是經過master節點完成。master節點對消息進行處理的同時將消息的處理動做經過gm廣播給全部的slave節點,slave節點的gm收到消息後,經過回調交由mirror_queue_slave進行實際的處理。

#### 2.2 gm(Guaranteed Multicast)

傳統的主從複製方式:由master節點負責向全部slave節點發送須要複製的消息,在複製過程當中,若是有slave節點出現異常,master節點須要做出相應的處理;若是是master節點自己出現問題,那麼slave節點間可能會進行通訊決定本次複製是否繼續。固然爲了處理各類異常狀況,整個過程當中的日誌記錄是免不了的。

然而rabbitmq中並無採用這種方式,而是將全部的節點造成一個循環鏈表,每一個節點都會監控位於本身左右兩邊的節點,當有節點新增時,相鄰的節點保證當前廣播的消息會複製到新的節點上;當有節點失效時,相鄰的節點會接管保證本次廣播的消息會複製到全部節點。

在master節點和slave節點上的這些gm造成一個group,group的信息會記錄在mnesia中。不一樣的鏡像隊列造成不一樣的group。

消息從master節點對應的gm發出後,順着鏈表依次傳送到全部節點,因爲全部節點組成一個循環鏈表,master節點對應的gm最終會收到本身發送的消息,這個時候master節點就知道消息已經複製到全部slave節點了。

#### 2.3 重要的表結構

rabbit_queue表記錄隊列的相關信息:

-record(amqqueue,
{
name, %%隊列的名稱
durable, %%標識隊列是否持久化
auto_delete, %%標識隊列是否自動刪除
exclusive_owner, %%標識是否獨佔模式
arguments, %%隊列建立時的參數
pid, %%amqqueue_process進程PID
slave_pids, %%mirror_queue_slave進程PID集合
sync_slave_pids, %%已同步的slave進程PID集合
policy, %%與隊列有關的policy
%%經過set_policy設置,沒有則爲undefined
gm_pids, %%{gm,mirror_queue_coordinator},{gm,mirror_queue_slave}進程PID集合
decorator %%
}).

注意:slave_pids的存儲是按照slave加入的時間來排序的,以便master節點失效時,提高"資格最老"的slave節點爲新的master。

gm_group表記錄gm造成的group的相關信息:

-record(gm_group,
{
name, %%group的名稱,與queue的名稱一致
version, %%group的版本號, 新增節點/節點失效時會遞增
members, %%group的成員列表, 按照節點組成的鏈表順序進行排序
}).

### 三、鏡像隊列的一些細節

#### 3.1 新增節點

slave節點先從gm_group中獲取對應group的全部成員信息,而後隨機選擇一個節點並向這個節點發送請求,這個節點收到請求後,更新gm_group對應的信息,同時通知左右節點更新鄰居信息(調整對左右節點的監控)及當前正在廣播的消息,而後回覆通知請求節點成功加入group。請求加入group的節點收到回覆後再更新rabbit_queue中的相關信息,並根據須要進行消息的同步。

![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f82561fb9cac4bf9b698991fac141449~tplv-k3u1fbpfcp-zoom-1.image)
![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/b72e557d07084562a930664a6a17c75f~tplv-k3u1fbpfcp-zoom-1.image)

#### 3.2 消息的廣播

消息從master節點發出,順着節點鏈表發送。在這期間,全部的slave節點都會對消息進行緩存,當master節點收到本身發送的消息後,會再次廣播ack消息,一樣ack消息會順着節點鏈表通過全部的slave節點,其做用是通知slave節點能夠清除緩存的消息,當ack消息回到master節點時對應廣播消息的生命週期結束。

下圖爲一個簡單的示意圖,A節點爲master節點,廣播一條內容爲"test"的消息。"1"表示消息爲廣播的第一條消息;"id=A"表示消息的發送者爲節點A。右邊是slave節點記錄的狀態信息。

![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1410e28b45cb43d1a878a0f752c12ef2~tplv-k3u1fbpfcp-zoom-1.image)

爲何全部的節點都須要緩存一份發佈的消息呢?

master發佈的消息是依次通過全部slave節點,在這期間的任什麼時候刻,有可能有節點失效,那麼相鄰的節點可能須要從新發送給新的節點。例如,A->B->C->D->A造成的循環鏈表,A爲master節點,廣播消息發送給節點B,B再發送給C,若是節點C收到B發送的消息還未發送給D時異常結束了,那麼節點B感知後節點C失效後須要從新將消息發送給D。一樣,若是B節點將消息發送給C後,B,C節點中新增了E節點,那麼B節點須要再將消息發送給新增的E節點。

gm的狀態記錄:

-record(state,
{
self, %%gm自己的ID
left, %%該節點左邊的節點
right, %%該節點右邊的節點
group_name, %%group名稱 與隊列名一致
module, %%回調模塊 rabbit_mirror_queue_slave或者
%%rabbit_mirror_queue_coordinator
view, %%group成員列表視圖信息
%%記錄了成員的ID及每一個成員的左右鄰居節點
pub_count, %%當前已發佈的消息計數
members_state, %%group成員狀態列表 記錄了廣播狀態:[#member{}]
callback_args, %%回調函數的參數信息
%%rabbit_mirror_queue_slave/rabbit_mirror_queue_coordinator進程PID
confirms, %%confirm列表
broadcast_buffer, %%緩存待廣播的消息
broadcast_timer, %%廣播消息定時器
txn_executor
}).

-record(member,
{
pending_ack, %%待確認的消息,也就是已發佈的消息緩存的地方
last_pub, %%最後一次發佈的消息計數
last_ack %%最後一次確認的消息計數
}).

#### 3.3 節點的失效

當slave節點失效時,僅僅是相鄰節點感知,而後從新調整鄰居節點信息、更新rabbit_queue、gm_group的記錄等。若是是master節點失效,"資格最老"的slave節點被提高爲master節點,slave節點會建立出新的coordinator,並告知gm修改回調處理爲coordinator,原來的mirror_queue_slave充當amqqueue_process處理生產者發佈的消息,向消費者投遞消息等。

![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1158c2ed69384873bc2366b3509b38eb~tplv-k3u1fbpfcp-zoom-1.image)

上面提到若是是slave節點失效,只有相鄰的節點能感知到,那麼master節點失效是否是也是隻有相鄰的節點能感知到?假如是這樣的話,若是相鄰的節點不是"資格最老"的節點,怎麼通知"資格最老"的節點提高爲新的master節點呢?

實際上,全部的slave節點在加入group時,mirror_queue_slave進程會對master節點的amqqueue_process進程(也多是mirror_queue_slave進程)進行監控,若是master節點失效的話,mirror_queue_slave會感知,而後再經過gm進行廣播,這樣全部的節點最終都會知道master節點失效。固然,只有"資格最老"的節點會提高本身爲新的master。

另外,在slave提高爲master時,`mirror_queue_slave`內部來了一次"偷樑換柱",即本來須要回調`mirror_queue_slave`的`handle_call/handle_info/handle_cast`等接口進行處理的消息,所有改成調用`amqqueue_process`的`handle_call/handle_info/handle_cast`等接口,從而能夠解釋上面說的,`mirror_queue_slave`進程充當了`amqqueue_process`完成協議相關的消息的處理。

rabbit_mirror_queue_slave.erl

handle_call({gm_deaths,LiveGMPids},From,
State = #state{q = Q = #amqqueue{name=QName,pid=MPid}})->
Self = self(),
case rabbit_mirror_queue_misc:remove_from_queue(QName,
Self,
LiveGMPids) of
{ok,Pid,DeadPids} ->
case Pid of
MPid ->
%% master hasn't changed
gen_server2:reply(From, ok),
noreply(State);
Self ->
%% we've become master
QueueState = promote_me(From,State),
{become,
%% 改由rabbit_amqqueue_process模塊處理消息
rabbit_amqqueue_process,
QueueState, hibernate};
...

gen_server2.erl

handle_common_reply(Reply,Msg,GS2State = #gs2_state{name=Name,
debug=Debug})->
case Reply of
...
{become, Mod, NState, Time1} ->
Debug1=common_become(Name,Mod,NState,Debug),
loop(find_prioritisers(
GS2State#gs2_state{mod=Mod,
state=NState,
time=Time1,
debug=Debug1}));
...

handle_msg({'gen_call',From,Msg},
GS2State=#gs2_state{mod=Mod,
state=State,
name=Name,
debug=Debug}) ->
case catch Mod:handle_call(Msg, From, State) of
...

handle_msg(Msg,GS2State=#gs2_state{mod=Mod,state=State})->
Reply = (catch dispatch(Msg,Mod,State)),
handle_common_reply(Reply, Msg, GS2State).

dispatch({'$gen_cast',Msg},Mod,State)->
Mod:handle_cast(Msg, State);
dispatch(Info, Mod, State)->
Mod:handle_info(Info,State).

### 四、消息的同步

配置鏡像隊列的時候有個ha-sync-mode屬性,這個有什麼用呢?

新節點加入到group後,最多能從左邊節點獲取到當前正在廣播的消息內容,加入group以前已經廣播的消息則沒法獲取到。若是此時master節點不幸失效,而新節點有剛好成爲了新的master,那麼加入group以前已經廣播的消息則會所有丟失。

注意:這裏的消息具體是指新節點加入前已經發布並複製到全部slave節點的消息,而且這些消息還未被消費者消費或者未被消費者確認。若是新節點加入前,全部廣播的消息被消費者消費並確認了,master節點刪除消息的同時會通知slave節點完成相應動做。這種狀況等同於新節點加入前沒有發佈任何消息。

避免這種問題的解決辦法就是對新的slave節點進行消息同步。當ha-sync-mode配置爲自動同步(automatic)時,新節點加入group時會自動進行消息的同步;若是配置爲manually則須要手動操做完成同步

---
就先寫到這把,原本是想一篇文把三個中間件都寫了的,沒想到不知不覺寫了這麼多我都感受Rabbitmq還有不少東西還沒寫到,後面會再寫兩篇專門講一下 RocketMq和kafka,感興趣的朋友能夠給我點個關注。

[**完整版的消息中間件學習資料和我我的整理的筆記**](https://jq.qq.com/?_wv=1027&k=PjEVS3qt)直接點擊藍字領取

若是能夠點個贊就更好了,你說呢

---
end
相關文章
相關標籤/搜索