實現內容
RabbitMQ + springboot 實現消息的發送和監聽 springboot版本2.1.8spring
直接上代碼
配置類springboot
@Configuration public class RabbitConfig { // mq地址 @Bean(value = "connectionFactory") @Primary public ConnectionFactory connectionFactory( @Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.port}") int port, @Value("${spring.rabbitmq.username}") String username, @Value("${spring.rabbitmq.password}") String password, @Value("${spring.rabbitmq.virtual-host}") String virtualHost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); return connectionFactory; } @Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(500); backOffPolicy.setMultiplier(10.0); backOffPolicy.setMaxInterval(10000); retryTemplate.setBackOffPolicy(backOffPolicy); return retryTemplate; } // mq發送 @Bean public AmqpTemplate myMQTemplate(@Qualifier("connectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setRetryTemplate(retryTemplate()); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } }
MQ生產者服務器
@Component public class MQSender { @Autowired private AmqpTemplate myMQTemplate; public void send(String exchangeName, String routingKey, Object object) { myMQTemplate.convertAndSend(exchangeName, routingKey, object); } }
須要注意, 這裏沒有生成exchange, 須要手動建立, 若是須要程序自動建立, 則須要將exchange聲明爲bean便可. 執行測試代碼測試
@Autowired private MQSender mqSender; @Test public void send() { mqSender.send("e.send", "r.send", "send a message"); }
若是你觀察的及時, 估計還能看見exchange收到消息的曲線波動~ 由於沒有消費者, 因此exchange在接收到信息後直接將消息丟棄了, 如今咱們建立對應的隊列, 並綁定. 再次執行就能夠看到隊列中有消息了. code
下面是消費者 首先咱們要在配置類中增長監聽配置, 一個自動ack, 一個手動ackblog
// 自動ack @Bean(value = "listenerFactoryWithAutoAck") public SimpleRabbitListenerContainerFactory listenerFactoryWithAutoAck(@Qualifier("connectionFactory") ConnectionFactory newRentConnectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(newRentConnectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } // 手動ack @Bean(value = "listenerFactoryWithManualAck") public SimpleRabbitListenerContainerFactory listenerFactoryWithManualAck(@Qualifier("connectionFactory") ConnectionFactory newRentConnectionFactory) { SimpleRabbitListenerContainerFactory factory = listenerFactory(newRentConnectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; }
若是你使用的springboot版本是2.2.*的話, 能夠更方便的在監聽的註解上設置ackMode, 而且會覆蓋在配置監聽工廠的配置方式, 這裏使用的版本是2.1.8因此只能在配置工廠時設置.rabbitmq
@Component public class MQListener { @RabbitListener(containerFactory = "listenerFactoryWithManualAck", queues = "q.send") public void consume(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { System.out.println("message: " + message.toString()); String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("message body: " + msg); } }
執行後看到消息並無被消費, 這是由於咱們使用手動應答監聽, 可是沒有發送應答, 服務器將消息從新入隊列. 在監聽中加入代碼, false表示只是響應這條信息, true表示全部信息. channel.basicAck(tag, false);
隊列
啓動程序能夠看到信息被消費了. 在程序拋異常時, 可能須要手動處理異常, 拒絕消息. true表示消息從新入隊列, 還能夠被消費; false表示直接丟棄消息 channel.basicReject(tag, true);
ip
以上就是RabbitMQ在springboot中的簡單實用get