SpringBoot集成RabbitMQ

原文:https://blog.csdn.net/linpeng_1/article/details/80505828html

AmqpTemplate,RabbitTemplate

Spring AMQP提供了一個發送和接收消息的操做模板類AmqpTemplate。 AmqpTemplate它定義包含了發送和接收消息等的一些基本的操做功能。RabbitTemplate是AmqpTemplate的一個實現。java

RabbitTemplate支持消息的確認與返回,爲了返回消息,RabbitTemplate 須要設置mandatory 屬性爲true,而且CachingConnectionFactory 的publisherReturns屬性也須要設置爲true。返回的消息會根據它註冊的RabbitTemplate.ReturnCallback setReturnCallback 回調發送到給客戶端,spring

一個RabbitTemplate僅能支持一個ReturnCallback 。數據庫

爲了確認Confirms消息, CachingConnectionFactory 的publisherConfirms 屬性也須要設置爲true,確認的消息會根據它註冊的RabbitTemplate.ConfirmCallback setConfirmCallback回調發送到給客戶端。一個RabbitTemplate也僅能支持一個ConfirmCallback.json

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

SpringBoot集成RabbitMQ

server.port=8083
#服務器配置
spring.application.name=rabbitmq-hello-sending
#rabbitmq鏈接參數
spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3 #集羣或單機 均可配置
spring.rabbitmq.username=linpeng
spring.rabbitmq.password=123456

# rabbitmq服務器的虛擬主機名,能夠在後臺管理系統上查看和新建
spring.rabbitmq.virtual-host=/test
# 鏈接超時
spring.rabbitmq.connection-timeout=5s

# 發送方
# 開啓發送確認(未到達MQ服務器)
spring.rabbitmq.publisher-confirms=true
# 開啓發送失敗退回(未找到對應queue)
spring.rabbitmq.publisher-returns=true

# 消費方 開啓手動ACK(坑:當序列化爲JSON時,此配置會失效,見下文)
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

# 消費方
spring.rabbitmq.listener.concurrency=2  //最小消息監聽線程數
spring.rabbitmq.listener.max-concurrency=2 //最大消息監聽線程數

#消費者每次從隊列獲取的消息數量 (默認一次250個)
#經過查看後臺管理器中queue的unacked數量
spring.rabbitmq.listener.simple.prefetch= 5

#消費者自動啓動
spring.rabbitmq.listener.simple.auto-startup=true

#消費失敗,自動從新入隊
spring.rabbitmq.listener.simple.default-requeue-rejected= true

#啓用發送重試
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0

RabbitTemplate

默認一個RabbitTemplate在RabbitMQ中至關於一個connection,每發送一次消息至關於channel,MQ接收消息後釋放channel。每一個connection最多支持2048個channel,加入從一個connection同時超過2048個線程併發發送,channel超過2048,會報錯org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。數組

測試啓動publisher-confirms後,400個線程經過一個RabbitTemplate併發發送10000消息,同時就可能產生1000左右的channel。由於channel等在confirm。10000消息所有發送在幾秒內完成,10000消息所有confirm回調完成用時22秒。緩存

後臺管理頁面查看connection+channel
springboot

此connection中有10個線程併發發送消息,監控到10個channel生成,MQ完成接收後釋放channel。若是是publisher-confirms模式,channel會保持到confirm回調完成再釋放,影響併發性能。每一個connection最多支持2048個channel。bash

測試啓動publisher-confirms後,500個線程併發發送,部分消息報AmqpResourceNotAvailableException。400個線程經過一個RabbitTemplate併發發送10000消息,最高同時就可能產生1000多的channel。由於channel在等待執行confirm回調。10000消息所有發送在幾秒內完成,10000消息所有confirm回調完成用時22秒,此時全部channel所有釋放。服務器

 

綁定隊列

若在rabbitmq的管理頁面手動建立隊列和交換機,則能夠再也不代碼中聲明 

package com.example.demo;
 
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitConfig {
 
    @Bean
    public Queue QueueA() {
        return new Queue("hello");
    }
 
    @Bean
    public Queue QueueB() {
        return new Queue("helloObj");
    }
 
    /**
     * Fanout 就是咱們熟悉的廣播模式或者訂閱模式,給Fanout交換機發送消息,綁定了這個交換機的全部隊列都收到這個消息。
     * @return
     */
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("ABExchange");
    }

    @Bean
    DirectExchange Exchange() {
        return new DirectExchange("DExchange");
    }
  
    @Bean
    Binding bindingExchangeA(Queue QueueA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(QueueA).to(fanoutExchange);
    }
 
    @Bean
    Binding bindingExchangeB(Queue QueueB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(QueueB).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchange() {
        return BindingBuilder.bind(QueueA()).to(Exchange()).with("TEST");//routingKey
    }
}

 

消息發送者 

ConfirmCallback :ACK=true僅僅標示消息已被Broker接收到,並不表示已成功投放至消息隊列中, ACK=false標示消息因爲Broker處理錯誤,消息並未處理成功。如未找到對應交換機返回ACK=false。

ReturnCallback:當消息發送出去找不到對應路由隊列時,將會把消息退回 。若是有任何一個路由隊列接收投遞消息成功,則不會退回消息。MQ成功接收,可是未找到對應隊列觸發

經過以上異步確認機制,增長降級、補償處理。好比發送時保存信息和消息ID,ConfirmCallback 經過ID找到對應信息重發,注意要保證冪等性

package com.example.demo;
 
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
 
import java.util.Date;
//RabbitTemplate.ConfirmCallback
@Service
public class HelloSender implements RabbitTemplate.ReturnCallback {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        String context = "你好如今是 " + new Date() +"";
        System.out.println("HelloSender發送內容 : " + context);

        //消息序列化設置
        //rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        
        //自身實現ReturnCallback接口 設置異步回調對象爲this
        this.rabbitTemplate.setReturnCallback(this);

        
        //如果當前類實現RabbitTemplate.ConfirmCallback接口,則能夠設置爲this
        //發送前給RabbitTemplate設置一個異步回調對象  RabbitTemplate.ConfirmCallback接口的匿名類
        this.rabbitTemplate.setConfirmCallback((correlationData, confirm, cause) -> {
        //若發送時沒有CorrelationData,則這裏correlationData==null
            if (!confirm) {
                System.out.println("HelloSender消息發送失敗" + cause + correlationData.getId() );
               //correlationData.getReturnedMessage(); Message
               //correlationData.toString();
            } else {
                System.out.println("HelloSender 消息發送成功 ");
            }
        });
        //this.rabbitTemplate.setConfirmCallback(this);
        //rabbitTemplate.convertAndSend("hello", context);

        //這裏指定路由鍵,注意不是隊列名
        //發送時 能夠指定消息ID,方便在ConfirmCallback時候二次處理消息
        rabbitTemplate.convertAndSend("DExchange","QueueRoutingKey", context, new CorrelationData("自定義消息ID"));
    }
 
    public void sendObj() {
       MessageObj obj = new MessageObj();
       obj.setACK(false);
       obj.setId(123);
       obj.setName("zhangsan");
       obj.setValue("data");
       System.out.println("發送 : " + obj);
       this.rabbitTemplate.convertAndSend("helloObj", obj);
    }
 
    @Override
    public void returnedMessage(Message message, int i, String cause, String exchange, String queue) {
        //沒有找到queue
        //Message中的成員,Body爲消息內容   
       //(Body:'hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
    }
 
//    @Override
//    public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
//        System.out.println("sender success");
//    }
 
}

測試發送:

使用Spring默認的rabbitTemplate發送消息,CorrelationData能夠重複。

交換機+路由鍵+消息Object+CorrelationData

rabbitTemplate.convertAndSend("TEST.EX","TEST","String:message",new CorrelationData("111"));

在rabbitmq控制檯上getmessage查看 ,rabbitTemplate默認發送deliverymode=2消息,已經設置了消息持久化。

測試速度:

測試100個線程同時併發向同一隊列發送簡單消息(15左右長度的字符串)。從發送到100個消息所有完成ConfirmCallback,用時爲600ms左右。此過程不計入消費速度。

400個線程經過一個RabbitTemplate併發發送10000消息,同時就可能產生1000左右的channel。由於channel等在confirm。10000消息所有發送在幾秒內完成,10000消息所有confirm回調完成用時22秒。

 

測試ConfirmCallback回調:

public void confirm(CorrelationData correlationData, boolean confirm, String cause) ;

confirm==true僅僅標示消息已被Broker接收到,並不表示已成功投放至消息隊列中, confirm==false標示消息因爲Broker處理錯誤,消息並未處理成功。如未找到對應交換機返回confirm==false。

在此方法中針對confirm==false的消息實現降級/補償處理:重發、本地緩存、計入數據庫/Redis等、更新狀態.....

測試環境:實例化一個ConfirmCallback接口對象,做爲rabbitTemplate共用回調處理對象。

回調測試結果:

      1 先發送到MQ的消息,先完成confirm回調。

      2 ConfirmCallback默認是由同一個線程執行回調,打印線程名能夠看到線程名爲【AMQP Connection rabbitmqIp:port】

      3 若發送時沒有攜帶CorrelationData,回調時這裏correlationData==null

      4.設置消息確認會影響併發性能,每一個線程發送生成一個channel,channel會保持到confirm回調完成再釋放。由於每一個connection最多支持2048個channel,當channel達到2048時,會報錯org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later

 

測試ReturnCallback 回調:

 public void returnedMessage(Message message, int i, String cause, String exchange, String queue) ;

MQ成功接收消息,可是未找到對應路由鍵的隊列後回調。實現降級/補償處理。

測試環境:實例化一個ReturnCallback接口對象,做爲rabbitTemplate共用回調處理對象。

回調測試結果:

默認是由同一個線程執行回調,打印線程名能夠看到線程名爲【AMQP Connection rabbitmqIp:port】

message=返回的Message對象中的成員,Body爲發送時的消息內容   ,receivedDeliveryMode=PERSISTENT=2 爲持久化消息。spring_returned_message_correlation=發送時的CorrelationData
(Body:'String:message' MessageProperties [headers={spring_returned_message_correlation=111}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])

cause=NO_ROUTE

exchange、queue 爲發送時的配置

 

消息消費者

設置QOS,避免觸發流控機制

#消費者每次從隊列獲取的消息數量 (默認一次250個)
spring.rabbitmq.listener.simple.prefetch= 5

當QUEUE達到5條Unacked消息時,不會再推送消息給Consumer。查看後臺管理器中queue的unacked數量

package com.example.demo;
 
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
 
import java.io.IOException;
import java.util.Date;
import java.util.Map;
 
@Component
public class HelloReceiver {
 
    @RabbitListener(queues = "hello") //這裏是隊列名,不是路由鍵
    public void process(String msg,Channel channel, Message message) throws IOException {
        System.out.println("HelloReceiver收到  : " + msg +"收到時間"+new Date());
        try {
            //告訴MQ服務器收到這條消息 已經被我消費了 能夠在隊列刪掉 這樣之後就不會再發了 不然消息服務器覺得這條消息沒處理掉 後續還會在發
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("receiver success");
        } catch (IOException e) {
            e.printStackTrace();
            //丟棄這條消息
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            System.out.println("receiver fail");
        }
 
    }
}

msg是消息內容,至關於Message對象中的body。

Message對象的成員:

能夠看到有消息信息BODY,發送方生成的消息CorrelationData,還有執行的Method對象(@RabbitListener標註的方法),目標BEAN

 

備註:咱們用註解的方式來接受消息 就不要用 本身建立對象實現ChannelAwareMessageListener的方式來接受消息 這種方式還要去全局裏面配置麻煩,直接用@RabbitListener(queues = "hello")最簡單

消息確認  由於我在屬性配置文件裏面開啓了ACK確認 因此若是代碼沒有執行ACK確認 你在RabbitMQ的後臺會看到消息會一直留在隊列裏面未消費掉 只要程序一啓動開始接受該隊列消息的時候 又會收到

方法參數詳解:http://www.javashuo.com/article/p-dwmtzvog-ey.html

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

deliveryTag:該消息的index,由發送方生成
multiple:是否批量.true:將一次性ack全部小於deliveryTag的消息。

 

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

deliveryTag:該消息的index
multiple:是否批量.true:將一次性拒絕全部小於deliveryTag的消息。
requeue:被拒絕的是否從新入隊列,true 放在隊首,false 消息進入綁定的DLX。必定注意:若此消息一直Nack重入隊會致使的死循環

channel.basicNack 與 channel.basicReject 的區別在於basicNack能夠拒絕多條消息,而basicReject一次只能拒絕一條消息

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

deliveryTag:該消息的index
requeue:被拒絕的是否從新入隊列。false 消息進入綁定的DLX

ShutdownSignalException

1 隊列名找不到

2 代碼中有ack,可是沒有配置手動ACK

 

消費超時

消費超時,queue中unacked的消息會退回到queue中,且消費者ACK時會失敗。

 

使用@Payload和@Headers註解

@Component
public class MessageHandler {

    //獲取消息的頭屬性和body屬性
    @RabbitListener(queues = "zhihao.miao.order")
    public void handleMessage(@Payload String body, @Headers Map<String,Object> headers){
        System.out.println("====消費消息===handleMessage");
        System.out.println(headers);
        System.out.println(body);
    }
}

 

@RabbitListener 和 @RabbitHandler 搭配使用

  • @RabbitListener 能夠標註在類上面,需配合 @RabbitHandler 註解一塊兒使用
  • @RabbitListener 標註在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪一個方法處理,根據 MessageConverter 轉換後的參數類型
@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {

    @RabbitHandler
    public void processMessage1(String message) {
        System.out.println(message);
    }

    @RabbitHandler
    public void processMessage2(byte[] message) {
        System.out.println(new String(message));
    }
    
}

 

 

序列化

當中默認的序列化類爲SimpleMessageConverter。

僅僅有調用了convertAndSend方法纔會使用對應的MessageConvert進行消息的序列化與反序列化。

SimpleMessageConverter對於要發送的消息體body爲字節數組時。不進行處理。

對於假設是String。則將String轉成字節數組。

對於假設是Java對象,則使用jdk序列化Serializable將消息轉成字節數組。轉出來的結果較大,含class類名。類對應方法等信息。所以性能較差。

當使用RabbitMq做爲中間件時,數據量比較大,此時就要考慮使用相似Jackson2JsonMessageConverter。hessian等序列化形式。以此提升性能。

 

使用 JSON 序列化與反序列化

https://www.jianshu.com/p/911d987b5f11

發送

@Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());        
        return rabbitTemplate;
    }
User user = new User("linyuan");
rabbitTemplate.convertAndSend("topic.exchange","queue1",user);

接收

@Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//json序列化時,若想手動ACK,則必須配置
        return factory;
    }
@Component
@RabbitListener(queues = "queue1")
public class Receiver {

    @RabbitHandler
    public void processMessage1(@Payload User user) {
        System.out.println(user.getName());
    }
}

消費者+json反序列化 形成手動ACK配置失效

解決方案: https://blog.csdn.net/m912595719/article/details/83787486

這是springboot集成RabbitMQ的一個大坑。當消費者配置JSON反序列化時,配置文件中的手動ACK會失效,消費者會變成自動ACK模式。spring.rabbitmq.listener.direct.acknowledge-mode=manual,spring.rabbitmq.listener.simple.acknowledge-mode=manual  配置失效。

解決方法是消費者配置RabbitListenerContainerFactory這個Bean時(見上),設置factory.setAcknowledgeMode(AcknowledgeMode.MANUAL)。把消費者強制轉換爲手動ACK。

若是配置失效切換爲自動ACK,可是代碼中又使用channel.basicAck手動ACK。這樣會形成雙ACK的ERROR,接着信道會重啓重連。以下:

o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

unknown delivery tag 1表示當前Channel中找不到delivery-tag=1的消息,實際上是這個消息已經自動ACK了,basicAck時就會出錯。測試顯示,消息並不會丟失而是在出現ERROR異常後走向Nack後從新入隊,再屢次重複消費後最終ACK成功,嚴重下降消費者的執行效率。

 

Delivery Tags投遞的標識

當一個消費者向RabbitMQ註冊後,RabbitMQ會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它在一個channel中惟一表明瞭一次投遞。delivery tag的惟一標識範圍限於channel. delivery tag是單調遞增的正整數,客戶端獲取投遞的方法用用dellivery tag做爲一個參數。

 

 

TestController測試

@Autowired
    private HelloSender helloSender;
 
    /**
     * 單生產者-單個消費者
     */
    @RequestMapping("/test")
    public void hello() throws Exception {
        helloSender.send();
    }

 發送消息

ACK場景測試

咱們把HelloReceiver的ACK確認代碼註釋掉 ,那消息就算程序收到了, 可是未確認ACK致使消息服務器覺得他是未成功消費的,若此時消費者斷開則消息返回隊列,後續還會再發。

 

相關文章
相關標籤/搜索