RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然:html
單向解耦java
雙向解耦(如:RPC)服務器
例如一個日誌系統,很容易使用RabbitMQ簡化工做量,一個Consumer能夠進行消息的正常處理,另外一個Consumer負責對消息進行日誌記錄,只要在程序中指定兩個Consumer所監聽的queue以相同的方式綁定到同一個exchange便可,剩下的消息分發工做由RabbitMQ完成。
併發
使用RabbitMQ server須要:負載均衡
1. ErLang語言包;分佈式
2. RabbitMQ安裝包;性能
RabbitMQ同時提供了java的客戶端(一個jar包)。測試
1. 接收消息,轉發消息到綁定的隊列。四種類型:direct, topic, headers and fanoutfetch
direct:轉發消息到routigKey指定的隊列優化
topic:按規則轉發消息(最靈活)
headers:(這個尚未接觸到)
fanout:轉發消息到全部綁定隊列
2. 若是沒有隊列綁定在交換機上,則發送到該交換機上的消息會丟失。
3. 一個交換機能夠綁定多個隊列,一個隊列能夠被多個交換機綁定。
4. topic類型交換器經過模式匹配分析消息的routing-key屬性。它將routing-key和binding-key的字符串切分紅單詞。這些單詞之間用點隔開。它一樣也會識別兩個通配符:#匹配0個或者多個單詞,*匹配一個單詞。例如,binding key:*.stock.#匹配routing key:usd.stcok和eur.stock.db,可是不匹配stock.nana。
還有一些其餘的交換器類型,如header、failover、system等,如今在當前的RabbitMQ版本中均未實現。
5. 由於交換器是命名實體,聲明一個已經存在的交換器,可是試圖賦予不一樣類型是會致使錯誤。客戶端須要刪除這個已經存在的交換器,而後從新聲明而且賦予新的類型。
6. 交換器的屬性:
- 持久性:若是啓用,交換器將會在server重啓前都有效。
- 自動刪除:若是啓用,那麼交換器將會在其綁定的隊列都被刪除掉以後自動刪除掉自身。
- 惰性:若是沒有聲明交換器,那麼在執行到使用的時候會致使異常,並不會主動聲明。
1. 隊列是RabbitMQ內部對象,存儲消息。相同屬性的queue能夠重複定義。
2. 臨時隊列。channel.queueDeclare(),有時不須要指定隊列的名字,並但願斷開鏈接時刪除隊列。
3. 隊列的屬性:
- 持久性:若是啓用,隊列將會在server重啓前都有效。
- 自動刪除:若是啓用,那麼隊列將會在全部的消費者中止使用以後自動刪除掉自身。
- 惰性:若是沒有聲明隊列,那麼在執行到使用的時候會致使異常,並不會主動聲明。
- 排他性:若是啓用,隊列只能被聲明它的消費者使用。
這些性質能夠用來建立例如排他和自刪除的transient或者私有隊列。這種隊列將會在全部連接到它的客戶端斷開鏈接以後被自動刪除掉。它們只是短暫地鏈接到server,可是能夠用於實現例如RPC或者在AMQ上的對等通訊。4. RPC的使用是這樣的:RPC客戶端聲明一個回覆隊列,惟一命名(例如用UUID),而且是自刪除和排他的。而後它發送請求給一些交換器,在消息的reply-to字段中包含了以前聲明的回覆隊列的名字。RPC服務器將會回答這些請求,使用消息的reply-to做爲routing key(默認綁定器會綁定全部的隊列到默認交換器,名稱爲「amp.交換器類型名」)發送到默認交換器。注意這僅僅是慣例而已,能夠根據和RPC服務器的約定,它能夠解釋消息的任何屬性(甚至數據體)來決定回覆給誰。
1. 消息在隊列中保存,以輪詢的方式將消息發送給監聽消息隊列的消費者,能夠動態的增長消費者以提升消息的處理能力。
2. 爲了實現負載均衡,能夠在消費者端通知RabbitMQ,一個消息處理完以後纔會接受下一個消息。
channel.basic_qos(prefetch_count=1)
注意:要防止若是全部的消費者都在處理中,則隊列中的消息會累積的狀況。
3. 消息有14個屬性,最經常使用的幾種:
deliveryMode:持久化屬性
contentType:編碼
replyTo:指定一個回調隊列
correlationId:消息id
實例代碼:
4. 消息生產者能夠選擇是否在消息被髮送到交換器而且還未投遞到隊列(沒有綁定器存在)和/或沒有消費者可以當即處理的時候獲得通知。經過設置消息的mandatory和/或immediate屬性爲真,這些投遞保障機制的能力獲得了強化。
5. 此外,一個生產者能夠設置消息的persistent屬性爲真。這樣一來,server將會嘗試將這些消息存儲在一個穩定的位置,直到server崩潰。固然,這些消息確定不會被投遞到非持久的隊列中。
1. 消息ACK,通知RabbitMQ消息已被處理,能夠從內存刪除。若是消費者因宕機或連接失敗等緣由沒有發送ACK(不一樣於ActiveMQ,在RabbitMQ裏,消息沒有過時的概念),則RabbitMQ會將消息從新發送給其餘監聽在隊列的下一個消費者。
channel.basicConsume(queuename, noAck=false, consumer);
2. 消息和隊列的持久化。定義隊列時能夠指定隊列的持久化屬性(問:持久化隊列如何刪除?)
channel.queueDeclare(queuename, durable=true, false, false, null);
發送消息時能夠指定消息持久化屬性:
channel.basicPublish(exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
這樣,即便RabbitMQ服務器重啓,也不會丟失隊列和消息。
3. publisher confirms
4. master/slave機制,配合Mirrored Queue,這種狀況下,publisher會正常發送消息和接收消息的confirm,但對於subscriber來講,須要接收Consumer Cancellation Notifications來獲得主節點失敗的通知,而後re-consume from the queue,此時要求client有處理重複消息的能力。注意:若是queue在一個新加入的節點上增長了一個slave,此時slave上沒有此前queue的信息(目前尚未同步機制)。
(經過命令行或管理插件能夠查看哪一個slave是同步的:
rabbitmqctl list_queues name slave_pids synchronised_slave_pids)
當一個slave從新加入mirrored-queue時,若是queue是durable的,則會被清空。
1. 不支持跨網段(如需支持,須要shovel或federation插件)
2. 能夠隨意的動態增長或減小、啓動或中止節點,容許節點故障
3. 集羣分爲RAM節點和DISK節點,一個集羣最好至少有一個DISK節點保存集羣的狀態。
4. 集羣的配置能夠經過命令行,也能夠經過配置文件,命令行優先。
在Openstack中,組件之間對RabbitMQ使用基本都是「Remote Procedure Calls」的方式。每個Nova服務(好比計算服務、存儲服務等)初始化時會建立兩個隊列,一個名爲「NODE-TYPE.NODE-ID」,另外一個名爲「NODE-TYPE」,NODE-TYPE是指服務的類型,NODE-ID指節點名稱。
從抽象層面上講,RabbitMQ的組件的使用相似於下圖所示:
每一個服務會綁定兩個隊列到同一個topic類型的exchange,從不一樣的隊列中接收不一樣類型的消息。消息的發送者若是關心消息的返回值,則會監聽另外一個隊列,該隊列綁定在一個direct類型的exchange。接受者收到消息並處理後,會將消息的返回發送到此exchange。
在Openstack中,若是不關心消息返回,消息的流程圖以下:
若是關心消息返回值,流程圖以下:
曾經有過一我的作過一個測試(http://www.cnblogs.com/amityat/archive/2011/08/31/2160293.html),發送1百萬個併發消息,對性能有很高的需求,因而做者對比了RabbitMQ、MSMQ、ActiveMQ、ZeroMQueue,整個過程共產生1百萬條1K的消息。測試的執行是在一個Windows Vista上進行的,測試結果以下:
雖然ZeroMQ性能較高,但這個產品不提供消息持久化,須要本身實現審計和數據恢復,所以在易用性和HA上不是使人滿意,經過測試結果能夠看到,RabbitMQ的性能確實不錯。
我在本機也作了一些測試,但個人測試是基於組件的原生配置,沒有作任何的配置優化,所以總覺的不靠譜。我只測試了RabbitMQ和ActiveMQ兩款產品,雖然網上都說ActiveMQ性能不如前者,但平心而論,ActiveMQ提供了不少配置,存在很大的調優空間,也許修改一個配置參數就會使組件的性能有一個質的飛躍。