注意
java
SimpleMessageListenerContainer
能夠進行動態設置,好比在運行中能夠動態修改其消費者數量的大小,接收消息的模式等。不少基於RabbitMQ的制定化後端管理控制檯在進行動態設置的時候,也是根據這一特性去實現的。spring
package com.wyg.rabbitmq.springamqp; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.rabbitmq.client.Channel; /** * RabbitAdmin * * @author wyg0405@gmail.com * @date 2019-11-25 15:11 * @since JDK1.8 * @version V1.0 */ @Configuration public class RabbitConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses("localhost:5672"); cachingConnectionFactory.setUsername("guest"); cachingConnectionFactory.setPassword("guest"); cachingConnectionFactory.setVirtualHost("/"); return cachingConnectionFactory; } /** * SimpleMessageListenerContainer注入 * * @param connectionFactory * @return * @throws @author * wyg0405@gmail.com * @date 2019/11/25 17:16 */ @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // 監聽多個queue container.addQueueNames("test01", "test02", "test03"); // 設置當前消費者數量 container.setConcurrentConsumers(1); // 設置最大的消費者數量 container.setMaxConcurrentConsumers(5); // 設置不要重回隊列 container.setDefaultRequeueRejected(false); // 設置自動簽收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 設置消費端tag策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + System.currentTimeMillis(); } }); // 設置監聽 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { // 消息處理 String msg = new String(message.getBody(), "UTF-8"); System.out.println("---消費者---隊列名:" + message.getMessageProperties().getConsumerQueue() + ",消息:" + msg + ",deliveryTag:" + message.getMessageProperties().getDeliveryTag()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);; } }); return container; } }
package com.wyg.rabbitmq.springamqp; import java.io.*; import java.lang.reflect.Field; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import com.wyg.rabbitmq.springamqp.convert.Order; import com.wyg.rabbitmq.springamqp.convert.User; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitConfigTest { @Autowired RabbitAdmin rabbitAdmin; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private SimpleMessageListenerContainer simpleMessageListenerContainer; @Test public void testSimpleMessageListenerContainerSendMsg() { // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02", // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey for (int i = 0; i < 3; i++) { rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", ("第" + i + "條消息").getBytes()); } } }