在上一個教程中,咱們構建了一個簡單的fanout(扇出)交換。咱們可以向許多接收者廣播消息。java
在本教程中,咱們將爲其添加一個功能 - 咱們將只能訂閱一部分消息。例如,咱們將只能將消息指向感興趣的特定顏色(「orange」,「black」,「green」),同時仍然可以在控制檯上打印全部消息。算法
在前面的例子中,咱們已經在建立綁定。您能夠在咱們的Tut3Config文件中調用這樣的代碼:spring
@Bean public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1).to(fanout); }
綁定是交換和隊列之間的關係。這能夠簡單地理解爲:隊列對來自此交換的消息感興趣。app
綁定能夠採用額外的routingKey參數。Spring-amqp使用流暢的API來使這種關係很是清晰。咱們將交換和隊列傳遞到BindingBuilder,並簡單地將隊列綁定到交換機「與路由密鑰」,以下所示:ui
@Bean public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("orange"); }
綁定密鑰的含義取決於交換類型。咱們以前使用的 fanout 交換只是忽略了它的價值this
咱們上一個教程中的消息系統向全部消費者廣播全部消息。咱們但願擴展它以容許根據顏色類型過濾消息。例如,咱們可能須要一個程序將日誌消息寫入磁盤以僅接收嚴重錯誤,而不是在警告或信息日誌消息上浪費磁盤空間。spa
咱們使用的是fanout exchange(扇出交換),它沒有給咱們太大的靈活性 - 它只能進行無心識的廣播。日誌
咱們將使用direct exchange (直接交換)。direct change (直接交換) 背後的路由算法很簡單 - 消息進入隊列,其 綁定密鑰與消息的路由密鑰徹底匹配。code
爲了說明這一點,請考慮如下設置:blog
在此設置中,咱們能夠看到direct exchange(直接交換) X與兩個綁定到它的隊列。第一個隊列綁定orange綁定,第二個綁定有兩個綁定,一個綁定密鑰爲黑色,另外一個綁定爲綠色。
在這樣的設置中,使用路由密鑰orange發佈到exchange的消息 將被路由到隊列Q1。路由鍵爲black 或green 的消息將轉到Q2。全部其餘消息將被丟棄。
使用相同的綁定密鑰綁定多個隊列是徹底合法的。在咱們的例子中,咱們能夠在X和Q1之間添加綁定鍵black的綁定。
在這種狀況下,direct exchange(直接交換)將表現得像fanout (扇出)同樣,並將消息廣播到全部匹配的隊列。路由密鑰爲black的消息將傳送到 Q1和Q2。
咱們將此模型用於咱們的路由系統。咱們會將消息發送給direct change (直接交換),而不是fanout(扇出)。
咱們將提供顏色做爲路由鍵。這樣接收程序將可以選擇它想要接收(或訂閱)的顏色。讓咱們首先關注發送消息。
與往常同樣,咱們在Tut4Config中進行一些Spring Boot 配置:
@Bean public FanoutExchange fanout() { return new FanoutExchange("tut.fanout"); }
咱們已經準備好發送消息了。顏色,如圖中所示,能夠是「orange」,「black」或「green」之一。
接收消息將像上一個教程同樣工做,但有一個例外 - 咱們將爲咱們感興趣的每種顏色建立一個新的綁定。這也適用於Tut4Config
@Bean public DirectExchange direct() { return new DirectExchange("tut.direct"); } ... @Bean public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("orange"); }
配置類
Tut4Config.java
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import com.xingyun.springamqp.business.Tut4Receiver; import com.xingyun.springamqp.business.Tut4Sender; @Profile({"tut4","routing"}) @Configuration public class Tut4Config { @Bean public DirectExchange direct() { return new DirectExchange("tut.direct"); } @Profile("receiver") private static class ReceiverConfig { @Bean public Queue autoDeleteQueue1() { return new AnonymousQueue(); } @Bean public Queue autoDeleteQueue2() { return new AnonymousQueue(); } @Bean public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("orange"); } @Bean public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("black"); } @Bean public Binding binding2a(DirectExchange direct, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(direct) .with("green"); } @Bean public Binding binding2b(DirectExchange direct, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(direct) .with("black"); } @Bean public Tut4Receiver receiver() { return new Tut4Receiver(); } } @Profile("sender") @Bean public Tut4Sender sender() { return new Tut4Sender(); } }
生產者
Tut4Sender.java
import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; public class Tut4Sender { @Autowired private RabbitTemplate template; @Autowired private DirectExchange direct; private int index; private int count; private final String[] keys = {"orange", "black", "green"}; @Scheduled(fixedDelay = 1000, initialDelay = 500) public void send() { StringBuilder builder = new StringBuilder("Hello to "); if (++this.index == 3) { this.index = 0; } String key = keys[this.index]; builder.append(key).append(' '); builder.append(Integer.toString(++this.count)); String message = builder.toString(); template.convertAndSend(direct.getName(), key, message); System.out.println(" [x] Sent '" + message + "'"); } }
消費者
Tut4Receiver.java
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.util.StopWatch; public class Tut4Receiver { @RabbitListener(queues = "#{autoDeleteQueue1.name}") public void receive1(String in) throws InterruptedException { receive(in, 1); } @RabbitListener(queues = "#{autoDeleteQueue2.name}") public void receive2(String in) throws InterruptedException { receive(in, 2); } public void receive(String in, int receiver) throws InterruptedException { StopWatch watch = new StopWatch(); watch.start(); System.out.println("instance " + receiver + " [x] Received '" + in + "'"); doWork(in); watch.stop(); System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s"); } private void doWork(String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
查看用法
java -jar RabbitMQ_0x04_SpringAMQP_Routing_Sample-0.0.1-SNAPSHOT.jar
啓動生產者
java -jar RabbitMQ_0x04_SpringAMQP_Routing_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=routing,sender
啓動消費者
java -jar RabbitMQ_0x04_SpringAMQP_Routing_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=routing,receiver