SpringBoot整合rabbitmq

在如今的項目開發過程當中,消息中間件使用的愈來愈多,通常用的比較多的消息中間件有rabbitmq、activemq、rocketmq、kafka等。那麼今天,咱們來學習springboot整合rabbitmq。java

在整合rabbitmq的時候,咱們先要在本地下載安裝rabbitmq,而rabbitmq是用erlang語言開發的,因此在安裝rabbitmq以前,咱們須要先安裝erlang。具體下載安裝的步驟我在這裏就再也不贅述了,能夠參照這篇文章進行安裝:https://www.jianshu.com/p/3d43561bb3eespring

在安裝好了erlang和rabbitmq以後,咱們就開始整合。springboot

一、首先咱們須要在pom.xml中添加rabbitmq的依賴:app

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

二、配置application-boot.yml:dom

spring:
  # 配置rabbitMQspring:
  rabbitmq:
    host: 127.0.0.1
    username: guest
    password: guest

三、建立兩個POJO實體類:MsgContent1和MsgContent2ide

package com.hry.spring.rabbitmq.boot.msgconvert.pojo;

/**
 * 測試發送對象
 */
public class MsgContent1 {
    private String name;
    private String age;

    @Override
    public String toString(){
        return "[ name = " + name + "; " + " age = " + age + " ]";
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAge() {
        return age;
    }

    public void setAge(String age) {
        this.age = age;
    }
}

  

package com.hry.spring.rabbitmq.boot.msgconvert.pojo;

/**
 * 測試發送對象
 */
public class MsgContent2 {
    private String id;
    private String content;

    @Override
    public String toString(){
        return "[ id = " + id + "; " + " content = " + content + " ]";
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

四、設置序列化類RabbitMsgConvertConfigurespring-boot

package com.hry.spring.rabbitmq.boot.msgconvert;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置RabbitMQ中使用到隊列、交換機、綁定等信息
 */
@Configuration
public class RabbitMsgConvertConfigure {

    // 隊列名稱
    public final static String SPRING_BOOT_QUEUE = "spring-boot-queue-msg-convert";
    // 交換機名稱
    public final static String SPRING_BOOT_EXCHANGE = "spring-boot-exchange-msg-convert";
    // 綁定的值
    public static final String SPRING_BOOT_BIND_KEY = "spring-boot-bind-key-msg-convert";


    // === 在RabbitMQ上建立queue,exchange,binding 方法一:經過@Bean實現 begin ===
    /**
     * 定義隊列:
     * @return
     */
    @Bean
    Queue queue() {
        return new Queue(SPRING_BOOT_QUEUE, false);
    }

    /**
     * 定義交換機
     * @return
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(SPRING_BOOT_EXCHANGE);
    }

    /**
     * 定義綁定
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(SPRING_BOOT_BIND_KEY );
    }

    /**
     * 定義消息轉換實例
     * @return
     */
    @Bean
    MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    // === 若是默認的SimpleMessageListenerContainer不符合咱們的要求,咱們也能夠經過以下的方式建立新的SimpleMessageListenerContainer===
//    @Bean
//    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
//        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
//        container.setConnectionFactory(connectionFactory);
//        container.setMessageConverter().
//        container.setConcurrentConsumers(10);
//        return container;
//    }


//    @Bean
//    MessageListenerAdapter listenerAdapter(ProductMessageListener receiver) {
//        return new MessageListenerAdapter(receiver, "receiveMessage");
//    }

}

五、消息發送者:學習

package com.hry.spring.rabbitmq.boot.msgconvert;

import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1;
import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 消息發送者
 */
@Component
public class SendMsgConvertMsg {

    // 此接口的默認實現是RabbitTemplate,目前只有一個實現,
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 發送消息
     *
     * @param msgContent
     */
    public void sendMsgContent1(MsgContent1 msgContent) {
        amqpTemplate.convertAndSend(RabbitMsgConvertConfigure.SPRING_BOOT_EXCHANGE, RabbitMsgConvertConfigure.SPRING_BOOT_BIND_KEY, msgContent );


    }

    /**
     * 發送消息
     * @param msgContent
     */
    public void sendMsgContent2(MsgContent2 msgContent) {
        amqpTemplate.convertAndSend(RabbitMsgConvertConfigure.SPRING_BOOT_EXCHANGE, RabbitMsgConvertConfigure.SPRING_BOOT_BIND_KEY, msgContent);
    }
}

六、消息接收者:測試

@RabbitListener定義在類表示此類是消息監聽者並設置要監聽的隊列 
@RabbitHandler:在類中能夠定義多個@RabbitHandler,spring boot會根據不一樣參數傳送到不一樣方法處理ui

package com.hry.spring.rabbitmq.boot.msgconvert;

import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1;
import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
// @RabbitListener除了能夠做用在方法,也能夠做用在類上。在後者的狀況下,須要在處理的方法使用@RabbitHandler。一個類能夠配置多個@RabbitHandler
@RabbitListener(queues = RabbitMsgConvertConfigure.SPRING_BOOT_QUEUE)
public class ReceiveMsgConvertMsg {

    /**
     * 獲取信息:
     *  queue也能夠支持RabbitMQ中對隊列的模糊匹配
     * @param content
     */
    @RabbitHandler
    public void receiveMsgContent1(MsgContent1 content) {
        // ...
        System.out.println("[ReceiveMsgConvertMsg-MsgContent1] receive receiveMsgContent1 msg: " + content);
    }

    @RabbitHandler
    public void receiveMsgContent2(MsgContent2 msgContent2) {
        // ...
        System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive receiveMsgContent2 msg: " + msgContent2);
    }

//    @RabbitHandler
//    public void receiveString(@Payload String content) {
//        // ...
//        System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive msg: " + content);
//    }
//
//    @RabbitHandler
//    public void receiveStringb(byte[] content) {
//        // ...
//        System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive msg: " + content);
//    }
}

七、最後咱們編寫測試類;

package com.hry.spring.boot.simple;

import com.hry.spring.rabbitmq.boot.msgconvert.SendMsgConvertMsg;
import com.hry.spring.rabbitmq.boot.msgconvert.SpringBootRabbitMsgConvertApplication;
import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1;
import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2;
import com.hry.spring.rabbitmq.boot.raw.SendRawMsg;
import com.hry.spring.rabbitmq.boot.raw.SpringBootRabbitRawApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.ThreadLocalRandom;

/**
 * 測試類
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes= SpringBootRabbitMsgConvertApplication.class, value = "spring.profiles.active=boot")
public class MsgConvertTest {
    @Autowired
    private SendMsgConvertMsg sendMsgConvertMsg;

    @Test
    public void sendMsgContent() throws Exception {
        // 發送消息對象MsgContent1
        MsgContent1 msgContent1 = new MsgContent1();
        msgContent1.setName("send msg via spring boot - msg convert - MsgContent1");
        msgContent1.setAge("" + ThreadLocalRandom.current().nextInt(100));
        sendMsgConvertMsg.sendMsgContent1(msgContent1);

        // 發送消息對象MsgContent2
        MsgContent2 msgContent2 = new MsgContent2();
        msgContent2.setId(ThreadLocalRandom.current().nextInt(100) + "");
        msgContent2.setContent("send msg via spring boot - msg convert - MsgContent1");
        sendMsgConvertMsg.sendMsgContent2(msgContent2);

        try {
            Thread.sleep(1000 * 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


}

在控制檯咱們能夠看到輸出的結果:

說明springboot整合rabbitmq成功,能夠實現業務功能了。

注意:這裏要注意com.fasterxml.jackson的版本兼容問題,當springboot是使用2.1.4.RELEASE版本時,就會報

這裏必定要將springboot的版本設置爲1.5.6.RELEASE,功能才能正常實現。

本博客的demo以下:https://download.csdn.net/download/weixin_38340967/11180464

另:原本是想上傳demo供你們一塊兒學習的,可是上傳資源到CSDN上的時候默認要5積分,還改不了,因此若是有同窗想要demo的能夠私信我,我郵箱發你就好了!包括erlang和rabbitmq安裝包。

相關文章
相關標籤/搜索