消息中間件-RabbitMq(高可用方案&集羣搭建)

消息中間件-RabbitMq(高可用方案&集羣搭建)

上一篇咱們搭建了rabbit單機節點,咱們知道不少個開發小組均可以經過rabbit開發(由於它有不一樣的虛擬主機),但是問題來了,若是rabbit宕機了,怎麼玩?那天然而然就想到集羣搭建了,而集羣會產生一個新的問題,使用哪一種方式作數據的共享,下面咱們就會聊到這些問題,咱們來搭建它,而且解決這個問題。也會說到內存和磁盤滿了rabbit怎麼告知咱們,以及什麼告知咱們的機制。如下:java

  • 高可用集羣搭建以及介紹方案
  • 模擬中止某個節點看是否能夠正常吞吐
  • rabbit對於內存和磁盤的控制

搭建集羣

rabbit提供的集羣方案:服務器

  • cluster:
    • 【普通模式】:有兩種部署方式:【多機多節點、單機多節點】(下面會使用這種方式,由於服務器資源有限)
      • 【問題】
        • 【不可以保障消息的萬無一失】:由於咱們的節點之間並無持有真正的隊列數據,而持有的是一個叫作【元數據】的東西,java客戶端會幫你自動分發請求到任何一個節點,然而這些節點不必定恰好存儲了你想要的東西,那當前節點就會根據他持有的元數據把你的請求分發到其餘的節點,可是可能給你分發過去的節點宕機了,那你就沒法獲取你的數據。
          •  元數據:【隊列元數據】:隊列的名稱以及屬性、【交換機】:名稱以及屬性 【綁定關係元數據】:交換機和隊列、或者交換機和交換機 【虛擬主機】:虛擬主機中的各類綁定關係
      • 【解決】:使用鏡像方式
      • 【優勢】:速度快、所佔空間少
    • 【鏡像模式】:每個rabbit節點都存儲相同的數據 【問題】:所佔內存隨着節點的個數增長,成比例增加 ,而且數據冗餘(由於相同數據在不一樣的節點上)【解決】:有錢就行,換一個大一點內存的服務器
  • 集羣聯邦(使得mq在不一樣的節點之間進行消息傳遞,而不需搭建集羣,好處:不須要搭建集羣、而且Erlang的版本不同能夠)他們就相似於插件,不多使用(因此不會寫demo和展開)
    • (Federation)應用場景】:好比有三臺mq,在不一樣的地方進行部署,北上廣,那就可使用這個,
    • (Federation)原理】:他在本身鏈接本身內部的交換機,當你發送想給你在北京想給上海的mq發送消息的時候,他會發送給本身本地,而後本地的federation插件會進行數據的傳遞,這樣就減小了響應時間
    • (shovel)他是不斷的拉取別的節點上的數據。

【單機多節點開始搭建】:app

  • 【中止當前rabbit】:sudo systemctl stop rabbitmq-server
  • 【啓動第一個節點】:rabbitmq-server -detached
  • 【啓動第二個節點】:修改主機名稱(rabbit2 )、修改默認端口(15673)
    RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management
    listener [{port,15673}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server -detached
  • 【啓動第三個節點】:修改主機名稱(rabbit3 )、修改默認端口(15674)
    RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management
    listener [{port,15674}]" RABBITMQ_NODENAME=rabbit3 rabbitmq-server -detached
  • 將rabbit2加入集羣
    •   # 中止 rabbit2 的應用           rabbitmqctl -n rabbit2 stop_app
    •   # 重置 rabbit2 的設置           rabbitmqctl -n rabbit2 reset  
    •   # rabbit2 節點加入到rabbit的集羣中   rabbitmqctl -n rabbit2 join_cluster rabbit --ram
    •   # 啓動 rabbit2 節點             rabbitmqctl -n rabbit2 start_app 
  • 【將rabbit3加入集羣】
    •   # 中止 rabbit3 的應用         rabbitmqctl -n rabbit3 stop_app
    •   # 重置 rabbit3 的設置          rabbitmqctl -n rabbit3 reset
    •   # rabbit3 節點加入到 rabbit的集羣中  rabbitmqctl -n rabbit3 join_cluster rabbit --ram
    •   # 啓動 rabbit3 節點            rabbitmqctl -n rabbit3 start_app
  • 使用【rabbitmqctl cluster_status】能夠查詢到集羣狀態

 至此,集羣搭建完畢ide

對集羣的高可用進行檢驗(咱們這裏使用一個for循環去模擬生產者建立100個消息,而後一個消費者去獲取數據,這個時候,咱們隨機中止一個節點,看一下時候消息依然收發正常 runspa

生產者操作系統

public class ProducerCluster {

    public static void main(String[] args) {
        // 一、建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 二、設置鏈接屬性
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("v1");

        Connection connection = null;
        Channel channel = null;

        // 三、設置每一個節點的連接地址和端口
        Address[] addresses = new Address[]{
                new Address("你的ip", 5672),
                new Address("你的ip", 5673),
                new Address("你的ip", 5674),
        };

        try {
            // 開啓/關閉鏈接自動恢復,默認是開啓狀態
            factory.setAutomaticRecoveryEnabled(true);

            // 設置每100毫秒嘗試恢復一次,默認是5秒:com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL
            factory.setNetworkRecoveryInterval(100);

            factory.setTopologyRecoveryEnabled(false);

            // 四、使用鏈接集合裏面的地址獲取鏈接
            connection = factory.newConnection(addresses, "生產者");

            // 添加劇連監聽器
            ((Recoverable) connection).addRecoveryListener(new RecoveryListener() {
                /**
                 * 重連成功後的回調
                 * @param recoverable
                 */
                public void handleRecovery(Recoverable recoverable) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 已從新創建鏈接!");
                }

                /**
                 * 開始重連時的回調
                 * @param recoverable
                 */
                public void handleRecoveryStarted(Recoverable recoverable) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 開始嘗試重連!");
                }
            });

            // 五、從連接中建立通道
            channel = connection.createChannel();

            /**
             * 六、聲明(建立)隊列
             * 若是隊列不存在,纔會建立
             * RabbitMQ 不容許聲明兩個隊列名相同,屬性不一樣的隊列,不然會報錯
             *
             * queueDeclare參數說明:
             * @param queue 隊列名稱
             * @param durable 隊列是否持久化
             * @param exclusive 是否排他,便是否爲私有的,若是爲true,會對當前隊列加鎖,其它通道不能訪問,而且在鏈接關閉時會自動刪除,不受持久化和自動刪除的屬性控制
             * @param autoDelete 是否自動刪除,當最後一個消費者斷開鏈接以後是否自動刪除
             * @param arguments 隊列參數,設置隊列的有效期、消息最大長度、隊列中全部消息的生命週期等等
             */
            channel.queueDeclare("queue1", true, false, false, null);

            for (int i = 0; i < 100; i++) {
                // 消息內容
                String message = "Hello World " + i;
                try {
                    // 七、發送消息
                    channel.basicPublish("", "queue1", null, message.getBytes());
                } catch (AlreadyClosedException e) {
                    // 可能鏈接已關閉,等待重連
                    System.out.println("消息 " + message + " 發送失敗!");
                    i--;
                    TimeUnit.SECONDS.sleep(2);
                    continue;
                }
                System.out.println("消息 " + i + " 已發送!");
                TimeUnit.SECONDS.sleep(2);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 八、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 九、關閉鏈接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

消費者插件

public class ConsumerCluster {

    public static void main(String[] args) {
        // 一、建立鏈接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 二、設置鏈接屬性
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("v1");

        Connection connection = null;
        Channel channel = null;

        // 三、設置每一個節點的連接地址和端口
        Address[] addresses = new Address[]{
                new Address("你的ip", 5672),
                new Address("你的ip", 5673),
                new Address("你的ip", 5674),
        };

        try {
            // 開啓/關閉鏈接自動恢復,默認是開啓狀態
            factory.setAutomaticRecoveryEnabled(true);

            // 設置每100毫秒嘗試恢復一次,默認是5秒:com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL
            factory.setNetworkRecoveryInterval(100);

            // 四、從鏈接工廠獲取鏈接
            connection = factory.newConnection(addresses, "消費者");

            // 添加劇連監聽器
            ((Recoverable) connection).addRecoveryListener(new RecoveryListener() {
                /**
                 * 重連成功後的回調
                 * @param recoverable
                 */
                public void handleRecovery(Recoverable recoverable) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 已從新創建鏈接!");
                }

                /**
                 * 開始重連時的回調
                 * @param recoverable
                 */
                public void handleRecoveryStarted(Recoverable recoverable) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 開始嘗試重連!");
                }
            });

            // 五、從連接中建立通道
            channel = connection.createChannel();

            /**
             * 六、聲明(建立)隊列
             * 若是隊列不存在,纔會建立
             * RabbitMQ 不容許聲明兩個隊列名相同,屬性不一樣的隊列,不然會報錯
             *
             * queueDeclare參數說明:
             * @param queue 隊列名稱
             * @param durable 隊列是否持久化
             * @param exclusive 是否排他,便是否爲私有的,若是爲true,會對當前隊列加鎖,其它通道不能訪問,
             *                  而且在鏈接關閉時會自動刪除,不受持久化和自動刪除的屬性控制。
             *                  通常在隊列和交換器綁定時使用
             * @param autoDelete 是否自動刪除,當最後一個消費者斷開鏈接以後是否自動刪除
             * @param arguments 隊列參數,設置隊列的有效期、消息最大長度、隊列中全部消息的生命週期等等
             */
            channel.queueDeclare("queue1", true, false, false, null);

            // 七、定義收到消息後的回調
            final Channel finalChannel = channel;
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
                    finalChannel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            };
            // 八、監聽隊列
            channel.basicConsume("queue1", false, callback, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                }
            });

            System.out.println("開始接收消息");
            System.in.read();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 九、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 十、關閉鏈接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

而後咱們中止依次中止節點2 、33d

結果是:他們重新鏈接後依舊收發正常(集羣工做正常)code

 

 

 【鏡像模式】:咱們這裏新建立一套,避免衝突orm

建立用戶:test,把用戶綁定到虛擬主機

 建立鏡像策略

 咱們發現它自動進行了數據同步

 如今各個節點的數據都是同樣的了

 內存控制

內存使用超過配置的閾值或者 磁盤剩餘空間低於配置的閾值時RabbitMQ 會暫時阻塞客戶端的鏈接,並中止接收從客戶端發來的消息,以此避免服務崩潰。
當出現內存預警,可使用【 rabbitmqctl set_vm_memory_high_watermark <fractioion>】臨時調整內存大小,通常設置爲0.4,這裏咱們設置他爲50m就報警

【rabbitmqctl set_vm_memory_high_watermark absolute 50MB】能夠看到當前就變紅啦

【relative】 相對值,即前面的fraction,建議取值在0.4~0.66之間,不建議超過0.7
 【absolute 】絕對值,單位爲KB、MB、GB

 內存翻頁

在某個 Broker 節點觸及內存並阻塞生產者以前 ,它會嘗試將隊列中的消息換頁到磁盤以釋放內存空間。默認狀況下, 在內存到達內存閾值的 50%時會進行換頁動做。在配置文件(/etc/rabbitmq/rabbitmq.conf)中能夠進行配置預警和翻頁的閾值。
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.75
 

 磁盤控制

當磁盤剩餘空間低於肯定的閾值時,RabbitMQ 一樣會阻塞生產者,這樣能夠避免因非持久化的消息持續換頁而耗盡磁盤空間致使服務崩潰。默認狀況下,磁盤閾值爲50MB,表示當磁盤剩餘空間低於50MB 時會阻塞生產者並中止內存中消息的換頁動做 。這個閾值的設置能夠減少,但不能徹底消除因磁盤耗盡而致使崩潰的可能性。好比在兩次磁盤 空間檢測期間內,磁盤空間從大於50MB被耗盡到0MB 。通常咱們是將磁盤閾值設置爲與操做系統所顯示的內存大小一致。

這裏使用命令告訴它當剩下100個G的時候預警,可是咱們總共纔不到6G,因此他會立馬預警 【rabbitmqctl set_disk_free_limit 100GB

相關文章
相關標籤/搜索