SpringBoot+RabbitMq的幾種姿式

前言

目前主流的消息中間件有activemq,rabbitmq,rocketmq,kafka,咱們要根據實際的業務場景來選擇一款合適的消息中間件,關注的主要指標有,消息投遞的可靠性,可維護性,吞吐量以及中間件的特點等重要指標來選擇,大數據領域確定是kafka,那麼傳統的業務場景就是解耦,異步,削峯。那麼就在剩下的3款產品中選擇一款,從吞吐量,社區的活躍度,消息的可靠性出發,通常的中小型公司選擇rabbitmq來講可能更爲合適。那麼咱們就來看看如何使用它吧。java

環境準備

本案例基於springboot集成rabbitmq,本案例主要側重要實際的code,對於基礎理論知識請自行百度。
jdk-version:1.8
rabbitmq-version:3.7
springboot-version:2.1.4.RELEASEspring

  • pom文件
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
複製代碼
  • yml配置文件
spring:
  rabbitmq:
    password: guest
    username: guest
    port: 5672
    addresses: 127.0.0.1
    #開啓發送失敗返回
    publisher-returns: true
    #開啓發送確認
    publisher-confirms: true
    listener:
      simple:
        #指定最小的消費者數量.
        concurrency: 2
        #指定最大的消費者數量.
        max-concurrency: 2
        #開啓ack
        acknowledge-mode: auto
      #開啓ack
      direct:
        acknowledge-mode: auto
    #支持消息的確認與返回
    template:
      mandatory: true
複製代碼

配置rabbitMq的姿式

  • 姿式一

基於javaconfigspringboot

package com.lly.order.message;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName RabbitMqConfig
 * @Description rabbitMq配置類
 * @Author lly
 * @Date 2019-05-13 15:05
 * @Version 1.0
 **/
@Configuration
public class RabbitMqConfig {

    public final static String DIRECT_QUEUE = "directQueue";
    public final static String TOPIC_QUEUE_ONE = "topic_queue_one";
    public final static String TOPIC_QUEUE_TWO = "topic_queue_two";
    public final static String FANOUT_QUEUE_ONE = "fanout_queue_one";
    public final static String FANOUT_QUEUE_TWO = "fanout_queue_two";

    public final static String TOPIC_EXCHANGE = "topic_exchange";
    public final static String FANOUT_EXCHANGE = "fanout_exchange";

    public final static String TOPIC_ROUTINGKEY_ONE = "common_key";
    public final static String TOPIC_ROUTINGKEY_TWO = "*.key";

//  direct模式隊列
    @Bean
    public Queue directQueue() {
        return new Queue(DIRECT_QUEUE, true);
    }
//  topic 訂閱者模式隊列
    @Bean
    public Queue topicQueueOne() {
        return new Queue(TOPIC_QUEUE_ONE, true);
    }
    @Bean
    public Queue topicQueueTwo() {
        return new Queue(TOPIC_QUEUE_TWO, true);
    }
//  fanout 廣播者模式隊列
    @Bean
    public Queue fanoutQueueOne() {
        return new Queue(FANOUT_QUEUE_ONE, true);
    }
    @Bean
    public Queue fanoutQueueTwo() {
        return new Queue(FANOUT_QUEUE_TWO, true);
    }
//  topic 交換器
    @Bean
    public TopicExchange topExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }
//  fanout 交換器
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

//   訂閱者模式綁定
    @Bean
    public Binding topExchangeBingingOne() {
        return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE);
    }

    @Bean
    public Binding topicExchangeBingingTwo() {
        return BindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO);
    }
//   廣播模式綁定
    @Bean
    public Binding fanoutExchangeBingingOne() {
        return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutExchangeBingingTwo() {
        return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
    }
}

複製代碼
  • 姿式二

基於註解bash

package com.lly.order.message;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalTime;
import java.util.UUID;


/**
 * @ClassName MQTest
 * @Description 消息隊列測試
 * @Author lly
 * @Date 2019-05-13 10:50
 * @Version 1.0
 **/
@Component
@Slf4j
public class MQTest implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    private final static String QUEUE = "test_queue";

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public MQTest(RabbitTemplate rabbitTemplate) {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    public void sendMq() {
        rabbitTemplate.convertAndSend("test_queue", "test_queue" + LocalTime.now());
        log.info("發送消息:{}", "test_queue" + LocalTime.now());
    }


    public void sendMqRabbit() {
        //回調id
        CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
//        rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", "廣播者模式測試",cId);
        Object object = rabbitTemplate.convertSendAndReceive(RabbitMqConfig.FANOUT_EXCHANGE, "", "廣播者模式測試", cId);
        log.info("發送消息:{},object:{}", "廣播者模式測試" + LocalTime.now(), object);
    }

    //發送訂閱者模式
    public void sendMqExchange() {
        CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
        CorrelationData cId01 = new CorrelationData(UUID.randomUUID().toString());
        log.info("訂閱者模式->發送消息:routing_key_one");
        rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_one", "routing_key_one" + LocalTime.now(), cId);
        log.info("訂閱者模式->發送消息routing_key_two");
        rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_two", "routing_key_two" + LocalTime.now(), cId01);
    }
    //若是不存在,自動建立隊列
    @RabbitListener(queuesToDeclare = @Queue("test_queue"))
    public void receiverMq(String msg) {
        log.info("接收到隊列消息:{}", msg);
    }
      //若是不存在,自動建立隊列和交換器而且綁定
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = "topic_queue01", durable = "true"),
                    exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
                    key = "routing_key_one")})
    public void receiverMqExchage(String msg, Channel channel, Message message) throws IOException {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            log.info("接收到topic_routing_key_one消息:{}", msg);
            //發生異常
            log.error("發生異常");
            int i = 1 / 0;
            //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉 這樣之後就不會再發了 不然消息服務器覺得這條消息沒處理掉 後續還會在發
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("接收消息失敗,從新放回隊列");
            //requeu,爲true,表明從新放入隊列屢次失敗從新放回會致使隊列堵塞或死循環問題,
            // 解決方案,剔除此消息,而後記錄到db中去補償
            //channel.basicNack(deliveryTag, false, true);
            //拒絕消息
            //channel.basicReject(deliveryTag, true);
        }
    }

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = "topic_queue02", durable = "true"),
                    exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
                    key = "routing_key_two")})
    public void receiverMqExchageTwo(String msg) {
        log.info("接收到topic_routing_key_two消息:{}", msg);
    }


    @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_ONE)
    public void receiverMqFanout(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("接收到隊列fanout_queue_one消息:{}", msg);
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            e.printStackTrace();
            //屢次失敗從新放回會致使隊列堵塞或死循環問題 丟棄這條消息
//            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.error("接收消息失敗");
        }
    }

    @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_TWO)
    public void receiverMqFanoutTwo(String msg) {
        log.info("接收到隊列fanout_queue_two消息:{}", msg);
    }

    /**
     * @return
     * @Author lly
     * @Description 確認消息是否發送到exchange
     * @Date 2019-05-14 15:36
     * @Param [correlationData, ack, cause]
     **/
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("消息惟一標識id:{}", correlationData);
        log.info("消息確認結果!");
        log.error("消息失敗緣由,cause:{}", cause);
    }
    /**
     * @return
     * @Author lly
     * @Description 消息消費發生異常時返回
     * @Date 2019-05-14 16:22
     * @Param [message, replyCode, replyText, exchange, routingKey]
     **/
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息發送失敗id:{}", message.getMessageProperties().getCorrelationId());
        log.info("消息主體 message : ", message);
        log.info("消息主體 message : ", replyCode);
        log.info("描述:" + replyText);
        log.info("消息使用的交換器 exchange : ", exchange);
        log.info("消息使用的路由鍵 routing : ", routingKey);
    }
}

複製代碼

rabbitMq消息確認的三種方式

# 發送消息後直接確認消息
acknowledge-mode:none
# 根據消息消費的狀況,智能斷定消息的確認狀況
acknowledge-mode:auto
# 手動確認消息的狀況
acknowledge-mode:manual
複製代碼

咱們以topic模式來試驗下消息的ack 服務器

自動確認消息模式
手動確認消息模式

而後咱們再次消費消息,發現消息是沒有被確認的,因此能夠被再次消費

發現一樣的消息仍是存在的沒有被隊列刪除,必須手動去ack,咱們修改隊列1的手動ack看看效果

channel.basicAck(deliveryTag, false);
複製代碼

重啓項目再次消費消息dom

再次查看隊列裏的消息,發現隊列01裏的消息被刪除了,隊列02的仍是存在。

消費消息發生異常的狀況,修改代碼 模擬發生異常的狀況下發生了什麼, 異常發生了,消息被重放進了隊列 異步

可是會致使消息不停的循環消費,而後失敗,致死循環調用大量服務器資源
因此咱們正確的處理方式是,發生異常,將消息記錄到db,再經過補償機制來補償消息,或者記錄消息的重複次數,進行重試,超過幾回後再放到db中。

總結

經過實際的code咱們瞭解的rabbitmq在項目的具體的整合狀況,消息ack的幾種狀況,方便在實際的場景中選擇合適的方案來使用。若有不足,還望不吝賜教。ide

相關文章
相關標籤/搜索