場景:業務須要,若是在消費者端出現異常或者特殊狀況下,須要讓消息回到隊列再次進行消耗
複製代碼
在redisConfig中進行設置redis
@Autowired
private TestReceiver testReceiver ;
@Bean
public SimpleMessageListenerContainer qrcodeListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer factory = new SimpleMessageListenerContainer(connectionFactory);
factory.setQueues(test());
factory.setConcurrentConsumers(3);
factory.setExposeListenerChannel(true);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置手動應答
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setMessageListener(testReceiver);
return factory;
}
@Bean
public Queue test(){
return new Queue(Constants.QUEUE_NAME_ALI_DYNAMIC_QR_CODE,false);
}
複製代碼
接收者類bash
@Component
public class TestReceiver implements ChannelAwareMessageListener{
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//消息轉換
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
Object object = converter.fromMessage(message);
//業務代碼...
//手動確認應答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//手動否認
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
}
複製代碼