版權聲明:原創做品,謝絕轉載!不然將追究法律責任。java
Disruptor框架是一個優秀的併發框架,利用RingBuffer中的預分配內存實現內存的可重複利用,下降了GC的頻率。數組
具體關於Disruptor的原理,參見:http://ifeve.com/disruptor/,本文不在贅述。併發
在Disruptor的使用中,偶爾會出現調用了shutdown函數但程序並未終止的現象。在網上已有的文章中並無對該問題的分析,本文對此現象進行總結和說明:app
Order用於生產者生產事件,消費者消費事件。是RingBuffer的槽中存儲的數據類型。其定義以下:框架
package liuqiang.instance; public class Order { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } }
Order類中由一個id成員,用於生產者生產和消費者消費。ide
定義一個事件處理器OrderHandler1。函數
package liuqiang.instance; import com.lmax.disruptor.EventHandler; import java.util.concurrent.atomic.AtomicInteger; public class OrderHandler1 implements EventHandler<Order>{ private String consumerId; private static AtomicInteger count = new AtomicInteger(0); public OrderHandler1(String consumerId){ this.consumerId = consumerId; } public static int getCount(){ return count.get(); } @Override public void onEvent(Order order, long sequence, boolean endOfBatch) throws Exception { System.out.println("OrderHandler1 " + this.consumerId + ",消費信息:" + order.getId()); count.incrementAndGet(); } }
每次消費一個事件,count靜態變量自增1次。this
生產者用於在ringBuffer放入生產的order信息。atom
package liuqiang.instance; import com.lmax.disruptor.RingBuffer; public class Producer { private final RingBuffer<Order> ringBuffer; public Producer(RingBuffer<Order> ringBuffer){ this.ringBuffer = ringBuffer; } public void onData(String data){ long sequence = ringBuffer.next(); try { Order order = ringBuffer.get(sequence); order.setId(data); } finally { ringBuffer.publish(sequence); } } }
onData方法首先得到Order的id屬性值data,而後經過ringBuffer獲取下一個sequence,並將data值填充進Order對象當中。spa
ringBuffer中預先分配了order數組,生產過程僅僅是將數組中的對象屬性更改,所以大大減小了gc過程。
ringBuffer中預生成order對象數組,須要一個工廠方法。
package liuqiang.instance; import com.lmax.disruptor.EventFactory; public class OrderFactory implements EventFactory<Order> { @Override public Order newInstance() { return new Order(); } }
該工廠方法重寫了父類的newInstance方法,該方法僅僅是new一個還沒有填充屬性信息的Order對象,用於在RingBuffer中預先佔據內存。
package liuqiang.instance; import com.lmax.disruptor.*; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import java.util.concurrent.Executors; public class Main1 { public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2")); //兩個獨立的消費者,各自對消費ringBuffer中的全部事件。 disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer);
//producer生產1個對象,並存入ringBuffer數組當中 for (long l = 0; l < 1; l++) { producer.onData(l + ""); } disruptor.shutdown(); //方法會阻塞,但該方法執行結束後,並不必定會使程序關閉,後面詳細分析緣由。 } }
最終打印結果以下:
OrderHandler1 1,消費信息:0
OrderHandler1 2,消費信息:0
兩個消費者都已經消費掉了生產者生產的一個事件。然而,該程序屢次運行,會發現存在某些次運行過程消費完事件後,程序並無終止。調用jconsole查看線程相關信息,以下:
BatchEventProcess的run方法中,調用消費者Handler方法進行消費。
顯然,shutdown關閉以後,該消費者依然在等待RingBuffer中新的event進行消費。這個問題是如何產生的呢?
disruptor的start方法以下:
public RingBuffer<T> start() { //開啓Disruptor,保證只被開啓一次 checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; }
因而可知,start方法非阻塞的。將消費者線程放入一個線程池以後,即返回。start方法執行結束,此時的消費者線程未必會馬上運行!
咱們繼續看,disruptor.shutdown方法。
public void shutdown() { try { shutdown(-1, TimeUnit.MILLISECONDS); //超時shutdown,此時超時時間設置爲-1,表示一直阻塞,直到關閉 } catch (final TimeoutException e) { exceptionHandler.handleOnShutdownException(e); } } public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException { final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);// 此處可能積壓事件處理完了,但生產者尚未生產完 while (hasBacklog()) { if (timeout >= 0 && System.currentTimeMillis() > timeOutAt) { throw TimeoutException.INSTANCE; } // Busy spin } halt(); }
// 此處可能積壓事件處理完了,但生產者尚未生產完 private boolean hasBacklog() { final long cursor = ringBuffer.getCursor(); //獲取全部除去已經中止的customer以外的全部的sequence for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false)) //false參數。代表已經中止或未啓動的消費者,再也不考慮 { //說明有消費者尚未消費完cursor if (cursor > consumer.get()) { return true; } } //全部消費者都已經消費完全部事件,不存在積壓的未處理事件 return false; } public void halt() { for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.halt(); } }
此處關閉過程主要分爲兩部分:
1. 判斷RingBuffer當中是否還有未消費完的事件。
2. 全部事件都消費完以後,調用halt終止各個Customer。
consumerRepository.getLastSequenceInChain(false)
false參數表示只取消費者線程集合中正在運行的消費者線程,並將對應消費者線程的alert標記置爲true。還沒有啓動的消費者線程不受影響。
此處爲什麼要設置參數爲false呢?我的理解爲:消費者線程在運行中可能會拋出異常形成線程退出。若是shutdown方法考慮到這些消費者線程,則該消費者線程將永遠不可能消費RingBuffer中的event,形成阻塞。
另外解釋一下getLastSequenceInChain的意義,咱們在編寫消費者的時候,經常會存在消費者之間的依賴關係。例如:消費者A先消費event,而後消費者B、C同時消費該event,最後消費者D在B、C以後最後消費該event。這樣的結構,有利於編寫出流水線式的處理。具體以下圖所示:
一個事件event,最後處理該事件的event必定是D,D處理過的事件,前驅消費者A、B、C必定都已經消費過了。在判斷消費者集合中是否全部消費者都已經消費完某event,能夠直接取依賴的末端消費者集合,進行判斷便可。
getLastSequenceInChain方法便是完成該操做。
public Sequence[] getLastSequenceInChain(boolean includeStopped) //返回集合是否要包含未啓動或已中止的消費者線程 { List<Sequence> lastSequence = new ArrayList<>(); for (ConsumerInfo consumerInfo : consumerInfos) {
//includeStopped爲true,則不管消費者線程是否已經在執行,都返回。 if ((includeStopped || consumerInfo.isRunning()) && consumerInfo.isEndOfChain()) { final Sequence[] sequences = consumerInfo.getSequences(); Collections.addAll(lastSequence, sequences); } } return lastSequence.toArray(new Sequence[lastSequence.size()]); }
因爲disruptor的shutdown方法中,最終調用的getLastSequenceInChain方法,inclueStopped爲false。所以,若是消費者線程在調用shutown的時候還沒有開啓,此時就會致使返回的Sequence序列漏掉了這部分線程。致使shutdown方法失效。
第二部分操做:halt,該方法最終調用BatchEventProcessor線程的halt方法。BatchEventProcessor線程負責對消費者OrderHandler1進行循環調用。其halt方法:
public void halt() { running.set(false); //運行狀態設置爲false sequenceBarrier.alert(); //Barrier調用alert方法,在後面消費者消費的時候,會查看該狀態,以決定是否阻塞在ringBuffer上等待事件。 }
BatchEventProcessor線程的run方法以下。disruptor的shutdown方法最終的效果是,設置了正在運行的消費者線程的BatchEventProcessor的alert狀態。而即使咱們將shutdown調用過程當中的getLastSequenceInChain方法的includeStopped設置爲true,獲取到未開啓的消費者線程,在消費者線程執行的第一步也會將alert狀態清除(sequenceBarrier.clearAlert();調用重置了alert的狀態爲false)。
@Override public void run() { //確保一次只有一個線程執行這個方法 if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } //清除sequenceBarrier的通知狀態!!!該代碼是致使shutdown不起效果的緣由 sequenceBarrier.clearAlert(); //通知生命週期eventHandler,事件處理開始 notifyStart(); T event = null; //獲取下一個應當進行消費的序號 long nextSequence = sequence.get() + 1L; try { while (true) { try { //準備獲取下一個可用於消費者線程的sequence。拋出AlertException,說明disruptor已經運行結束了。 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } //針對從nextSequence到availableSequence的每個event,調用相應的事件處理器進行處理 while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } //當前消費到的序號寫入sequence的value變量 sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } finally { //事件處理完畢,發送關閉通知。 notifyShutdown(); //線程運行完畢,設置運行狀態爲false running.set(false); } }
線程中會調用SequenceBarrier的waitFor方法等待下一個可消費事件的序號。該方法以下:
@Override public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {
//檢查alert狀態是否改變。若是alert了,則拋出異常,消費者線程執行結束 checkAlert(); //若是狀態沒有改變,繼續根據後面的等待策略等待下一個可消費event的最大可用序號。
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } return sequencer.getHighestPublishedSequence(sequence, availableSequence); }
在具體的WaitStrategy中,會繼續檢查alert狀態。以YieldWaitStrategy爲例,其waitFor方法以下:
private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException { //判斷是否在這一步驟時disruptor已經中止 barrier.checkAlert(); if (0 == counter) { Thread.yield(); } else { --counter; } return counter; }
在BatchEventProcessor消費的過程當中,會屢次檢查alert狀態。若是alert狀態爲true,則說明shutdown方法已經改變了該狀態,程序須要中止。但若是shutdown線程先於消費者線程執行,則alert永遠爲false,消費者線程永遠阻塞。
場景2、disruptor的shutdown過程先於生產者的生產過程執行。
package liuqiang.instance; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import java.util.concurrent.Executors; public class Main2 { public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); //生產者單獨經過一個線程進行生產 new Thread(new ProducerTask(ringBuffer)).start(); //要想disruptor能夠正常的關閉,還須要消費者線程在執行該方法時,已經所有正常啓動。 disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至全部的事件都獲得處理; } } class ProducerTask implements Runnable { Producer producer; public ProducerTask(RingBuffer<Order> ringBuffer) { producer = new Producer(ringBuffer); } @Override public void run() { producer.onData("1"); System.out.println("producer event finished"); } }
在這個例子中,咱們將生產者的生產過程單獨做爲一個線程運行。
屢次嘗試以後,可能出現的結果以下:
輸出結果1:(輸出後執行結束)
producer event finished
輸出結果2:(輸出後程序不中止運行,三條打印信息的順序可能不一樣)
producer event finished
OrderHandler1 2,消費信息:1
OrderHandler1 1,消費信息:1
輸出結果3:(輸出後程序中止運行,三條打印信息的順序可能不一樣)
producer event finished
OrderHandler1 1,消費信息:1
OrderHandler1 2,消費信息:1
爲什麼會出現三種不一樣的打印結果呢?
緣由在於,這段程序的運行過程當中,存在三個線程:生產者線程、shutdown線程、消費者線程。三個線程的執行順序不一樣,會致使結果產生不一樣的效果。按照線程執行的順序不一樣,以下咱們分別進行分析:
1. 消費者線程->shutdown線程->生產者線程,輸出結果1
緣由在於,調用shutdown線程時,消費者線程已經在運行。而此時生產者線程還沒有填充event數據,在執行hasBackLog判斷時,會發現消費者無數據可消費,所以直接關掉消費者線程,結束shutdown調用。最終生產者線程執行,放入event數據到RingBuffer中,程序運行結束。
2. shutdown線程->生產者線程->消費者線程(或者,shutdown線程->消費者線程->生產者線程),輸出結果2
調用shutdown線程會給正在運行的消費者線程設置alert,但因爲消費者線程還沒有開啓,因此此步驟跳過。隨後消費者線程啓動,線程持續執行。不管生產者、消費者線程執行的順序如何,消息都最終能夠被消費(只是輸出的信息順序不一樣),但最終消費者阻塞在等待RingBuffer的過程上。
3. 生產者線程->消費者線程->shutdown線程,輸出結果3
生產者先生產好數據放入ringBuffer,隨後消費者線程開啓,shutdown線程發現消費者線程還沒有消費完全部數據時,會在hasBackLog方法上循環等待。最終,消費者線程消費完數據,shutdown關閉,程序正常結束。
4. 生產者線程->shutdown線程->消費者線程,輸出結果2
生產者生產好數據放入ringbuffer,shutdown取出此時正在運行的依賴尾節點消費者集合。但因爲消費者線程還沒有啓動,此操做無效。隨後消費者線程啓動,消費數據集並阻塞
5. 消費者線程->生產者線程->shutdown線程,輸出結果3
消費者線程運行並等待數據,生產者線程生產數據,shutdown線程等待消費者線程消費完畢,並關閉消費者線程。程序正常結束。
由上可見,disruptor.shutdown方法僅僅能關閉當前已經啓動了的消費者線程,對於調用時還沒有啓動的消費者線程不起做用。在disruptor.shutdown若是能正確的關閉程序,須要知足兩個條件:
1. 生產者的生產線程必須執行在disruptor.shutdown方法以前。
2. disruptor.shutdown方法必須執行在全部消費者線程啓動以前。
在實際使用中,第二個條件產生的disruptor.shutdown失效問題也許並不常見。緣由在於:線上環境中,生產者線程每每已經運行了一段時間,這段時間內,足夠線程池調用全部的消費者線程並運行。但若是生產者運行的時間太短,致使shutdown提早調用在消費者線程啓動以前,則會產生問題。