原文:https://blog.csdn.net/qq_38439885/article/details/88982373java
進入正題,本文會介紹兩種實現rabbitmq的ack模式的方法,分別爲:spring
1、經過配置文件配置。springboot
2、經過手動註冊 SimpleMessageListenerContainer容器實現。服務器
先介紹方法一:
經過配置文件配置。
此類實現起來較爲方便,經過springboot的配置文件以及註解的形式便可完成。app
1.首先引入依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.編寫配置文件
# rabbitmq基本配置
spring.rabbitmq.host=***
spring.rabbitmq.port=5672
spring.rabbitmq.username=***
spring.rabbitmq.password=***
spring.rabbitmq.virtual-host=/框架
# 開啓發送確認
spring.rabbitmq.publisher-confirms=true
# 開啓發送失敗退回
spring.rabbitmq.publisher-returns=true
# 全局開啓ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
在配置文件中使用函數
spring.rabbitmq.listener.simple.acknowledge-mode
來配置ack模式,這個配置有三種配置方式,分別爲NONE、MANUAL、AUTO。spring-boot
I:NONE:默認爲NONE,也就是自動ack模式,在消費者接受到消息後無需手動ack,消費者會自動將消息ack掉。工具
II:MANUAL:即爲手動ack模式,消費者在接收到消息後須要手動ack消息,否則消息將一直處於uncheck狀態,在應用下次啓動的時候會再次對消息進行消費。使用該配置須要注意的是,配置開啓後即項目全局開啓手動ack模式,全部的消費者都須要在消費信息後手動ack消息,不然在重啓應用的時候將會有大量的消息沒法被消費掉而重複消費。測試
III:AUTO:自動確認ack 若是此時消費者拋出異常,不一樣的異常會有不一樣的處理方式。
3.編寫MQConfig的代碼,實現相應的queue和exchange的註冊以及綁定。
/**
* ACK 測試
*/
public static final String ACK_QUEUE_A = "ack.test.queue.A";
public static final String ACK_QUEUE_B = "ack.test.queue.B";
public static final String ACK_EXCHANGE = "ack.test.exchange";
/**
* ACK TEST
*/
@Bean
public Queue ackQueueA() {
return new Queue(ACK_QUEUE_A);
}
@Bean
public Queue ackQueueB() {
return new Queue(ACK_QUEUE_B);
}
@Bean
public FanoutExchange ackFanoutExchange() {
return new FanoutExchange(ACK_EXCHANGE);
}
@Bean
public Binding ackBindingA() {
return BindingBuilder.bind(ackQueueA()).to(ackFanoutExchange());
}
@Bean
public Binding ackBindingB() {
return BindingBuilder.bind(ackQueueB()).to(ackFanoutExchange());
}
上方代碼中作了三件事:
I.註冊了兩個queue,分別爲ackQueueA以及ackQueueB。
II.註冊了一個fanout類型的exchange。
III.將兩個queue和其綁定。
4. 生產者代碼編寫
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author hsw
* @since 9:26 2019/4/2
*/
@Slf4j
@Service
public class MQAckSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void ackMQSender(String msg) {
log.info("send ack message :" + msg);
// 生產者發送消息到exchange後沒有綁定的queue時將消息退回
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("ackMQSender 發送消息被退回" + exchange + routingKey);
});
// 生產者發送消息confirm檢測
this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.info("ackMQSender 消息發送失敗" + cause + correlationData.toString());
} else {
log.info("ackMQSender 消息發送成功 ");
}
});
this.rabbitTemplate.convertAndSend(MQConfig.ACK_EXCHANGE, "", msg);
}
}
這裏使用了RabbitTemplate而沒有使用AmqpTemplate,能夠將RabbitTemplate看做一個實現了AmqpTemplate的工具類,其中定義了更多方法供開發者使用。
在第一步的配置文件中定義了MANUAL的ack模式的同時,也配置了發送確認以及發送失敗退回,因此在上述生產者代碼中,分別配置了這兩項。具體回調時間見註釋。
5.消費者代碼編寫
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @author hsw
* @since 9:39 2019/4/2
*/
@Slf4j
@Service
public class MQAckReceive {
@RabbitListener(queues = MQConfig.ACK_QUEUE_A)
public void process(String msg, Channel channel, Message message) throws IOException {
log.info("ACK_QUEUE_A 收到 : " + msg);
try {
// 框架容器,是否開啓手動ack按照框架配置
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("ACK_QUEUE_A 接受信息成功");
} catch (Exception e) {
e.printStackTrace();
//丟棄這條消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.info("ACK_QUEUE_A 接受信息異常");
}
}
@RabbitListener(queues = MQConfig.ACK_QUEUE_B)
public void process2(String msg, Channel channel, Message message) throws IOException {
log.info("ACK_QUEUE_B 收到 : " + msg);
try {
//告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉 這樣之後就不會再發了 不然消息服務器覺得這條消息沒處理掉 重啓應用後還會在發
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("ACK_QUEUE_B 接受信息成功");
} catch (Exception e) {
e.printStackTrace();
//丟棄這條消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.info("ACK_QUEUE_B 接受信息異常");
}
}
}
上述代碼定義了兩個消費者,即爲以前定義的ackQueueA以及ackQueueB的消費者。
與默認ack模式的消費者不一樣的是,在消費者消費信息的時候,須要手動ack掉信息,即爲上述代碼中的:
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
該方法有兩個參數,分別爲long類型和boolean類型:
/**
* Acknowledge one or several received
* messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being acknowledged.
* @see com.rabbitmq.client.AMQP.Basic.Ack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to acknowledge all messages up to and
* including the supplied delivery tag; false to acknowledge just
* the supplied delivery tag.
* @throws java.io.IOException if an error is encountered
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;
第一個deliveryTag參數爲每條信息帶有的tag值,第二個multiple參數爲布爾類型,爲true時會將小於等於這次tag的全部消息都確認掉,若是爲false則只確認當前tag的信息,可根據實際狀況進行選擇。
再看下另外兩個拒絕消息的函數:
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
第一個方法 basicNack 有三個參數,分別爲long類型、boolean類型和boolean類型:
/**
* Reject one or several received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
前兩個參數和接受方法 basicAck 的參數類似,第一個deliveryTag參數爲每條信息帶有的tag值,第二個multiple參數爲true時會將小於等於這次tag的全部消息都拒絕掉,若是爲false則只拒絕當前tag的信息,可根據實際狀況進行選擇。
第三個參數爲requeue,爲true的時候會將消息從新發送到當前隊列。可根據具體業務需求中不一樣的異常捕捉實現不一樣的拒絕方式。
第二個方法 basicReject 和 basicAck 方法相似,可是隻能拒絕/重發當前tag的信息。
6.項目測試
@GetMapping("/ack")
public void springAck() {
try {
mqAckSender.ackMQSender("this is a ack msg");
} catch (Exception e) {
e.printStackTrace();
}
}
調用接口後返回:
2019-04-03 10:18:07.018 INFO 7352 --- [nio-8081-exec-3] c.h.a.rabbitmq.amqp.MQAckSender : send ack message :this is a ack msg
2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-9] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_B 收到 : this is a ack msg
2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-9] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_B 接受信息成功
2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-1] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_A 收到 : this is a ack msg
2019-04-03 10:18:07.028 INFO 7352 --- [cTaskExecutor-1] c.h.a.rabbitmq.amqp.MQAckReceive : ACK_QUEUE_A 接受信息成功
2019-04-03 10:18:07.035 INFO 7352 --- [2.20.4.100:5672] c.h.a.rabbitmq.amqp.MQAckSender : ackMQSender 消息發送成功
若在queueA消費者ack消息前打上斷點,可在rabbitmq管理後臺看到:
第一種方式的手動ack模式開啓成功!
接下來介紹方法二:
經過手動註冊 SimpleMessageListenerContainer容器實現。
方法一經過註解方式開啓ack模式當然方便,但經過註解方式開啓後,項目全局的ack模式都將被修改,那怎麼樣作到只修改單個消費者的ack模式呢?這裏就須要手動註冊相應容器來修改ack模式。話很少說,先上代碼。
MQConfig和MQSender端代碼不變。
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(ACK_QUEUE_A);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
log.info(ACK_QUEUE_A + "get msg:" +new String(message.getBody()));
if(message.getMessageProperties().getHeaders().get("error") == null){
// 消息手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息確認");
}else {
// 消息從新回到隊列
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
// 拒絕消息(刪除)
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
log.info("消息拒絕");
}
});
return container;
}
與第一種方法的不一樣點:
一、配置文件配置的ack模式不會影響。
二、消費者須要配置在setMessageListener中。
上述代碼中,手動註冊了一個SimpleMessageListenerContainer容器,並將對應的queueName、須要修改的ack模式以及消費者收到消息後的處理一併注入到spring中。
因爲是手動註冊容器,不受到配置文件的影響,因此能夠實現對單個queue的ack模式修改。
須要注意的是,若是消費者依舊使用@RabbitListener註解進行消費信息,手動註冊容器中修改的ack模式是無效的。