Disruptor.shutdown 方法阻塞至全部事件獲得處理。
循環調用hasBacklog()斷定當前 生產分配的遊標: ringBuffer.getCursor() > 消費者序號: consumer的lastSequence,表示還未處理完結。java
public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException { final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout); while (hasBacklog()) { if (timeout >= 0 && System.currentTimeMillis() > timeOutAt) { throw TimeoutException.INSTANCE; } // Busy spin try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } halt(); } private boolean hasBacklog() { final long cursor = ringBuffer.getCursor(); for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false)) { System.out.println(DateTimeFormatter.ISO_INSTANT.format(Instant.now()) + " " + "cursor=:"+ cursor +"consumer:"+ consumer.get()); if (cursor > consumer.get()) { return true; } } return false; }
準確的說:
hasBacklog只是比對瞬時狀態下的是否處理完當前已入RingBuffer的事件。
高併發時大量事件阻塞置入RingBuffer時,當消費效率大於生產效率時,可能並非理想中下的全部事件獲得處理,慎用。併發
測試用例:高併發