QMQ源碼分析之delay-server篇【二】

總體結構

要了解delay-server源碼的一個總體結構,須要咱們跟着源碼,從初始化開始簡單先過一遍。重試化的工做都在startup這個包裏,而這個包只有一個ServerWrapper類。 結合上一篇的內容,經過這個類就基本能看到delay的一個源碼結構。delay-server基於netty,init方法完成初始化工做(端口默認爲2080一、心跳、wheel等),register方法是向meta-server發起請求,獲取本身本身的角色delay,並開始和meta-server的心跳。startServer方法是開始HashWheel的轉動,從上次結束的位置繼續message_log的回放,開啓netty server。另外在作準備工做時知道QMQ是基於一主一從一備的方式,關於這個sync方法,是開啓監聽一個端口迴應同步拉取動做,若是是從節點還要開始向主節點發起同步拉取動做。當這一切都完成了,那麼online方法就執行,表示delay開始上線提供服務了。總結一下兩個要點,QMQ是基於netty進行通訊,而且採用一主一從一備的方式。java

存儲

關於存儲在以前咱們也討論了,delay-server接收到延遲消息,會順序append到message_log,以後再對message_log進行回放,以生成schedule_log。因此關於存儲咱們須要關注兩個東西,一個是message_log的存儲,另外一個是schedule_log的生成。多線程

message_log

其實message_log的生成很簡單,就是順序append。主要邏輯在qunar.tc.qmq.delay.receiver.Receiver這個類裏,大體流程就是關於QMQ自定義協議的一個反序列化,而後再對序列化的單個消息進行存儲。如圖:app

主要邏輯在途中標紅方法 doInvoke中。

private void doInvoke(ReceivedDelayMessage message) {
        // ... 

        try {
          // 注:這裏是進行append的地方
          ReceivedResult result = facade.appendMessageLog(message);
          offer(message, result);
        } catch (Throwable t) {
           error(message, t);
        }
    }
複製代碼

delay存儲層相關邏輯都在facade這個類裏,初始化時相似消息的校驗等工做也都在這裏,而message_log的相關操做都在messageLog裏。ide

@Override
    public AppendMessageRecordResult append(RawMessageExtend record) {
        AppendMessageResult<Long> result;
        // 注:當前最新的一個segment
        LogSegment segment = logManager.latestSegment();
        if (null == segment) {
            segment = logManager.allocNextSegment();
        }

        if (null == segment) {
            return new AppendMessageRecordResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null);
        }

				// 注:真正進行append的動做是messageAppender
        result = segment.append(record, messageAppender);
        switch (result.getStatus()) {
            case MESSAGE_SIZE_EXCEEDED:
                return new AppendMessageRecordResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
            case END_OF_FILE:
                if (null == logManager.allocNextSegment()) {
                    return new AppendMessageRecordResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null);
                }
                return append(record);
            case SUCCESS:
                return new AppendMessageRecordResult(PutMessageStatus.SUCCESS, result);
            default:
                return new AppendMessageRecordResult(PutMessageStatus.UNKNOWN_ERROR, result);
        }
    }
    
    // 看一下這個appender,也能夠經過這裏能看到QMQ的delay message 格式定義
    private class DelayRawMessageAppender implements MessageAppender<RawMessageExtend, Long> {
        private final ReentrantLock lock = new ReentrantLock();
        private final ByteBuffer workingBuffer = ByteBuffer.allocate(1024);

        @Override
        public AppendMessageResult<Long> doAppend(long baseOffset, ByteBuffer targetBuffer, int freeSpace, RawMessageExtend message) {
            // 這個lock這裏影響不大
            lock.lock();
            try {
                workingBuffer.clear();

                final String messageId = message.getHeader().getMessageId();
                final byte[] messageIdBytes = messageId.getBytes(StandardCharsets.UTF_8);
                final String subject = message.getHeader().getSubject();
                final byte[] subjectBytes = subject.getBytes(StandardCharsets.UTF_8);

                final long startWroteOffset = baseOffset + targetBuffer.position();
                final int recordSize = recordSizeWithCrc(messageIdBytes.length, subjectBytes.length, message.getBodySize());

                if (recordSize > config.getSingleMessageLimitSize()) {
                    return new AppendMessageResult<>(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED, startWroteOffset, freeSpace, null);
                }

                workingBuffer.flip();
                if (recordSize != freeSpace && recordSize + MIN_RECORD_BYTES > freeSpace) {
                		// 填充
                    workingBuffer.limit(freeSpace);
                    workingBuffer.putInt(MESSAGE_LOG_MAGIC_V1);
                    workingBuffer.put(MessageLogAttrEnum.ATTR_EMPTY_RECORD.getCode());
                    workingBuffer.putLong(System.currentTimeMillis());
                    targetBuffer.put(workingBuffer.array(), 0, freeSpace);
                    return new AppendMessageResult<>(AppendMessageStatus.END_OF_FILE, startWroteOffset, freeSpace, null);
                } else {
                    int headerSize = recordSize - message.getBodySize();
                    workingBuffer.limit(headerSize);
                    workingBuffer.putInt(MESSAGE_LOG_MAGIC_V2);
                    workingBuffer.put(MessageLogAttrEnum.ATTR_MESSAGE_RECORD.getCode());
                    workingBuffer.putLong(System.currentTimeMillis());
                    // 注意這裏,是schedule_time ,即延遲時間
                    workingBuffer.putLong(message.getScheduleTime());
                    // sequence,每一個brokerGroup應該是惟一的
                    workingBuffer.putLong(sequence.incrementAndGet());
                    workingBuffer.putInt(messageIdBytes.length);
                    workingBuffer.put(messageIdBytes);
                    workingBuffer.putInt(subjectBytes.length);
                    workingBuffer.put(subjectBytes);
                    workingBuffer.putLong(message.getHeader().getBodyCrc());
                    workingBuffer.putInt(message.getBodySize());
                    targetBuffer.put(workingBuffer.array(), 0, headerSize);
                    targetBuffer.put(message.getBody().nioBuffer());

                    final long payloadOffset = startWroteOffset + headerSize;
                    return new AppendMessageResult<>(AppendMessageStatus.SUCCESS, startWroteOffset, recordSize, payloadOffset);
                }
            } finally {
                lock.unlock();
            }
        }
    }
複製代碼

以上基本就是message_log的存儲部分,接下來咱們來看message_log的回放生成schedule_log。工具

schedule_log

MessageLogReplayer這個類就是控制回放的地方。那麼考慮一個問題,下一次重啓的時候,咱們該從哪裏進行回放?QMQ是會有一個回放的offset,這個offset會定時刷盤,下次重啓的時候會從這個offset位置開始回放。細節能夠看一下下面這段代碼塊。源碼分析

final LogVisitor<LogRecord> visitor = facade.newMessageLogVisitor(iterateFrom.longValue());
            adjustOffset(visitor);

            while (true) {
                final Optional<LogRecord> recordOptional = visitor.nextRecord();
                if (recordOptional.isPresent() && recordOptional.get() == DelayMessageLogVisitor.EMPTY_LOG_RECORD) {
                    break;
                }

                recordOptional.ifPresent((record) -> {
                		// post以進行存儲
                    dispatcher.post(record);
                    long checkpoint = record.getStartWroteOffset() + record.getRecordSize();
                    this.cursor.addAndGet(record.getRecordSize());
                    facade.updateIterateOffset(checkpoint);
                });
            }
            iterateFrom.add(visitor.visitedBufferSize());

            try {
                TimeUnit.MILLISECONDS.sleep(5);
            } catch (InterruptedException e) {
                LOGGER.warn("message log iterate sleep interrupted");
            }
複製代碼

注意這裏除了offset還有個cursor,這是爲了防止回放失敗,sleep 5ms後再次回放的時候從cursor位置開始,避免重複消息。那麼咱們看一下dispatcher.post這個方法:post

@Override
    public void post(LogRecord event) {
    		// 這裏是schedule_log
        AppendLogResult<ScheduleIndex> result = facade.appendScheduleLog(event);
        int code = result.getCode();
        if (MessageProducerCode.SUCCESS != code) {
            LOGGER.error("appendMessageLog schedule log error,log:{} {},code:{}", event.getSubject(), event.getMessageId(), code);
            throw new AppendException("appendScheduleLogError");
        }

				// 先看這裏
        iterateCallback.apply(result.getAdditional());
    }
複製代碼

如以上代碼,咱們看略過schedule_log的存儲,看一下那個callback是幾個意思:this

private boolean iterateCallback(final ScheduleIndex index) {
    		// 延遲時間
        long scheduleTime = index.getScheduleTime();
        // 這個offset是startOffset,即在delay_segment中的這個消息的起始位置
        long offset = index.getOffset();
        // 是否add到內存中的HashWheel
        if (wheelTickManager.canAdd(scheduleTime, offset)) {
            wheelTickManager.addWHeel(index);
            return true;
        }

        return false;
    }
複製代碼

這裏的意思是,delay-server接收到消息,會判斷一下這個消息是否須要add到內存中的wheel中,以防止丟消息。你們記着有這個事情,在投遞小節中咱們回過頭來再說這裏。那麼回到facade.appendScheduleLog這個方法,schedule_log相關操做在scheduleLog裏:spa

@Override
    public RecordResult<T> append(LogRecord record) {
        long scheduleTime = record.getScheduleTime();
        // 這裏是根據延遲時間定位對應的delaySegment的
        DelaySegment<T> segment = locateSegment(scheduleTime);
        if (null == segment) {
            segment = allocNewSegment(scheduleTime);
        }

        if (null == segment) {
            return new NopeRecordResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED);
        }

				// 具體動做在append裏
        return retResult(segment.append(record, appender));
    }
複製代碼

留意locateSegment這個方法,它是根據延遲時間定位DelaySegment,好比若是延遲時間是2019-03-03 16:00:00,那麼就會定位到201903031600這個DelaySegment(注:這裏貼的代碼不是最新的,最新的是DelaySegment的刻度是能夠配置,到分鐘級別)。一樣,具體動做也是appender作的,以下:線程

@Override
    public AppendRecordResult<ScheduleSetSequence> appendLog(LogRecord log) {
        workingBuffer.clear();
        workingBuffer.flip();
        final byte[] subjectBytes = log.getSubject().getBytes(StandardCharsets.UTF_8);
        final byte[] messageIdBytes = log.getMessageId().getBytes(StandardCharsets.UTF_8);
        int recordSize = getRecordSize(log, subjectBytes.length, messageIdBytes.length);
        workingBuffer.limit(recordSize);

        long scheduleTime = log.getScheduleTime();
        long sequence = log.getSequence();
        workingBuffer.putLong(scheduleTime);
        // message_log中的sequence
        workingBuffer.putLong(sequence);
        workingBuffer.putInt(log.getPayloadSize());
        workingBuffer.putInt(messageIdBytes.length);
        workingBuffer.put(messageIdBytes);
        workingBuffer.putInt(subjectBytes.length);
        workingBuffer.put(subjectBytes);
        workingBuffer.put(log.getRecord());
        workingBuffer.flip();
        ScheduleSetSequence record = new ScheduleSetSequence(scheduleTime, sequence);
        return new AppendRecordResult<>(AppendMessageStatus.SUCCESS, 0, recordSize, workingBuffer, record);
    }
複製代碼

這裏也能看到schedule_log的消息格式。

投遞

投遞的相關內容在WheelTickManager這個類。提早加載schedule_log、wheel根據延遲時間到時進行投遞等相關工做都在這裏完成。而關於真正進行投遞的相關類是在sender這個包裏。

wheel

wheel包裏一共就三個類文件,HashWheelTimer、WheelLoadCursor、WheelTickManager,WheelTickManager就應該是wheel加載文件,wheel中的消息到時投遞的管理器;WheelLoadCursor應該就是上一篇中提到的schedule_log文件加載到哪裏的cursor標識;那麼HashWheelTimer就是一個輔助工具類,簡單理解成Java中的ScheduledExecutorService,可理解成是根據延遲消息的延遲時間進行投遞的timer,因此這裏不對這個工具類作更多解讀,咱們更關心MQ邏輯。

首先來看提早必定時間加載schedule_log,這裏的提早必定時間是多長時間呢?這個是根據須要配置的,好比3schedule_log的刻度自定義配置爲1h,提早加載時間配置爲30min,那麼在2019-02-10 17:30就應該加載2019021018這個schedule_log。

@Override
    public void start() {
        if (!isStarted()) {
            sender.init();
            // hash wheel timer,內存中的wheel
            timer.start();
            started.set(true);
            // 根據dispatch log,從上次投遞結束的地方恢復開始投遞
            recover();
            // 加載線程,用於加載schedule_log
            loadScheduler.scheduleWithFixedDelay(this::load, 0, config.getLoadSegmentDelayMinutes(), TimeUnit.MINUTES);
            LOGGER.info("wheel started.");
        }
    }
複製代碼

recover這個方法,會根據dispatch log中的投遞記錄,找到上一次最後投遞的位置,在delay-server重啓的時候,wheel會根據這個位置恢復投遞。

private void recover() {
        LOGGER.info("wheel recover...");
      	// 最新的dispatch log segment
        DispatchLogSegment currentDispatchedSegment = facade.latestDispatchSegment();
        if (currentDispatchedSegment == null) {
            LOGGER.warn("load latest dispatch segment null");
            return;
        }

        int latestOffset = currentDispatchedSegment.getSegmentBaseOffset();
        DispatchLogSegment lastSegment = facade.lowerDispatchSegment(latestOffset);
        if (null != lastSegment) doRecover(lastSegment);

      	// 根據最新的dispatch log segment進行恢復投遞
        doRecover(currentDispatchedSegment);
        LOGGER.info("wheel recover done. currentOffset:{}", latestOffset);
    }

     private void doRecover(DispatchLogSegment dispatchLogSegment) {
        int segmentBaseOffset = dispatchLogSegment.getSegmentBaseOffset();
        ScheduleSetSegment setSegment = facade.loadScheduleLogSegment(segmentBaseOffset);
        if (setSegment == null) {
            LOGGER.error("load schedule index error,dispatch segment:{}", segmentBaseOffset);
            return;
        }

      	// 獲得一個關於已投遞記錄的set
        LongHashSet dispatchedSet = loadDispatchLog(dispatchLogSegment);
      	// 根據這個set,將最新的dispatch log segment中未投遞的消息add in wheel。
        WheelLoadCursor.Cursor loadCursor = facade.loadUnDispatch(setSegment, dispatchedSet, this::refresh);
        int baseOffset = loadCursor.getBaseOffset();
      	// 記錄cursor
        loadingCursor.shiftCursor(baseOffset, loadCursor.getOffset());
        loadedCursor.shiftCursor(baseOffset);
    }
複製代碼

恢復基本就是以上的這些內容,接下來看看是如何加載的

private void load() {
      	// 提早必定時間加載到下一 delay segment
        long next = System.currentTimeMillis() + config.getLoadInAdvanceTimesInMillis();
        int prepareLoadBaseOffset = resolveSegment(next);
        try {
          	// 加載到prepareLoadBaseOffset這個delay segment
            loadUntil(prepareLoadBaseOffset);
        } catch (InterruptedException ignored) {
            LOGGER.debug("load segment interrupted");
        }
    }

    private void loadUntil(int until) throws InterruptedException {
      	// 當前wheel已加載到baseOffset
        int loadedBaseOffset = loadedCursor.baseOffset();
      	// 如已加載到until,則break
        // have loaded
        if (loadedBaseOffset > until) return;

        do {
          	// 加載失敗,則break
            // wait next turn when loaded error.
            if (!loadUntilInternal(until)) break;

          	// 當前並無until這個delay segment,即loading cursor小於until
            // load successfully(no error happened) and current wheel loading cursor < until
            if (loadingCursor.baseOffset() < until) {
              	// 阻塞,直到thresholdTime+blockingExitTime
              	// 即若是提早blockingExitTime還未有until這個delay segment的消息進來,則退出
                long thresholdTime = System.currentTimeMillis() + config.getLoadBlockingExitTimesInMillis();
                // exit in a few minutes in advance
                if (resolveSegment(thresholdTime) >= until) {
                    loadingCursor.shiftCursor(until);
                    loadedCursor.shiftCursor(until);
                    break;
                }
            }

          	// 避免cpu load太高
            Thread.sleep(100);
        } while (loadedCursor.baseOffset() < until);

        LOGGER.info("wheel load until {} <= {}", loadedCursor.baseOffset(), until);
    }
複製代碼

根據配置的提早加載時間,內存中的wheel會提早加載schedule_log,加載是在一個while循環裏,直到加載到until delay segment才退出,若是當前沒有until 這個delay segment,那麼會在配置的blockingExitTime時間退出該循環,而爲了不cpu load太高,這裏會在每次循環間隔設置100ms sleep。這裏加載爲何是在while循環裏?以及爲何sleep 100ms,sleep 500ms 或者1s可不能夠?以及爲何要設置個blockingExitTime呢?下面的分析以後,應該就能回答這些問題了。主要考慮兩種狀況,一種是當以前一直沒有delay segment或者delay segment是間隔存在的,好比delay segment刻度爲1h,2019031001和2019031004之間的2019031002及2019031003不存在這種之類的delay segment不存在的狀況,另外一種是當正在加載delay segment的時候,位於該segment的延遲消息正在被加載,這種狀況是有可能丟消息的。因此這裏加載是在一個循環裏,以及設置了兩個cursor,即loading cursor,和loaded cursor。一個表示正在加載,一個表示已經加載。此外,上面每次循環sleep 100ms,可不能夠sleep 500ms 1s?答案是能夠,只是消息是否能容忍500ms 或者1s的延遲。

private boolean loadUntilInternal(int until) {
        int index = resolveStartIndex();
        if (index < 0) return true;

        try {
            while (index <= until) {
                ScheduleSetSegment segment = facade.loadScheduleLogSegment(index);
                if (segment == null) {
                    int nextIndex = facade.higherScheduleBaseOffset(index);
                    if (nextIndex < 0) return true;
                    index = nextIndex;
                    continue;
                }

								// 具體加載某個segment的地方
                loadSegment(segment);
                int nextIndex = facade.higherScheduleBaseOffset(index);
                if (nextIndex < 0) return true;

                index = nextIndex;
            }
        } catch (Throwable e) {
            LOGGER.error("wheel load segment failed,currentSegmentOffset:{} until:{}", loadedCursor.baseOffset(), until, e);
            QMon.loadSegmentFailed();
            return false;
        }

        return true;
    }
    
    private void loadSegment(ScheduleSetSegment segment) {
        final long start = System.currentTimeMillis();
        try {
            int baseOffset = segment.getSegmentBaseOffset();
            long offset = segment.getWrotePosition();
            if (!loadingCursor.shiftCursor(baseOffset, offset)) {
                LOGGER.error("doLoadSegment error,shift loadingCursor failed,from {}-{} to {}-{}", loadingCursor.baseOffset(), loadingCursor.offset(), baseOffset, offset);
                return;
            }

            WheelLoadCursor.Cursor loadedCursorEntry = loadedCursor.cursor();
            // have loaded
            // 已經加載
            if (baseOffset < loadedCursorEntry.getBaseOffset()) return;

            long startOffset = 0;
            // last load action happened error
            // 若是上次加載失敗,則從上一次的位置恢復加載
            if (baseOffset == loadedCursorEntry.getBaseOffset() && loadedCursorEntry.getOffset() > -1)
                startOffset = loadedCursorEntry.getOffset();

            LogVisitor<ScheduleIndex> visitor = segment.newVisitor(startOffset, config.getSingleMessageLimitSize());
            try {
                loadedCursor.shiftCursor(baseOffset, startOffset);

                long currentOffset = startOffset;
                // 考慮一種狀況,當前delay segment正在append消息,因此是while,而loaded cursor的offset也是沒加載一個消息更新的
                while (currentOffset < offset) {
                    Optional<ScheduleIndex> recordOptional = visitor.nextRecord();
                    if (!recordOptional.isPresent()) break;
                    ScheduleIndex index = recordOptional.get();
                    currentOffset = index.getOffset() + index.getSize();
                    refresh(index);
                    loadedCursor.shiftOffset(currentOffset);
                }
                loadedCursor.shiftCursor(baseOffset);
                LOGGER.info("loaded segment:{} {}", loadedCursor.baseOffset(), currentOffset);
            } finally {
                visitor.close();
            }
        } finally {
            Metrics.timer("loadSegmentTimer").update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
        }
    }
複製代碼

還記得上一篇咱們提到過,存儲的時候,若是這個消息位於正在被wheel加載segment中,那麼這個消息應該是會被加載到wheel中的。

private boolean iterateCallback(final ScheduleIndex index) {
        long scheduleTime = index.getScheduleTime();
        long offset = index.getOffset();
        // 主要看一下這個canAdd
        if (wheelTickManager.canAdd(scheduleTime, offset)) {
            wheelTickManager.addWHeel(index);
            return true;
        }

        return false;
    }
    
    // 就是cursor起做用的地方了
    public boolean canAdd(long scheduleTime, long offset) {
        WheelLoadCursor.Cursor currentCursor = loadingCursor.cursor();
        int currentBaseOffset = currentCursor.getBaseOffset();
        long currentOffset = currentCursor.getOffset();

				// 根據延遲時間肯定該消息位於哪一個segment
        int baseOffset = resolveSegment(scheduleTime);
        // 小於當前loading cursor,則put int wheel
        if (baseOffset < currentBaseOffset) return true;

				// 正在加載
        if (baseOffset == currentBaseOffset) {
        		// 根據cursor的offset判斷
            return currentOffset <= offset;
        }
        return false;
    }
複製代碼

sender

經過brokerGroup作分組,根據組批量發送,發送時是多線程發送,每一個組互不影響,發送時也會根據實時broker的weight進行選擇考慮broker進行發送。

@Override
    public void send(ScheduleIndex index) {
        if (!BrokerRoleManager.isDelayMaster()) {
            return;
        }

        boolean add;
        try {
            long waitTime = Math.abs(sendWaitTime);
            // 入隊
            if (waitTime > 0) {
                add = batchExecutor.addItem(index, waitTime, TimeUnit.MILLISECONDS);
            } else {
                add = batchExecutor.addItem(index);
            }
        } catch (InterruptedException e) {
            return;
        }
        if (!add) {
            reject(index);
        }
    }
    
    @Override
    public void process(List<ScheduleIndex> indexList) {
        try {
        		// 發送處理邏輯在senderExecutor裏
            senderExecutor.execute(indexList, this, brokerService);
        } catch (Exception e) {
            LOGGER.error("send message failed,messageSize:{} will retry", indexList.size(), e);
            retry(indexList);
        }
    }

		// 如下爲senderExecutor內容
    void execute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) {
        // 分組
        Map<SenderGroup, List<ScheduleIndex>> groups = groupByBroker(indexList, brokerService);
        for (Map.Entry<SenderGroup, List<ScheduleIndex>> entry : groups.entrySet()) {
            doExecute(entry.getKey(), entry.getValue(), handler);
        }
    }

    private void doExecute(final SenderGroup group, final List<ScheduleIndex> list, final SenderGroup.ResultHandler handler) {
        // 分組發送
        group.send(list, sender, handler);
    }
複製代碼

能夠看到,投遞時是根據server broker進行分組投遞。看一下SenderGroup這個類

能夠看到,每一個組的投遞是多線程,互不影響,不會存在某個組的server掛掉,致使其餘組沒法投遞。而且這裏若是存在某個組沒法投遞,重試時會選擇其它的server broker進行重試。與此同時,在選擇組時,會根據每一個server broker的weight進行綜合考量,即當前server broker有多少消息量要發送。

// 具體發送的地方
    private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List<ScheduleIndex> list) {
        try {
            long start = System.currentTimeMillis();
            // 從schedule log中恢復消息內容
            List<ScheduleSetRecord> records = store.recoverLogRecord(list);
            QMon.loadMsgTime(System.currentTimeMillis() - start);
            // 發送消息
            Datagram response = sendMessages(records, sender);
            release(records);
            monitor(list, groupName);
            if (response == null) {
                // 這裏會進行重試等動做
                handler.fail(list);
            } else {
                final int responseCode = response.getHeader().getCode();
                final Map<String, SendResult> resultMap = getSendResult(response);
                if (resultMap == null || responseCode != CommandCode.SUCCESS) {
                    if (responseCode == CommandCode.BROKER_REJECT || responseCode == CommandCode.BROKER_ERROR) {
                      // 該組熔斷 
                      groupInfo.markFailed();
                    }
                    monitorSendFail(list, groupInfo.getGroupName());
                    // 重試
                    handler.fail(list);
                    return;
                }
                Set<String> failedMessageIds = new HashSet<>();
                boolean brokerRefreshed = false;
                for (Map.Entry<String, SendResult> entry : resultMap.entrySet()) {
                    int resultCode = entry.getValue().getCode();
                    if (resultCode != MessageProducerCode.SUCCESS) {
                        failedMessageIds.add(entry.getKey());
                    }
                    if (!brokerRefreshed && resultCode == MessageProducerCode.BROKER_READ_ONLY) {
                        groupInfo.markFailed();
                        brokerRefreshed = true;
                    }
                }
                if (!brokerRefreshed) groupInfo.markSuccess();

              	// dispatch log 記錄在這裏產生
                handler.success(records, failedMessageIds);
            }
        } catch (Throwable e) {
            LOGGER.error("sender group send batch failed,broker:{},batch size:{}", groupName, list.size(), e);
            handler.fail(list);
        }
    }
複製代碼

就是以上這些,關於QMQ的delay-server源碼分析就是這些了,若是之後有機會會分析一下QMQ的其餘模塊源碼,謝謝。

相關文章
相關標籤/搜索