譯: 5. RabbitMQ Spring AMQP 之 Topic 主題

在上一個教程中,咱們提升了消息傳遞的靈活 咱們使用direct交換而不是使用僅可以進行虛擬廣播的fanout交換,java

而且得到了基於路由key 有選擇地接收消息的可能性。spring

雖然使用direct 交換改進了咱們的系統,但它仍然有侷限性 - 它不能基於多個標準進行路由。併發

在咱們的消息傳遞系統中,咱們可能不只要根據路由key訂閱隊列,還要根據生成消息的源來訂閱隊列.app

爲了在咱們的日誌記錄系統中實現這種靈活性,咱們須要瞭解更復雜的topic交換。ide

Topic Exchangeui

發送到topic 交換的消息不能具備任意 routing_key - 它必須是由點分隔的單詞列表。單詞能夠是任何內容,但一般它們指定與消息相關的一些功能。一些有效的路由密鑰示例:「 stock.usd.nyse 」,「 nyse.vmw 」,「 quick.orange.rabbit 」。路由密鑰中能夠包含任意數量的單詞,最多可達255個字節。this

綁定密鑰也必須採用相同的形式。spa

topic 交換背後的邏輯 相似於direct 交換- 使用特定路由key發送的消息將被傳遞到與匹配綁定key綁定的全部隊列。可是,綁定鍵有兩個重要的特殊狀況:日誌

  • *(星號)能夠替代一個單詞。
  • (hash)能夠替換零個或多個單詞。

在一個例子中解釋這個是最容易的:code

在這個例子中,咱們將發送全部描述動物的消息。

消息將與包含三個單詞(兩個點)的路由鍵一塊兒發送。路由鍵中的第一個單詞將描述速度,第二個是顏色,第三個是物種:

<speed>.<colour>.<species>

咱們建立了三個綁定

  • Q1   .orange.*
  • Q2   *.*.rabbit" and "lazy.# 

這些綁定能夠歸納爲:

  • Q1對全部orange橙色動物感興趣。
  • Q2但願聽到關於rabbit兔子的一切,以及關於lazy懶惰動物的一切。

路由密鑰設置爲「 quick.orange.rabbit 」的消息將傳遞到兩個隊列。

消息「 lazy.orange.elephant 」也將同時發送給他們。

另外一方面,「 quick.orange.fox 」只會進入第一個隊列,而「 lazy.brown.fox 」只會進入第二個隊列。

「 lazy.pink.rabbit 」將僅傳遞到第二個隊列一次,即便它匹配兩個綁定。

「 quick.brown.fox 」與任何綁定都不匹配,所以它將被丟棄。

若是咱們違反約定併發送帶有一個或四個單詞的消息,例如「 orange 」或「 quick.orange.male.rabbit」,會發生什麼?好吧,這些消息將不匹配任何綁定,將丟失。

另外一方面,「 lazy.orange.male.rabbit 」,即便它有四個單詞,也會匹配最後一個綁定,並將被傳遞到第二個隊列。

Topic Exchange

topic exchange 功能強大,能夠像其餘exchange同樣。

當隊列與「  」(哈希)綁定密鑰綁定時 - 它將接收全部消息,而無論路由密鑰 - 如扇出交換。

當特殊字符「 * 」(星號)和「  」(哈希)未在綁定中使用時,主題交換的行爲就像直接交換同樣

放在一塊兒

主類

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.xingyun.springamqp.config.RabbitAmqpTutorialsRunner;

@SpringBootApplication
@EnableScheduling
public class RabbitMq0x05SpringAmqpTopicSampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMq0x05SpringAmqpTopicSampleApplication.class, args);
    }
    
    @Profile("usage_message")
    @Bean
    public CommandLineRunner usage() {
        return new CommandLineRunner() {

            @Override
            public void run(String... arg0) throws Exception {
                System.out.println("This app uses Spring Profiles to control its behavior.\n");
                System.out.println("Sample usage: java -jar "
                        + "RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar "
                        + "--spring.profiles.active=topics"
                        + ",sender");
            }
        };
    }
    
    @Profile("!usage_message")
    @Bean
    public CommandLineRunner tutorial() {
        return new RabbitAmqpTutorialsRunner();
    }
}

Tut5Config.java

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

import com.xingyun.springamqp.business.Tut5Receiver;
import com.xingyun.springamqp.business.Tut5Sender;

@Profile({"tut5","topics"})
@Configuration
public class Tut5Config {

    @Bean
    public TopicExchange topic() {
        return new TopicExchange("tut.topic");
    }

    @Profile("receiver")
    private static class ReceiverConfig {

        @Bean
        public Tut5Receiver receiver() {
            return new Tut5Receiver();
        }

        @Bean
        public Queue autoDeleteQueue1() {
            return new AnonymousQueue();
        }

        @Bean
        public Queue autoDeleteQueue2() {
            return new AnonymousQueue();
        }

        @Bean
        public Binding binding1a(TopicExchange topic, 
            Queue autoDeleteQueue1) {
            return BindingBuilder.bind(autoDeleteQueue1)
                .to(topic)
                .with("*.orange.*");
        }

        @Bean
        public Binding binding1b(TopicExchange topic, 
            Queue autoDeleteQueue1) {
            return BindingBuilder.bind(autoDeleteQueue1)
                .to(topic)
                .with("*.*.rabbit");
        }

        @Bean
        public Binding binding2a(TopicExchange topic, 
            Queue autoDeleteQueue2) {
            return BindingBuilder.bind(autoDeleteQueue2)
                .to(topic)
                .with("lazy.#");
        }

    }

    @Profile("sender")
    @Bean
    public Tut5Sender sender() {
        return new Tut5Sender();
    }

}

RabbitAmqpTutorialsRunner.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ConfigurableApplicationContext;

public class RabbitAmqpTutorialsRunner implements CommandLineRunner {

    /**
     * application.properties文件中配置tutorial.client.duration=10000 須要
     * */
    @Value("${tutorial.client.duration:0}")
    private int duration;

    @Autowired
    private ConfigurableApplicationContext ctx;

    @Override
    public void run(String... args) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("Ready ... running for " + duration + "ms");
        Thread.sleep(duration);
        ctx.close();
    }

}

Tut5Sender.java

import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;

public class Tut5Sender {

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private TopicExchange topic;


    private int index;

    private int count;

    private final String[] keys = {"quick.orange.rabbit", 
            "lazy.orange.elephant", "quick.orange.fox",
            "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder builder = new StringBuilder("Hello to ");
        if (++this.index == keys.length) {
            this.index = 0;
        }
        String key = keys[this.index];
        builder.append(key).append(' ');
        builder.append(Integer.toString(++this.count));
        String message = builder.toString();
        template.convertAndSend(topic.getName(), key, message);
        System.out.println(" [x] Sent '" + message + "'");
    }

}

Tut5Receiver.java

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class Tut5Receiver {

    @RabbitListener(queues = "#{autoDeleteQueue1.name}")
    public void receive1(String in) throws InterruptedException {
        receive(in, 1);
    }

    @RabbitListener(queues = "#{autoDeleteQueue2.name}")
    public void receive2(String in) throws InterruptedException {
        receive(in, 2);
    }

    public void receive(String in, int receiver) throws 
        InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + receiver + " [x] Received '" 
            + in + "'");
        doWork(in);
        watch.stop();
        System.out.println("instance " + receiver + " [x] Done in " 
            + watch.getTotalTimeSeconds() + "s");
    }

    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }
}

 

查看用法

java -jar RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar

啓動生產者

java -jar RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=topics,sender

啓動消費者

java -jar RabbitMQ_0x05_SpringAMQP_Topic_Sample-0.0.1-SNAPSHOT.jar --spring.profiles.active=topics,receiver

相關文章
相關標籤/搜索