RabbitMQ的集羣是依賴erlang集羣的,而erlang集羣是經過.erlang.cookie文件進行通訊認證的,因此咱們使用RabbitMQ集羣時只須要配置一下.erlang.cookie文件便可。下邊簡單演示一下RabbitMQ高可用集羣的搭建,附帶一個簡單使用C#驅動RabbtiMQ集羣的小栗子。html
首先準備三臺設備,這裏採用的三臺Centos7的虛擬機,測試一下各個虛擬機能不能相互ping通,若是能夠相互ping通的話,在每臺虛擬機上分別安裝RabbitMQ,能夠參考第一篇的安裝方法。node
爲了方便機器間的相互訪問,三臺centos都執行 vim /etc/hosts ,添加下邊的配置(注意修改爲本身設備的IP):redis
192.168.70.129 rabbitmq1 192.168.70.131 rabbitmq2 192.168.70.133 rabbitmq3
通常狀況,hosts文件中內容以下:vim
修改三臺設備的.erlang.cookie中的key一致。若是使用的是前邊的安裝方法,.erlang.cookie的位置爲 /var/lib/rabbitmq/.erlang.cookie (採用其餘安裝方式找不到文件的話,可使用命令 find / -name '.erlang.cookie' 找到文件位置)。這裏三臺虛擬機的key都採用192.168.70.129的key(值爲CRRQPKHDXEEIUJUOGYKN),在另外兩臺設備上 執行命令: vim /var/lib/rabbitmq/.erlang.cookie ,修改文件內容爲CRRQPKHDXEEIUJUOGYKN。在修改時若是遇到權限問題,可執行命令 chmod 600 /var/lib/rabbitmq/.erlang.cookie 修改文件的權限爲可寫,修改內容完成後,執行命令 chmod 400 /var/lib/rabbitmq/.erlang.cookie 把文件再次改爲只讀的。centos
完成上邊的兩步後,erlang集羣就搭建好了,重啓全部的設備便可。在rabbitmq1節點的虛擬機上執行命令 rabbitmqctl cluster_status 查看集羣狀態:數組
添加節點bash
把rabbitmq2節點添加到集羣中去,在rabbitmq2節點執行如下命令:cookie
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@rabbit1 rabbitmqctl start_app
執行完成後,查看集羣狀態,看到rabbitmq2已經在集羣中了,以下:app
重複上邊的步驟,把rabbitmq3也添加到集羣中,在rabbitmq3節點執行下邊命令:
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@rabbit2 rabbitmqctl start_app
查看集羣狀態,rabbitmq3也在集羣中了,以下:
這時咱們打開任意一個節點的Web管理界面,顯示以下,看到集羣已經配置完成了:
刪除節點
把某一節點從集羣中刪除很簡單,reset一下節點便可。如刪除rabbitmq3節點,在rabbitmq3上執行如下命令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
執行完成後,查看集羣狀態以下:負載均衡
如今搭建的集羣是默認的普通集羣,普通集羣中節點能夠共享集羣中的exchange,routingKey和queue,可是queue中的消息只保存在首次聲明queue的節點中。任意節點的消費者均可以消費其餘節點的消息,好比消費者鏈接rabbitmq1節點的消費者(代碼中創建Connection時,使用的rabbitmq1的IP)能夠消費節點rabbitmq2的隊列myqueue2中的消息,消息傳輸過程是:rabbitmq2把myqueue2中的消息傳輸給rabbtimq1,而後rabbitmq1節點把消息發送給consumer。由於queue中的消息只保存在首次聲明queue的節點中,這樣就有一個問題:若是某一個node節點掛掉了,那麼只能等待該節點從新鏈接才能繼續處理該節點內的消息(若是沒有設置持久化的話,節點掛掉後消息會直接丟失)。以下圖,rabbitmq1節點掛掉後,myqueue隊列就down掉了,不能被訪問。
針對上邊的問題,咱們可能會想到:若是可讓rabbitmq中的節點像redis集羣的節點同樣,每個節點都保存全部的消息,好比讓rabbitmq1不只僅保存本身隊列myqueue的消息,還保存其餘節點的隊列myqueue2和myqueue3中的消息,rabbitmq2和rabbitmq3節點也同樣,這樣就不用擔憂宕機的問題了。rabbitmq也提供了這樣的功能:鏡像隊列。鏡像隊列由一個master和多個slave組成,使用鏡像隊列消息會自動在鏡像節點間同步,而不是在consumer取數據時臨時拉取。
rabbitmq配置鏡像隊列十分簡單,咱們在任意一個node節點下執行下邊的命令就能夠完成鏡像隊列的配置(固然也能夠在Web管理界面上添加policy):
rabbitmqctl set_policy ha-all "^my" '{"ha-mode":"all","ha-sync-mode":"automatic"}' # ha-all:爲策略名稱; # ^my:爲匹配符,只有一個^表明匹配全部,^abc爲匹配名稱以abc開頭的queue或exchange; # ha-mode:爲同步模式,一共3種模式: # ①all-全部(全部的節點都同步消息), # ②exctly-指定節點的數目(需配置ha-params參數,此參數爲int類型好比2,在集羣中隨機抽取2個節點同步消息) # ③nodes-指定具體節點(需配置ha-params參數,此參數爲數組類型好比["rabbit@rabbitmq1","rabbit@rabbitmq2"],明確指定在這兩個節點上同步消息)。
打開Web管理界面,若是效果以下表示鏡像隊列已經配置完成了。當前myqueue的master節點爲rabbitmq1:
若是首次聲明queue的節點(master)掛了,其餘節點會自動變成master,如上圖myqueue的master爲rabbitmq1,停掉rabbtmq1後,結果以下:rabbitmq2成爲了master。
咱們發現rabbitmq1節點掛了後,rabbitmq2自動成爲了myqueue的master,myqueue不會down掉,能夠正常的添加/刪除/獲取消息,這就解決了普通集羣宕機的問題。使用鏡像隊列,由於各個節點要同步消息,因此比較耗費資源,通常在可靠性比較高的場景使用鏡像隊列。
還能夠配置其餘策略的鏡像隊列,也是一行命令便可完成配置,一些其它同步模式的栗子:
#策略名爲ha-twe,匹配以「my」開頭的queue或exchange,在集羣中隨機挑選鏡像節點,同步的節點爲2個 rabbitmqctl set_policy ha-two "^my" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' #策略名爲ha-nodes,匹配以「my」開頭的queue或exchange,指定rabbit@rabbitmq2和rabbit@rabbitmq3爲同步節點 rabbitmqctl set_policy ha-nodes "^my" \ '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq2", "rabbit@rabbitmq3"]}'
C#驅動RabbitMQ集羣與C#驅動單機RabbtiMQ的方式基本同樣,區別在於使用集羣時,建立Connection指定的是一個host集合。看一個簡單的栗子:
生產者代碼:
static void Main(string[] args) { var factory = new ConnectionFactory() { UserName = "wyy",//用戶名 Password = "123456",//密碼 AutomaticRecoveryEnabled = true,//Connection斷了,自動從新鏈接 }; //集羣中的三個rabbitmq節點 List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" }; //隨機鏈接一個rabbitmq節點 using (var connection = factory.CreateConnection(hosts)) { //建立通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者準備就緒...."); #region 發佈100條消息 for (int i = 0; i < 100; i++) { channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: Encoding.UTF8.GetBytes($"第{i}條消息")); } #endregion } } Console.ReadKey(); }
消費者代碼:
static void Main(string[] args) { var factory = new ConnectionFactory() { UserName = "wyy",//用戶名 Password = "123456",//密碼 }; //集羣中的三個rabbitmq節點 List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" }; //隨機鏈接一個rabbitmq節點 using (var connection = factory.CreateConnection(hosts)) { using (var channel = connection.CreateModel()) { //使用Qos,每次接收1條消息 channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false); Console.WriteLine("消費者準備就緒...."); #region EventingBasicConsumer //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { //處理一條消息須要10s時間 Thread.Sleep(1000); //顯示確認消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); Console.WriteLine($"處理消息【{Encoding.UTF8.GetString(ea.Body)}】完成!"); }; //處理消息 channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer); Console.ReadKey(); #endregion } } }
執行這兩個應用程序,結果以下:
到這裏RabbtMQ的集羣搭建就告一段落了,有一個小問題:RabbitMQ的集羣默認不支持負載均衡的。咱們能夠根據設備的性能,使用Qos給各個消費者指定合適的最大發送條數,這樣能夠在必定程度上實現負載均衡。也有園友經過Haproxy實現RabbitMQ集羣的負載均衡,有興趣的小夥伴能夠研究一下,爲何使用Haprpxy而不用Ngnix呢?這是由於Haproxy支持四層(tcp,udp等)和七層(http,https,email等)的負載均衡,而Nginx只支持七層的負載均衡,而Rabbitmq是經過tcp傳輸的。本節也是RabbitMQ系列的最後一篇,若是文中有錯誤的話,但願你們能夠指出,我會及時修改,謝謝。