快速掌握RabbitMQ(五)——搭建高可用的RabbitMQ集羣

  RabbitMQ的集羣是依賴erlang集羣的,而erlang集羣是經過.erlang.cookie文件進行通訊認證的,因此咱們使用RabbitMQ集羣時只須要配置一下.erlang.cookie文件便可。下邊簡單演示一下RabbitMQ高可用集羣的搭建,附帶一個簡單使用C#驅動RabbtiMQ集羣的小栗子。html

1 搭建RabbitMQ高可用集羣

  首先準備三臺設備,這裏採用的三臺Centos7的虛擬機,測試一下各個虛擬機能不能相互ping通,若是能夠相互ping通的話,在每臺虛擬機上分別安裝RabbitMQ,能夠參考第一篇的安裝方法node

第1步 修改主機配置

   爲了方便機器間的相互訪問,三臺centos都執行  vim /etc/hosts ,添加下邊的配置(注意修改爲本身設備的IP):redis

192.168.70.129 rabbitmq1
192.168.70.131 rabbitmq2
192.168.70.133 rabbitmq3

  通常狀況,hosts文件中內容以下:vim

第2步:修改.erlang.cookie文件

  修改三臺設備的.erlang.cookie中的key一致。若是使用的是前邊的安裝方法,.erlang.cookie的位置爲 /var/lib/rabbitmq/.erlang.cookie (採用其餘安裝方式找不到文件的話,可使用命令  find / -name '.erlang.cookie' 找到文件位置)。這裏三臺虛擬機的key都採用192.168.70.129的key(值爲CRRQPKHDXEEIUJUOGYKN),在另外兩臺設備上 執行命令: vim /var/lib/rabbitmq/.erlang.cookie ,修改文件內容爲CRRQPKHDXEEIUJUOGYKN。在修改時若是遇到權限問題,可執行命令 chmod 600 /var/lib/rabbitmq/.erlang.cookie 修改文件的權限爲可寫,修改內容完成後,執行命令 chmod 400 /var/lib/rabbitmq/.erlang.cookie 把文件再次改爲只讀的。centos

  完成上邊的兩步後,erlang集羣就搭建好了,重啓全部的設備便可。在rabbitmq1節點的虛擬機上執行命令 rabbitmqctl cluster_status 查看集羣狀態:數組

 第3步:添加/刪除節點

添加節點bash

  把rabbitmq2節點添加到集羣中去,在rabbitmq2節點執行如下命令:cookie

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit1
rabbitmqctl start_app

  執行完成後,查看集羣狀態,看到rabbitmq2已經在集羣中了,以下:app

  重複上邊的步驟,把rabbitmq3也添加到集羣中,在rabbitmq3節點執行下邊命令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit2
rabbitmqctl start_app
  查看集羣狀態,rabbitmq3也在集羣中了,以下:  

  這時咱們打開任意一個節點的Web管理界面,顯示以下,看到集羣已經配置完成了:

刪除節點
  把某一節點從集羣中刪除很簡單,reset一下節點便可。如刪除rabbitmq3節點,在rabbitmq3上執行如下命令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

  執行完成後,查看集羣狀態以下:負載均衡

   如今搭建的集羣是默認的普通集羣,普通集羣中節點能夠共享集羣中的exchange,routingKey和queue,可是queue中的消息只保存在首次聲明queue的節點中。任意節點的消費者均可以消費其餘節點的消息,好比消費者鏈接rabbitmq1節點的消費者(代碼中創建Connection時,使用的rabbitmq1的IP)能夠消費節點rabbitmq2的隊列myqueue2中的消息,消息傳輸過程是:rabbitmq2把myqueue2中的消息傳輸給rabbtimq1,而後rabbitmq1節點把消息發送給consumer。由於queue中的消息只保存在首次聲明queue的節點中,這樣就有一個問題:若是某一個node節點掛掉了,那麼只能等待該節點從新鏈接才能繼續處理該節點內的消息(若是沒有設置持久化的話,節點掛掉後消息會直接丟失)。以下圖,rabbitmq1節點掛掉後,myqueue隊列就down掉了,不能被訪問。

  針對上邊的問題,咱們可能會想到:若是可讓rabbitmq中的節點像redis集羣的節點同樣,每個節點都保存全部的消息,好比讓rabbitmq1不只僅保存本身隊列myqueue的消息,還保存其餘節點的隊列myqueue2和myqueue3中的消息,rabbitmq2和rabbitmq3節點也同樣,這樣就不用擔憂宕機的問題了。rabbitmq也提供了這樣的功能:鏡像隊列。鏡像隊列由一個master和多個slave組成,使用鏡像隊列消息會自動在鏡像節點間同步,而不是在consumer取數據時臨時拉取。

第4步:配置鏡像隊列

  rabbitmq配置鏡像隊列十分簡單,咱們在任意一個node節點下執行下邊的命令就能夠完成鏡像隊列的配置(固然也能夠在Web管理界面上添加policy):

rabbitmqctl set_policy ha-all "^my" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
# ha-all:爲策略名稱;
# ^my:爲匹配符,只有一個^表明匹配全部,^abc爲匹配名稱以abc開頭的queue或exchange;
# ha-mode:爲同步模式,一共3種模式:
#    ①all-全部(全部的節點都同步消息),
#    ②exctly-指定節點的數目(需配置ha-params參數,此參數爲int類型好比2,在集羣中隨機抽取2個節點同步消息)
#    ③nodes-指定具體節點(需配置ha-params參數,此參數爲數組類型好比["rabbit@rabbitmq1","rabbit@rabbitmq2"],明確指定在這兩個節點上同步消息)。

  打開Web管理界面,若是效果以下表示鏡像隊列已經配置完成了。當前myqueue的master節點爲rabbitmq1:

  若是首次聲明queue的節點(master)掛了,其餘節點會自動變成master,如上圖myqueue的master爲rabbitmq1,停掉rabbtmq1後,結果以下:rabbitmq2成爲了master。

 

  咱們發現rabbitmq1節點掛了後,rabbitmq2自動成爲了myqueue的master,myqueue不會down掉,能夠正常的添加/刪除/獲取消息,這就解決了普通集羣宕機的問題。使用鏡像隊列,由於各個節點要同步消息,因此比較耗費資源,通常在可靠性比較高的場景使用鏡像隊列。

還能夠配置其餘策略的鏡像隊列,也是一行命令便可完成配置,一些其它同步模式的栗子:

#策略名爲ha-twe,匹配以「my」開頭的queue或exchange,在集羣中隨機挑選鏡像節點,同步的節點爲2個
rabbitmqctl set_policy ha-two "^my"    '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
#策略名爲ha-nodes,匹配以「my」開頭的queue或exchange,指定rabbit@rabbitmq2和rabbit@rabbitmq3爲同步節點
rabbitmqctl set_policy ha-nodes "^my" \ '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq2", "rabbit@rabbitmq3"]}'

第5步:C#驅動RabbitMQ集羣

  C#驅動RabbitMQ集羣與C#驅動單機RabbtiMQ的方式基本同樣,區別在於使用集羣時,建立Connection指定的是一個host集合。看一個簡單的栗子:

生產者代碼:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                UserName = "wyy",//用戶名
                Password = "123456",//密碼
                AutomaticRecoveryEnabled = true,//Connection斷了,自動從新鏈接
            };
            //集羣中的三個rabbitmq節點
            List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" };
            //隨機鏈接一個rabbitmq節點
            using (var connection = factory.CreateConnection(hosts))
            {
                //建立通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("生產者準備就緒....");
                    #region 發佈100條消息
                    for (int i = 0; i < 100; i++)
                    {
                        channel.BasicPublish(exchange: "myexchange",
                                             routingKey: "mykey",
                                             basicProperties: null,
                                             body: Encoding.UTF8.GetBytes($"第{i}條消息"));
                    }
                    #endregion
                }
            }
            Console.ReadKey();
        }

消費者代碼:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                UserName = "wyy",//用戶名
                Password = "123456",//密碼
            };
            //集羣中的三個rabbitmq節點
            List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" };
            //隨機鏈接一個rabbitmq節點
            using (var connection = factory.CreateConnection(hosts))
            {
                using (var channel = connection.CreateModel())
                {
                    //使用Qos,每次接收1條消息
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false);
                    Console.WriteLine("消費者準備就緒....");
                    #region EventingBasicConsumer

                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (sender, ea) =>
                    {
                        //處理一條消息須要10s時間
                        Thread.Sleep(1000);
                        //顯示確認消息
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        Console.WriteLine($"處理消息【{Encoding.UTF8.GetString(ea.Body)}】完成!");
                    };
                    //處理消息
                    channel.BasicConsume(queue: "myqueue",
                                           autoAck: false,
                                           consumer: consumer);
                    Console.ReadKey();
                    #endregion
                }
            }
        }

   執行這兩個應用程序,結果以下:

 

   到這裏RabbtMQ的集羣搭建就告一段落了,有一個小問題:RabbitMQ的集羣默認不支持負載均衡的。咱們能夠根據設備的性能,使用Qos給各個消費者指定合適的最大發送條數,這樣能夠在必定程度上實現負載均衡。也有園友經過Haproxy實現RabbitMQ集羣的負載均衡,有興趣的小夥伴能夠研究一下,爲何使用Haprpxy而不用Ngnix呢?這是由於Haproxy支持四層(tcp,udp等)和七層(http,https,email等)的負載均衡,而Nginx只支持七層的負載均衡,而Rabbitmq是經過tcp傳輸的。本節也是RabbitMQ系列的最後一篇,若是文中有錯誤的話,但願你們能夠指出,我會及時修改,謝謝。

相關文章
相關標籤/搜索