1、鏡像模式集羣簡介node
若是RabbitMQ集羣只有一個broker節點,那麼該節點的失效將致使整個服務臨時性的不可用,而且可能會致使message的丟失(尤爲是在非持久化message存儲於非持久化queue中的時候)。固然能夠將全部的publish的message都設置爲持久化的,而且使用持久化的queue,可是這樣仍然沒法避免因爲緩存致使的問題:由於message在發送以後和被寫入磁盤並執行fsync之間存在一個雖然短暫可是會產生問題的時間窗。經過publisher的confirm機制可以確保客戶端知道哪些message已經存入磁盤,儘管如此,通常不但願遇到因單點故障致使的服務不可用。linux
若是RabbitMQ集羣是由多個broker節點構成的,那麼從服務的總體可用性上來說,該集羣對於單點失效是有彈性的,可是同時也須要注意:儘管exchange和binding可以在單點失效問題上倖免於難,可是queue和其上持有的message卻不行,這是由於queue及其內容僅僅存儲於單個節點之上,因此一個節點的失效表現爲其對應的queue不可用。正則表達式
引入RabbitMQ的鏡像隊列機制,將queue鏡像到cluster中其餘節點之上。在該實現下,若是集羣中的一個節點失效了,queue能自動地切換到鏡像中的另外一個節點以保證服務的可用性。在一般的用法中,針對每個鏡像隊列都包含一個master和多個slave,分別對應於不一樣的節點。slave會準確地按照master執行命令的順序進行命令執行,故slave與master上維護的狀態應該是相同的。除了publish外全部動做都只會向master發送,而後由master將命令執行的結果廣播給slave們,故看似從鏡像隊列中的消費操做其實是在master上執行的。一旦完成了選中的slave被提高爲master的動做,發送到鏡像隊列的message將不會再丟失:publish到鏡像隊列的全部消息老是被直接publish到master和全部的slave之上。這樣一旦master失效了,message仍然能夠繼續發送到其餘slave上。shell
RabbitMQ的鏡像隊列同時支持publisher confirm和事務兩種機制。在事務機制中,只有當前事務在所有鏡像queue中執行以後,客戶端纔會收到Tx.CommitOk的消息。一樣的,在publisher confirm機制中,向publisher進行當前message確認的前提是該message被所有鏡像所接受了。express
2、開始配置緩存
鏡像隊列是基於普通的集羣模式的,因此仍是得先配置普通集羣,而後才能設置鏡像隊列,這裏再也不贅述,能夠參考個人另外一篇文章http://linuxg.blog.51cto.com/4410110/1965369配置普通集羣。 安全
一、首先查看策略 #rabbitmqctl -n rabbit1 list_policies #以節點rabbit2和rabbit3查詢結果是同樣的 Listing policies 二、設置鏡像隊列策略 用法: set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition} Sets a policy. name The name of the policy. pattern The regular expression, which when matches on a given resources causes the policy to apply. definition The definition of the policy, as a JSON term. In most shells you are very likely to need to quote this. priority The priority of the policy as an integer. Higher numbers indicate greater precedence. The default is 0. apply-to Which types of object this policy should apply to - "queues", "exchanges" or "all". The default is "all". For example: rabbitmqctl set_policy federate-me "^amq." '{"federation-upstream-set":"all"}' This command sets the policy federate-me in the default virtual host so that built-in exchanges are federated. 釋義: -p Vhost: 可選參數,針對指定vhost下的queue進行設置 Name: policy的名稱,能夠自定義 Pattern: queue的匹配模式(正則表達式) Definition: 鏡像定義,包括三個部分ha-mode, ha-params, ha-sync-mode ha-mode: 指明鏡像隊列的模式,有效值爲 all/exactly/nodes all: 表示在集羣中全部的節點上進行鏡像 exactly: 表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定 nodes: 表示在指定的節點上進行鏡像,節點名稱經過ha-params指定 ha-params: ha-mode模式須要用到的參數 ha-sync-mode: 進行隊列中消息的同步方式,有效值爲automatic和manual priority: 可選參數,policy的優先級 ha-promote-on-shutdown: 用來控制選主的行爲的,有效值爲when-synced,always 三、開始設置: #rabbitmqctl -n rabbit1 set_policy mirror_queue "^" '{"ha-mode":"all","ha-sync-mode":"automatic","ha-promote-on-shutdown":"always"}' Setting policy "mirror_queue" for pattern "^" to "{\"ha-mode\":\"all\",\"ha-sync-mode\":\"automatic\",\"ha-promote-on-shutdown\":\"always\"}" with priority "0" 注意:"^" 可使用正則表達式,好比"^queue_" 表示對隊列名稱以「queue_」開頭的全部隊列進行鏡像 四、再次查看策略 #rabbitmqctl -n rabbit1 list_policies Listing policies / mirror_queue all ^ {"ha-mode":"all","ha-sync-mode":"automatic","ha-promote-on-shutdown":"always"} 0 #rabbitmqctl -n rabbit2 list_policies Listing policies / mirror_queue all ^ {"ha-mode":"all","ha-sync-mode":"automatic","ha-promote-on-shutdown":"always"} 0 #rabbitmqctl -n rabbit3 list_policies Listing policies / mirror_queue all ^ {"ha-mode":"all","ha-sync-mode":"automatic","ha-promote-on-shutdown":"always"} 0
能夠經過rabbitmq_management界面查看策略,Admin-->Policies, 以下圖:bash
一樣的,也能夠經過此界面添加policies,下面會介紹。服務器
五、取消鏡像隊列(在哪個節點均可以) #rabbitmqctl -n rabbit1 clear_policy mirror_queue Clearing policy "mirror_queue" 在每一個節點查看, 就不會在看到鏡像隊列了 #rabbitmqctl -n rabbit3 list_policies Listing policies #rabbitmqctl -n rabbit2 list_policies Listing policies #rabbitmqctl -n rabbit1 list_policies Listing policies
六、經過rabbitmq_management界面建立策略:Admin-->Policiesapp
填寫好策略信息以後,點擊「Add Policy」,而後會看到添加好的策略:
3、消息的同步
將新節點加入已存在的鏡像隊列是,默認狀況下ha-sync-mode=manual,鏡像隊列中的消息不會主動同步到新節點,除非顯式調用同步命令。當調用同步命令後,隊列開始阻塞,沒法對其進行操做,直到同步完畢。當ha-sync-mode=automatic時,新加入節點時會默認同步已知的鏡像隊列。因爲同步過程的限制,因此不建議在生產的active隊列(有生產消費消息)中操做。
使用下面的命令來查看那些slaves已經完成同步: #rabbitmqctl -n rabbit1 list_queues consumers messages name slave_pids synchronised_slave_pids Listing queues 0 0 book.data.chapter.queue [<rabbit2@localhost.2.5129.0>, <rabbit3@localhost.3.5082.0>] [<rabbit2@localhost.2.5129.0>, <rabbit3@localhost.3.5082.0>] 能夠經過手動的方式同步一個queue: #rabbitmqctl -n rabbit1 sync_queue name 一樣也能夠取消某個queue的同步功能: #rabbitmqctl -n rabbit1 cancel_sync_queue name 這些均可以經過management插件來設置。
4、rabbitmq持久化
RabbitMQ持久層的目的是爲了獲得好的結果,在大多數狀況下沒有配置。然而,一些配置有時是有用的。此頁解釋瞭如何配置它。在採起任何行動以前,你都應該閱讀這一切。
持久化如何工做?
首先,先講一下背景: 持久化和短暫消息均可以寫入磁盤.持久化消息一旦到達隊列,就會寫入磁盤,而短暫消息只在內存壓力較大被趕出內存時纔會寫入磁盤.持久化消息在內存緊張釋放內存時,依然也會存在內存中. 持久層指的是存儲這兩種類型消息到磁盤的機制.咱們說的隊列是指無鏡像隊列或master隊列或slave隊列. 隊列鏡像會發生以上的持久化.
持久層有兩個組件: 隊列索引和消息存儲.隊列索引負責維護消息在隊列的位置,以及是否被投遞,是否應答的信息. 所以,每一個隊列都有一個隊列索引。
消息存儲是消息的key-value存儲, 由服務器中的全部隊列共享.消息(消息體, 消息屬性或消息頭)可直接存儲於隊列索引,也能夠寫到消息存儲中.在技術上有兩個消息存儲(一個暫時的和一個持久的消息),但他們一般一塊兒被被認爲是「消息存儲」。
queue的持久化是經過durable=true來實現的。 通常程序中這麼使用: Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("queue.persistent.name", true, false, false, null); 關鍵的是第二個參數設置爲true,即durable=true. 參考連接:
RabbitMQ安全特性
一、publish消息確認機制 若是採用標準的 AMQP 協議,則惟一可以保證消息不會丟失的方式是利用事務機制 — 令 channel 處於 transactional 模式、向其 publish 消息、執行commit動做 在這種方式下,事務機制會帶來大量的多餘開銷,並會致使吞吐量降低 250% 。爲了補救事務帶來的問題,引入了 confirmation 機制(即 Publisher Confirm)。 confirm 機制是在channel上使用 confirm.select方法,處於 transactional 模式的 channel 不能再被設置成 confirm 模式,反之亦然。 在 channel 被設置成 confirm 模式以後,全部被 publish 的後續消息都將被 confirm(即 ack) 或者被 nack 一次。可是沒有對消息被confirm的快慢作任何保證 而且同一條消息不會既被 confirm 又被 nack 。 RabbitMQ 將在下面的狀況中對消息進行 confirm : RabbitMQ發現當前消息沒法被路由到指定的 queues 中; 非持久屬性的消息到達了其所應該到達的全部 queue 中(和鏡像 queue 中); 持久消息到達了其所應該到達的全部 queue 中(和鏡像 queue 中),並被持久化到了磁盤(被 fsync); 持久消息從其所在的全部 queue 中被 consume 了(若是必要則會被 acknowledge)。 二、consumer消息確認機制 爲了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。 若是沒啓動消息確認機制,RabbitMQ在consumer收到消息後就會把消息刪除。 啓用消息確認後,consumer在處理數據後應經過回調函數顯示發送ack, RabbitMQ收到ack後纔會刪掉數據。若是consumer一段時間內不回饋,RabbitMQ會將該消息重 新分配給另一個綁定在該隊列上的consumer。另外一種狀況是consumer斷開鏈接,可是獲取到的消息沒有回饋,則RabbitMQ一樣從新分配。 注意:若是consumer 沒調用basic.qos 方法設置prefetch_count=1,那即便該consumer有未ack的messages,RabbitMQ仍會繼續發messages給它。 三、消息持久化 消息確認機制確保了consumer退出時消息不會丟失,但若是是RabbitMQ自己因故障退出,消息仍是會丟失。爲了保證在RabbitMQ出現意外狀況時數據仍沒有丟失, 須要將queue和message都要持久化。 queue持久化: channel.queue_declare(queue=’hello’, durable=True) message持久化: channel.basic_publish(exchange=」, routing_key=」task_queue」, body=message, properties=pika.BasicProperties( delivery_mode = 2,) #消息持久化 ) 即便有消息持久化,數據也有可能丟失,由於rabbitmq是先將數據緩存起來,到必定條件才保存到硬盤上,這期間rabbitmq出現意外數據有可能丟失。 網上有測試代表:持久化會對RabbitMQ的性能形成比較大的影響,可能會降低10倍不止。
參考連接:http://blog.csdn.net/u013256816/article/details/71097186