最近在線上發現了一個問題,mq的監聽時常會報消息不存在的異常,關鍵代碼以下:apache
public void sendMessage(MessageData message) throws Exception { if (message == null) return; // 持久化消息 ① String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3()); log.info("create transaction control messageid = " + messageId); message.setMessageId(messageId); // 發送消息 ② ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000); }
致使的緣由就是 ②已經消息發送了,可是①尚未事物提交,就致使了問題。this
解決辦法 一、 增長延遲發送 。spa
二、 增長事物監聽。code
針對1方法,若是是activemq,有一個須要注意的地方, 須要修改activemq.xml xml
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">blog
即,增長 schedulerSupport="true" 參數get
針對2方法, 須要先建立一個 TransactionalMessageListener 類it
@Component @Slf4j public class TransactionalMessageListener { @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT) public void afterCommit(EventMessage eventMessage) throws Exception { if (eventMessage == null) return; if (eventMessage.getEventType() == null) throw new NullPointerException("eventType"); if (eventMessage.getEventType() == EventType.ACTIVE_MQ) { // 防止事物已經尚未提交,mq監聽器就已經收到了消息 if (eventMessage.getData() instanceof MessageData) { MessageData message = (MessageData) eventMessage.getData(); String sendMessage = JSONObject.toJSONString(message); log.info("push msg to activeMq, context :{ " + sendMessage + "}"); try { ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000); } catch (Exception e) { log.error("push msg error to activeMq:{" + sendMessage + "}", e); } } } } }
public class EventMessage<T> { public EventMessage(EventType eventType, T data) { this.eventType = eventType; this.data = data; } private EventType eventType; private T data; public EventType getEventType() { return eventType; } public void setEventType(EventType eventType) { this.eventType = eventType; } public T getData() { return data; } public void setData(T data) { this.data = data; } }
而後修改原來邏輯以下io
@Autowired
ApplicationEventPublisher publisher;
public void sendMessage(MessageData message) throws Exception { if (message == null) return; String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3()); log.info("create transaction control messageid = " + messageId); message.setMessageId(messageId);
publisher.publishEvent(new EventMessage<MessageData>(EventType.MQ, message));
}
這裏我須要對這個進行解釋一下: @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT) event
fallbackExecution = true 是爲了保證沒有事物的時候也能正常收到消息
phase = TransactionPhase.AFTER_COMMIT 表明提交後監聽