在如今的項目開發過程當中,消息中間件使用的愈來愈多,通常用的比較多的消息中間件有rabbitmq、activemq、rocketmq、kafka等。那麼今天,咱們來學習springboot整合rabbitmq。java
在整合rabbitmq的時候,咱們先要在本地下載安裝rabbitmq,而rabbitmq是用erlang語言開發的,因此在安裝rabbitmq以前,咱們須要先安裝erlang。具體下載安裝的步驟我在這裏就再也不贅述了,能夠參照這篇文章進行安裝:https://www.jianshu.com/p/3d43561bb3eespring
在安裝好了erlang和rabbitmq以後,咱們就開始整合。springboot
一、首先咱們須要在pom.xml中添加rabbitmq的依賴:app
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二、配置application-boot.yml:dom
spring: # 配置rabbitMQspring: rabbitmq: host: 127.0.0.1 username: guest password: guest
三、建立兩個POJO實體類:MsgContent1和MsgContent2ide
package com.hry.spring.rabbitmq.boot.msgconvert.pojo; /** * 測試發送對象 */ public class MsgContent1 { private String name; private String age; @Override public String toString(){ return "[ name = " + name + "; " + " age = " + age + " ]"; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAge() { return age; } public void setAge(String age) { this.age = age; } }
package com.hry.spring.rabbitmq.boot.msgconvert.pojo; /** * 測試發送對象 */ public class MsgContent2 { private String id; private String content; @Override public String toString(){ return "[ id = " + id + "; " + " content = " + content + " ]"; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
四、設置序列化類RabbitMsgConvertConfigurespring-boot
package com.hry.spring.rabbitmq.boot.msgconvert; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 配置RabbitMQ中使用到隊列、交換機、綁定等信息 */ @Configuration public class RabbitMsgConvertConfigure { // 隊列名稱 public final static String SPRING_BOOT_QUEUE = "spring-boot-queue-msg-convert"; // 交換機名稱 public final static String SPRING_BOOT_EXCHANGE = "spring-boot-exchange-msg-convert"; // 綁定的值 public static final String SPRING_BOOT_BIND_KEY = "spring-boot-bind-key-msg-convert"; // === 在RabbitMQ上建立queue,exchange,binding 方法一:經過@Bean實現 begin === /** * 定義隊列: * @return */ @Bean Queue queue() { return new Queue(SPRING_BOOT_QUEUE, false); } /** * 定義交換機 * @return */ @Bean TopicExchange exchange() { return new TopicExchange(SPRING_BOOT_EXCHANGE); } /** * 定義綁定 * @param queue * @param exchange * @return */ @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(SPRING_BOOT_BIND_KEY ); } /** * 定義消息轉換實例 * @return */ @Bean MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // === 若是默認的SimpleMessageListenerContainer不符合咱們的要求,咱們也能夠經過以下的方式建立新的SimpleMessageListenerContainer=== // @Bean // SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { // SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); // container.setConnectionFactory(connectionFactory); // container.setMessageConverter(). // container.setConcurrentConsumers(10); // return container; // } // @Bean // MessageListenerAdapter listenerAdapter(ProductMessageListener receiver) { // return new MessageListenerAdapter(receiver, "receiveMessage"); // } }
五、消息發送者:學習
package com.hry.spring.rabbitmq.boot.msgconvert; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息發送者 */ @Component public class SendMsgConvertMsg { // 此接口的默認實現是RabbitTemplate,目前只有一個實現, @Autowired private AmqpTemplate amqpTemplate; /** * 發送消息 * * @param msgContent */ public void sendMsgContent1(MsgContent1 msgContent) { amqpTemplate.convertAndSend(RabbitMsgConvertConfigure.SPRING_BOOT_EXCHANGE, RabbitMsgConvertConfigure.SPRING_BOOT_BIND_KEY, msgContent ); } /** * 發送消息 * @param msgContent */ public void sendMsgContent2(MsgContent2 msgContent) { amqpTemplate.convertAndSend(RabbitMsgConvertConfigure.SPRING_BOOT_EXCHANGE, RabbitMsgConvertConfigure.SPRING_BOOT_BIND_KEY, msgContent); } }
六、消息接收者:測試
@RabbitListener定義在類表示此類是消息監聽者並設置要監聽的隊列
@RabbitHandler:在類中能夠定義多個@RabbitHandler,spring boot會根據不一樣參數傳送到不一樣方法處理ui
package com.hry.spring.rabbitmq.boot.msgconvert; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @Component // @RabbitListener除了能夠做用在方法,也能夠做用在類上。在後者的狀況下,須要在處理的方法使用@RabbitHandler。一個類能夠配置多個@RabbitHandler @RabbitListener(queues = RabbitMsgConvertConfigure.SPRING_BOOT_QUEUE) public class ReceiveMsgConvertMsg { /** * 獲取信息: * queue也能夠支持RabbitMQ中對隊列的模糊匹配 * @param content */ @RabbitHandler public void receiveMsgContent1(MsgContent1 content) { // ... System.out.println("[ReceiveMsgConvertMsg-MsgContent1] receive receiveMsgContent1 msg: " + content); } @RabbitHandler public void receiveMsgContent2(MsgContent2 msgContent2) { // ... System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive receiveMsgContent2 msg: " + msgContent2); } // @RabbitHandler // public void receiveString(@Payload String content) { // // ... // System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive msg: " + content); // } // // @RabbitHandler // public void receiveStringb(byte[] content) { // // ... // System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive msg: " + content); // } }
七、最後咱們編寫測試類;
package com.hry.spring.boot.simple; import com.hry.spring.rabbitmq.boot.msgconvert.SendMsgConvertMsg; import com.hry.spring.rabbitmq.boot.msgconvert.SpringBootRabbitMsgConvertApplication; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2; import com.hry.spring.rabbitmq.boot.raw.SendRawMsg; import com.hry.spring.rabbitmq.boot.raw.SpringBootRabbitRawApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.ThreadLocalRandom; /** * 測試類 */ @RunWith(SpringRunner.class) @SpringBootTest(classes= SpringBootRabbitMsgConvertApplication.class, value = "spring.profiles.active=boot") public class MsgConvertTest { @Autowired private SendMsgConvertMsg sendMsgConvertMsg; @Test public void sendMsgContent() throws Exception { // 發送消息對象MsgContent1 MsgContent1 msgContent1 = new MsgContent1(); msgContent1.setName("send msg via spring boot - msg convert - MsgContent1"); msgContent1.setAge("" + ThreadLocalRandom.current().nextInt(100)); sendMsgConvertMsg.sendMsgContent1(msgContent1); // 發送消息對象MsgContent2 MsgContent2 msgContent2 = new MsgContent2(); msgContent2.setId(ThreadLocalRandom.current().nextInt(100) + ""); msgContent2.setContent("send msg via spring boot - msg convert - MsgContent1"); sendMsgConvertMsg.sendMsgContent2(msgContent2); try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } }
在控制檯咱們能夠看到輸出的結果:
說明springboot整合rabbitmq成功,能夠實現業務功能了。
注意:這裏要注意com.fasterxml.jackson的版本兼容問題,當springboot是使用2.1.4.RELEASE版本時,就會報
這裏必定要將springboot的版本設置爲1.5.6.RELEASE,功能才能正常實現。
本博客的demo以下:https://download.csdn.net/download/weixin_38340967/11180464
另:原本是想上傳demo供你們一塊兒學習的,可是上傳資源到CSDN上的時候默認要5積分,還改不了,因此若是有同窗想要demo的能夠私信我,我郵箱發你就好了!包括erlang和rabbitmq安裝包。