SpringBoot集成RabbitMQ消息隊列搭建與ACK消息確認入門

1.RabbitMQ介紹
RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。RabbitMQ主要是爲了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者沒法快速消費,那麼須要一箇中間層。保存這個數據。java

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。web

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。spring

2.AmqpTemplate,RabbitTemplateapache


Spring AMQP提供了一個發送和接收消息的操做模板類AmqpTemplate。 AmqpTemplate它定義包含了發送和接收消息等的一些基本的操做功能。RabbitTemplate是AmqpTemplate的一個實現。api

RabbitTemplate支持消息的確認與返回,爲了返回消息,RabbitTemplate 須要設置mandatory 屬性爲true,而且CachingConnectionFactory 的publisherReturns屬性也須要設置爲true。返回的消息會根據它註冊的RabbitTemplate.ReturnCallback setReturnCallback 回調發送到給客戶端,安全

一個RabbitTemplate僅能支持一個ReturnCallback 。服務器

爲了確認Confirms消息, CachingConnectionFactory 的publisherConfirms 屬性也須要設置爲true,確認的消息會根據它註冊的RabbitTemplate.ConfirmCallback setConfirmCallback回調發送到給客戶端。一個RabbitTemplate也僅能支持一個ConfirmCallback.app

3.SpringBoot集成RabbitMQmaven

pom分佈式

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
 
    <name>rabbitMQ</name>
    <description>Demo project for Spring Boot</description>
 
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
 
 
</project>

自動配置信息  這裏我開啓ACK消息確認

server.port=8083
#服務器配置
spring.application.name=rabbitmq-hello-sending
#rabbitmq鏈接參數
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=linpeng
spring.rabbitmq.password=123456
# 開啓發送確認
spring.rabbitmq.publisher-confirms=true
# 開啓發送失敗退回
spring.rabbitmq.publisher-returns=true
# 開啓ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
建立消息隊列  隊列名:hello 和 helloObj

package com.example.demo;
 
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitConfig {
 
    @Bean
    public Queue QueueA() {
        return new Queue("hello");
    }
 
    @Bean
    public Queue QueueB() {
        return new Queue("helloObj");
    }
 
    /**
     * Fanout 就是咱們熟悉的廣播模式或者訂閱模式,給Fanout交換機發送消息,綁定了這個交換機的全部隊列都收到這個消息。
     * @return
     */
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("ABExchange");
    }
 
 
    @Bean
    Binding bindingExchangeA(Queue QueueA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(QueueA).to(fanoutExchange);
    }
 
    @Bean
    Binding bindingExchangeB(Queue QueueB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(QueueB).to(fanoutExchange);
    }
}
消息發送者 Sender  使用 RabbitTemplate  不採用 AmqpTemplate

package com.example.demo;
 
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
 
import java.util.Date;
//RabbitTemplate.ConfirmCallback
@Service
public class HelloSender implements RabbitTemplate.ReturnCallback {
 
    @Autowired
//    private AmqpTemplate rabbitTemplate;
    private RabbitTemplate rabbitTemplate;
    public void send() {
        String context = "你好如今是 " + new Date() +"";
        System.out.println("HelloSender發送內容 : " + context);
//        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                System.out.println("HelloSender消息發送失敗" + cause + correlationData.toString());
            } else {
                System.out.println("HelloSender 消息發送成功 ");
            }
        });
        this.rabbitTemplate.convertAndSend("hello", context);
    }
 
    public void sendObj() {
       MessageObj obj = new MessageObj();
       obj.setACK(false);
       obj.setId(123);
       obj.setName("zhangsan");
       obj.setValue("data");
       System.out.println("發送 : " + obj);
       this.rabbitTemplate.convertAndSend("helloObj", obj);
    }
 
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("sender return success" + message.toString()+"==="+i+"==="+s1+"==="+s2);
    }
 
//    @Override
//    public void confirm(CorrelationData correlationData, boolean b, String s) {
//        System.out.println("sender success");
//    }
 
}

消息接受者 Receiver 註解方式接受消息 

package com.example.demo;
 
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
 
import java.io.IOException;
import java.util.Date;
import java.util.Map;
 
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
 
    @RabbitHandler
    public void process(String hello,Channel channel, Message message) throws IOException {
        System.out.println("HelloReceiver收到  : " + hello +"收到時間"+new Date());
        try {
            //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉 這樣之後就不會再發了 不然消息服務器覺得這條消息沒處理掉 後續還會在發
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("receiver success");
        } catch (IOException e) {
            e.printStackTrace();
            //丟棄這條消息
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            System.out.println("receiver fail");
        }
 
    }
}

備註:咱們用註解的方式來接受消息 就不要用 本身建立對象實現ChannelAwareMessageListener的方式來接受消息 這種方式還要去全局裏面配置 麻煩,直接用@RabbitListener(queues = "hello")最簡單

消息確認  由於我在屬性配置文件裏面開啓了ACK確認 因此若是代碼沒有執行ACK確認 你在RabbitMQ的後臺會看到消息會一直留在隊列裏面未消費掉 只要程序一啓動開始接受該隊列消息的時候 又會收到

 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

//ack返回false,並從新回到隊列,api裏面解釋得很清楚

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

//拒絕消息

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

TestController測試

@Autowired
    private HelloSender helloSender;
 
    /**
     * 單生產者-單個消費者
     */
    @RequestMapping("/test")
    public void hello() throws Exception {
        helloSender.send();
    }
RabbitMQ後臺 兩個隊列

發送消息

ACK場景測試

咱們把HelloReceiver的ACK確認代碼註釋掉 那消息就算程序收到了 可是未確認ACK致使消息服務器覺得他是未成功消費的 後續還會再發

重啓程序

相關文章
相關標籤/搜索