解決事物提交與消息發送順序問題

最近在線上發現了一個問題,mq的監聽時常會報消息不存在的異常,關鍵代碼以下:apache

public void sendMessage(MessageData message) throws Exception {
        if (message == null) return;

        // 持久化消息 ①
        String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3());
        log.info("create transaction control messageid = " + messageId);
        message.setMessageId(messageId);

        // 發送消息 ②
        ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000);

}

致使的緣由就是 ②已經消息發送了,可是①尚未事物提交,就致使了問題。this

解決辦法 一、 增長延遲發送 。spa

                二、 增長事物監聽。code

針對1方法,若是是activemq,有一個須要注意的地方, 須要修改activemq.xml  xml

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">blog

 即,增長 schedulerSupport="true" 參數get

針對2方法, 須要先建立一個 TransactionalMessageListener 類it

@Component
@Slf4j
public class TransactionalMessageListener {

    @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT)
    public void afterCommit(EventMessage eventMessage) throws Exception {

        if (eventMessage == null) return;

        if (eventMessage.getEventType() == null) throw new NullPointerException("eventType");

        if (eventMessage.getEventType() == EventType.ACTIVE_MQ) { // 防止事物已經尚未提交,mq監聽器就已經收到了消息

            if (eventMessage.getData() instanceof MessageData) {
                MessageData message = (MessageData) eventMessage.getData();
                String sendMessage = JSONObject.toJSONString(message);
                log.info("push msg to activeMq, context :{ " + sendMessage + "}");
                try {
                    ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000);
                } catch (Exception e) {
                    log.error("push msg error to activeMq:{" + sendMessage + "}", e);
                }
            }

        }
    }

}
public class EventMessage<T> {

    public EventMessage(EventType eventType, T data) {
        this.eventType = eventType;
        this.data = data;
    }

    private EventType eventType;

    private T data;

    public EventType getEventType() {
        return eventType;
    }

    public void setEventType(EventType eventType) {
        this.eventType = eventType;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }
}

而後修改原來邏輯以下io

 
 
@Autowired
ApplicationEventPublisher publisher;
public void sendMessage(MessageData message) throws Exception {
        if (message == null) return;

        String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3());
        log.info("create transaction control messageid = " + messageId);
        message.setMessageId(messageId);

publisher.publishEvent(new EventMessage<MessageData>(EventType.MQ, message));
 }

這裏我須要對這個進行解釋一下:   @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT)     event

fallbackExecution = true 是爲了保證沒有事物的時候也能正常收到消息

 phase = TransactionPhase.AFTER_COMMIT  表明提交後監聽

相關文章
相關標籤/搜索