2013年3月23日 小白 學習筆記php
在瞭解RabbitMQ以前,首先要了解AMQP協議。AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。html
當前各類應用大量使用異步消息模型,並隨之產生衆多消息中間件產品及協議,標準的不一導致應用與中間件之間的耦合限制產品的選擇,並增長維護成本。AMQP是一個提供統一消息服務的應用層標準協議,基於此協議的客戶端與消息中間件可傳遞消息,並不受不一樣客戶端/中間件產品,不一樣開發語言等條件的限制。java
AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。node
AMQP的實現有:mysql
OpenAMQweb
AMQP的開源實現,用C語言編寫,運行於Linux、AIX、Solaris、Windows、OpenVMSsql
Apache Qpidshell
Apache的開源項目,支持C++、Ruby、Java、JMS、Python和.NET緩存
Redhat Enterprise MRG安全
實現了AMQP的最新版本0-10,提供了豐富的特徵集,好比徹底管理、聯合、Active-Active集羣,有Web控制檯,還有許多企業級特徵,客戶端支持C++、Ruby、Java、JMS、Python和.NET
RabbitMQ
一個獨立的開源實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ發佈在Ubuntu、FreeBSD平臺
AMQP Infrastructure
Linux下,包括Broker、管理工具、Agent和客戶端
ØMQ
一個高性能的消息平臺,在分佈式消息網絡可做爲兼容AMQP的Broker節點,綁定了多種語言,包括Python、C、C++、Lisp、Ruby等
Zyre
是一個Broker,實現了RestMS協議和AMQP協議,提供了RESTful HTTP訪問網絡AMQP的能力
以上是AMQP中的核心概念:
Broker
消息服務器的實體
虛擬主機(Virtual Host)
一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。客戶端應用程序在登陸到服務器以後,能夠選擇一個虛擬主機。每一個鏈接(包括全部channel)都必須關聯至一個虛擬主機
交換器(Exchange)
服務器中的實體,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列
消息隊列(Message Queue)
服務器中的實體,用來保存消息直到發送給消費者
生產者(Producer)
一個向交換器發佈消息的客戶端應用程序
消費者(Consumer)
一個從消息隊列中請求消息的客戶端應用程序
綁定器(Binding)
將交換器和隊列鏈接起來,而且封裝消息的路由信息
全部這些組件的屬性各不相同,可是隻有交換器和隊列被命名。客戶端能夠經過交換器的名字來發送消息,也能夠經過隊列的名字收取信息。由於AMQ 協議沒有一個通用的標準方法來得到全部組件的名稱,因此客戶端對隊列和交換器的訪問被限制在僅能使用熟知的或者只有本身知道的名字。
綁定器沒有名字,它們的生命期依賴於所緊密鏈接的交換器和隊列。若是這二者任意一個被刪除掉,那麼綁定器便失效了。這就說明,若要知道交換器和隊列的名字,還須要設置消息路由。
消息是一個不透明的數據包,這些包有以下性質:
元數據,例如內容的編碼或者代表來源的字段
標誌位,標記消息投遞時候的一些保障機制
一個特殊的字段叫作routing key
發送消息是一個很是簡單的過程。客戶端聲明一個它想要發送消息的目的交換器,而後將消息傳遞給交換器。
接受消息的最簡單辦法是設置一個訂閱。客戶端須要聲明一個隊列,而且使用一個綁定器將以前的交換器和隊列綁定起來,這樣的話,訂閱就設置完畢。
交換器的類型:
fanout交換器
不會解釋任何東西:它只是將消息投遞到全部綁定到它的隊列中
direct交換器
將消息根據其routing-key屬性投遞到包含對應key屬性的綁定器上
topic交換器
模式匹配分析消息的routing-key屬性。它將routing-key和binding-key的字符串切分紅單詞。這些單詞之間用點隔開。它一樣也會識別兩個通配符:#匹配0個或者多個單詞,*匹配一個單詞。例如,binding key *.stock.#匹配routing-key usd.stcok和eur.stock.db,可是不匹配stock.nasdaq
header交換器
根據應用程序消息的特定屬性進行匹配
failover和system交換器
當前RabbitMQ版本中均未實現
沒有綁定器,哪怕是最簡單的消息,交換器也不能將其投遞到隊列中,只能拋棄它。經過訂閱一個隊列,消費者可以從隊列中獲取消息,而後在使用事後將其從隊列中刪除。
不一樣於隊列的是,交換器有相應的類型,代表它們的投遞方式(一般是在和綁定器協做的時候)。由於交換器是命名實體,因此聲明一個已經存在的交換器, 可是試圖賦予不一樣類型是會致使錯誤。客戶端須要刪除這個已經存在的交換器,而後從新聲明而且賦予新的類型。
交換器也有一些性質:
持久性:若是啓用,交換器將會在Broker重啓前都有效
自動刪除:若是啓用,那麼交換器將會在其綁定的隊列都被刪除掉以後自動刪除掉自身
惰性:若是沒有聲明交換器,那麼在執行到使用的時候會致使異常,並不會主動聲明
AMQP Broker都會對其支持的每種交換器類型(爲每個虛擬主機)聲明一個實例。這些交換器的命名規則是amq.前綴加上類型名。例如 amq.fanout。空的交換器名稱等於amq.direct。對這個默認的direct交換器(也僅僅是對這個交換器),Broker將會聲明一個綁定了系統中全部隊列的綁定器。
這個特色告訴咱們,在系統中,任意隊列均可以和默認的direct交換器綁定在一塊兒,只要其routing-key等於隊列名字。
默認綁定器的行爲揭示了多綁定器的存在,將一個或者多個隊列和一個或者多個交換器綁定起來。這使得能夠將發送到不一樣交換器的具備不一樣routing key(或者其餘屬性)的消息發送到同一個隊列中。
隊列也有如下屬性,這些屬性和交換器所具備的屬性相似。
持久性:若是啓用,隊列將會在Broker重啓前都有效
自動刪除:若是啓用,那麼隊列將會在全部的消費者中止使用以後自動刪除掉自身
惰性:若是沒有聲明隊列,那麼在執行到使用的時候會致使異常,並不會主動聲明
排他性:若是啓用,隊列只能被聲明它的消費者使用
這些性質能夠用來建立例如排他和自刪除的transient或者私有隊列。這種隊列將會在全部連接到它的客戶端斷開鏈接以後被自動刪除掉 – 它們只是短暫地鏈接到Broker,可是能夠用於實現例如RPC或者在AMQ上的對等通訊。
AMQP上的RPC是這樣的:RPC客戶端聲明一個回覆隊列,惟一命名(例如用UUID19), 而且是自刪除和排他的。而後它發送請求給一些交換器,在消息的reply-to字段中包含了以前聲明的回覆隊列的名字。RPC服務器將會回答這些請求,使用消息的reply-to做爲routing key(以前提到過默認綁定器會綁定全部的隊列到默認交換器)發送到默認交換器。注意僅僅是慣例而已。根據和RPC服務器的約定,它能夠解釋消息的任何屬性(甚至數據體)來決定回覆給誰。
隊列也能夠是持久的,可共享,非自動刪除以及非排他的。使用同一個隊列的多個用戶接收到的並非發送到這個隊列的消息的一份拷貝,而是這些用戶共享這隊列中的一份數據,而後在使用完以後刪除掉。
RabbitMQ是一個遵循AMQP協議的消息中間件,它從生產者接收消息並遞送給消費者,在這個過程當中,根據規則進行路由,緩存與持久化。
幾個概念說明(徹底遵循AMQP中的概念):
Broker:簡單來講就是消息隊列服務器實體
Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列
Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列
Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞
vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離
producer:消息生產者,就是投遞消息的程序
consumer:消息消費者,就是接受消息的程序
channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務
消息隊列的使用過程大概以下:
客戶端鏈接到消息隊列服務器,打開一個channel
客戶端聲明一個exchange,並設置相關屬性
客戶端聲明一個queue,並設置相關屬性
客戶端使用routing key,在exchange和queue之間創建好綁定關係
客戶端投遞消息到exchange
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。
exchange也有幾個類型,徹底根據key進行投遞的叫作Direct交換機,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。對key進行模式匹配後進行投遞的叫作Topic交換機,符號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.*」只匹配」abc.def」。還有一種不須要key的,叫作Fanout交換機,它採起廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。
RabbitMQ支持消息的持久化,消息隊列持久化包括3個部分:
exchange持久化,在聲明時指定durable爲true
queue持久化,在聲明時指定durable爲true
消息持久化,在投遞時指定delivery_mode 爲2(1是非持久化)
若是exchange和queue都是持久化的,那麼它們之間的binding也是持久化的。若是exchange和queue二者之間有一個持久化,一個非持久化,就不容許創建綁定。
RabbitMQ的特性:
可靠性:包括消息持久化,消費者和生產者的消息確認
靈活路由:遵循AMQP協議,支持多種Exchange類型實現不一樣路由策略
分佈式:集羣的支持,包括本地網絡與遠程網絡
高可用性:支持主從備份與鏡像隊列
多語言支持:支持多語言的客戶端
WEB界面管理:能夠管理用戶權限,exhange,queue,binding,與實時監控
訪問控制:基於vhosts實現訪問控制
調試追蹤:支持tracing,方便調試
由於RabbitMQ由ERLANG實現,安裝RabbitMQ以前要先安裝ERLANG
安裝包:otp_src_R15B03-1.tar.gz ERLANG安裝包
rabbitmq-server-generic-unix-3.0.0.tar.gz RabbitMQ服務端
rabbitmq-java-client-bin-3.0.0.tar.gz RabbitMQ客戶端,包含性能測試腳本
如下是上述版本爲例的安裝步驟,後續章節描述的內容都對應此版本ERLANG的安裝步驟:
tar -zxf otp_src_R15B03-1.tar.gzz cd otp_src_R15B03 ./configure make make install
RabbitMQ客戶端與服務端的安裝直接解壓安裝包便可,客戶端的目錄中,rabbitmq-client.jar爲JAVA版的客戶端,編寫客戶端程序時須要引用,腳本文件爲性能測試腳本
$RABBIT_MQ_HOME/sbin目錄中的文件說明及命令使用請參考http://www.rabbitmq.com/manpages.html
RabbitMQ的啓停:
rabbitmq-server啓動服務,如要之後臺方式啓動,增長-detached參數
rabbitmqctl stop中止服務
rabbitmq-plugins enable rabbitmq_management打開WEB管理界面插件,默認訪問地址:
http://服務器IP:15672
經過配置環境變量或者配置文件,修改諸如端口,綁定IP,broker的名稱等,參考配置管理章節
例如:
修改$RABBIT_MQ_HOME/sbin/rabbitmq-env文件,增長配置:
HOSTNAME=broker_138 若是是集羣,每臺機器的名稱要不一樣
RABBITMQ_NODE_IP_ADDRESS=192.168.100.138 綁定機器IP
RabbitMQ集羣的運行須要集羣中的全部節點共享erlang.cookie,以其中一臺RabbitMQ中用戶目錄下~/.erlang.cookie文件爲準,複製文件內容,將全部節點的erlang.cookie文件都修改成此值。
先啓動全部節點的RabbitMQ,而後依次在每臺RabbitMQ中執行命令:
./rabbitmqctl stop_app ./rabbitmqctl join_cluster rabbit@broker_138 ./rabbitmqctl start_app
rabbit@broker_138爲其中一臺RabbitMQ的實例名稱,全部RabbitMQ節點都添加同一節點便可。
一個簡單的示例,P是生產者,C是消費者。P發送消息到隊列,C從隊列取消息。代碼以下:
生產者:
首先創建鏈接,在鏈接上創建channel,一般一個鏈接會創建多個channel,能夠提升消息的發送速度。這裏只創建了一個鏈接,創建多個channel時,客戶端可以使用多線程,每一個線程裏使用一個channel
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //而後聲明隊列,若是隊列沒有預先建立,會建立隊列。消息以字節碼的形式發送,因此在客戶端可使用任何編碼格式。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //最後別忘了關閉channel和connection channel.close(); connection.close();
消費者:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }
一個隊列能夠有多個消費者,隊列發送消息採用Round-robin方式,即順序發送給每一個消費者,在單位時間內,每一個消費者收到的消息數量是相同的。
以上是假設每一個消費者處理消息的速度是同樣的,若是每一個消費者處理消息的速度不一樣,那麼Round-robin方式的效率就不高了,這時能夠設置prefetch參數。prefetch的值表示在消費者爲返回上一條消息的確認信息時,隊列最多發送給此消費者的消息數目。若是消息數目達到prefetch值,隊列就中止發送消息給這個消費者,並隨之發送給不忙的消費者。prefetch經過如下代碼設置:
channel.basicQos(prefetchCount);
在上一個示例中,隊列保證每條消息發送給其中一個消費者,即每一個消息只被處理一次。在實際應用中,常常會有這樣的需求,每條消息要同時發送給多個消費者或者更復雜的狀況。也就是說消息須要根據必定的規則發送給不一樣的消費者。
爲實現消息路由,須要引入Exchange,圖中用X表示。生產者再也不直接發送消息給隊列,而是先發送到Exchange。而後Exchange與隊列綁定。這樣消息會根據不一樣規則發送給不一樣隊列,最終到達不一樣的消費者。
實現代碼以下:
生產者:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close();
消費者:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }
不少時候,隊列是非持久而且是自動刪除的,這時隊列名稱也就不重要了,能夠經過如下代碼,由服務器自動生成隊列名稱。自動生成的隊列以amq開頭
String queueName = channel.queueDeclare().getQueue();
Exchange的類型不一樣,消息的路由規則也不一樣,Exchange的類型介紹請參考RabbitMQ簡介。如下是以direct類型的Exchange爲例的生產者代碼實現, 最重要的兩步就是聲明Exhange類型與發送時指定routeKey
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); //返回info,error,warning做爲routeKey String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close();
如下是以topic類型的Exchange爲例的生產者代碼實現:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); connection.close();
一條以quick.orange.rabbit爲routeKey的消息,Q1和Q2都會收到,lazy.orange.elephant也是。quick.orange.fox只能發送到Q1,lazy.brown.fox只能到Q2. lazy.pink.rabbit雖然符合兩個匹配規則,但只發送到Q2,由於先匹配的lasy.#規則。quick.brown.fox則Q1和Q2都收不到,會被直接丟棄。
以上示例都是異步的,即生產者不須要等待消費者的反饋。在實際狀況中,有些時候在消息處理比較快,且須要及時反饋時,則須要同步的方式,生產者發送消息,在收到消費者的反饋前一直處於阻塞狀態。由於等待的返回來自遠程主機,這種方式也被稱爲RPC(Remote procedure call)。RPC的實現有不少,好比JAVA平臺下的RMI,JMX。
如下是在RabbitMQ中的實現:
RPCClient:
private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = java.util.UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; } public void close() throws Exception { connection.close(); }
RPCServer:
private static final String RPC_QUEUE_NAME = "rpc_queue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build(); String message = new String(delivery.getBody()); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }
工做流程以下:
客戶端啓動,創建發送隊列與反饋隊列
當RPC客戶端發送消息時,設置replyTo和correlationId參數。replyTo參數爲反饋隊列的名稱,correlationId做爲一次請求的惟一標識,要每次請求都不一樣,用於關聯服務端的反饋消息
請求發送到rpc_queue
服務端等待請求,當收到請求後,處理請求,並將反饋經過replyTo指定的反饋隊列發送回去
客戶端收到反饋,並校驗correlationId的值是否與發送的一致,若是一致,則一次請求完成
RabbitMQ不支持鏈接的failover,因此須要客戶端本身實現失敗重連。
爲保證消息的可靠傳遞,服務器使用持久化保證消息不丟失。包括exchange與queue必須定義爲持久的,同時發送消息時,也要設置消息爲持久消息。
在代碼中能夠經過如下語句設置發送持久消息:
channel.basicPublish(exchangeName, routeKey,MessageProperties.PERSISTENT_TEXT_PLAIN,msg)
或者:
BasicProperties basicProperties = new AMQP.BasicProperties.Builder().deliveryMode(2).build(); // deliveryMode爲1是非持久 channel.basicPublish(exchangeName, routeKey, basicProperties, msg)
生產者的消息確認叫作confirm,confirm確保消息已經發送到MQ中。當connection或channel異常時,會從新發送消息,若是消息是持久的,並不能必定保證消息持久化到磁盤中,由於消息可能存在與磁盤的緩存中。爲進一步提升可靠性,可使用事務。Confirm與事務不能同時使用。
當生產者收不到confirm時,消息可能會重複,因此若是消息不容許重複,則消費者須要本身實現消息去重。
使用如下代碼打開confirm,默認是關閉的
channel.confirmSelect();
消費者的消息確認叫作Acknowledgements,Acknowledgements確保消費者已經處理了消息,若是收不到消費者的Acknowledgements,MQ會從新發送消息。
默認Acknowledgements是自動確認,如需客戶端控制,在消費者的代碼中設置:
channel.basicConsume(queueName,false,consumer);//聲明隊列時,設置autoack爲false 。。。 //消息處理代碼 。。。 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //發送確認
一樣,MQ也可能收不到消費者的Acknowledgements,就會重複發送消息,若要避免,消費者須要本身實現消息去重。
RabbitMQ提供了3中分佈式的解決方案,cluster,federation,shovel。cluster用於可靠的本地局域網,後兩種用於不可靠的網絡。
Cluster將多臺機器鏈接爲一個邏輯broker,各機器之間使用Erlang消息通訊,因此cluster中各機器必須有同樣的Erlang cookie,而且機器之間的網絡要是可靠的,而且都運行相同版本的Erlang。
Virtual hosts,exchanges,用戶及權限都在全部節點同步,queues能夠位於本機,也能夠做爲鏡像隊列,在各個機器之間同步。
一般使用cluster來提升可靠性與增長吞吐量。
Federation容許一個exchange從另一臺機器或者cluster的exchange中接收消息,由於是兩個exchange聯合起來,因此必須有相同的用戶權限。
聯合起來的exchange是單向的點對點的鏈接。
一般應該在經過internet鏈接broker的時候使用Federation
Shovel與Federation的概念相似,只是工做在更低的層次。
Federation是從一個exchange到另外一個exchange,而Shovel是從一邊的queue中取走消息併發送到另外一個exchange。
一般在經過internet鏈接broker的時,而且須要得到比Federation更多控制權的時候使用Shovel。
如下是三種分佈式模式的簡要對比:
Federation / Shovel |
Clustering |
Brokers are logically separate and may have different owners. |
A cluster forms a single logical broker. |
Brokers can run different versions of RabbitMQ and Erlang. |
Nodes must run the same version of RabbitMQ, and frequently Erlang. |
Brokers can run different versions of RabbitMQ and Erlang. |
Brokers must be connected via reliable LAN links. Communication is via Erlang internode messaging, requiring a shared Erlang cookie. |
Brokers can be connected in whatever topology you arrange. Links can be one- or two-way. |
All nodes connect to all other nodes in both directions. |
Brokers can be connected in whatever topology you arrange. Links can be one- or two-way. |
Chooses Consistency and Availability from the CAP theorem. |
Some exchanges in a broker may be federated while some may be local. |
Clustering is all-or-nothing. |
A client connecting to any broker can only see queues in that broker. |
A client connecting to any node can see queues on all nodes. |
當生產者發送消息的速率大於消息被路由到queue的速率時,會觸發流量控制,發送速率受到限制,但不會徹底阻塞。
當內存使用達到vm_memory_high_watermark的值時,會觸發流量控制,生產者被阻塞。vm_memory_high_watermark的默認值是系統內存的40%,這個值能夠在配置文件中修改。
[{rabbit, [{vm_memory_high_watermark, 0.4}]}].
或者在運行時經過命令rabbitmqctlset_vm_memory_high_watermark fraction修改,修改當即生效,但下次重啓後恢復。因此要永久修改,必須同時修改配置文件。
當磁盤剩餘空間小於disk_free_limit的值時,觸發流量控制,生產者被阻塞。disk_free_limit的默認值是1GB,可在配置文件中修改。
[{rabbit, [{disk_free_limit, 25000000000}]}].
經過命令rabbitmqctl status能夠查看內存使用狀態,或者在WEB管理界面中點擊節點後查看。
其中Queues表示隊列中消息佔用的內存
Mnesia表示MQ中定義的exchange,queue,bindings,用戶及權限佔用的內存
詳細說明請參考http://www.rabbitmq.com/memory-use.html
RabbitMQ的默認配置在大部分狀況下是最佳配置,若是服務運行良好,不須要修改。RabbitMQ支持3種方式修改配置:環境變量、配置文件、運行時參數與策略。
環境變量能夠配置到shell環境變量中,也能夠在RabbitMQ的環境變量中配置。例如:配置服務綁定IP,能夠在shell環境變量裏配置RABBITMQ_NODE_IP_ADDRESS的值,也能夠在RabbitMQ的環境變量中配置NODE_IP_ADDRESS的值,即RabbitMQ的環境變量中變量名稱要去掉RABBITMQ_。RabbitMQ的環境變量文件在$RABBITMQ_HOME/sbin/rabbitmq-env。配置的優先級爲shell環境變量優先於RabbitMQ的環境變量,RabbitMQ的環境變量優先於RabbitMQ默認的環境變量。
經過配置文件配置,要先在環境變量中指定配置文件路徑,例如:
CONFIG_FILE=/etc/rabbitmq/rabbitmq.config
而後添加配置,例如:
[ {mnesia, [{dump_log_write_threshold, 1000}]}, {rabbit, [{tcp_listeners, [5673]}]} ].
經過rabbitmqctl命令能夠在運行時修改配置,例如修改vm_memory_high_watermark。還有些配置,好比鏡像隊列,是經過管理界面或命令配置策略實現的。
詳細的配置項請參考http://www.rabbitmq.com/configure.html
RabbitMQ支持主從備份,當主服務器不可用時,存在磁盤中的消息能夠由從服務器恢復。
也能夠在集羣的基礎上配置主從備份。主從備份依賴Pacemaker來管理資源,主從備份的方式已不推薦使用,而鏡像隊列則更容易使用,且可靠性更高。
雖然使用cluster能夠提升可靠性,exchange,binding在各個機器是共享的,可是queue中的消息實際上仍是存在單獨的機器,若是一臺機器不可用,那麼在這臺機器恢復前,這臺機器中存儲的消息也是不可用的。
爲解決這樣的問題,引入了鏡像隊列,鏡像隊列是在集羣中爲隊列創建的一個或多個物理鏡像,這些鏡像分別存儲在主節點以外的其餘節點,全部節點中的隊列共同組成一個邏輯隊列。將一個隊列作鏡像後,即便此機器不可用,RabbitMQ會自動從鏡像中選擇一個繼續使用,不會致使隊列中的消息不可用。
若是爲一個隊列創建多個鏡像,前者稱爲主節點,後者稱爲從節點。若是主節點有問題,那麼RabbitMQ會從從節點中選擇最先同步的一個做爲新的主節點,以保證儘可能不丟失消息,然而原主節點中同步以前的消息仍是會丟失。
鏡像隊列運行在cluster中,不建議經過WAN使用,也就是不建議在Federation和Shovel中使用。
鏡像隊列是經過策略配置的,添加一個策略,匹配相應的隊列,而後指定一個key爲ha-mode的參數,例如:
rabbitmqctl set_policy ha-all "^ha." '{"ha-mode":"all"}'
這個策略設置全部的節點都爲ha.開頭的隊列作鏡像。這個設置也能夠在管理界面中添加,詳細信息請參考http://www.rabbitmq.com/ha.html
RabbitMQ的JAVA客戶端中附帶了性能測試腳本,如下數據都由此腳本測試獲得。
硬件環境:CPU::Intel(R) Core(TM) i5-2400 CPU @ 3.10GHz
內存:4G
磁盤:500G 10000轉/分
軟件環境:otp_src_R15B03-1.tar.gz
rabbitmq-server-generic-unix-3.0.0.tar.gz (單臺)
rabbitmq-java-client-bin-3.0.0.tar.gz
Red Hat 4.1.2-48 (Linux version 2.6.18)
如下是發送0.5KB大小消息的測試結果:
producer consumer confirm(max unconfirmed publishes 100) ack persistent throughput (msg/s)
1 1 N N N 17650
1 1 Y N N 15640
1 1 N Y N 17100
1 1 N N Y 17368
1 1 Y N Y 15635
1 1 N Y Y 9154
1 1 Y Y N 15266
1 1 Y Y Y 6111
max unconfirmed publishes的值對於吞吐量的影響較大.
在發送持久消息與打開消費者的acknowledgements時,吞吐量變化明顯。
關於性能,請參考如下文章:
http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/
http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
RabbitMQ中的隊列性能是一個值得關注的地方。在設計方案時就應該考慮到。隊列只有在保持隊列中不積壓消息時,性能纔是最佳的,隊列中積壓的消息越多,性能降低越多。
例如生產者發送消息的速度是600msg/s,消費者接收的速度是1200msg/s,正常狀況下,是沒有性能問題的。這時若是中止消費者一段時間,讓消息在隊列中積壓,而後在打開消費者。按理消費者的速度大於生產者速度,能夠轉發新消息,並把老消息也取走,最終隊列又回到爲空的狀態。但實際狀況則不是,隊列中的消息會繼續積壓,並且會繼續變多,而這時消費者的速度就不如以前的了。
RabbitMQ中的隊列,在實現上又分爲多個小的隊列,每一個隊列裏存儲着不一樣狀態的消息。當消息不積壓時,消息由交換器到達隊列,就會被直接發送給消費者。而當消息堆積時,因爲佔用較多內存,RabbitMQ會把消息放入更深層次的隊列,例如將內存中的消息換出到磁盤上(無論消息是否持久化),而這些操做會消耗更多的CPU等系統資源,從而致使影響隊列中消息的發送。
爲了避免使消息積壓,能夠採起兩種方法:
中止向隊列發送消息
中止發送消息,讓系統資源都集中到向消費者發送消息,隊列中的消息逐漸減小,隊列最終會恢復至爲空狀態。
轉移負載
有些時候不能中止生產者,這時能夠改變綁定,讓新消息發送到新的隊列,新隊列必須位於新的機器上。固然也須要新的消費者來鏈接。這樣可讓老隊列中的消息慢慢取走,也不影響新消息的發送。
默認的集羣模式下,雖然消息能夠發送到一臺機器,而後從另外一臺機器取出,可是由於每臺機器的queue實際上消息是本地存儲,因此消息發到A的queue,從B中取,首先須要從A再次發送到B中,這樣會致使取消息的效率不高。
若是使用鏡像模式,A中的消息會同步到B中,消費者從B中取消息,消息是從本地取了,可是隊列作鏡像依然對性能影響很大,尤爲是鏡像的數目增長,性能會成倍降低。鏡像隊列優於普通模式的地方在於可靠性,普通模式中,A若是有故障,那麼A中的消息就沒法取出。鏡像模式中,A有故障,消息依然能夠從B中取出。
如下是咱們生產環境的集羣配置方案,由於對於吞吐量要求很高,單臺RabbitMQ沒法知足性能要求,因此選擇使用cluster,而鏡像模式對於性能影響很大,只能採起其餘方案:假設3臺RabbitMQ組成一個集羣。而後創建多個queue,exchange使用direct類型,並綁定全部queue,routeKey爲0到2(和MQ的數量一致)中隨機發送。生產者發送消息到exchange,並路由到各個queue,消費者也有多個,同時從各個queue獲取消息。生產者與消費者使用多channel提升速度,同時消費者使用異步接收方式。
使用多個隊列,能夠顯著提升集羣的吞吐量,每一個隊列要位於不一樣的物理機器上。考慮性能優先,也取消了消息持久化。可是在可靠性方面,若是某個隊列不可用,那麼發送給這個隊列的消息就會被丟棄。爲避免這種狀況,採用備用綁定與備用隊列的方式,即創建多個綁定,默認狀況exchange經過routeKey 0,1,2綁定隊列a,b,c(橙色線路) ,備用綁定是exchange經過routeKey 0,1,2 綁定隊列d(紫色線路)。好比當隊列a不可用時,默認的綁定routeKey爲0的消息就沒法發送到a隊列,這時備用策略自動生效,routeKey爲0的消息會被髮送到隊列d上(走紫色線路),routeKey爲1和2的消息照常發到b和c(仍是橙色線路)。這樣就能夠確保消息不丟失。若要進一步提升可靠性,下降備用隊列的壓力,能夠創建多個備用隊列,而後將綁定分散開來。
1百萬條1K的消息
[1] http://www.rabbitmq.com/documentation.html
[2] http://www.infoq.com/cn/articles/AMQP-RabbitMQ#ftn.26
[3] http://langyu.iteye.com/blog/759663/
[4] http://mysql.taobao.org/index.php/Rabbitmq
[5] http://blog.163.com/clevertanglei900@126/blog/static/111352259201011121041853/
[6] http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/
[7] http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
[8] http://www.rabbitmq.com/blog/2011/10/27/performance-of-queues-when-less-is-more/
[9] http://www.rabbitmq.com/blog/2011/09/24/sizing-your-rabbits/
[10] http://www.oschina.net/news/17973/message-queue-shootout
轉自:http://changmengnan.com/284.html