版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。數據庫
預發送消息失敗,消息未進存儲,業務操做未執行(可能的緣由:主動方應用、網絡、消息中間件、消息存儲),數據此時保持一致。網絡
預發送消息後,主動方業務沒有收到返回消息存儲結果,分爲兩種可能session
(1)消息未進存儲,業務操做未執行 數據此時保持一致。
(2)消息已進存儲(待確認),業務操做未執行 數據此時保持不一致。
複製代碼
收到消息存儲成功的返回結果,但未執行業務操做就失敗併發
消息已進存儲(待確認),業務操做未執行 數據此時保持不一致。
複製代碼
消息中間件沒有收到主動方應用的業務操做處理結果異步
(1)消息已進存儲(待確認),業務操做未執行(或 業務操做出錯回滾了) 數據此時保持不一致
(2)消息已進存儲(待確認),業務操做成功 數據此時保持不一致
複製代碼
消息中間件收到業務操做結果(成功/失敗),但處理消息存儲中的消息狀態失敗高併發
(1)消息已進存儲(待確認),業務操做未執行(或業務操做出錯回滾了) 數據此時保持不一致
(2)消息已進存儲(待確認),業務操做成功 數據此時保持不一致
複製代碼
消息中間件返回消息持久化結果(成功/失敗),主動方應用根據返 回結果進行判斷如何進行業務操做處理:大數據
a) 失敗:放棄業務操做處理,結束(必要時向上層返回失敗結果);
b) 成功:執行業務操做處理;
複製代碼
消息中間件收到業務操做結果後,根據業務結果進行處理;優化
a) 失敗:刪除消息存儲中的消息,結束;
b) 成功:更新消息存儲中的消息狀態爲「待發送(可發送)」;
複製代碼
消息服務子系統接口規範spa
public interface RpTransactionMessageService {
/**
* 預存儲消息.
*/
public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 確認併發送消息.
*/
public void confirmAndSendMessage(String messageId) throws MessageBizException;
/**
* 存儲併發送消息.
*/
public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 直接發送消息.
*/
public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 重發消息.
*/
public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 根據messageId重發某條消息.
*/
public void reSendMessageByMessageId(String messageId) throws MessageBizException;
/**
* 將消息標記爲死亡消息.
*/
public void setMessageToAreadlyDead(String messageId) throws MessageBizException;
/**
* 根據消息ID獲取消息
*/
public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;
/**
* 根據消息ID刪除消息
*/
public void deleteMessageByMessageId(String messageId) throws MessageBizException;
/**
* 重發某個消息隊列中的所有已死亡的消息.
*/
public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;
複製代碼
}code
消息類型及核心字段
public class RpTransactionMessage extends BaseEntity {
private static final long serialVersionUID = 1757377457814546156L;
private String messageId;
private String messageBody;
private String messageDataType;
private String consumerQueue;
private Integer messageSendTimes;
private String areadlyDead;
private String field1;
private String field2;
private String field3;
}
複製代碼
消息服務子系統核心實現
預發送消息
public int saveMessageWaitingConfirm(RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息爲空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能爲空 ");
}
message.setEditTime(new Date());
message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());
message.setAreadlyDead(PublicEnum.NO.name());
message.setMessageSendTimes(0);
return rpTransactionMessageDao.insert(message);
}
消息確認併發送
public void confirmAndSendMessage(String messageId) {
final RpTransactionMessage message = getMessageByMessageId(messageId);
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據消息id查找的消息爲空");
}
message.setStatus(MessageStatusEnum.SENDING.name());
message.setEditTime(new Date());
rpTransactionMessageDao.update(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
保存併發送不進行預發送
public int saveAndSendMessage(final RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息爲空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能爲空 ");
}
message.setStatus(MessageStatusEnum.SENDING.name());
message.setAreadlyDead(PublicEnum.NO.name());
message.setMessageSendTimes(0);
message.setEditTime(new Date());
int result = rpTransactionMessageDao.insert(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
return result;
}
直接發送
public void directSendMessage(final RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息爲空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能爲空 ");
}
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
重複發送
public void reSendMessage(final RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息爲空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能爲空 ");
}
message.addSendTimes();
message.setEditTime(new Date());
rpTransactionMessageDao.update(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
public void reSendMessageByMessageId(String messageId) {
final RpTransactionMessage message = getMessageByMessageId(messageId);
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據消息id查找的消息爲空");
}
int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
if (message.getMessageSendTimes() >= maxTimes) {
message.setAreadlyDead(PublicEnum.YES.name());
}
message.setEditTime(new Date());
message.setMessageSendTimes(message.getMessageSendTimes() + 1);
rpTransactionMessageDao.update(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
標記死亡
public void setMessageToAreadlyDead(String messageId) {
RpTransactionMessage message = getMessageByMessageId(messageId);
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據消息id查找的消息爲空");
}
message.setAreadlyDead(PublicEnum.YES.name());
message.setEditTime(new Date());
rpTransactionMessageDao.update(message);
}
複製代碼
業務活動的主動方,在完成業務處理以後,向業務活動的被動方發送消息,容許消息丟失。
業務活動的被動方根據定時策略,向業務活動主動方查詢,恢復丟失的業務消息。
適用範圍
• 對業務最終一致性的時間敏感度低
• 跨企業的業務活動
複製代碼
本文結合工業大數據高併發數據接入場景,經過弱化MQ的消息一致性,增強業務系統的一致性保證,實現了工業級大數據的數據倉庫建設。
秦凱新 於深圳