springboot rabbitmq整合

這一篇咱們來把消息中間件整合到springboot中web

=====================================================================spring

首先在服務器上安裝rabbitmq的服務,用docker拉取便可,再也不詳細描述。docker

直接來擼代碼springboot

首先咱們先添加rabbitmq的依賴服務器

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在配置文件中添加必要的配置信息併發

spring.rabbitmq.host=192.168.0.86
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

好了,基本的配置就已經配置完畢了tcp

rabbitmq有六種模式spring-boot

 

咱們逐個來看springboot是怎麼實現的呢高併發

1.hello world

 

 P表明生產者,C表明消費者,紅色代碼消息隊列。P將消息發送到消息隊列,C對消息進行處理。測試

咱們先建立一個隊列

@Bean
    public Queue Queue() {
        return new Queue("hello");
    }

而後我再建立一個生產者

@Controller
public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context);
    }
}

再建立一個消費者

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
}

再寫一個測試用例看看

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
        @Autowired
        private HelloSender helloSender;

        @Test
        public void hello() throws Exception {
            helloSender.send();
        }
}

 

成功!

2.工做模式(競爭)

一個消息產生者,多個消息的消費者。競爭搶消息

咱們先建立一個隊列

@Bean
    public Queue Queue2() {
        return new Queue("neo");
    }

再建立一個消息生產者

@Controller
public class NeoSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(int i) {
        String context = "spirng boot neo queue"+" ****** "+i;
        System.out.println("Sender1 : " + context);
        this.rabbitTemplate.convertAndSend("neo", context);
    }
}

再建立兩個消息的消費者

 1 @Component
 2 @RabbitListener(queues = "neo")
 3 public class NeoReceiver1 {
 4     @RabbitHandler
 5     public void process(String neo) {
 6         System.out.println("Receiver 1: " + neo);
 7     }
 8 }
 9 
10 
11 
12 @Component
13 @RabbitListener(queues = "neo")
14 public class NeoReceiver2 {
15     @RabbitHandler
16     public void process(String neo) {
17         System.out.println("Receiver 2: " + neo);
18     }
19 
20 }

咱們寫一個測試用例

@Test
    public void oneToMany() throws Exception {
        for (int i=0;i<100;i++){
           // Thread.sleep(10);
            neoSender.send(i);
        }
    }

運行

 

能夠看到消息均勻的被兩個消費者消費了。

經過這個例子咱們能夠看作高併發狀況下的消息產生和消費,這會產生一個消息丟失的問題。萬一客戶端在處理消息的時候掛了,那這條消息就至關於被浪費了,針對這種狀況,rabbitmq推出了消息ack機制,熟悉tcp三次握手的必定不會陌生。

咱們看看springboot是實現ack的

很簡單,在咱們的配置類中,配置一個新的消費者,將原先的消費者先都去掉:

@Bean
    public SimpleMessageListenerContainer messageContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(Queue());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//消息確認後才能刪除
        container.setPrefetchCount(5);//每次處理5條消息
        container.setMessageListener(new ChannelAwareMessageListener() {

            public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("消費端接收到消息 : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        });
        return container;
    }

 

但這裏會有個問題,test模式下消息發送完畢系統就會直接shutdown,因此只能消費部分消息,不過等真正啓動項目,這個問題就不存在了。

3.發佈訂閱模式

生產者將消息不是直接發送到隊列,而是發送到X交換機,而後由交換機發送給兩個隊列,兩個消費者各自監聽一個隊列,來消費消息。

這種方式實現同一個消息被多個消費者消費。工做模式是同一個消息只能有一個消費者。

咱們新建三個隊列

@Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

再新建一個交換機

@Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

再把這些隊列綁定到交換機上去

@Bean
    Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

基本的配置完成後,再新建一個消息生產者

@Component
public class FanoutSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    }
}

一樣的,咱們再新建三個消息消費者

 1 @Component
 2 @RabbitListener(queues = "fanout.A")
 3 public class FanoutReceiveA {
 4 
 5     @RabbitHandler
 6     public void process(String message) {
 7         System.out.println("fanout Receiver A  : " + message);
 8     }
 9 }
10 
11 @Component
12 @RabbitListener(queues = "fanout.B")
13 public class FanoutReceiverB {
14     @RabbitHandler
15     public void process(String message) {
16         System.out.println("fanout Receiver B: " + message);
17     }
18 }
19 
20 @Component
21 @RabbitListener(queues = "fanout.C")
22 public class FanoutReceiverC {
23     @RabbitHandler
24     public void process(String message) {
25         System.out.println("fanout Receiver C: " + message);
26     }
27 }

三個消費者分別監聽3個隊列的內容

新建一個測試用例:

@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutTest {
    @Autowired
    private FanoutSender fanoutSender;

    @Test
    public void setFanoutSender(){
        fanoutSender.send();
    }

}

 

 三個隊列都接受到了消息

4:路由模式

須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配,這是一個完整的匹配。

 

 

5.主題模式

發送端不僅按固定的routing key發送消息,而是按字符串匹配發送,接收端一樣如此

符號#匹配一個或多個詞,符號*匹配很少很多一個詞。

 

4/5二者模式很類似,咱們放在一塊兒演示

 

新建兩個隊列

final static String message = "topic.A";
    final static String messages = "topic.B";


    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

新建一個交換機

@Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

綁定隊列到交換機上,路由模式,須要完整匹配topic.message,才能接受

@Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

topic模式,前綴匹配到topic.便可接受

@Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

咱們新建三個消息生產者

@Component
public class TopicSend {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, i am message all";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);
    }

    public void send1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }
}

send的key是topic.1  send1的key是topic.message,send2的key是topic.messages

因此理論上send會被兩個隊列消費,1.2都應該只有一個隊列消費

咱們再新建兩個消費者

@Component
@RabbitListener(queues = "topic.A")
public class TopicReceiver {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver1  : " + message);
    }

}

@Component
@RabbitListener(queues = "topic.B")
public class TopicReceiver2 {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver2  : " + message);
    }
}

寫三個測試用例

@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicTest {
    @Autowired
    private TopicSend sender;

    @Test
    public void topic() throws Exception {
        sender.send();
    }

    @Test
    public void topic1() throws Exception {
        sender.send1();
    }

    @Test
    public void topic2() throws Exception {
        sender.send2();
    }

}

send的運行結果

send1的運行結果

send2的運行結果

 

結果符合預期。 

相關文章
相關標籤/搜索