RabbitMq + Spring 實現ACK機制

概念性解讀(Ack的靈活)

 

         首先啊,有的人不是太理解這個Ack是什麼,講的接地氣一點,其實就是一個通知,怎麼說呢,當我監聽消費者,正常狀況下,不會出異常,可是若是是出現了異常,甚至是沒有獲取的異常,那是否是這條數據就會做廢,可是咱們確定不但願這樣的狀況出現,咱們想要的是,若是在出現異常的時候,咱們識別到,若是確實是一個不良異常,確定但願數據從新返回隊列中,再次執行咱們的業務邏輯代碼,此時我就須要一個Ack的通知,告訴隊列服務,我是否已經成功處理了這條數據,而若是不配置Ack的話呢,我測試過他會自動的忽略,也就是說此時的服務是no_ack=true的模式,就是說只要我發現你是消費了這個數據,至於異常不異常的,我無論了。通知Ack機制就是這麼來的,更加靈活的,咱們須要Ack不自動,而是手動,這樣作的好處,就是使得咱們開發人員更加人性化或者靈活的來處理咱們的業務羅傑代碼,更加方便的處理異常的問題以及數據的返回處理等。下面是通話機制的四條原則:php

  • Basic.Ack 發回給 RabbitMQ 以告知,能夠將相應 message 從 RabbitMQ 的消息緩存中移除。
  • Basic.Ack 未被 consumer 發回給 RabbitMQ 前出現了異常,RabbitMQ 發現與該 consumer 對應的鏈接被斷開,以後將該 message 以輪詢方式發送給其餘 consumer (假設存在多個 consumer 訂閱同一個 queue)。
  • 在 no_ack=true 的狀況下,RabbitMQ 認爲 message 一旦被 deliver 出去了,就已被確認了,因此會當即將緩存中的 message 刪除。因此在 consumer 異常時會致使消息丟失。
  • 來自 consumer 側的 Basic.Ack 與 發送給 Producer 側的 Basic.Ack 沒有直接關係。

 

正題部分(配置手動Ack,實現異常消息回滾)

 

A. 在消費者端的mq配置文件上添加,配置  關鍵代碼爲 acknowledeg = "manual",意爲表示該消費者的ack方式爲手動(此時的queue已經和生產者的exchange經過某個routeKey綁定了)css

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener queues="queue_xxx" ref="MqConsumer"/> <rabbit:listener queues="queue_xxx" ref="MqConsumer2"/> </rabbit:listener-container>

B. 新建一個類 MqConsumer ,並實現接口  ChannelAwareMessageListener ,實現onMessage方法,不須要指定方法。html

springAMQP中已經實現了一個功能,若是該監聽器已經實現了下面2個接口,則直接調用onMessage方法java

C. 關鍵點在實現了ChannelAwareMessageListener的onMessage方法後,會有2個參數。redis

一個是message(消息實體),一個是channel就是當前的通道,不少地方都沒有說清楚怎麼去手動ack,其實手動ack就是在當前channel裏面調用basicAsk的方法,並傳入當前消息的tagId就能夠了。spring

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

其中deliveryTag是tag的id,由生產者生成。第二個參數我其實也沒理解用途,暫時尚未模擬出場景,因此先不討論。apache

一樣的,若是要Nack或者拒絕消息(reject)的時候,也是調用channel裏面的basicXXX方法就能夠了(固然要制定tagId)。注意若是拋異常或Nack(而且requeue爲true),消息會一直從新入隊列,一不當心就會重複一大堆消息不斷出現~。json

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack返回false,並從新回到隊列,api裏面解釋得很清楚 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息

D. 針對上面所描述的狀況,咱們在搭建一個消息隊列時候,咱們的思路應該是這樣的,首先,咱們要啓動ack的手動方式,緊接着,咱們處理代碼邏輯,若是發生了異常信息,咱們首先通知到ack,我已經表示接受到這條數據了,你能夠進行刪除了,不須要讓他自動的從新進入隊列中,而後,咱們啓用一個錯誤處理,手動將其從新插入隊列中,在此以前,有幾個類和Api一塊兒來看一下。api

    1. SimpleMessageListenerContainer緩存

    這個是咱們的基礎監聽,他的做用就是隊列的總監聽,能夠爲其配置ack模式,異常處理類等。。

    2. org.springframework.amqp.support.converter.SimpleMessageConverter

    這個類和下面的Converter類很容易搞混淆,這個類的做用是能夠解析隊列中的 message 轉 obj

    3. org.springframework.amqp.rabbit.retry.MessageRecoverer

    這個接口,須要咱們開發者自定義實現,其中的一個方法recover(Message message, Throwable cause),就能夠看出來他是幹嗎的,就是說在監聽出錯,也就是沒有抓取的異常而是拋出的異常會觸發該方法,咱們就會在這個接口的實現中,將消息從新入隊列

    4. org.springframework.util.ErrorHandler

    這個接口也是在出現異常時候,會觸發他的方法

E.  完整實例

    1. spring配置隊列xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <!-- 鏈接服務配置 --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" channel-cache-size="${rabbitmq.channel.cache.size}" /> <!-- 設置Ack模式爲手動 --> <bean id="ackManual" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean"> <property name="staticField" value="org.springframework.amqp.core.AcknowledgeMode.MANUAL" /> </bean> <!-- 異常處理,記錄異常信息 --> <bean id="mqErrorHandler" class="com.zefun.wechat.utils.MQErrorHandler"/> <!-- 將類自動注入,可解析msg信息 --> <bean id="msgConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" /> <!-- 建立rabbitAdmin 代理類 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory" /> <!-- 建立SimpleMessageListenerContainer的理想通道,主要實現異常事件處理邏輯 --> <bean id="retryOperationsInterceptorFactoryBean" class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean"> <property name="messageRecoverer"> <bean class="com.zefun.wechat.utils.MQRepublishMessageRecoverer"/> </property> <property name="retryOperations"> <bean class="org.springframework.retry.support.RetryTemplate"> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="500" /> <property name="multiplier" value="10.0" /> <property name="maxInterval" value="10000" /> </bean> </property> </bean> </property> </bean> <!-- 定義隊列,在下面的交換機中引用次隊列,實現綁定 --> <rabbit:queue id="queue_system_error_logger_jmail" name="${rabbitmq.system.out.log.error.mail}" durable="true" auto-delete="false" exclusive="false" /> <!--路由設置 將隊列綁定,屬於direct類型 --> <rabbit:direct-exchange id="directExchange" name="directExchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queue_system_error_logger_jmail" key="${rabbitmq.system.out.log.error.mail}" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- logger 日誌發送功能 --> <bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="acknowledgeMode" ref="ackManual" /> <property name="queueNames" value="${rabbitmq.system.out.log.error.mail}" /> <property name="messageListener"> <bean class="com.zefun.wechat.listener.SystemOutLogErrorMessageNoitce" /> </property> <property name="concurrentConsumers" value="${rabbitmq.concurrentConsumers}" /> <property name="adviceChain" ref="retryOperationsInterceptorFactoryBean" /> <property name="errorHandler" ref="mqErrorHandler" /> </bean> </beans>

    2. MessageRecoverer 配置,將當心從新入隊列

package com.zefun.wechat.utils; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Map; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; public class MQRepublishMessageRecoverer implements MessageRecoverer { private static final Logger logger = Logger.getLogger(MQRepublishMessageRecoverer.class); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MessageConverter msgConverter; @Override public void recover(Message message, Throwable cause) { Map<String, Object> headers = message.getMessageProperties().getHeaders(); headers.put("x-exception-stacktrace", getStackTraceAsString(cause)); headers.put("x-exception-message", cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage()); headers.put("x-original-exchange", message.getMessageProperties().getReceivedExchange()); headers.put("x-original-routingKey", message.getMessageProperties().getReceivedRoutingKey()); this.rabbitTemplate.send(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message); logger.error("handler msg (" + msgConverter.fromMessage(message) + ") err, republish to mq.", cause); } private String getStackTraceAsString(Throwable cause) { StringWriter stringWriter = new StringWriter(); PrintWriter printWriter = new PrintWriter(stringWriter, true); cause.printStackTrace(printWriter); return stringWriter.getBuffer().toString(); } } 

    3. MQErrorHandler 寫法,在出現異常時,記錄異常

package com.zefun.wechat.utils;

import java.lang.reflect.Field;
import java.util.Date;

import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ErrorHandler;

import com.zefun.wechat.service.RedisService;

public class MQErrorHandler implements ErrorHandler {

    private static final Logger logger = Logger.getLogger(MQErrorHandler.class);

    @Autowired
    private RedisService redisService;
    @Autowired
    private MessageConverter msgConverter;

    @Override
    public void handleError(Throwable cause) {
        Field mqMsgField = FieldUtils.getField(MQListenerExecutionFailedException.class, "mqMsg", true);
        if (mqMsgField != null) {
            try {
                Message mqMsg = (Message) mqMsgField.get(cause);
                Object msgObj = msgConverter.fromMessage(mqMsg);
                logger.error("handle MQ msg: " + msgObj + " failed, record it to redis.", cause);
                redisService.zadd(App.MsgErr.MQ_MSG_ERR_RECORD_KEY, new Double(new Date().getTime()), msgObj.toString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        } else {
            logger.error("An error occurred.", cause);
        }
    }

}

    4. SystemOutLogErrorMessageNoitce 實現 ChannelAwareMessageListener接口,處理郵件服務

package com.zefun.wechat.listener; import javax.mail.internet.MimeMessage; import org.apache.log4j.Logger; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.mail.javamail.JavaMailSenderImpl; import org.springframework.mail.javamail.MimeMessageHelper; import com.rabbitmq.client.Channel; import com.zefun.wechat.utils.App; import net.sf.json.JSONObject; public class SystemOutLogErrorMessageNoitce implements ChannelAwareMessageListener { private static final Logger logger = Logger.getLogger(MemberWechatMessageTextNoitce.class); @Autowired private MessageConverter msgConverter; /** logger b */ @Autowired private JavaMailSenderImpl senderImpl; @Override public void onMessage(Message message, Channel channel) throws Exception { Object obj = null; try { obj = msgConverter.fromMessage(message); } catch (MessageConversionException e) { logger.error("convert MQ message error.", e); } finally { long deliveryTag = message.getMessageProperties().getDeliveryTag(); if (deliveryTag != App.DELIVERIED_TAG) { channel.basicAck(deliveryTag, false); message.getMessageProperties().setDeliveryTag(App.DELIVERIED_TAG); logger.info("revice and ack msg: " + (obj == null ? message : new String((byte[]) obj))); } } if (obj == null) { return; } JSONObject map = JSONObject.fromObject(obj); sendMailSystemLoggerError(map.getString("date"), map.getString("subject"), map.getString("domain"), map.getString("requestURL"), map.getString("message")); } /** * jmail logger  * @param date 日期 * @param subject 主題帳戶 * @param domain 域名環境 * @param message logger日誌 * @param requestURL 請求路徑 * @throws Exception 異常信息 */ public void sendMailSystemLoggerError(String date, String subject, String domain, String requestURL, String message) throws Exception{ MimeMessage mailMessage = this.senderImpl.createMimeMessage(); MimeMessageHelper messageHelper = new MimeMessageHelper(mailMessage, true); messageHelper.setTo("1043851832@qq.com"); messageHelper.setFrom("18734911338@163.com"); messageHelper.setSubject(date + " 系統異常"); String msg = "<p>異常時間:" + date + "</p><p>門店企業:" + subject + "</p>" + "<p>部署環境:" + domain + "</p><p>異常鏈接:" + requestURL + "</p>" + "<p>異常內容:</p>" + message; messageHelper.setText("<html><head></head><body>" + msg + "</body></html>", true); senderImpl.send(mailMessage); logger.info("jmail push message success"); } } 

 

E. rabbitMq中文文檔,方便查閱API http://rabbitmq.mr-ping.com/AMQP/amqp-0-9-1-quickref.html

相關文章
相關標籤/搜索