SpringBoot Rabbitmq發送消息

官方文檔:https://docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/htmlsingle/#boot-features-amqphtml

引入依賴:spring

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

發送消息代碼:json

@RestController @RequestMapping("/") public class SenderMsgController { @Autowired private AmqpTemplate amqpTemplate; @RequestMapping(value = "/{str}") public void testSend(@RequestParam("str") String str) throws InterruptedException { for (int i = 0; i < 10; i++) { int millis = 500; Thread.sleep(new Long(millis)); if (i%2 == 1) { String isr = "{\n" + "\t\"dd\":" + i + "}"; amqpTemplate.convertAndSend("DirectExchange","test.1",isr,new MyMessageConverter()); }else{ String isr = "{\n" + "\t\"dd\":" + i + "}"; amqpTemplate.convertAndSend("DirectExchange","test.2",isr,new MyMessageConverter()); } System.out.println("第"+i+"次發送"); } } }
MyMessageConverter:
//MessagePostProcessor 接口能夠對發送請求以前的Message 進行操做,這裏我設置了contenttype爲json格式
public class MyMessageConverter implements MessagePostProcessor { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON); return message; } }

建立交換機、隊列並互相綁定設置路由keyapp

@Component public class AmqpAdminConfig { @Autowired public AmqpAdmin amqpAdmin; @Bean public DirectExchange createDirectExchange(){ DirectExchange directExchange = new DirectExchange("DirectExchange", false, false); amqpAdmin.declareExchange(directExchange); return directExchange; } // @Bean // public void createFanoutExchange(){ // amqpAdmin.declareExchange(new FanoutExchange("FanoutExchange",false,false)); // }
 @Bean public Queue createQueue1(){ Queue queue = new Queue("queue-1", false, false, false); amqpAdmin.declareQueue(queue); return queue; } @Bean public Queue createQueue2(){ Queue queue = new Queue("queue-2", false, false, false); amqpAdmin.declareQueue(queue); return queue; } @Bean public void createBinding1(){ Binding bind = BindingBuilder.bind(createQueue1()).to(createDirectExchange()).with("test.1"); amqpAdmin.declareBinding(bind); } @Bean public void createBinding2(){ Binding bind = BindingBuilder.bind(createQueue2()).to(createDirectExchange()).with("test.2"); amqpAdmin.declareBinding(bind); } }

根據官方文檔知道AmqpTemplate 和AmqpAdmin  已經自動配置,可直接注入使用,AmqpTemplate 封裝了發送與接收的各類操做,AmqpAdmin  封裝了針對交換機和消息隊列的各類操做ide

相關文章
相關標籤/搜索