在上一個教程中,咱們提升了消息傳遞的靈活 咱們使用direct交換而不是使用僅可以進行虛擬廣播的fanout交換,java
而且得到了基於路由key 有選擇地接收消息的可能性。spring
雖然使用direct 交換改進了咱們的系統,但它仍然有侷限性 - 它不能基於多個標準進行路由。併發
在咱們的消息傳遞系統中,咱們可能不只要根據路由key訂閱隊列,還要根據生成消息的源來訂閱隊列.app
爲了在咱們的日誌記錄系統中實現這種靈活性,咱們須要瞭解更復雜的topic交換。ide
Topic Exchangeui
發送到topic 交換的消息不能具備任意 routing_key - 它必須是由點分隔的單詞列表。單詞能夠是任何內容,但一般它們指定與消息相關的一些功能。一些有效的路由密鑰示例:「 stock.usd.nyse 」,「 nyse.vmw 」,「 quick.orange.rabbit 」。路由密鑰中能夠包含任意數量的單詞,最多可達255個字節。this
綁定密鑰也必須採用相同的形式。spa
topic 交換背後的邏輯 相似於direct 交換- 使用特定路由key發送的消息將被傳遞到與匹配綁定key綁定的全部隊列。可是,綁定鍵有兩個重要的特殊狀況:日誌
在一個例子中解釋這個是最容易的:code
在這個例子中,咱們將發送全部描述動物的消息。
消息將與包含三個單詞(兩個點)的路由鍵一塊兒發送。路由鍵中的第一個單詞將描述速度,第二個是顏色,第三個是物種:
<speed>.<colour>.<species>
咱們建立了三個綁定
這些綁定能夠歸納爲:
路由密鑰設置爲「 quick.orange.rabbit 」的消息將傳遞到兩個隊列。
消息「 lazy.orange.elephant 」也將同時發送給他們。
另外一方面,「 quick.orange.fox 」只會進入第一個隊列,而「 lazy.brown.fox 」只會進入第二個隊列。
「 lazy.pink.rabbit 」將僅傳遞到第二個隊列一次,即便它匹配兩個綁定。
「 quick.brown.fox 」與任何綁定都不匹配,所以它將被丟棄。
若是咱們違反約定併發送帶有一個或四個單詞的消息,例如「 orange 」或「 quick.orange.male.rabbit」,會發生什麼?好吧,這些消息將不匹配任何綁定,將丟失。
另外一方面,「 lazy.orange.male.rabbit 」,即便它有四個單詞,也會匹配最後一個綁定,並將被傳遞到第二個隊列。
Topic Exchange
topic exchange 功能強大,能夠像其餘exchange同樣。
當隊列與「 # 」(哈希)綁定密鑰綁定時 - 它將接收全部消息,而無論路由密鑰 - 如扇出交換。
當特殊字符「 * 」(星號)和「 # 」(哈希)未在綁定中使用時,主題交換的行爲就像直接交換同樣
主類
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Profile; import org.springframework.scheduling.annotation.EnableScheduling; import com.xingyun.springamqp.config.RabbitAmqpTutorialsRunner; @SpringBootApplication @EnableScheduling public class RabbitMq0x05SpringAmqpTopicSampleApplication { public static void main(String[] args) { SpringApplication.run(RabbitMq0x05SpringAmqpTopicSampleApplication.class, args); } @Profile("usage_message") @Bean public CommandLineRunner usage() { return new CommandLineRunner() { @Override public void run(String... arg0) throws Exception { System.out.println("This app uses Spring Profiles to control its behavior.\n"); System.out.println("Sample usage: java -jar " + "RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar " + "--spring.profiles.active=topics" + ",sender"); } }; } @Profile("!usage_message") @Bean public CommandLineRunner tutorial() { return new RabbitAmqpTutorialsRunner(); } }
Tut5Config.java
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import com.xingyun.springamqp.business.Tut5Receiver; import com.xingyun.springamqp.business.Tut5Sender; @Profile({"tut5","topics"}) @Configuration public class Tut5Config { @Bean public TopicExchange topic() { return new TopicExchange("tut.topic"); } @Profile("receiver") private static class ReceiverConfig { @Bean public Tut5Receiver receiver() { return new Tut5Receiver(); } @Bean public Queue autoDeleteQueue1() { return new AnonymousQueue(); } @Bean public Queue autoDeleteQueue2() { return new AnonymousQueue(); } @Bean public Binding binding1a(TopicExchange topic, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(topic) .with("*.orange.*"); } @Bean public Binding binding1b(TopicExchange topic, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(topic) .with("*.*.rabbit"); } @Bean public Binding binding2a(TopicExchange topic, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(topic) .with("lazy.#"); } } @Profile("sender") @Bean public Tut5Sender sender() { return new Tut5Sender(); } }
RabbitAmqpTutorialsRunner.java
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.context.ConfigurableApplicationContext; public class RabbitAmqpTutorialsRunner implements CommandLineRunner { /** * application.properties文件中配置tutorial.client.duration=10000 須要 * */ @Value("${tutorial.client.duration:0}") private int duration; @Autowired private ConfigurableApplicationContext ctx; @Override public void run(String... args) throws Exception { // TODO Auto-generated method stub System.out.println("Ready ... running for " + duration + "ms"); Thread.sleep(duration); ctx.close(); } }
Tut5Sender.java
import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; public class Tut5Sender { @Autowired private RabbitTemplate template; @Autowired private TopicExchange topic; private int index; private int count; private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"}; @Scheduled(fixedDelay = 1000, initialDelay = 500) public void send() { StringBuilder builder = new StringBuilder("Hello to "); if (++this.index == keys.length) { this.index = 0; } String key = keys[this.index]; builder.append(key).append(' '); builder.append(Integer.toString(++this.count)); String message = builder.toString(); template.convertAndSend(topic.getName(), key, message); System.out.println(" [x] Sent '" + message + "'"); } }
Tut5Receiver.java
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.util.StopWatch; public class Tut5Receiver { @RabbitListener(queues = "#{autoDeleteQueue1.name}") public void receive1(String in) throws InterruptedException { receive(in, 1); } @RabbitListener(queues = "#{autoDeleteQueue2.name}") public void receive2(String in) throws InterruptedException { receive(in, 2); } public void receive(String in, int receiver) throws InterruptedException { StopWatch watch = new StopWatch(); watch.start(); System.out.println("instance " + receiver + " [x] Received '" + in + "'"); doWork(in); watch.stop(); System.out.println("instance " + receiver + " [x] Done in " + watch.getTotalTimeSeconds() + "s"); } private void doWork(String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
查看用法
java -jar RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar
啓動生產者
java -jar RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=topics,sender
啓動消費者
java -jar RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=topics,receiver