官方文檔:https://docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/htmlsingle/#boot-features-amqphtml
引入依賴:spring
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
發送消息代碼:json
@RestController @RequestMapping("/") public class SenderMsgController { @Autowired private AmqpTemplate amqpTemplate; @RequestMapping(value = "/{str}") public void testSend(@RequestParam("str") String str) throws InterruptedException { for (int i = 0; i < 10; i++) { int millis = 500; Thread.sleep(new Long(millis)); if (i%2 == 1) { String isr = "{\n" + "\t\"dd\":" + i + "}"; amqpTemplate.convertAndSend("DirectExchange","test.1",isr,new MyMessageConverter()); }else{ String isr = "{\n" + "\t\"dd\":" + i + "}"; amqpTemplate.convertAndSend("DirectExchange","test.2",isr,new MyMessageConverter()); } System.out.println("第"+i+"次發送"); } } }
MyMessageConverter:
//MessagePostProcessor 接口能夠對發送請求以前的Message 進行操做,這裏我設置了contenttype爲json格式
public class MyMessageConverter implements MessagePostProcessor { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON); return message; } }
建立交換機、隊列並互相綁定設置路由keyapp
@Component public class AmqpAdminConfig { @Autowired public AmqpAdmin amqpAdmin; @Bean public DirectExchange createDirectExchange(){ DirectExchange directExchange = new DirectExchange("DirectExchange", false, false); amqpAdmin.declareExchange(directExchange); return directExchange; } // @Bean // public void createFanoutExchange(){ // amqpAdmin.declareExchange(new FanoutExchange("FanoutExchange",false,false)); // } @Bean public Queue createQueue1(){ Queue queue = new Queue("queue-1", false, false, false); amqpAdmin.declareQueue(queue); return queue; } @Bean public Queue createQueue2(){ Queue queue = new Queue("queue-2", false, false, false); amqpAdmin.declareQueue(queue); return queue; } @Bean public void createBinding1(){ Binding bind = BindingBuilder.bind(createQueue1()).to(createDirectExchange()).with("test.1"); amqpAdmin.declareBinding(bind); } @Bean public void createBinding2(){ Binding bind = BindingBuilder.bind(createQueue2()).to(createDirectExchange()).with("test.2"); amqpAdmin.declareBinding(bind); } }
根據官方文檔知道AmqpTemplate
和AmqpAdmin
已經自動配置,可直接注入使用,AmqpTemplate
封裝了發送與接收的各類操做,AmqpAdmin
封裝了針對交換機和消息隊列的各類操做ide