RabbitMQ學習:RabbitMQ的六種工做模式終結篇(四)

前言,在前面我講到了RabbitMQ的六種工做模式中簡單模式和工做模式 -- http://www.javashuo.com/article/p-pldmjukv-nw.html ,這裏呢,我就一次性將剩下的四種--發佈訂閱模式/路由模式/主題模式及Rpc異步調用模式,給你們進行分析,講解一下,同時也給本身複習複習!!!java


3、發佈訂閱模式

在前面的例子中,咱們任務消息只交付給一個工做進程。在這部分,咱們將作一些徹底不一樣的事情——咱們將向多個消費者傳遞同一條消息。這種模式稱爲「發佈/訂閱」。算法

爲了說明該模式,咱們將構建一個簡單的日誌系統。它將由兩個程序組成——第一個程序將發出日誌消息,第二個程序接收它們。json

在咱們的日誌系統中,接收程序的每一個運行副本都將得到消息。這樣,咱們就能夠運行一個消費者並將日誌保存到磁盤; 同時咱們能夠運行另外一個消費者在屏幕上打印日誌。安全

最終, 消息會被廣播到全部消息接受者。服務器

Exchanges 交換機

RabbitMQ消息傳遞模型的核心思想是,生產者永遠不會將任何消息直接發送到隊列。實際上,一般生產者甚至不知道消息是否會被傳遞到任何隊列。app

相反,生產者只能向交換機(Exchange)發送消息。交換機是一個很是簡單的東西。一邊接收來自生產者的消息,另外一邊將消息推送到隊列。交換器必須確切地知道如何處理它接收到的消息。它應該被添加到一個特定的隊列中嗎?它應該添加到多個隊列中嗎?或者它應該被丟棄。這些規則由exchange的類型定義。dom

有幾種可用的交換類型:direct、topic、header和fanout。咱們將關注最後一個——fanout。讓咱們建立一個這種類型的交換機,並稱之爲 logs: ch.exchangeDeclare("logs", "fanout");異步

fanout交換機很是簡單。它只是將接收到的全部消息廣播給它所知道的全部隊列。這正是咱們的日誌系統所須要的。ide

咱們前面使用的隊列具備特定的名稱(還記得hello和task_queue嗎?)可以爲隊列命名對咱們來講相當重要——咱們須要將工做進程指向同一個隊列,在生產者和消費者之間共享隊列ui

但日誌記錄案例不是這種狀況。咱們想要接收全部的日誌消息,而不只僅是其中的一部分。咱們還只對當前的最新消息感興趣,而不是舊消息。

要解決這個問題,咱們須要兩件事。首先,每當咱們鏈接到Rabbitmq時,咱們須要一個新的空隊列。爲此,咱們能夠建立一個具備隨機名稱的隊列,或者,更好的方法是讓服務器爲咱們選擇一個隨機隊列名稱。其次,一旦斷開與使用者的鏈接,隊列就會自動刪除。在Java客戶端中,當咱們不向queueDeclare()提供任何參數時,會建立一個具備生成名稱的、非持久的、獨佔的、自動刪除隊列

//自動生成隊列名
//非持久,獨佔,自動刪除
String queueName = ch.queueDeclare().getQueue();

綁定Bindings

咱們已經建立了一個fanout交換機和一個隊列。如今咱們須要告訴exchange向指定隊列發送消息。exchange和隊列之間的關係稱爲綁定。

//指定的隊列,與指定的交換機關聯起來
//成爲綁定 -- binding
//第三個參數時 routingKey, 因爲是fanout交換機, 這裏忽略 routingKey
ch.queueBind(queueName, "logs", "");

如今, logs交換機將會向咱們指定的隊列添加消息

列出綁定關係:

rabbitmqctl list_bindings

完成代碼實現

生產者

生產者發出日誌消息,看起來與前一教程沒有太大不一樣。最重要的更改是,咱們如今但願將消息發佈到logs交換機,而不是無名的日誌交換機。咱們須要在發送時提供一個routingKey,可是對於fanout交換機類型,該值會被忽略

package rabbitmq.publishsubscribe;

import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Test1 {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		//定義名字爲logs的交換機,交換機類型爲fanout
		//這一步是必須的,由於禁止發佈到不存在的交換。
		ch.exchangeDeclare("logs", "fanout");
		
		while (true) {
			System.out.print("輸入消息: ");
			String msg = new Scanner(System.in).nextLine();
			if ("exit".equals(msg)) {
				break;
			}
			
			//第一個參數,向指定的交換機發送消息
			//第二個參數,不指定隊列,由消費者向交換機綁定隊列
			//若是尚未隊列綁定到交換器,消息就會丟失,
			//但這對咱們來講沒有問題;即便沒有消費者接收,咱們也能夠安全地丟棄這些信息。
			ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));
			System.out.println("消息已發送: "+msg);
		}

		c.close();
	}
}
消費者

若是尚未隊列綁定到交換器,消息就會丟失,但這對咱們來講沒有問題;若是尚未消費者在聽,咱們能夠安全地丟棄這些信息。

package rabbitmq.publishsubscribe;

import java.io.IOException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		//定義名字爲 logs 的交換機, 它的類型是 fanout
		ch.exchangeDeclare("logs", "fanout");
		
		//自動生成對列名,
		//非持久,獨佔,自動刪除
		String queueName = ch.queueDeclare().getQueue();
		
		//把該隊列,綁定到 logs 交換機
		//對於 fanout 類型的交換機, routingKey會被忽略,不容許null值
		ch.queueBind(queueName, "logs", "");
		
		System.out.println("等待接收數據");
		
		//收到消息後用來處理消息的回調對象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				System.out.println("收到: "+msg);
			}
		};
		
		//消費者取消時的回調對象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		ch.basicConsume(queueName, true, callback, cancel);
	}
}

4、路由模式

在上一小節,咱們構建了一個簡單的日誌系統。咱們可以向多個接收者廣播日誌消息。

在這一節,咱們將向其添加一個特性—咱們將只訂閱全部消息中的一部分。例如,咱們只接收關鍵錯誤消息並保存到日誌文件(以節省磁盤空間),同時仍然可以在控制檯上打印全部日誌消息。

綁定 Bindings

在上一節,咱們已經建立了隊列與交換機的綁定。使用下面這樣的代碼:

ch.queueBind(queueName, "logs", "");

綁定是交換機和隊列之間的關係。這能夠簡單地理解爲:隊列對來自此交換的消息感興趣。

綁定可使用額外的routingKey參數。爲了不與basic_publish參數混淆,咱們將其稱爲bindingKey。這是咱們如何建立一個鍵綁定:

ch.queueBind(queueName, EXCHANGE_NAME, "black");

bindingKey的含義取決於交換機類型。咱們前面使用的fanout交換機徹底忽略它。

直連交換機 Direct exchange

上一節中的日誌系統向全部消費者廣播全部消息。咱們但願擴展它,容許根據消息的嚴重性過濾消息。例如,咱們但願將日誌消息寫入磁盤的程序只接收關鍵error,而不是在warning或info日誌消息上浪費磁盤空間。

前面咱們使用的是fanout交換機,這並無給咱們太多的靈活性——它只能進行簡單的廣播。

咱們將用直連交換機(Direct exchange)代替。它背後的路由算法很簡單——消息傳遞到bindingKey與routingKey徹底匹配的隊列。爲了說明這一點,請考慮如下設置

其中咱們能夠看到直連交換機X,它綁定了兩個隊列。第一個隊列用綁定鍵orange綁定,第二個隊列有兩個綁定,一個綁定black,另外一個綁定鍵green

這樣設置,使用路由鍵orange發佈到交換器的消息將被路由到隊列Q1。帶有blackgreen路由鍵的消息將轉到Q2。而全部其餘消息都將被丟棄。

多重綁定 Multiple bindings

使用相同的bindingKey綁定多個隊列是徹底容許的。如圖所示,可使用binding key blackXQ1Q2綁定。在這種狀況下,直連交換機的行爲相似於fanout,並將消息廣播給全部匹配的隊列。一條路由鍵爲black的消息將同時發送到Q1和Q2

發送日誌

咱們將在日誌系統中使用這個模型。咱們把消息發送到一個Direct交換機,而不是fanout。咱們將提供日誌級別做爲routingKey。這樣,接收程序將可以選擇它但願接收的級別。讓咱們首先來看發出日誌。

和前面同樣,咱們首先須要建立一個exchange:

//參數1: 交換機名
//參數2: 交換機類型
ch.exchangeDeclare("direct_logs", "direct");

接着來看發送消息的代碼

//參數1: 交換機名
//參數2: routingKey, 路由鍵,這裏咱們用日誌級別,如"error","info","warning"
//參數3: 其餘配置屬性
//參數4: 發佈的消息數據 
ch.basicPublish("direct_logs", "error", null, message.getBytes());

訂閱

接收消息的工做原理與前面章節同樣,但有一個例外——咱們將爲感興趣的每一個日誌級別建立一個新的綁定, 示例代碼以下:

ch.queueBind(queueName, "logs", "info");
ch.queueBind(queueName, "logs", "warning");

最終代碼實現

生產者
package rabbitmq.routing;

import java.util.Random;
import java.util.Scanner;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Test1 {
	public static void main(String[] args) throws Exception {
		String[] a = {"warning", "info", "error"};
		
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		//參數1: 交換機名
		//參數2: 交換機類型
		ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
		
		while (true) {
			System.out.print("輸入消息: ");
			String msg = new Scanner(System.in).nextLine();
			if ("exit".equals(msg)) {
				break;
			}
			
			//隨機產生日誌級別
			String level = a[new Random().nextInt(a.length)];
			
			//參數1: 交換機名
			//參數2: routingKey, 路由鍵,這裏咱們用日誌級別,如"error","info","warning"
			//參數3: 其餘配置屬性
			//參數4: 發佈的消息數據 
			ch.basicPublish("direct_logs", level, null, msg.getBytes());
			System.out.println("消息已發送: "+level+" - "+msg);
			
		}

		c.close();
	}
}
消費者
package rabbitmq.routing;

import java.io.IOException;
import java.util.Scanner;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		//定義名字爲 direct_logs 的交換機, 它的類型是 "direct"
		ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
		
		//自動生成對列名,
		//非持久,獨佔,自動刪除
		String queueName = ch.queueDeclare().getQueue();
		
		System.out.println("輸入接收的日誌級別,用空格隔開:");
		String[] a = new Scanner(System.in).nextLine().split("\\s");
		
		//把該隊列,綁定到 direct_logs 交換機
		//容許使用多個 bindingKey
		for (String level : a) {
			ch.queueBind(queueName, "direct_logs", level);
		}
		
		System.out.println("等待接收數據");
		
		//收到消息後用來處理消息的回調對象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				String routingKey = message.getEnvelope().getRoutingKey();
				System.out.println("收到: "+routingKey+" - "+msg);
			}
		};
		
		//消費者取消時的回調對象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		ch.basicConsume(queueName, true, callback, cancel);
	}
}

5、主題模式

在上一小節,咱們改進了日誌系統。咱們沒有使用只能進行廣播的fanout交換機,而是使用Direct交換機,從而能夠選擇性接收日誌。

雖然使用Direct交換機改進了咱們的系統,但它仍然有侷限性——它不能基於多個標準進行路由。

在咱們的日誌系統中,咱們可能不只但願根據級別訂閱日誌,還但願根據發出日誌的源訂閱日誌。

這將給咱們帶來很大的靈活性——咱們可能只想接收來自「cron」的關鍵錯誤,但也要接收來自「kern」的全部日誌。

要在日誌系統中實現這一點,咱們須要瞭解更復雜的Topic交換機。

主題交換機 Topic exchange

發送到Topic交換機的消息,它的的routingKey,必須是由點分隔的多個單詞。單詞能夠是任何東西,但一般是與消息相關的一些特性。幾個有效的routingKey示例:「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」。routingKey能夠有任意多的單詞,最多255個字節

bindingKey也必須採用相同的形式。Topic交換機的邏輯與直連交換機相似——使用特定routingKey發送的消息將被傳遞到全部使用匹配bindingKey綁定的隊列。bindingKey有兩個重要的特殊點:

  • * 能夠通配單個單詞。

  • # 能夠通配零個或多個單詞。

用一個例子來解釋這個問題是最簡單的

在本例中,咱們將發送描述動物的消息。這些消息將使用由三個單詞(兩個點)組成的routingKey發送。routingKey中的第一個單詞表示速度,第二個是顏色,第三個是物種:「<速度>.<顏色>.<物種>」。

咱們建立三個綁定:Q1與bindingKey 「.orange.」 綁定。和Q2是 「*.*.rabbit」 和 「lazy.#」 。

這些綁定可歸納爲:

  • Q1對全部橙色的動物感興趣。
  • Q2想接收關於兔子和慢速動物的全部消息。

將routingKey設置爲"quick.orange.rabbit"的消息將被髮送到兩個隊列。消息 "lazy.orange.elephant「也發送到它們兩個。另外」quick.orange.fox「只會發到第一個隊列,」lazy.brown.fox「只發給第二個。」lazy.pink.rabbit「將只被傳遞到第二個隊列一次,即便它匹配兩個綁定。」quick.brown.fox"不匹配任何綁定,所以將被丟棄。

若是咱們違反約定,發送一個或四個單詞的信息,好比"orange「或」quick.orange.male.rabbit",會發生什麼?這些消息將不匹配任何綁定,並將丟失。

另外,"lazy.orange.male.rabbit",即便它有四個單詞,也將匹配最後一個綁定,並將被傳遞到第二個隊列。

最終代碼實現

生產者
package rabbitmq.topic;

import java.util.Random;
import java.util.Scanner;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Test1 {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		//參數1: 交換機名
		//參數2: 交換機類型
		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
		
		while (true) {
			System.out.print("輸入消息: ");
			String msg = new Scanner(System.in).nextLine();
			if ("exit".contentEquals(msg)) {
				break;
			}
			System.out.print("輸入routingKey: ");
			String routingKey = new Scanner(System.in).nextLine();
			
			//參數1: 交換機名
			//參數2: routingKey, 路由鍵,這裏咱們用日誌級別,如"error","info","warning"
			//參數3: 其餘配置屬性
			//參數4: 發佈的消息數據 
			ch.basicPublish("topic_logs", routingKey, null, msg.getBytes());
			
			System.out.println("消息已發送: "+routingKey+" - "+msg);
		}

		c.close();
	}
}
消費者
package rabbitmq.topic;

import java.io.IOException;
import java.util.Scanner;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
		
		//自動生成對列名,
		//非持久,獨佔,自動刪除
		String queueName = ch.queueDeclare().getQueue();
		
		System.out.println("輸入bindingKey,用空格隔開:");
		String[] a = new Scanner(System.in).nextLine().split("\\s");
		
		//把該隊列,綁定到 topic_logs 交換機
		//容許使用多個 bindingKey
		for (String bindingKey : a) {
			ch.queueBind(queueName, "topic_logs", bindingKey);
		}
		
		System.out.println("等待接收數據");
		
		//收到消息後用來處理消息的回調對象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				String routingKey = message.getEnvelope().getRoutingKey();
				System.out.println("收到: "+routingKey+" - "+msg);
			}
		};
		
		//消費者取消時的回調對象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		ch.basicConsume(queueName, true, callback, cancel);
	}
}

6、RPC模式

客戶端

在客戶端定義一個RPCClient類,並定義一個call()方法,這個方法發送一個RPC請求,並等待接收響應結果

RPCClient client = new RPCClient();
String result = client.call("4");
System.out.println( "第四個斐波那契數是: " + result);

回調隊列 Callback Queue

使用RabbitMQ去實現RPC很容易。一個客戶端發送請求信息,並獲得一個服務器端回覆的響應信息。爲了獲得響應信息,咱們須要在請求的時候發送一個「回調」隊列地址。咱們可使用默認隊列。下面是示例代碼:

//定義回調隊列,
//自動生成對列名,非持久,獨佔,自動刪除
callbackQueueName = ch.queueDeclare().getQueue();

//用來設置回調隊列的參數對象
BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();
//發送調用消息
ch.basicPublish("", "rpc_queue", props, message.getBytes());
消息屬性 Message Properties

AMQP 0-9-1協議定義了消息的14個屬性。大部分屬性不多使用,下面是比較經常使用的4個:

deliveryMode:將消息標記爲持久化(值爲2)或非持久化(任何其餘值)。

contentType:用於描述mime類型。例如,對於常用的JSON格式,將此屬性設置爲:application/json。

replyTo:一般用於指定回調隊列。

correlationId:將RPC響應與請求關聯起來很是有用。

關聯id (correlationId):

在上面的代碼中,咱們會爲每一個RPC請求建立一個回調隊列。 這是很是低效的,這裏還有一個更好的方法:讓咱們爲每一個客戶端建立一個回調隊列。

這就提出了一個新的問題,在隊列中獲得一個響應時,咱們不清楚這個響應所對應的是哪一條請求。這時候就須要使用關聯id(correlationId)。咱們將爲每一條請求設置惟一的的id值。稍後,當咱們在回調隊列裏收到一條消息的時候,咱們將查看它的id屬性,這樣咱們就能夠匹配對應的請求和響應。若是咱們發現了一個未知的id值,咱們能夠安全的丟棄這條消息,由於它不屬於咱們的請求。

最終實現代碼

RPC的工做方式是這樣的:
  • 對於RPC請求,客戶端發送一條帶有兩個屬性的消息:replyTo,設置爲僅爲請求建立的匿名獨佔隊列,和correlationId,設置爲每一個請求的唯一id值。

  • 請求被髮送到rpc_queue隊列。

  • RPC工做進程(即:服務器)在隊列上等待請求。當一個請求出現時,它執行任務,並使用replyTo字段中的隊列將結果發回客戶機。

  • 客戶機在迴應消息隊列上等待數據。當消息出現時,它檢查correlationId屬性。若是匹配請求中的值,則向程序返回該響應數據。

服務器端
package rabbitmq.rpc;

import java.io.IOException;
import java.util.Random;
import java.util.Scanner;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCServer {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		/*
		 * 定義隊列 rpc_queue, 將從它接收請求信息
		 * 
		 * 參數:
		 * 1. queue, 對列名
		 * 2. durable, 持久化
		 * 3. exclusive, 排他
		 * 4. autoDelete, 自動刪除
		 * 5. arguments, 其餘參數屬性
		 */
		ch.queueDeclare("rpc_queue",false,false,false,null);
		ch.queuePurge("rpc_queue");//清除隊列中的內容
		
		ch.basicQos(1);//一次只接收一條消息
		
		
		//收到請求消息後的回調對象
		DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				//處理收到的數據(要求第幾個斐波那契數)
				String msg = new String(message.getBody(), "UTF-8");
				int n = Integer.parseInt(msg);
				//求出第n個斐波那契數
				int r = fbnq(n);
				String response = String.valueOf(r);
				
				//設置發回響應的id, 與請求id一致, 這樣客戶端能夠把該響應與它的請求進行對應
				BasicProperties replyProps = new BasicProperties.Builder()
						.correlationId(message.getProperties().getCorrelationId())
						.build();
				/*
				 * 發送響應消息
				 * 1. 默認交換機
				 * 2. 由客戶端指定的,用來傳遞響應消息的隊列名
				 * 3. 參數(關聯id)
				 * 4. 發回的響應消息
				 */
				ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
				//發送確認消息
				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
			}
		};
		
		//
		CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//消費者開始接收消息, 等待從 rpc_queue接收請求消息, 不自動確認
		ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
	}

	protected static int fbnq(int n) {
		if(n == 1 || n == 2) return 1;
		
		return fbnq(n-1)+fbnq(n-2);
	}
}
客戶端
package rabbitmq.rpc;

import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;

public class RPCClient {
	Connection con;
	Channel ch;
	
	public RPCClient() throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		con = f.newConnection();
		ch = con.createChannel();
	}
	
	public String call(String msg) throws Exception {
		//自動生成對列名,非持久,獨佔,自動刪除
		String replyQueueName = ch.queueDeclare().getQueue();
		//生成關聯id
		String corrId = UUID.randomUUID().toString();
		
		//設置兩個參數:
		//1. 請求和響應的關聯id
		//2. 傳遞響應數據的queue
		BasicProperties props = new BasicProperties.Builder()
				.correlationId(corrId)
				.replyTo(replyQueueName)
				.build();
		//向 rpc_queue 隊列發送請求數據, 請求第n個斐波那契數
		ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
		
		//用來保存結果的阻塞集合,取數據時,沒有數據會暫停等待
		BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
		
		//接收響應數據的回調對象
		DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				//若是響應消息的關聯id,與請求的關聯id相同,咱們來處理這個響應數據
				if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
					//把收到的響應數據,放入阻塞集合
					response.offer(new String(message.getBody(), "UTF-8"));
				}
			}
		};

		CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//開始從隊列接收響應數據
		ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
		//返回保存在集合中的響應數據
		return response.take();
	}
	
	public static void main(String[] args) throws Exception {
		RPCClient client = new RPCClient();
		while (true) {
			System.out.print("求第幾個斐波那契數:");
			int n = new Scanner(System.in).nextInt();
			String r = client.call(""+n);
			System.out.println(r);
		}
	}
}

7、RabbitMQ六種工做模式總結:

相關文章
相關標籤/搜索