前言,在前面我講到了RabbitMQ的六種工做模式中簡單模式和工做模式 -- http://www.javashuo.com/article/p-pldmjukv-nw.html ,這裏呢,我就一次性將剩下的四種--發佈訂閱模式/路由模式/主題模式及Rpc異步調用模式,給你們進行分析,講解一下,同時也給本身複習複習!!!java
3、發佈訂閱模式
在前面的例子中,咱們任務消息只交付給一個工做進程。在這部分,咱們將作一些徹底不一樣的事情——咱們將向多個消費者傳遞同一條消息。這種模式稱爲「發佈/訂閱」。算法
爲了說明該模式,咱們將構建一個簡單的日誌系統。它將由兩個程序組成——第一個程序將發出日誌消息,第二個程序接收它們。json
在咱們的日誌系統中,接收程序的每一個運行副本都將得到消息。這樣,咱們就能夠運行一個消費者並將日誌保存到磁盤; 同時咱們能夠運行另外一個消費者在屏幕上打印日誌。安全
最終, 消息會被廣播到全部消息接受者。服務器
Exchanges 交換機
RabbitMQ消息傳遞模型的核心思想是,生產者永遠不會將任何消息直接發送到隊列。實際上,一般生產者甚至不知道消息是否會被傳遞到任何隊列。app
相反,生產者只能向交換機(Exchange)發送消息。交換機是一個很是簡單的東西。一邊接收來自生產者的消息,另外一邊將消息推送到隊列。交換器必須確切地知道如何處理它接收到的消息。它應該被添加到一個特定的隊列中嗎?它應該添加到多個隊列中嗎?或者它應該被丟棄。這些規則由exchange的類型定義。dom
有幾種可用的交換類型:direct、topic、header和fanout。咱們將關注最後一個——fanout。讓咱們建立一個這種類型的交換機,並稱之爲 logs: ch.exchangeDeclare("logs", "fanout");異步
fanout交換機很是簡單。它只是將接收到的全部消息廣播給它所知道的全部隊列。這正是咱們的日誌系統所須要的。ide
咱們前面使用的隊列具備特定的名稱(還記得hello和task_queue嗎?)可以爲隊列命名對咱們來講相當重要——咱們須要將工做進程指向同一個隊列,在生產者和消費者之間共享隊列。ui
但日誌記錄案例不是這種狀況。咱們想要接收全部的日誌消息,而不只僅是其中的一部分。咱們還只對當前的最新消息感興趣,而不是舊消息。
要解決這個問題,咱們須要兩件事。首先,每當咱們鏈接到Rabbitmq時,咱們須要一個新的空隊列。爲此,咱們能夠建立一個具備隨機名稱的隊列,或者,更好的方法是讓服務器爲咱們選擇一個隨機隊列名稱。其次,一旦斷開與使用者的鏈接,隊列就會自動刪除。在Java客戶端中,當咱們不向queueDeclare()提供任何參數時,會建立一個具備生成名稱的、非持久的、獨佔的、自動刪除隊列
//自動生成隊列名 //非持久,獨佔,自動刪除 String queueName = ch.queueDeclare().getQueue();
綁定Bindings
咱們已經建立了一個fanout交換機和一個隊列。如今咱們須要告訴exchange向指定隊列發送消息。exchange和隊列之間的關係稱爲綁定。
//指定的隊列,與指定的交換機關聯起來 //成爲綁定 -- binding //第三個參數時 routingKey, 因爲是fanout交換機, 這裏忽略 routingKey ch.queueBind(queueName, "logs", "");
如今, logs交換機將會向咱們指定的隊列添加消息
列出綁定關係: rabbitmqctl list_bindings
完成代碼實現
生產者
生產者發出日誌消息,看起來與前一教程沒有太大不一樣。最重要的更改是,咱們如今但願將消息發佈到logs交換機,而不是無名的日誌交換機。咱們須要在發送時提供一個routingKey,可是對於fanout交換機類型,該值會被忽略。
package rabbitmq.publishsubscribe; import java.util.Scanner; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Test1 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //定義名字爲logs的交換機,交換機類型爲fanout //這一步是必須的,由於禁止發佈到不存在的交換。 ch.exchangeDeclare("logs", "fanout"); while (true) { System.out.print("輸入消息: "); String msg = new Scanner(System.in).nextLine(); if ("exit".equals(msg)) { break; } //第一個參數,向指定的交換機發送消息 //第二個參數,不指定隊列,由消費者向交換機綁定隊列 //若是尚未隊列綁定到交換器,消息就會丟失, //但這對咱們來講沒有問題;即便沒有消費者接收,咱們也能夠安全地丟棄這些信息。 ch.basicPublish("logs", "", null, msg.getBytes("UTF-8")); System.out.println("消息已發送: "+msg); } c.close(); } }
消費者
若是尚未隊列綁定到交換器,消息就會丟失,但這對咱們來講沒有問題;若是尚未消費者在聽,咱們能夠安全地丟棄這些信息。
package rabbitmq.publishsubscribe; import java.io.IOException; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test2 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //定義名字爲 logs 的交換機, 它的類型是 fanout ch.exchangeDeclare("logs", "fanout"); //自動生成對列名, //非持久,獨佔,自動刪除 String queueName = ch.queueDeclare().getQueue(); //把該隊列,綁定到 logs 交換機 //對於 fanout 類型的交換機, routingKey會被忽略,不容許null值 ch.queueBind(queueName, "logs", ""); System.out.println("等待接收數據"); //收到消息後用來處理消息的回調對象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); System.out.println("收到: "+msg); } }; //消費者取消時的回調對象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; ch.basicConsume(queueName, true, callback, cancel); } }
4、路由模式
在上一小節,咱們構建了一個簡單的日誌系統。咱們可以向多個接收者廣播日誌消息。
在這一節,咱們將向其添加一個特性—咱們將只訂閱全部消息中的一部分。例如,咱們只接收關鍵錯誤消息並保存到日誌文件(以節省磁盤空間),同時仍然可以在控制檯上打印全部日誌消息。
綁定 Bindings
在上一節,咱們已經建立了隊列與交換機的綁定。使用下面這樣的代碼:
ch.queueBind(queueName, "logs", "");
綁定是交換機和隊列之間的關係。這能夠簡單地理解爲:隊列對來自此交換的消息感興趣。
綁定可使用額外的routingKey參數。爲了不與basic_publish參數混淆,咱們將其稱爲bindingKey。這是咱們如何建立一個鍵綁定:
ch.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey的含義取決於交換機類型。咱們前面使用的fanout交換機徹底忽略它。
直連交換機 Direct exchange
上一節中的日誌系統向全部消費者廣播全部消息。咱們但願擴展它,容許根據消息的嚴重性過濾消息。例如,咱們但願將日誌消息寫入磁盤的程序只接收關鍵error,而不是在warning或info日誌消息上浪費磁盤空間。
前面咱們使用的是fanout交換機,這並無給咱們太多的靈活性——它只能進行簡單的廣播。
咱們將用直連交換機(Direct exchange)代替。它背後的路由算法很簡單——消息傳遞到bindingKey與routingKey徹底匹配的隊列。爲了說明這一點,請考慮如下設置
其中咱們能夠看到直連交換機X
,它綁定了兩個隊列。第一個隊列用綁定鍵orange
綁定,第二個隊列有兩個綁定,一個綁定black
,另外一個綁定鍵green
。
這樣設置,使用路由鍵orange
發佈到交換器的消息將被路由到隊列Q1
。帶有black
或green
路由鍵的消息將轉到Q2
。而全部其餘消息都將被丟棄。
多重綁定 Multiple bindings
使用相同的bindingKey綁定多個隊列是徹底容許的。如圖所示,可使用binding key black
將X
與Q1
和Q2
綁定。在這種狀況下,直連交換機的行爲相似於fanout,並將消息廣播給全部匹配的隊列。一條路由鍵爲black的消息將同時發送到Q1和Q2。
發送日誌
咱們將在日誌系統中使用這個模型。咱們把消息發送到一個Direct交換機,而不是fanout。咱們將提供日誌級別做爲routingKey。這樣,接收程序將可以選擇它但願接收的級別。讓咱們首先來看發出日誌。
和前面同樣,咱們首先須要建立一個exchange:
//參數1: 交換機名 //參數2: 交換機類型 ch.exchangeDeclare("direct_logs", "direct");
接着來看發送消息的代碼
//參數1: 交換機名 //參數2: routingKey, 路由鍵,這裏咱們用日誌級別,如"error","info","warning" //參數3: 其餘配置屬性 //參數4: 發佈的消息數據 ch.basicPublish("direct_logs", "error", null, message.getBytes());
訂閱
接收消息的工做原理與前面章節同樣,但有一個例外——咱們將爲感興趣的每一個日誌級別建立一個新的綁定, 示例代碼以下:
ch.queueBind(queueName, "logs", "info"); ch.queueBind(queueName, "logs", "warning");
最終代碼實現
生產者
package rabbitmq.routing; import java.util.Random; import java.util.Scanner; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Test1 { public static void main(String[] args) throws Exception { String[] a = {"warning", "info", "error"}; ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //參數1: 交換機名 //參數2: 交換機類型 ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); while (true) { System.out.print("輸入消息: "); String msg = new Scanner(System.in).nextLine(); if ("exit".equals(msg)) { break; } //隨機產生日誌級別 String level = a[new Random().nextInt(a.length)]; //參數1: 交換機名 //參數2: routingKey, 路由鍵,這裏咱們用日誌級別,如"error","info","warning" //參數3: 其餘配置屬性 //參數4: 發佈的消息數據 ch.basicPublish("direct_logs", level, null, msg.getBytes()); System.out.println("消息已發送: "+level+" - "+msg); } c.close(); } }
消費者
package rabbitmq.routing; import java.io.IOException; import java.util.Scanner; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test2 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //定義名字爲 direct_logs 的交換機, 它的類型是 "direct" ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); //自動生成對列名, //非持久,獨佔,自動刪除 String queueName = ch.queueDeclare().getQueue(); System.out.println("輸入接收的日誌級別,用空格隔開:"); String[] a = new Scanner(System.in).nextLine().split("\\s"); //把該隊列,綁定到 direct_logs 交換機 //容許使用多個 bindingKey for (String level : a) { ch.queueBind(queueName, "direct_logs", level); } System.out.println("等待接收數據"); //收到消息後用來處理消息的回調對象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); String routingKey = message.getEnvelope().getRoutingKey(); System.out.println("收到: "+routingKey+" - "+msg); } }; //消費者取消時的回調對象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; ch.basicConsume(queueName, true, callback, cancel); } }
5、主題模式
在上一小節,咱們改進了日誌系統。咱們沒有使用只能進行廣播的fanout交換機,而是使用Direct交換機,從而能夠選擇性接收日誌。
雖然使用Direct交換機改進了咱們的系統,但它仍然有侷限性——它不能基於多個標準進行路由。
在咱們的日誌系統中,咱們可能不只但願根據級別訂閱日誌,還但願根據發出日誌的源訂閱日誌。
這將給咱們帶來很大的靈活性——咱們可能只想接收來自「cron」的關鍵錯誤,但也要接收來自「kern」的全部日誌。
要在日誌系統中實現這一點,咱們須要瞭解更復雜的Topic交換機。
主題交換機 Topic exchange
發送到Topic交換機的消息,它的的routingKey,必須是由點分隔的多個單詞。單詞能夠是任何東西,但一般是與消息相關的一些特性。幾個有效的routingKey示例:「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」。routingKey能夠有任意多的單詞,最多255個字節。
bindingKey也必須採用相同的形式。Topic交換機的邏輯與直連交換機相似——使用特定routingKey發送的消息將被傳遞到全部使用匹配bindingKey綁定的隊列。bindingKey有兩個重要的特殊點:
-
*
能夠通配單個單詞。 -
#
能夠通配零個或多個單詞。
用一個例子來解釋這個問題是最簡單的
在本例中,咱們將發送描述動物的消息。這些消息將使用由三個單詞(兩個點)組成的routingKey發送。routingKey中的第一個單詞表示速度,第二個是顏色,第三個是物種:「<速度>.<顏色>.<物種>」。
咱們建立三個綁定:Q1與bindingKey 「.orange.
」 綁定。和Q2是 「*.*.rabbit
」 和 「lazy.#
」 。
這些綁定可歸納爲:
- Q1對全部橙色的動物感興趣。
- Q2想接收關於兔子和慢速動物的全部消息。
將routingKey設置爲"quick.orange.rabbit
"的消息將被髮送到兩個隊列。消息 "lazy.orange.elephant
「也發送到它們兩個。另外」quick.orange.fox
「只會發到第一個隊列,」lazy.brown.fox
「只發給第二個。」lazy.pink.rabbit
「將只被傳遞到第二個隊列一次,即便它匹配兩個綁定。」quick.brown.fox
"不匹配任何綁定,所以將被丟棄。
若是咱們違反約定,發送一個或四個單詞的信息,好比"orange
「或」quick.orange.male.rabbit
",會發生什麼?這些消息將不匹配任何綁定,並將丟失。
另外,"lazy.orange.male.rabbit
",即便它有四個單詞,也將匹配最後一個綁定,並將被傳遞到第二個隊列。
最終代碼實現
生產者
package rabbitmq.topic; import java.util.Random; import java.util.Scanner; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Test1 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //參數1: 交換機名 //參數2: 交換機類型 ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC); while (true) { System.out.print("輸入消息: "); String msg = new Scanner(System.in).nextLine(); if ("exit".contentEquals(msg)) { break; } System.out.print("輸入routingKey: "); String routingKey = new Scanner(System.in).nextLine(); //參數1: 交換機名 //參數2: routingKey, 路由鍵,這裏咱們用日誌級別,如"error","info","warning" //參數3: 其餘配置屬性 //參數4: 發佈的消息數據 ch.basicPublish("topic_logs", routingKey, null, msg.getBytes()); System.out.println("消息已發送: "+routingKey+" - "+msg); } c.close(); } }
消費者
package rabbitmq.topic; import java.io.IOException; import java.util.Scanner; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test2 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC); //自動生成對列名, //非持久,獨佔,自動刪除 String queueName = ch.queueDeclare().getQueue(); System.out.println("輸入bindingKey,用空格隔開:"); String[] a = new Scanner(System.in).nextLine().split("\\s"); //把該隊列,綁定到 topic_logs 交換機 //容許使用多個 bindingKey for (String bindingKey : a) { ch.queueBind(queueName, "topic_logs", bindingKey); } System.out.println("等待接收數據"); //收到消息後用來處理消息的回調對象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); String routingKey = message.getEnvelope().getRoutingKey(); System.out.println("收到: "+routingKey+" - "+msg); } }; //消費者取消時的回調對象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; ch.basicConsume(queueName, true, callback, cancel); } }
6、RPC模式
客戶端
在客戶端定義一個RPCClient類,並定義一個call()方法,這個方法發送一個RPC請求,並等待接收響應結果
RPCClient client = new RPCClient(); String result = client.call("4"); System.out.println( "第四個斐波那契數是: " + result);
回調隊列 Callback Queue
使用RabbitMQ去實現RPC很容易。一個客戶端發送請求信息,並獲得一個服務器端回覆的響應信息。爲了獲得響應信息,咱們須要在請求的時候發送一個「回調」隊列地址。咱們可使用默認隊列。下面是示例代碼:
//定義回調隊列, //自動生成對列名,非持久,獨佔,自動刪除 callbackQueueName = ch.queueDeclare().getQueue(); //用來設置回調隊列的參數對象 BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); //發送調用消息 ch.basicPublish("", "rpc_queue", props, message.getBytes());
消息屬性 Message Properties AMQP 0-9-1協議定義了消息的14個屬性。大部分屬性不多使用,下面是比較經常使用的4個: deliveryMode:將消息標記爲持久化(值爲2)或非持久化(任何其餘值)。 contentType:用於描述mime類型。例如,對於常用的JSON格式,將此屬性設置爲:application/json。 replyTo:一般用於指定回調隊列。 correlationId:將RPC響應與請求關聯起來很是有用。
關聯id (correlationId):
在上面的代碼中,咱們會爲每一個RPC請求建立一個回調隊列。 這是很是低效的,這裏還有一個更好的方法:讓咱們爲每一個客戶端建立一個回調隊列。
這就提出了一個新的問題,在隊列中獲得一個響應時,咱們不清楚這個響應所對應的是哪一條請求。這時候就須要使用關聯id(correlationId)。咱們將爲每一條請求設置惟一的的id值。稍後,當咱們在回調隊列裏收到一條消息的時候,咱們將查看它的id屬性,這樣咱們就能夠匹配對應的請求和響應。若是咱們發現了一個未知的id值,咱們能夠安全的丟棄這條消息,由於它不屬於咱們的請求。
最終實現代碼
RPC的工做方式是這樣的:
-
對於RPC請求,客戶端發送一條帶有兩個屬性的消息:replyTo,設置爲僅爲請求建立的匿名獨佔隊列,和correlationId,設置爲每一個請求的唯一id值。
-
請求被髮送到rpc_queue隊列。
-
RPC工做進程(即:服務器)在隊列上等待請求。當一個請求出現時,它執行任務,並使用replyTo字段中的隊列將結果發回客戶機。
-
客戶機在迴應消息隊列上等待數據。當消息出現時,它檢查correlationId屬性。若是匹配請求中的值,則向程序返回該響應數據。
服務器端
package rabbitmq.rpc; import java.io.IOException; import java.util.Random; import java.util.Scanner; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); /* * 定義隊列 rpc_queue, 將從它接收請求信息 * * 參數: * 1. queue, 對列名 * 2. durable, 持久化 * 3. exclusive, 排他 * 4. autoDelete, 自動刪除 * 5. arguments, 其餘參數屬性 */ ch.queueDeclare("rpc_queue",false,false,false,null); ch.queuePurge("rpc_queue");//清除隊列中的內容 ch.basicQos(1);//一次只接收一條消息 //收到請求消息後的回調對象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //處理收到的數據(要求第幾個斐波那契數) String msg = new String(message.getBody(), "UTF-8"); int n = Integer.parseInt(msg); //求出第n個斐波那契數 int r = fbnq(n); String response = String.valueOf(r); //設置發回響應的id, 與請求id一致, 這樣客戶端能夠把該響應與它的請求進行對應 BasicProperties replyProps = new BasicProperties.Builder() .correlationId(message.getProperties().getCorrelationId()) .build(); /* * 發送響應消息 * 1. 默認交換機 * 2. 由客戶端指定的,用來傳遞響應消息的隊列名 * 3. 參數(關聯id) * 4. 發回的響應消息 */ ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); //發送確認消息 ch.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; // CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //消費者開始接收消息, 等待從 rpc_queue接收請求消息, 不自動確認 ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback); } protected static int fbnq(int n) { if(n == 1 || n == 2) return 1; return fbnq(n-1)+fbnq(n-2); } }
客戶端
package rabbitmq.rpc; import java.io.IOException; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCClient { Connection con; Channel ch; public RPCClient() throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); con = f.newConnection(); ch = con.createChannel(); } public String call(String msg) throws Exception { //自動生成對列名,非持久,獨佔,自動刪除 String replyQueueName = ch.queueDeclare().getQueue(); //生成關聯id String corrId = UUID.randomUUID().toString(); //設置兩個參數: //1. 請求和響應的關聯id //2. 傳遞響應數據的queue BasicProperties props = new BasicProperties.Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); //向 rpc_queue 隊列發送請求數據, 請求第n個斐波那契數 ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8")); //用來保存結果的阻塞集合,取數據時,沒有數據會暫停等待 BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); //接收響應數據的回調對象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { //若是響應消息的關聯id,與請求的關聯id相同,咱們來處理這個響應數據 if (message.getProperties().getCorrelationId().contentEquals(corrId)) { //把收到的響應數據,放入阻塞集合 response.offer(new String(message.getBody(), "UTF-8")); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //開始從隊列接收響應數據 ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback); //返回保存在集合中的響應數據 return response.take(); } public static void main(String[] args) throws Exception { RPCClient client = new RPCClient(); while (true) { System.out.print("求第幾個斐波那契數:"); int n = new Scanner(System.in).nextInt(); String r = client.call(""+n); System.out.println(r); } } }