工業級大數據接入MQ消息發送異常性及最終一致性解決方案-DW商業環境實戰

版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。數據庫

1 工業級高併發數據接入消息一致性保障

  • 消息中間件在高併發數據接入大數據平臺時的的主要做用: 異步通信、解耦、併發緩衝
  • 消息發送一致性:是指產生消息的業務動做與消息發送的一致。 (也就是說,若是業務操做成功,那麼由這個業務操做所產生的消息必定要成功投遞出去,不然就丟消息)

2 工業級MQ數據接入消息一致性理論模型

版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。

2.1 從主動方業務來分析

  • 預發送消息失敗,消息未進存儲,業務操做未執行(可能的緣由:主動方應用、網絡、消息中間件、消息存儲),數據此時保持一致。網絡

  • 預發送消息後,主動方業務沒有收到返回消息存儲結果,分爲兩種可能session

    (1)消息未進存儲,業務操做未執行             數據此時保持一致。
          (2)消息已進存儲(待確認),業務操做未執行    數據此時保持不一致。
    複製代碼
  • 收到消息存儲成功的返回結果,但未執行業務操做就失敗併發

    消息已進存儲(待確認),業務操做未執行         數據此時保持不一致。
    複製代碼

2.2 從消息中間件的角度來分析:

  • 消息中間件沒有收到主動方應用的業務操做處理結果異步

    (1)消息已進存儲(待確認),業務操做未執行(或 業務操做出錯回滾了)         數據此時保持不一致
      (2)消息已進存儲(待確認),業務操做成功                                  數據此時保持不一致
    複製代碼
  • 消息中間件收到業務操做結果(成功/失敗),但處理消息存儲中的消息狀態失敗高併發

    (1)消息已進存儲(待確認),業務操做未執行(或業務操做出錯回滾了)            數據此時保持不一致
    (2)消息已進存儲(待確認),業務操做成功                                    數據此時保持不一致
    複製代碼

2.3 可靠消息的總體流程

    1. 主動方應用先把消息發給消息中間件,消息狀態標記爲「待確認」;
    1. 消息中間件收到消息後,把消息持久化到消息存儲中,但並不向被動方應用投遞消息;
    1. 消息中間件返回消息持久化結果(成功/失敗),主動方應用根據返 回結果進行判斷如何進行業務操做處理:大數據

      a) 失敗:放棄業務操做處理,結束(必要時向上層返回失敗結果);
         b) 成功:執行業務操做處理;
      複製代碼
    1. 業務操做完成後,把業務操做結果(成功/失敗)發送給消息中間件;
    1. 消息中間件收到業務操做結果後,根據業務結果進行處理;優化

      a) 失敗:刪除消息存儲中的消息,結束;
         b) 成功:更新消息存儲中的消息狀態爲「待發送(可發送)」;
      複製代碼
    1. 被動方應用監聽並接收「待發送」狀態的消息,執行業務處理;
    1. 業務處理完成後,向消息中間件發送ACK,確認消息已經收到(消息 主動方應用主機/主機集羣 消息中間件主機/主機集羣 被動方應用主機/主機集羣 中間件將從隊列中刪除該消息)。

2.4 消息重複發送異常分析

  • 1 被動方應用接收到消息,業務處理完成後應用出問題,消息中間件不知道消息處理結果,會從新投遞消息。
  • 2 被動方應用接收到消息,業務處理完成後網絡出問題,消息中間件收不到消息處理結果,會從新投遞消息。
  • 3 被動方應用接收到消息,業務處理時間過長,消息中間件因消息超時未確認,會再次投遞消息。
  • 4 被動方應用接收到消息,業務處理完成,消息中間件問題致使收不到消息處理結果,消息會從新投遞。
  • 5 被動方應用接收到消息,業務處理完成,消息中間件收到了消息處理結果,但因爲消息存儲故障致使消息沒能成功確認, 消息會再次投遞。

3 工業級MQ數據接入消息一致性實際處理方案

版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。

3.1 優勢準則

  • 消息確認及恢復系統
  • 消費者流控確認系統進行消息消費確認,並刪除原生消息。
  • 從應用層弱化對MQ消息可靠性的依賴,經過應用端消息重試及恢復系統和消費者流控確認系統來增強消息的一致性。
  • 經過應用層的監聽和回調,保證了消息的時效性。

3.2 弊端準則

  • 消息系統的可靠性服務(消息重試及恢復系統和消費者流控確認系統)與具體業務場景進行綁定,耦合性強,不可共用
  • 消息數據和業務數據同一數據庫,佔用了業務系統資源。

4 工業級MQ數據接入消息一致性實際處理優化演進方案

4.1 優勢準則

  • 消息服務子系統能夠獨立部署,獨立維護及彈性伸縮
  • 減輕與應用層的強耦合,並弱化對MQ消息可靠性的依賴。

4.2 弊端準則

  • 消息的發送須要兩次請求
  • 產品線主動方系統須要提供操做狀態校驗查詢接口查詢

4.3 核心代碼剖析

  • 消息服務子系統接口規範spa

    public interface RpTransactionMessageService {
      
      /**
       * 預存儲消息. 
       */
      public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
      
      
      /**
       * 確認併發送消息.
       */
      public void confirmAndSendMessage(String messageId) throws MessageBizException;
    
      
      /**
       * 存儲併發送消息.
       */
      public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
    
      
      /**
       * 直接發送消息.
       */
      public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
      
      
      /**
       * 重發消息.
       */
      public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
      
      
      /**
       * 根據messageId重發某條消息.
       */
      public void reSendMessageByMessageId(String messageId) throws MessageBizException;
      
      
      /**
       * 將消息標記爲死亡消息.
       */
      public void setMessageToAreadlyDead(String messageId) throws MessageBizException;
    
    
      /**
       * 根據消息ID獲取消息
       */
      public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;
    
      /**
       * 根據消息ID刪除消息
       */
      public void deleteMessageByMessageId(String messageId) throws MessageBizException;
      
      
      /**
       * 重發某個消息隊列中的所有已死亡的消息.
       */
      public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;
    複製代碼

    }code

  • 消息類型及核心字段

    public class RpTransactionMessage extends BaseEntity {
      
      	private static final long serialVersionUID = 1757377457814546156L;
      
      	private String messageId;
      
      	private String messageBody;
      
      	private String messageDataType;
      
      	private String consumerQueue;
      
      	private Integer messageSendTimes;
      
      	private String areadlyDead;
      
      	private String field1;
      
      	private String field2;
      
      	private String field3;
      }
    複製代碼
  • 消息服務子系統核心實現

    預發送消息
      public int saveMessageWaitingConfirm(RpTransactionMessage message) {
      		
      		if (message == null) {
      			throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息爲空");
      		}
      
      		if (StringUtil.isEmpty(message.getConsumerQueue())) {
      			throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能爲空 ");
      		}
      		
      		message.setEditTime(new Date());
      		message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());
      		message.setAreadlyDead(PublicEnum.NO.name());
      		message.setMessageSendTimes(0);
      		return rpTransactionMessageDao.insert(message);
      	}
      
      消息確認併發送	
      public void confirmAndSendMessage(String messageId) {
      	final RpTransactionMessage message = getMessageByMessageId(messageId);
      	if (message == null) {
      		throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據消息id查找的消息爲空");
      	}
      	
      	message.setStatus(MessageStatusEnum.SENDING.name());
      	message.setEditTime(new Date());
      	rpTransactionMessageDao.update(message);
      	
      	notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
      	notifyJmsTemplate.send(new MessageCreator() {
      		public Message createMessage(Session session) throws JMSException {
      			return session.createTextMessage(message.getMessageBody());
      		}
      	});
      }
      
      
      保存併發送不進行預發送
      public int saveAndSendMessage(final RpTransactionMessage message) {
      	if (message == null) {
      		throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息爲空");
      	}
    
      	if (StringUtil.isEmpty(message.getConsumerQueue())) {
      		throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能爲空 ");
      	}
    
      	message.setStatus(MessageStatusEnum.SENDING.name());
      	message.setAreadlyDead(PublicEnum.NO.name());
      	message.setMessageSendTimes(0);
      	message.setEditTime(new Date());
      	int result = rpTransactionMessageDao.insert(message);
    
      	notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
      	notifyJmsTemplate.send(new MessageCreator() {
      		public Message createMessage(Session session) throws JMSException {
      			return session.createTextMessage(message.getMessageBody());
      		}
      	});
      
          return result;
      }
     
     直接發送 
      public void directSendMessage(final RpTransactionMessage message) {
    
      	if (message == null) {
      		throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息爲空");
      	}
    
      	if (StringUtil.isEmpty(message.getConsumerQueue())) {
      		throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能爲空 ");
      	}
    
      	notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
      	notifyJmsTemplate.send(new MessageCreator() {
      		public Message createMessage(Session session) throws JMSException {
      			return session.createTextMessage(message.getMessageBody());
      		}
      	});
      }
    
      重複發送
      public void reSendMessage(final RpTransactionMessage message) {
      
      		if (message == null) {
      			throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息爲空");
      		}
      		
      		if (StringUtil.isEmpty(message.getConsumerQueue())) {
      			throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能爲空 ");
      		}
      		
      		message.addSendTimes();
      		message.setEditTime(new Date());
      		rpTransactionMessageDao.update(message);
      
      		notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
      		notifyJmsTemplate.send(new MessageCreator() {
      			public Message createMessage(Session session) throws JMSException {
      				return session.createTextMessage(message.getMessageBody());
      			}
      		});
      	}
      	
      
      public void reSendMessageByMessageId(String messageId) {
      	final RpTransactionMessage message = getMessageByMessageId(messageId);
      	if (message == null) {
      		throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據消息id查找的消息爲空");
      	}
      	
      	int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
      	if (message.getMessageSendTimes() >= maxTimes) {
      		message.setAreadlyDead(PublicEnum.YES.name());
      	}
      	
      	message.setEditTime(new Date());
      	message.setMessageSendTimes(message.getMessageSendTimes() + 1);
      	rpTransactionMessageDao.update(message);
      	
      	notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
      	notifyJmsTemplate.send(new MessageCreator() {
      		public Message createMessage(Session session) throws JMSException {
      			return session.createTextMessage(message.getMessageBody());
      		}
      	});
      }	
      	
      標記死亡
      public void setMessageToAreadlyDead(String messageId) {
      	RpTransactionMessage message = getMessageByMessageId(messageId);
      	if (message == null) {
      		throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據消息id查找的消息爲空");
      	}
      	
      	message.setAreadlyDead(PublicEnum.YES.name());
      	message.setEditTime(new Date());
      	rpTransactionMessageDao.update(message);
      }
    複製代碼

5 最大努力通知型業務邏輯實現工業預警

  • 業務活動的主動方,在完成業務處理以後,向業務活動的被動方發送消息,容許消息丟失。

  • 業務活動的被動方根據定時策略,向業務活動主動方查詢,恢復丟失的業務消息。

  • 適用範圍

    • 對業務最終一致性的時間敏感度低
      • 跨企業的業務活動
    複製代碼

6 總結

本文結合工業大數據高併發數據接入場景,經過弱化MQ的消息一致性,增強業務系統的一致性保證,實現了工業級大數據的數據倉庫建設。

秦凱新 於深圳

相關文章
相關標籤/搜索