Disruptor (4) - shutdown

Disruptor.shutdown 方法阻塞至全部事件獲得處理。
循環調用hasBacklog()斷定當前 生產分配的遊標: ringBuffer.getCursor()  > 消費者序號: consumer的lastSequence,表示還未處理完結。java

public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException
    {
        final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        while (hasBacklog())
        {
            if (timeout >= 0 && System.currentTimeMillis() > timeOutAt)
            {
                throw TimeoutException.INSTANCE;
            }
            // Busy spin
            try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
        }
        halt();
    }

    private boolean hasBacklog()
    {
        final long cursor = ringBuffer.getCursor();
        for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false))
        {
         System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + "  " + "cursor=:"+ cursor +"consumer:"+ consumer.get());

            if (cursor > consumer.get())
            {
                return true;
            }
        }
        return false;
    }

準確的說:
hasBacklog只是比對瞬時狀態下的是否處理完當前已入RingBuffer的事件。
高併發時大量事件阻塞置入RingBuffer時,當消費效率大於生產效率時,可能並非理想中下的全部事件獲得處理,慎用併發


測試用例:高併發

  1. 不阻塞消費者, 此時處理到第4個接直接shutdown了。
  2. 消費者sleep(100),  發現: 會一直往下處理。
相關文章
相關標籤/搜索