RabbitMQ的原理和使用

轉載:RabbitMQ從入門到精通html

轉載:輕鬆搞定RabbitMQjava

轉載:RabbitMQ Java入門教程web

1、RabbitMQ

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
spring

2、RabbitMQ的使用場景

對於一個大型的軟件系統來講,它會有不少的組件或者說模塊或者說子系統或者(subsystem or Component or submodule)。那麼這些模塊的如何通訊?這和傳統的IPC有很大的區別。傳統的IPC不少都是在單一系統上的,模塊耦合性很大,不適合擴展(Scalability);若是使用socket那麼不一樣的模塊的確能夠部署到不一樣的機器上,可是仍是有不少問題須要解決。好比:
 1)信息的發送者和接收者如何維持這個鏈接,若是一方的鏈接中斷,這期間的數據如何方式丟失?
 2)如何下降發送者和接收者的耦合度?
 3)如何讓Priority高的接收者先接到數據?
 4)如何作到load balance?有效均衡接收者的負載?
 5)如何有效的將數據發送到相關的接收者?也就是說將接收者subscribe 不一樣的數據,如何作有效的filter。
 6)如何作到可擴展,甚至將這個通訊模塊發到cluster上?
 7)如何保證接收者接收到了完整,正確的數據?
  AMDQ協議解決了以上的問題,而RabbitMQ實現了AMQP。
安全

3、RabbitMQ的結構

RabbitMQ的應用場景架構圖以下:服務器


  1. Broker:簡單來講就是消息隊列服務器實體。
  2. Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
  3. Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
  4. Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
  5. Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  6. vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
  7. producer:消息生產者,就是投遞消息的程序。
  8. consumer:消息消費者,就是接受消息的程序。
  9. channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。

4、RabbitMQ的使用過程

AMQP模型中,消息在producer中產生,發送到MQ的exchange上,exchange根據配置的路由方式發到相應的Queue上,Queue又將消息發送給consumer,消息從queue到consumer有push和pull兩種方式。 消息隊列的使用過程大概以下:
網絡

  1. 客戶端鏈接到消息隊列服務器,打開一個channel。
  2. 客戶端聲明一個exchange,並設置相關屬性。
  3. 客戶端聲明一個queue,並設置相關屬性。
  4. 客戶端使用routing key,在exchange和queue之間創建好綁定關係。
  5. 客戶端投遞消息到exchange。
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。 exchange也有幾個類型,徹底根據key進行投遞的叫作Direct交換機,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。

4.0 安裝和配置

RabbitMQ使用Erlang語言實現,所以在使用時首先要安裝和配置erlang環境,並安裝服務器後進行相關配置,因爲不是本文主要內容因此忽略,詳見RabbitMQ簡介架構

RabbitMQ的客戶端使用時須要添加相關依賴。dom

4.1 點對點


消息生產者的代碼以下:socket

  
  
  
  
  1. package com.zenhobby.rabbit.demo;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. public class Send
  6. {
  7. //隊列名稱
  8. private final static String QUEUE_NAME = "hello";
  9. public static void main(String[] argv) throws java.io.IOException
  10. {
  11. /**
  12. * 建立鏈接鏈接到MabbitMQ
  13. */
  14. ConnectionFactory factory = new ConnectionFactory();
  15. //設置MabbitMQ所在主機ip或者主機名
  16. factory.setHost( "localhost");
  17. //建立一個鏈接
  18. Connection connection = factory.newConnection();
  19. //建立一個頻道
  20. Channel channel = connection.createChannel();
  21. //指定一個隊列
  22. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  23. //發送的消息
  24. String message = "hello world!";
  25. //往隊列中發出一條消息
  26. channel.basicPublish( "", QUEUE_NAME, null, message.getBytes());
  27. System.out.println( " [x] Sent '" + message + "'");
  28. //關閉頻道和鏈接
  29. channel.close();
  30. connection.close();
  31. }
  32. }
消息消費者的代碼以下:

  
  
  
  
  1. package com.zenhobby.rabbit.demo;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.QueueingConsumer;
  6. public class Recv
  7. {
  8. //隊列名稱
  9. private final static String QUEUE_NAME = "hello";
  10. public static void main(String[] argv) throws java.io.IOException,
  11. java.lang.InterruptedException
  12. {
  13. //打開鏈接和建立頻道,與發送端同樣
  14. ConnectionFactory factory = new ConnectionFactory();
  15. factory.setHost( "localhost");
  16. Connection connection = factory.newConnection();
  17. Channel channel = connection.createChannel();
  18. //聲明隊列,主要爲了防止消息接收者先運行此程序,隊列還不存在時建立隊列。
  19. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  20. System.out.println( " [*] Waiting for messages. To exit press CTRL+C");
  21. //建立隊列消費者
  22. QueueingConsumer consumer = new QueueingConsumer(channel);
  23. //指定消費隊列,關閉默認的消息應答
  24. channel.basicConsume(QUEUE_NAME, true, consumer);
  25. while ( true)
  26. {
  27. //nextDelivery是一個阻塞方法(內部實現實際上是阻塞隊列的take方法)
  28. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  29. String message = new String(delivery.getBody());
  30. System.out.println( " [x] Received '" + message + "'");
  31. }
  32. }
  33. }
隊列分別在生產者和消費者處建立,主要是爲了防止有一端未創建起來的時候丟失消息。

4.2 工做隊列

工做隊列的主要任務是:避免馬上執行資源密集型任務,而後必須等待其完成。相反地,咱們進行任務調度:咱們把任務封裝爲消息發送給隊列。工做進行在後臺運行並不斷的從隊列中取出任務而後執行。當你運行了多個工做進程時,任務隊列中的任務將會被工做進程共享執行。這樣的概念在web應用中極其有用,當在很短的HTTP請求間須要執行復雜的任務。

1.消息分發機制

默認的,RabbitMQ會一個一個的發送信息給下一個消費者(consumer),而不考慮每一個任務的時長等等,且是一次性分配,並不是一個一個分配。平均的每一個消費者將會得到相等數量的消息。這樣分發消息的方式叫作round-robin。

默認的任務分發雖然看似公平但存在弊端。好比:如今有2個消費者,全部的奇數的消息都是繁忙的,而偶數則是輕鬆的。按照輪詢的方式,奇數的任務交給了第一個消費者,因此一直在忙個不停。偶數的任務交給另外一個消費者,則當即完成任務,而後閒得不行。而RabbitMQ則是不瞭解這些的。這是由於當消息進入隊列,RabbitMQ就會分派消息。它不看消費者爲應答的數目,只是盲目的將第n條消息發給第n個消費者。

爲了解決這個問題,咱們使用basicQos( prefetchCount = 1)方法,來限制RabbitMQ只發不超過1條的消息給同一個消費者。當消息處理完畢後,有了反饋,纔會進行第二次發送。

  
  
  
  
  1. int prefetchCount = 1;
  2. channel.basicQos(prefetchCount);
使用公平分發,必須關閉自動應答,改成手動應答。

2. 消息確認

每一個Consumer可能須要一段時間才能處理完收到的數據。若是在這個過程當中,Consumer出錯了,異常退出了,而數據尚未處理完成,那麼很是不幸,這段數據就丟失了。由於咱們採用no-ack的方式進行確認,也就是說,每次Consumer接到數據後,而無論是否處理完成,RabbitMQ Server會當即把這個Message標記爲完成,而後從queue中刪除了。
爲了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。爲了保證數據能被正確處理而不只僅是被Consumer收到,那麼咱們不能採用no-ack。而應該是在處理完數據後發送ack。在處理數據後發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ能夠去安全的刪除它了。若是Consumer退出了可是沒有發送ack,那麼RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的狀況下數據也不會丟失。這裏並無用到超時機制。RabbitMQ僅僅經過Consumer的鏈接中斷來確認該Message並無被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來作數據處理。
默認狀況下,消息確認是打開的(enabled):

  
  
  
  
  1. boolean autoAck = false;
  2. channel.basicConsume(QUEUE_NAME, autoAck, consumer);
修改消費者以下:

  
  
  
  
  1. channel.basicQos( 1); //保證一次只分發一個
  2. // 建立隊列消費者
  3. final Consumer consumer = new DefaultConsumer(channel) {
  4. @Override
  5. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  6. String message = new String(body, "UTF-8");
  7. System.out.println( " [x] Received '" + message + "'");
  8. System.out.println( " [x] Proccessing... at " + new Date().toLocaleString());
  9. try {
  10. for ( char ch: message.toCharArray()) {
  11. if (ch == '.') {
  12. Thread.sleep( 1000);
  13. }
  14. }
  15. } catch (InterruptedException e) {
  16. } finally {
  17. System.out.println( " [x] Done! at " + new Date().toLocaleString());
  18. channel.basicAck(envelope.getDeliveryTag(), false);
  19. }
  20. }
  21. };
其中:

  
  
  
  
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
用於在消息處理完畢時返回應答狀態。若是MQ服務器未收到應答則在消費者掛掉以後從新把消息放入到隊列中以供其餘消費者使用。若是關閉了自動消息應答,手動也未設置應答,這是一個很簡單的錯誤,可是後果倒是極其嚴重的。消息在分發出去之後,得不到迴應,因此不會在內存中刪除,結果RabbitMQ會愈來愈佔用內存,致使服務器掛掉。

3. 消息持久化

爲了保證在RabbitMQ退出或者crash了數據仍沒有丟失,須要將queueMessage都要持久化。

queue的持久化須要在聲明時指定durable=True:

  
  
  
  
channel.queue_declare(queue='hello', durable=True)
message的持久化須要在發送時指定property:

  
  
  
  
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

修改後的生產者以下所示:

  
  
  
  
  1. static void Main(string[] args)
  2. {
  3. var factory = new ConnectionFactory() { HostName = "localhost" };
  4. using (var connection = factory.CreateConnection())
  5. {
  6. using (var channel = connection.CreateModel())
  7. {
  8. bool durable = true;
  9. channel.QueueDeclare( "task_queue", durable, false, false, null); //queue的持久化須要在聲明時指定durable=True
  10. var message = GetMessage(args);
  11. var body = Encoding.UTF8.GetBytes(message);
  12. var properties = channel.CreateBasicProperties();
  13. properties.SetPersistent( true); //須要持久化Message,即在Publish的時候指定一個properties,
  14. channel.BasicPublish( "", "task_hello", properties, body);
  15. }
  16. }
  17. }

4.3 Publish/Subscribe

1. 交換器

在工做隊列一節中使用的分發以下:

  
  
  
  
channel.basicPublish("", "hello", null, message.getBytes());
其中第一個入參爲空即爲默認的交換器,交換器是RabbitMQ中的概念,其主要工做是接受生產者發出的消息,並推送到消息隊列中(生產者並無直接向queue中發送任何消息,而是發給交換器由交換器轉交)。

這裏寫圖片描述
交換器的規則有:

  1. direct (直連):
  2. topic (主題)
  3. headers (標題)
  4. fanout (分發)

Direct Exchange – 處理路由鍵。須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。這是一個完整的匹配。若是一個隊列綁定到該交換機上要求路由鍵 「dog」,則只有被標記爲「dog」的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。 

  
  
  
  
  1. Channel channel = connection.createChannel();
  2. channel.exchangeDeclare( "exchangeName", "direct"); //direct fanout topic
  3. channel.queueDeclare( "queueName");
  4. channel.queueBind( "queueName", "exchangeName", "routingKey");
  5. byte[] messageBodyBytes = "hello world".getBytes();
  6. //須要綁定路由鍵
  7. channel.basicPublish( "exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);


Fanout Exchange – 不處理路由鍵。你只須要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。 Fanout交換機轉發消息是最快的。
  
  
  
  
  1. Channel channel = connection.createChannel();
  2. channel.exchangeDeclare( "exchangeName", "fanout"); //direct fanout topic
  3. channel.queueDeclare( "queueName");
  4. channel.queueBind( "queueName", "exchangeName", "routingKey");
  5. channel.queueDeclare( "queueName1");
  6. channel.queueBind( "queueName1", "exchangeName", "routingKey1");
  7. byte[] messageBodyBytes = "hello world".getBytes();
  8. //路由鍵須要設置爲空
  9. channel.basicPublish( "exchangeName", "", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);


Topic Exchange – 將路由鍵和某模式進行匹配。此時隊列須要綁定要一個模式上。符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞。所以「audit.#」可以匹配到「audit.irs.corporate」,可是「audit.*」 只會匹配到「audit.irs」。

  
  
  
  
  1. Channel channel = connection.createChannel();
  2. channel.exchangeDeclare( "exchangeName", "topic"); //direct fanout topic
  3. channel.queueDeclare( "queueName");
  4. channel.queueBind( "queueName", "exchangeName", "routingKey.*");
  5. byte[] messageBodyBytes = "hello world".getBytes();
  6. channel.basicPublish( "exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

Header Exchange

Headers類型的exchange使用的比較少,它也是忽略routingKey的一種路由方式。是使用Headers來匹配的。

Headers是一個鍵值對,能夠定義成Hashtable。發送者在發送的時候定義一些鍵值對,接收者也能夠再綁定時候傳入一些鍵值對,二者匹配的話,則對應的隊列就能夠收到消息。匹配有兩種方式all和any。這兩種方式是在接收端必需要用鍵值"x-mactch"來定義。

all表明定義的多個鍵值對都要知足,而any則代碼只要知足一個就能夠了。

fanout,direct,topic exchange的routingKey都須要要字符串形式的,而headers exchange則沒有這個要求,由於鍵值對的值能夠是任何類型

消息生產者以下:

  
  
  
  
  1. package cn.slimsmart.rabbitmq.demo.headers;
  2. import java.util.Date;
  3. import java.util.Hashtable;
  4. import java.util.Map;
  5. import org.springframework.amqp.core.ExchangeTypes;
  6. import com.rabbitmq.client.AMQP;
  7. import com.rabbitmq.client.AMQP.BasicProperties;
  8. import com.rabbitmq.client.AMQP.BasicProperties.Builder;
  9. import com.rabbitmq.client.Channel;
  10. import com.rabbitmq.client.Connection;
  11. import com.rabbitmq.client.ConnectionFactory;
  12. public class Producer {
  13. private final static String EXCHANGE_NAME = "header-exchange";
  14. @SuppressWarnings( "deprecation")
  15. public static void main(String[] args) throws Exception {
  16. // 建立鏈接和頻道
  17. ConnectionFactory factory = new ConnectionFactory();
  18. factory.setHost( "192.168.36.102");
  19. // 指定用戶 密碼
  20. factory.setUsername( "admin");
  21. factory.setPassword( "admin");
  22. // 指定端口
  23. factory.setPort(AMQP.PROTOCOL.PORT);
  24. Connection connection = factory.newConnection();
  25. Channel channel = connection.createChannel();
  26. //聲明轉發器和類型headers
  27. channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS, false, true, null);
  28. String message = new Date().toLocaleString() + " : log something";
  29. Map<String,Object> headers = new Hashtable<String, Object>();
  30. headers.put( "aaa", "01234");
  31. Builder properties = new BasicProperties.Builder();
  32. properties.headers(headers);
  33. // 指定消息發送到的轉發器,綁定鍵值對headers鍵值對
  34. channel.basicPublish(EXCHANGE_NAME, "",properties.build(),message.getBytes());
  35. System.out.println( "Sent message :'" + message + "'");
  36. channel.close();
  37. connection.close();
  38. }
  39. }

消息消費者以下:

  
  
  
  
  1. package cn.slimsmart.rabbitmq.demo.headers;
  2. import java.util.Hashtable;
  3. import java.util.Map;
  4. import org.springframework.amqp.core.ExchangeTypes;
  5. import com.rabbitmq.client.AMQP;
  6. import com.rabbitmq.client.Channel;
  7. import com.rabbitmq.client.Connection;
  8. import com.rabbitmq.client.ConnectionFactory;
  9. import com.rabbitmq.client.QueueingConsumer;
  10. public class Consumer {
  11. private final static String EXCHANGE_NAME = "header-exchange";
  12. private final static String QUEUE_NAME = "header-queue";
  13. public static void main(String[] args) throws Exception {
  14. // 建立鏈接和頻道
  15. ConnectionFactory factory = new ConnectionFactory();
  16. factory.setHost( "192.168.36.102");
  17. // 指定用戶 密碼
  18. factory.setUsername( "admin");
  19. factory.setPassword( "admin");
  20. // 指定端口
  21. factory.setPort(AMQP.PROTOCOL.PORT);
  22. Connection connection = factory.newConnection();
  23. Channel channel = connection.createChannel();
  24. //聲明轉發器和類型headers
  25. channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS, false, true, null);
  26. channel.queueDeclare(QUEUE_NAME, false, false, true, null);
  27. Map<String, Object> headers = new Hashtable<String, Object>();
  28. headers.put( "x-match", "any"); //all any
  29. headers.put( "aaa", "01234");
  30. headers.put( "bbb", "56789");
  31. // 爲轉發器指定隊列,設置binding 綁定header鍵值對
  32. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headers);
  33. QueueingConsumer consumer = new QueueingConsumer(channel);
  34. // 指定接收者,第二個參數爲自動應答,無需手動應答
  35. channel.basicConsume(QUEUE_NAME, true, consumer);
  36. while ( true) {
  37. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  38. String message = new String(delivery.getBody());
  39. System.out.println(message);
  40. }
  41. }
  42. }


Default Exchange

其實除了上面四種之外還有一種Default Exchange,它是一種特別的Direct Exchange
當你手動建立一個隊列時,後臺會自動將這個隊列綁定到一個名稱爲空的Direct類型交換機上,綁定路由名稱與隊列名稱相同。有了這個默認的交換機和綁定,咱們就能夠像其餘輕量級的隊列,如Redis那樣,直接操做隊列來處理消息。不過只是看起來是,實際上在RabbitMQ裏直接操做是不可能的。消息始終都是先發送到交換機,由交換級通過路由傳送給隊列,消費者再從隊列中獲取消息的。不過因爲這個默認交換機和路由的關係,使咱們只關心隊列這一層便可,這個比較適合作一些簡單的應用,畢竟沒有發揮RabbitMQ的最大功能,若是都用這種方式去使用的話就真是殺雞用宰牛刀了。

2. 臨時隊列

若是要在生產者和消費者之間建立一個新的隊列,又不想使用原來的隊列,臨時隊列就是爲這個場景而生的:
首先,每當咱們鏈接到RabbitMQ,咱們須要一個新的空隊列,咱們能夠用一個隨機名稱來建立,或者說讓服務器選擇一個隨機隊列名稱給咱們。
一旦咱們斷開消費者,隊列應該當即被刪除。Java客戶端提供queuedeclare()爲咱們建立一個非持久化、獨立、自動刪除的隊列名稱。

  
  
  
  
String queueName = channel.queueDeclare().getQueue();
經過上面的代碼就能獲取到一個隨機隊列名稱。 例如:它多是:amq.gen-jzty20brgko-hjmujj0wlg。

3. 綁定

這裏寫圖片描述

若是咱們已經建立了一個分發交換器和隊列,如今咱們就能夠就將咱們的隊列跟交換器進行綁定。

  
  
  
  
channel.queueBind(queueName, "logs", "");
執行完這段代碼後,日誌交換器會將消息添加到咱們的隊列中。


5、RabbitMQ實現RPC

RabbitMQ能夠用於實現RPC,二者有相像之處,使用RabbitMQ實現RPC分爲以下幾個步驟:

1. Client interface(客戶端接口)

爲了說明RPC服務可使用,咱們建立一個簡單的客戶端類。暴露一個方法——發送RPC請求,而後阻塞直到得到結果。

  
  
  
  
  1. FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
  2. String result = fibonacciRpc.call( "4");
  3. System.out.println( "fib(4) is " + result);

2. Callback queue(回調隊列)

通常在RabbitMQ中作RPC是很簡單的。客戶端發送請求消息,服務器回覆響應的消息。爲了接受響應的消息,咱們須要在請求消息中發送一個回調隊列。能夠用默認的隊列:

  
  
  
  
  1. BasicProperties props = new BasicProperties
  2. .Builder()
  3. .replyTo(callbackQueueName)
  4. .build();
  5. channel.basicPublish( "", "rpc_queue", props, message.getBytes());
  6. // ... then code to read a response message from the callback_queue ...

3. Message properties(消息屬性)

AMQP協議爲消息預約義了一組14個屬性。大部分的屬性是不多使用的。除了一下幾種:

  1. deliveryMode:標記消息傳遞模式,2-消息持久化,其餘值-瞬態。在第二篇文章中還提到過。
  2. contentType:內容類型,用於描述編碼的mime-type。例如常常爲該屬性設置JSON編碼。
  3. replyTo:應答,通用的回調隊列名稱
  4. correlationId:關聯ID,方便RPC響應與請求關聯
咱們須要添加一個新的導入:

  
  
  
  
import com.rabbitmq.client.AMQP.BasicProperties;

4. Correlation Id

在上述方法中爲每一個RPC請求建立一個回調隊列。這是很低效的。幸運的是,一個解決方案:能夠爲每一個客戶端建立一個單一的回調隊列
新的問題被提出,隊列收到一條回覆消息,可是不清楚是那條請求的回覆。這是就須要使用correlationId屬性了。咱們要爲每一個請求設置惟一的值。而後,在回調隊列中獲取消息,看看這個屬性,關聯response和request就是基於這個屬性值的。若是咱們看到一個未知的correlationId屬性值的消息,能夠放心的無視它——它不是咱們發送的請求。
你可能問道,爲何要忽略回調隊列中未知的信息,而不是看成一個失敗?這是因爲在服務器端競爭條件的致使的。雖然不太可能,可是若是RPC服務器在發送給咱們結果後,發送請求反饋前就掛掉了,這有可能會發送未知correlationId屬性值的消息。若是發生了這種狀況,重啓RPC服務器將會從新處理該請求。這就是爲何在客戶端必須很好的處理重複響應,RPC應該是冪等的

5. 實現


咱們的RPC的處理流程:

  1. 當客戶端啓動時,建立一個匿名的回調隊列
  2. 客戶端爲RPC請求設置2個屬性:replyTo:設置回調隊列名字;correlationId:標記request
  3. 請求被髮送到rpc_queue隊列中。
  4. RPC服務器端監聽rpc_queue隊列中的請求,當請求到來時,服務器端會處理而且把帶有結果的消息發送給客戶端。接收的隊列就是replyTo設定的回調隊列。
  5. 客戶端監聽回調隊列,當有消息時,檢查correlationId屬性,若是與request中匹配,那就是結果了。

RPC服務器端(RPCServer.java)

  
  
  
  
  1. /**
  2. * RPC服務器端
  3. *
  4. * @author arron
  5. * @date 2015年9月30日 下午3:49:01
  6. * @version 1.0
  7. */
  8. public class RPCServer {
  9. private static final String RPC_QUEUE_NAME = "rpc_queue";
  10. public static void main(String[] args) throws Exception {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. // 設置MabbitMQ所在主機ip或者主機名
  13. factory.setHost( "127.0.0.1");
  14. // 建立一個鏈接
  15. Connection connection = factory.newConnection();
  16. // 建立一個頻道
  17. Channel channel = connection.createChannel();
  18. //聲明隊列
  19. channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
  20. //限制:每次最多給一個消費者發送1條消息
  21. channel.basicQos( 1);
  22. //爲rpc_queue隊列建立消費者,用於處理請求
  23. QueueingConsumer consumer = new QueueingConsumer(channel);
  24. channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
  25. System.out.println( " [x] Awaiting RPC requests");
  26. while ( true) {
  27. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  28. //獲取請求中的correlationId屬性值,並將其設置到結果消息的correlationId屬性中
  29. BasicProperties props = delivery.getProperties();
  30. BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
  31. //獲取回調隊列名字
  32. String callQueueName = props.getReplyTo();
  33. String message = new String(delivery.getBody(), "UTF-8");
  34. System.out.println( " [.] fib(" + message + ")");
  35. //獲取結果
  36. String response = "" + fib(Integer.parseInt(message));
  37. //先發送回調結果
  38. channel.basicPublish( "", callQueueName, replyProps,response.getBytes());
  39. //後手動發送消息反饋
  40. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  41. }
  42. }
  43. /**
  44. * 計算斐波列其數列的第n項
  45. *
  46. * @param n
  47. * @return
  48. * @throws Exception
  49. */
  50. private static int fib(int n) throws Exception {
  51. if (n < 0)
  52. throw new Exception( "參數錯誤,n必須大於等於0");
  53. if (n == 0)
  54. return 0;
  55. if (n == 1)
  56. return 1;
  57. return fib(n - 1) + fib(n - 2);
  58. }
  59. }
RPC客戶端(RPCClient.java):

  
  
  
  
  1. /**
  2. *
  3. * @author arron
  4. * @date 2015年9月30日 下午3:44:43
  5. * @version 1.0
  6. */
  7. public class RPCClient {
  8. private static final String RPC_QUEUE_NAME = "rpc_queue";
  9. private Connection connection;
  10. private Channel channel;
  11. private String replyQueueName;
  12. private QueueingConsumer consumer;
  13. public RPCClient() throws Exception {
  14. ConnectionFactory factory = new ConnectionFactory();
  15. // 設置MabbitMQ所在主機ip或者主機名
  16. factory.setHost( "127.0.0.1");
  17. // 建立一個鏈接
  18. connection = factory.newConnection();
  19. // 建立一個頻道
  20. channel = connection.createChannel();
  21. //聲明隊列
  22. channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
  23. //爲每個客戶端獲取一個隨機的回調隊列
  24. replyQueueName = channel.queueDeclare().getQueue();
  25. //爲每個客戶端建立一個消費者(用於監聽回調隊列,獲取結果)
  26. consumer = new QueueingConsumer(channel);
  27. //消費者與隊列關聯
  28. channel.basicConsume(replyQueueName, true, consumer);
  29. }
  30. /**
  31. * 獲取斐波列其數列的值
  32. *
  33. * @param message
  34. * @return
  35. * @throws Exception
  36. */
  37. public String call(String message) throws Exception{
  38. String response = null;
  39. String corrId = java.util.UUID.randomUUID().toString();
  40. //設置replyTo和correlationId屬性值
  41. BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
  42. //發送消息到rpc_queue隊列
  43. channel.basicPublish( "", RPC_QUEUE_NAME, props, message.getBytes());
  44. while ( true) {
  45. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  46. if (delivery.getProperties().getCorrelationId().equals(corrId)) {
  47. response = new String(delivery.getBody(), "UTF-8");
  48. break;
  49. }
  50. }
  51. return response;
  52. }
  53. public static void main(String[] args) throws Exception {
  54. RPCClient fibonacciRpc = new RPCClient();
  55. String result = fibonacciRpc.call( "4");
  56. System.out.println( "fib(4) is " + result);
  57. }
  58. }

這裏的例子只是RabbitMQ中RPC服務的一個實現,你也能夠根據業務須要實現更多。rpc有一個優勢,若是一個RPC服務器處理不來,能夠再增長一個、兩個、三個。咱們的例子中的代碼還比較簡單,還有不少問題沒有解決:
  • 若是沒有發現服務器,客戶端如何處理?
  • 若是客戶端的RPC請求超時了怎麼辦?
  • 若是服務器出現了故障,發生了異常,是否將異常發送到客戶端?
  • 在處理消息前,怎樣防止無效的消息?檢查範圍、類型?

以上內容轉載自網絡,詳見開頭轉載聲明。

相關文章
相關標籤/搜索