在上一篇中使用直接交換器改進了咱們的系統,使得它可以有選擇的進行接收消息,但它仍然有侷限性——它不能基於多個條件進行路由。本節咱們就進行可以基於多個條件進行路由的topics exchange學習。java
發送給主題交換器的消息不能是任意的routing_key—它必須是一個單詞列表,由點分隔。這些詞能夠是任意的,但一般它們指定與消息相關的一些特性。幾個有效的路由示例:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".。路由鍵中能夠有任意多的字,最多能夠有255個字節。
ide
路由鍵也須要是相同的形式,topic交換器背後的邏輯相似於direct交換器——發送帶有特定路由鍵的消息將被傳送到綁定匹配路由鍵的全部隊列中,學習
然而,有兩個重要的特殊狀況須要綁定鍵:
ui
We created three bindings: Q1 is bound with binding key "*.orange.*" and Q2 with "*.*.rabbit" and "lazy.#".spa
These bindings can be summarised as:code
Q1對全部的橙色動物都感興趣。
blog
Q2但願聽到關於兔子的一切,以及關於懶惰動物的一切。
three
設置了路由鍵爲 "quick.orange.rabbit"的消息將被投遞到兩個隊列,消息 "lazy.orange.elephant" 也被投遞到他們兩個,而"quick.orange.fox"將被投遞到第一個隊列,"lazy.brown.fox"僅被投遞到第二個隊列,"quick.brown.fox"沒有匹配將被捨棄。rabbitmq
經過上面的學習,咱們知道,topic主題的交換器投遞消息與redict交換器的不一樣在於,交換器類型和路由鍵的模糊匹配,如今咱們就去把以前的代碼進行改變,只須要將代碼中的交換器類型改成topic,並將綁定的路由鍵更改一下,投遞消息用的是肯定的路由鍵,接收消息經過設置匹配的模糊綁定鍵,能夠訂閱到多條件的消息,接下來上代碼,並將改動的代碼如下劃線形式標記出來。隊列
發送方代碼:
package com.rabbitmq.HelloWorld;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Publish {
private static final String EXCHANGE_NAME = "exchangeC";
public static void main(String[] args) throws IOException, TimeoutException {
// TODO Auto-generated method stub
// 建立工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.185");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setPort(5672);
// 建立鏈接
Connection connetion = factory.newConnection();
// 得到信道
Channel channel = connetion.createChannel();
// 聲明交換器(聲明瞭一個名字位exchangeA,類型修改fanout爲direct類型的交換器)
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String message = "555,2,2,33,66";
// 發送消息,將第二項參數routingkey
channel.basicPublish(EXCHANGE_NAME, "my.hello.haha", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connetion.close();
}
}
接收方一:
package com.rabbitmq.HelloWorld;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Subscribe {
private static final String EXCHANGE_NAME = "exchangeC";
private static final String QUEUE_NAME = "queueA";
public static void main(String[] args) throws IOException, TimeoutException {
// TODO Auto-generated method stub
// 建立工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.185");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setPort(5672);
// 建立鏈接
Connection connetion = factory.newConnection();
// 得到信道
Channel channel = connetion.createChannel();
// 聲明交換器(聲明瞭一個名字位exchangeA,類型修改fanout爲direct的交換器)
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 聲明一個隊列,在此採用臨時隊列
String queueName = channel.queueDeclare().getQueue();
// channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 隊列和交換器進行綁定,並設定路由鍵爲error
channel.queueBind(queueName, EXCHANGE_NAME, "my.#");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
// TODO Auto-generated method stub
String message = new String(body,"utf-8");
System.out.println("[x] received'"+message+"'");
}
};
channel.basicConsume(queueName, consumer);
}
}
接收方二:
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Subscribe {
private static final String EXCHANGE_NAME = "exchangeC";
private static final String QUEUE_NAME = "queueA";
public static void main(String[] args) throws IOException, TimeoutException {
// TODO Auto-generated method stub
// 建立工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.185");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setPort(5672);
// 建立鏈接
Connection connetion = factory.newConnection();
// 得到信道
Channel channel = connetion.createChannel();
// 聲明交換器(聲明瞭一個名字位exchangeA,修改fanout類型爲direct類型的交換器�?
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 聲明�?個隊列,在此採用臨時隊列
String queueName = channel.queueDeclare().getQueue();
// channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 隊列和交換器進行綁定,未設定路由鍵
channel.queueBind(queueName, EXCHANGE_NAME, "my.hello.*");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
// TODO Auto-generated method stub
String message = new String(body,"utf-8");
System.out.println("[x] received'"+message+"'");
}
};
channel.basicConsume(queueName, consumer);
}
}
運行後,兩個接收方均接收道理髮送方發送的消息,儘管咱們在兩個接收方配置的綁定鍵並不相同,可是其模糊匹配規則都可以匹配到發送方發送消息的路由鍵,若是有大量接收方,咱們就能夠經過設置不一樣的綁定鍵來有選擇的接收較多的消息或者是不接受消息。