3、springBoot對接rabbitMq

前情提要:rabbitmq 管理界面查看姿式java

1、快速搭建/基本信息發送和消費git

一、引入依賴web

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二、application.ymlspring

spring:
  rabbitmq:
    host: ipXXX
    port: 5672
    username: 帳戶XXX
    password: 密碼XXX
    virtual-host: /wen  # 交換器名稱

以 direct模式爲例數據庫

一、配置文件

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class RabbitConfig {
    //隊列 起名:TestDirectQueue
    @Bean
    public Queue emailQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啓時仍然存在,暫存隊列:當前鏈接有效
        // exclusive:默認也是false,只能被當前建立的鏈接使用,並且當鏈接關閉後隊列即被刪除。此參考優先級高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //通常設置一下隊列的持久化就好,其他兩個就是默認false
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue smsQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啓時仍然存在,暫存隊列:當前鏈接有效
        // exclusive:默認也是false,只能被當前建立的鏈接使用,並且當鏈接關閉後隊列即被刪除。此參考優先級高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //通常設置一下隊列的持久化就好,其他兩個就是默認false
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue weixinQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啓時仍然存在,暫存隊列:當前鏈接有效
        // exclusive:默認也是false,只能被當前建立的鏈接使用,並且當鏈接關閉後隊列即被刪除。此參考優先級高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //通常設置一下隊列的持久化就好,其他兩個就是默認false
        return new Queue("weixin.fanout.queue", true);
    }
    @Bean
    public Queue TTLQueue() {
        Map<String, Object> map = new HashMap<>(16);
        map.put("x-message-ttl", 30000); // 隊列中的消息未被消費則30秒後過時
        return new Queue("TTL_QUEUE", true, false, false, map);
    }

    @Bean
    public DirectExchange TTLExchange() {
        return new DirectExchange("TTL_EXCHANGE", true, false);
    }


    //Direct交換機 起名:TestDirectExchange
    @Bean
    public DirectExchange fanoutOrderExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("fanout_exchange", true, false);
    }
    //綁定  將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");
    }

    @Bean
    public Binding bindingDirect1() {
        return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect2() {
        return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect3() {
        return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");
    }
}


二、生產者
package com.pit.barberShop.common.MQ.Rabbit.fanout;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author :wenye
 * @date :Created in 2021/6/15 21:41
 * @description:廣播模式
 * @version: $
 */
@RestController
@RequestMapping("/rabbitmq")
public class ProducerFanout {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 1: 定義交換機
    private String exchangeName = "fanout_exchange";
    // 2: 路由key
    private String routeKey = "";

    @RequestMapping("/fanout")
    public void markerFanout() {
        String message ="shua";
        // 發送消息
        rabbitTemplate.convertAndSend(exchangeName, routeKey, message);
    }

    @RequestMapping("/ttl")
    public String testTTL() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("20000"); // 設置過時時間,單位:毫秒
        byte[] msgBytes = "測試消息自動過時".getBytes();
        Message message = new Message(msgBytes, messageProperties);
        rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message);
        return "ok";
    }
}

三、消費者

package com.pit.barberShop.common.MQ.Rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
 * @author :wenye
 * @date :Created in 2021/6/15 22:07
 * @description:fanout消費者
 * @version: $
 */
@Component
public class ConsumerFanout {

    @RabbitListener(bindings =@QueueBinding(
            // email.fanout.queue 是隊列名字,這個名字你能夠自定隨便定義。
            value = @Queue(value = "sms.fanout.queue",autoDelete = "false"),
            // order.fanout 交換機的名字 必須和生產者保持一致
            exchange = @Exchange(value = "fanout_exchange",
                    // 這裏是肯定的rabbitmq模式是:fanout 是以廣播模式 、 發佈訂閱模式
                    type = ExchangeTypes.DIRECT)
    ))
    public void messagerevice(String message){
        // 此處省略發郵件的邏輯
        System.out.println("sms-two111------------->" + message);
    }


    @RabbitListener(bindings =@QueueBinding(
            // email.fanout.queue 是隊列名字,這個名字你能夠自定隨便定義。
            value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"),
            // order.fanout 交換機的名字 必須和生產者保持一致
            exchange = @Exchange(value = "fanout_exchange",
                    // 這裏是肯定的rabbitmq模式是:fanout 是以廣播模式 、 發佈訂閱模式
                    type = ExchangeTypes.DIRECT)
    ))
    public void messageWXrevice(String message){
        // 此處省略發郵件的邏輯
        System.out.println("weixin----two---------->" + message);
    }
}

2、過時時間緩存

一、生產者發送消息時設置過時時間

    public String testTTL() {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("20000"); // 設置過時時間,單位:毫秒
        byte[] msgBytes = "測試消息自動過時".getBytes();
        Message message = new Message(msgBytes, messageProperties);
        rabbitTemplate.convertAndSend("TTL_EXCHANGE", "", message);
        return "ok";
    }

二、隊列中的全部消息設置過時時間

配置中添加
@Bean
    public Queue TTLQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 30000); // 隊列中的消息未被消費則30秒後過時
        return new Queue("TTL_QUEUE", true, false, false, map);
    }

  @Bean
    public Queue TTLQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 30000); // 隊列中的消息未被消費則30秒後過時
        return new Queue("TTL_QUEUE", true, false, false, map);
    }

    @Bean
    public DirectExchange TTLExchange() {
        return new DirectExchange("TTL_EXCHANGE", true, false);
    }

    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");
    }

3、消息確認機制配置
參考:https://blog.csdn.net/qq33098...
默認是自動應答app

spring:
  rabbitmq:
    # 開啓發送確認
    publisher-confirms: true
    # 開啓發送失敗退回
    publisher-returns: true
目前回調存在ConfirmCallback和ReturnCallback二者。他們的區別在於
若是消息沒有到exchange,則ConfirmCallback回調,ack=false,
若是消息到達exchange,則ConfirmCallback回調,ack=true

exchange到queue成功,則不回調ReturnCallback
rabbitMQ 消息生產者發送消息的流程
輸入圖片說明less

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    /**
    * correlationData:對象內部只有一個 id 屬性,用來表示當前消息的惟一性。
    * ack:消息投遞到broker 的狀態,true表示成功。
    * cause:表示投遞失敗的緣由。
    **/
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause){
        if (!ack) {
            log.error("消息發送異常!");
        } else {
            log.info("發送者爸爸已經收到確認,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }

    }


}

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
    //重寫 returnedMessage() 方法,方法有五個參數message(消息體)、replyCode(響應code)、replyText(響應內容)、exchange(交換機)、routingKey路由(隊列)
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

配置文件
一、防止重複簽發ack須要在配置類中重寫
  @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //此處也設置爲手動ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

二、從新建立設置交換器和隊列屬性
@Bean
   public Queue chongfuQueue() {
    // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啓時仍然存在,暫存隊列:當前鏈接有效
    // exclusive:默認也是false,只能被當前建立的鏈接使用,並且當鏈接關閉後隊列即被刪除。此參考優先級高於durable
    // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
    //   return new Queue("TestDirectQueue",true,true,false);
    //通常設置一下隊列的持久化就好,其他兩個就是默認false
    return new Queue("chongfu.fanout.queue", true);
}

 //Direct交換機 起名:TestDirectExchange
@Bean
public DirectExchange chongfuExchange() {
    //  return new DirectExchange("TestDirectExchange",true,true);
    return new DirectExchange("chongfu_exchange", true, false);
}

@Bean
public Binding bindingDirect4() {
    return BindingBuilder.bind(chongfuQueue()).to(chongfuExchange()).with("");
}


生產者

public void markerchongfu() {
        /**
         * 確保消息發送失敗後能夠從新返回到隊列中
         * 注意:yml須要配置 publisher-returns: true
         */
        rabbitTemplate.setMandatory(true);

        /**
         * 消費者確認收到消息後,手動ack回執回調處理
         */
        rabbitTemplate.setConfirmCallback(confirmCallbackService);

        /**
         * 消息投遞到隊列失敗回調處理
         */
        rabbitTemplate.setReturnCallback(returnCallbackService);

        /**
         * 發送消息
         */
        String s = UUID.randomUUID().toString();
        rabbitTemplate.convertAndSend("chongfu_exchange", routeKey, "帥哥",
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                },
                new CorrelationData(s));
    }

消費者

@RabbitListener(bindings =@QueueBinding(
            // email.fanout.queue 是隊列名字,這個名字你能夠自定隨便定義。
            value = @Queue(value = "chongfu.fanout.queue",autoDelete = "false"),
            // order.fanout 交換機的名字 必須和生產者保持一致
            exchange = @Exchange(value = "chongfu_exchange",
                    // 這裏是肯定的rabbitmq模式是:fanout 是以廣播模式 、 發佈訂閱模式
                    type = ExchangeTypes.DIRECT)
    ))
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {

            log.info("小富收到消息:{}", msg);
//            log.info("序號:{}", message.getMessageProperties().getDeliveryTag());
//            System.out.println(msg);
            //TODO 具體業務
            // 收到消息 basicAck()
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {

            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重複處理失敗,拒絕再次接收...");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
            } else {
                log.error("消息即將再次返回隊列處理...");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
消費消息有三種回執方法

一、basicAckdom

basicAck:表示成功確認,使用此回執方法後,消息會被rabbitmq broker 刪除。

void basicAck(long deliveryTag, boolean multiple) 分佈式

  • deliveryTag:表示消息投遞序號,每次消費消息或者消息從新投遞後,deliveryTag都會增長。手動消息確認模式下,咱們能夠對指定deliveryTag的消息進行ack、nack、reject等操做。
  • multiple:是否批量確認,值爲 true 則會一次性 ack全部小於當前消息 deliveryTag 的消息。

舉個栗子: 假設我先發送三條消息deliveryTag分別是五、六、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag爲8,multiple設置爲 true,會將五、六、七、8的消息所有進行確認。

二、basicNack

basicNack :表示失敗確認,通常在消費消息業務異常時用到此方法,能夠將消息從新投遞入隊列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag:表示消息投遞序號。
  • multiple:是否批量確認。
  • requeue:值爲 true 消息將從新入隊列。

三、basicReject

basicReject:拒絕消息,與basicNack區別在於不能進行批量操做,其餘用法很類似。

void basicReject(long deliveryTag, boolean requeue)

  • deliveryTag:表示消息投遞序號。
  • requeue:值爲 true 消息將從新入隊列。

4、死信隊列

死信隊列其實和普通的隊列沒啥大的區別,都須要建立本身的Queue、Exchange,而後經過RoutingKey綁定到Exchange上去,只不過死信隊列的RoutingKey和Exchange要做爲參數,綁定到正常的隊列上去,一種應用場景是正常隊列裏面的消息被basicNack或者reject時,消息就會被路由到正常隊列綁定的死信隊列中,還有一種還有經常使用的場景就是開啓了自動簽收,而後消費者消費消息時出現異常,超過了重試次數,那麼這條消息也會進入死信隊列,若是配置了話,

輸入圖片說明

例子

//模擬異經常使用的交換器 ,topic交換器會通配符匹配,固然字符串如出一轍也會匹配
    @Bean
    TopicExchange emailExchange() {
        return new TopicExchange("demoTopicExchange");
    }

    //死信隊列
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("demo.dead.letter");
    }
    //死信交換器
    @Bean
    TopicExchange deadLetterExchange() {
        return new TopicExchange("demoDeadLetterTopicExchange");
    }
    //綁定死信隊列
    @Bean
    Binding bindingDeadLetterQueue() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("demo.dead.letter");
    }

生產者
@RequestMapping("/sixin")
    public void sendEmailMessage() {

        CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("demoTopicExchange","demo.email","11",correlationData);
        log.info("---發送 email 消息---{}---messageId---{}","111",correlationData.getId());
    }

消費者

  /**
     * 郵件消費者
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(bindings =@QueueBinding(
            // email.fanout.queue 是隊列名字,這個名字你能夠自定隨便定義。
            value = @Queue(value = "demo.email",autoDelete = "false",
            arguments = {
                    @Argument(name =  "x-dead-letter-exchange", value = "demoDeadLetterTopicExchange"),
                    @Argument(name = "x-dead-letter-routing-key",value = "demo.dead.letter"),
                    @Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long")
            }),
            key = "demo.email",
            // order.fanout 交換機的名字 必須和生產者保持一致
            exchange = @Exchange(value = "demoTopicExchange",
                    // 這裏是肯定的rabbitmq模式是:fanout 是以廣播模式 、 發佈訂閱模式
                    type = ExchangeTypes.TOPIC)
    ))
    public void handleEmailMessage(Message message, Channel channel,String msg) throws IOException {

        try {
            log.info("---接受到消息---{}",msg);
            //主動異常
            int m=1/0;
            //手動簽收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
        catch (Exception e) {
            //異常,ture 從新入隊,或者false,進入死信隊列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

        }
    }

    /**
     * 死信消費者,自動簽收開啓狀態下,超太重試次數,或者手動簽收,reject或者Nack
     * @param message
     */
    @RabbitListener(queues = "demo.dead.letter")
    public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {

        //能夠考慮數據庫記錄,每次進來查數量,達到必定的數量,進行預警,人工介入處理
        log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
        //回覆ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

輸出打印

一樣也可以使用java類配置

@Bean
    public Queue emailQueue() {

        Map<String, Object> arguments = new HashMap<>(2);
        // 綁定死信交換機
        arguments.put("x-dead-letter-exchange", "demoDeadLetterTopicExchange");
        // 綁定死信的路由key
        arguments.put("x-dead-letter-routing-key", "demo.dead.letter");
        arguments.put("x-message-ttl", 3000);

        return new Queue(emailQueue,true,false,false,arguments);
    }

    
    @Bean
    TopicExchange emailExchange() {
        return new TopicExchange(topicExchange);
    }


    @Bean
    Binding bindingEmailQueue() {
        return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
    }

5、持久化機制和內存磁盤監控

一、持久化
RabbitMQ的持久化隊列分爲:

1:隊列持久化
2:消息持久化
3:交換機持久化

不管是持久化的消息仍是非持久化的消息均可以寫入到磁盤中,只不過非持久的是等內存不足的狀況下才會被寫入到磁盤中。

二、內存磁盤監控
輸入圖片說明

6、分佈式事務

7、配置詳解

rabbitmq:
    addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client鏈接到的server的地址,多個以逗號分隔(優先取addresses,而後再取host)
#    port:
    ##集羣配置 addresses之間用逗號隔開
    # addresses: ip:port,ip:port
    password: admin
    username: 123456
    virtual-host: / # 鏈接到rabbitMQ的vhost
    requested-heartbeat: #指定心跳超時,單位秒,0爲不指定;默認60s
    publisher-confirms: #是否啓用 發佈確認
    publisher-reurns: # 是否啓用發佈返回
    connection-timeout: #鏈接超時,單位毫秒,0表示無窮大,不超時
    cache:
      channel.size: # 緩存中保持的channel數量
      channel.checkout-timeout: # 當緩存數量被設置時,從緩存中獲取一個channel的超時時間,單位毫秒;若是爲0,則老是建立一個新channel
      connection.size: # 緩存的鏈接數,只有是CONNECTION模式時生效
      connection.mode: # 鏈接工廠緩存模式:CHANNEL 和 CONNECTION
    listener:
      simple.auto-startup: # 是否啓動時自動啓動容器
      simple.acknowledge-mode: # 表示消息確認方式,其有三種配置方式,分別是none、manual和auto;默認auto
      simple.concurrency: # 最小的消費者數量
      simple.max-concurrency: # 最大的消費者數量
      simple.prefetch: # 指定一個請求能處理多少個消息,若是有事務的話,必須大於等於transaction數量.
      simple.transaction-size: # 指定一個事務處理的消息數量,最好是小於等於prefetch的數量.
      simple.default-requeue-rejected: # 決定被拒絕的消息是否從新入隊;默認是true(與參數acknowledge-mode有關係)
      simple.idle-event-interval: # 多少長時間發佈空閒容器時間,單位毫秒
      simple.retry.enabled: # 監聽重試是否可用
      simple.retry.max-attempts: # 最大重試次數
      simple.retry.initial-interval: # 第一次和第二次嘗試發佈或傳遞消息之間的間隔
      simple.retry.multiplier: # 應用於上一重試間隔的乘數
      simple.retry.max-interval: # 最大重試時間間隔
      simple.retry.stateless: # 重試是有狀態or無狀態
    template:
      mandatory: # 啓用強制信息;默認false
      receive-timeout: # receive() 操做的超時時間
      reply-timeout: # sendAndReceive() 操做的超時時間
      retry.enabled: # 發送重試是否可用
      retry.max-attempts: # 最大重試次數
      retry.initial-interval: # 第一次和第二次嘗試發佈或傳遞消息之間的間隔
      retry.multiplier: # 應用於上一重試間隔的乘數
      retry.max-interval: #最大重試時間間隔
相關文章
相關標籤/搜索