安裝我是按照這個哥們安裝的,cdns 過客幽影星風java
在pom.xml配置文件中加入以下依賴。spring
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在application.yml文件中配置rabbitmqapp
spring: rabbitmq: host: 127.0.0.1 # 主機ip port: 5672 # 主機端口 username: guest # 訪問用戶名 password: guest # 訪問密碼 publisher-confirms: true # 啓用消息確認,發送失敗會異常 virtual-host: / # 定義虛擬主機的路徑,默認爲根目錄
rabbitmq默認使用字節方式進行發送數據,爲了在管理界面方便查看,咱們能夠使用JSON字符串的方式傳輸spring-boot
// 添加配置文件 RabbitConfig.java import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 默認狀況下RabbitMQ發送的消息是轉換爲字節碼,這裏改成發送JSON數據。 * * @author lixingwu */ @Configuration public class RabbitConfig { @Bean public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } }
注意:若是使用JSON數據發送數據,就只能使用convertAndSend方法,不能使用send方法了,具體怎麼解決還不知道。測試
建立測試文件RabbitMQTest.javaui
import cn.hutool.core.lang.Console; import org.junit.Test; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import java.util.HashMap; import java.util.Map; /** * RabbitMQ 測試 */ public class RabbitMQTest extends BaseSpringBootTest { @Autowired private AmqpTemplate mq; @Autowired AmqpAdmin amqpAdmin; //建立交換機 @Test public void declareExchange() { // 建立 Direct 一對一的交換器,一個交換器對應一個隊列 // 該交換機裏面的三個參數分別爲: 名字,持久化,是否自動刪除 DirectExchange directExchange = new DirectExchange("direct-exchange-001", false, false); amqpAdmin.declareExchange(directExchange); // 建立FANOUT,發佈訂閱模式(不存在路由鍵 將被投放到exchange對應的隊列中) // 可綁定多個隊列,消息會給所有的隊列每人發送一份 FanoutExchange fanoutExchange = new FanoutExchange("fanout-exchange-001", false, false); amqpAdmin.declareExchange(fanoutExchange); // 建立Topic,能夠使得不一樣源頭的數據投放到一個隊列中 // 經過路由鍵的命名分類來進行篩選,其中 // * 表示:能夠(只能)匹配一個單詞, // # 表示:能夠匹配多個單詞(或者零個) TopicExchange topicExchange = new TopicExchange("topic-exchange-001", false, false); amqpAdmin.declareExchange(topicExchange); } //建立隊列 @Test public void declareQueue() { amqpAdmin.declareQueue(new Queue("direct-queue")); amqpAdmin.declareQueue(new Queue("fanout-queue-001")); amqpAdmin.declareQueue(new Queue("fanout-queue-002")); amqpAdmin.declareQueue(new Queue("fanout-queue-003")); amqpAdmin.declareQueue(new Queue("topic-queue-001")); amqpAdmin.declareQueue(new Queue("topic-queue-002")); amqpAdmin.declareQueue(new Queue("topic-queue-003")); } //綁定交換機隊列 @Test public void binding() { // 綁定Direct Binding directBinding = BindingBuilder.bind(new Queue("direct-queue")).to(new DirectExchange("direct-exchange-001", false, false)).with("direct-rout-key"); amqpAdmin.declareBinding(directBinding); // 綁定FANOUT Binding fanoutBinding1 = BindingBuilder.bind(new Queue("fanout-queue-001")).to(new FanoutExchange("fanout-exchange-001", false, false)); Binding fanoutBinding2 = BindingBuilder.bind(new Queue("fanout-queue-002")).to(new FanoutExchange("fanout-exchange-001", false, false)); Binding fanoutBinding3 = BindingBuilder.bind(new Queue("fanout-queue-003")).to(new FanoutExchange("fanout-exchange-001", false, false)); amqpAdmin.declareBinding(fanoutBinding1); amqpAdmin.declareBinding(fanoutBinding2); amqpAdmin.declareBinding(fanoutBinding3); // 綁定Topic Binding topicBinding1 = BindingBuilder.bind(new Queue("topic-queue-001")).to(new TopicExchange("topic-exchange-001", false, false)).with("*.to"); Binding topicBinding2 = BindingBuilder.bind(new Queue("topic-queue-002")).to(new TopicExchange("topic-exchange-001", false, false)).with("log.*"); Binding topicBinding3 = BindingBuilder.bind(new Queue("topic-queue-003")).to(new TopicExchange("topic-exchange-001", false, false)).with("log1.*"); amqpAdmin.declareBinding(topicBinding1); amqpAdmin.declareBinding(topicBinding2); amqpAdmin.declareBinding(topicBinding3); } //發送者消息 @Test public void send() { Map<String, Object> param = new HashMap<>(3); param.put("name", "李興武"); param.put("age", 24); param.put("sex", false); // 發送自動轉換後的消息,test爲消息隊列的名字 // mq.convertAndSend("test", param); // 根據路由鍵發送 // mq.convertAndSend("direct-rout-key", param); // 根據交換器+路由鍵發送 // mq.convertAndSend("topic-exchange-001", "log.bak", param); // 自定義消息頭 mq.convertAndSend("direct-exchange-001", "direct-rout-key", param, message -> { message.getMessageProperties().getHeaders().put("token", "123456789"); return message; }); } // 接收消息 @Test public void process() { Message message1 = mq.receive("topic-queue-001"); Console.log(message1); Message message2 = mq.receive("topic-queue-002"); Console.log(message2); // 獲取消息頭 Message message3 = mq.receive("direct-queue"); MessageProperties messageProperties = message3.getMessageProperties(); Console.log(messageProperties.getHeaders().get("token")); } }
未完待續....net