SpringBoot之集成RabbitMQ

環境:SpringBoot2.0spring

<!-- Spring Boot RabbitMQ 依賴 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

yml配置api

spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        # 開啓發送確認
        #publisher-confirms: true
        # 開啓發送失敗退回
        #publisher-returns: true
        #開啓ACK
        #listener.direct.acknowledge-mode: manual
        #listener.simple.acknowledge-mode: manual
RabbitConfig
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    //topic
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String TOPIC_EXCHANGE = "topic.exchange";

    //fanout
    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    public static final String FANOUT_QUEUE2 = "fanout.queue2";
    public static final String FANOUT_EXCHANGE = "fanout.exchange";

    //direct
    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 ="direct.queue2" ;
    public static final String DIRECT_EXCHANGE = "direct.exchange";


    /**
     * Topic模式
     *
     * @return
     */
    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE1);
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE2);
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("zns.message");
    }

    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("zns.#");
    }


    /**
     * Fanout模式
     * Fanout 就是咱們熟悉的廣播模式或者訂閱模式,給Fanout交換機發送消息,綁定了這個交換機的全部隊列都收到這個消息。
     * @return
     */
    @Bean
    public Queue fanoutQueue1() {
        return new Queue(FANOUT_QUEUE1);
    }

    @Bean
    public Queue fanoutQueue2() {
        return new Queue(FANOUT_QUEUE2);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    @Bean
    public Binding fanoutBinding1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

    /**
     * direct模式
     * 消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配
     * @return
     */
    @Bean
    public Queue directQueue1() {
        return new Queue(DIRECT_QUEUE1);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    @Bean
    public Binding directBinding1() {
        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("direct.pwl");
    }

}

 發生消息安全

import com.zns.admin.api.TestModel;
import com.zns.admin.api.ZNSAdminApiApplication;
import com.zns.admin.api.config.RabbitConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitmqTest {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Test
    public void testTopic() throws Exception {
        TestModel data=new TestModel(1,"topic...");
        this.rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,"zns.message", data);
        this.rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "zns.lzc", data);
    }

    @Test
    public void testFanout() throws Exception {
        TestModel data=new TestModel(2,"fanout...");
        this.rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "", data);
    }

    @Test
    public void testDirect() throws Exception {
        TestModel data=new TestModel(3,"direct...");
        this.rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, "direct.pwl", data);
    }
}

接收消息服務器

import com.zns.admin.api.config.RabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitmqReceiver {
    @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE1)
    public void receiveTopic1(TestModel data) {
        System.out.println("【receiveTopic1監聽到消息】" + data.toString());
    }
    @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE2)
    public void receiveTopic2(TestModel data) {
        System.out.println("【receiveTopic2監聽到消息】" + data.toString());
    }

    @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE1)
    public void receiveFanout1(TestModel data) {
        System.out.println("【receiveFanout1監聽到消息】" + data.toString());
    }
    @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE2)
    public void receiveFanout2(TestModel data) {
        System.out.println("【receiveFanout2監聽到消息】" + data.toString());
    }

    @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE1)
    public void receiveDirect1(TestModel data) {
        System.out.println("【receiveDirect1監聽到消息】" + data.toString());
    }
    @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE1)
    public void receiveDirect2(TestModel data) {
        System.out.println("【receiveDirect2監聽到消息】" + data.toString());
    }

}

 

ACK消息確認的實現

配置文件打開相關配置開關ide

spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
        # 開啓發送確認
        publisher-confirms: true
        # 開啓發送失敗退回
        publisher-returns: true
        #開啓ACK
        listener.direct.acknowledge-mode: manual
        listener.simple.acknowledge-mode: manual

這裏使用Fanout類型的Exchange爲例,主要是設置隊列,交換機及綁定spring-boot

@Configuration
public class RabbitMqFanoutACKConfig {

    @Bean
    public Queue ackQueue() {
        return new Queue("ackQueue");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingAckQueue2Exchange(Queue ackQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(ackQueue).to(fanoutExchange);
    }

}

消息發送服務測試

@Service
public class AckSenderService implements RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("AckSender returnedMessage " + message.toString() + " === " + i + " === " + s1 + " === " + s2);
    }

    /**
     * 消息發送
     */
    public void send() {
        final String content = "如今時間是" + LocalDateTime.now(ZoneId.systemDefault());

        //設置返回回調
        rabbitTemplate.setReturnCallback(this);
        //設置確認回調
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息發送成功!");
            }
            else {
                System.out.println("消息發送失敗," + cause + correlationData.toString());
            }
        });
        rabbitTemplate.convertAndSend("ackQueue", content);
    }
}

消息消費者ui

@Component
@RabbitListener(queues = {"ackQueue"})
public class MyAckReceiver {

    @RabbitHandler
    public void process(String sendMsg, Channel channel, Message message) {

        System.out.println("AckReceiver  : 收到發送消息 " + sendMsg + ",收到消息時間"
                + LocalDateTime.now(ZoneId.systemDefault()));

        try {
            //告訴服務器收到這條消息已經被當前消費者消費了,能夠在隊列安全刪除,這樣後面就不會再重發了,
            //不然消息服務器覺得這條消息沒處理掉,後續還會再發
            //第二個參數是消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("process success");
        } catch (Exception e) {
            System.out.println("process fail");
            e.printStackTrace();
        }

    }
}

 調用senderService.send()測試this

相關文章
相關標籤/搜索