Disruptor中shutdown方法失效,及產生的不肯定性源碼分析

 版權聲明:原創做品,謝絕轉載!不然將追究法律責任。java

Disruptor框架是一個優秀的併發框架,利用RingBuffer中的預分配內存實現內存的可重複利用,下降了GC的頻率。數組

具體關於Disruptor的原理,參見:http://ifeve.com/disruptor/,本文不在贅述。併發

在Disruptor的使用中,偶爾會出現調用了shutdown函數但程序並未終止的現象。在網上已有的文章中並無對該問題的分析,本文對此現象進行總結和說明:app

例子:相關的Event、EventHandler、Producer及OrderFactory定義

1.1 消費事件Event類Order

Order用於生產者生產事件,消費者消費事件。是RingBuffer的槽中存儲的數據類型。其定義以下:框架

package liuqiang.instance;

public class Order {

    private String id;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
}

Order類中由一個id成員,用於生產者生產和消費者消費。ide

1.2 事件處理器OrderHandler1

定義一個事件處理器OrderHandler1。函數

package liuqiang.instance;

import com.lmax.disruptor.EventHandler;

import java.util.concurrent.atomic.AtomicInteger;

public class OrderHandler1 implements EventHandler<Order>{
    private String consumerId;
    private static AtomicInteger count = new AtomicInteger(0);
    public OrderHandler1(String consumerId){
        this.consumerId = consumerId;
    }
    public static int getCount(){
        return count.get();
    }
    @Override
    public void onEvent(Order order, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("OrderHandler1 " + this.consumerId + ",消費信息:" + order.getId());
        count.incrementAndGet();
    }
}

每次消費一個事件,count靜態變量自增1次。this

1.3 生產者Producer

生產者用於在ringBuffer放入生產的order信息。atom

package liuqiang.instance;

import com.lmax.disruptor.RingBuffer;

public class Producer {

    private final RingBuffer<Order> ringBuffer;
    public Producer(RingBuffer<Order> ringBuffer){
        this.ringBuffer = ringBuffer;
    }
    public void onData(String data){
        long sequence = ringBuffer.next();
        try {
            Order order = ringBuffer.get(sequence);
            order.setId(data);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

onData方法首先得到Order的id屬性值data,而後經過ringBuffer獲取下一個sequence,並將data值填充進Order對象當中。spa

ringBuffer中預先分配了order數組,生產過程僅僅是將數組中的對象屬性更改,所以大大減小了gc過程。

ringBuffer中預生成order對象數組,須要一個工廠方法。

 1.4 消費事件的工廠方法OrderFactory

package liuqiang.instance;

import com.lmax.disruptor.EventFactory;

public class OrderFactory implements EventFactory<Order> {
    @Override
    public Order newInstance() {
        return new Order();
    }
}

該工廠方法重寫了父類的newInstance方法,該方法僅僅是new一個還沒有填充屬性信息的Order對象,用於在RingBuffer中預先佔據內存。

場景一:disruptor.shutdown操做先於BatchEventProcessor的線程執行

package liuqiang.instance;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executors;

public class Main1 {

    public static void main(String[] args) throws Exception {
        EventFactory<Order> factory = new OrderFactory();
        int ringBufferSize = 1024 * 1024;
        Disruptor<Order> disruptor =
                new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2"));  //兩個獨立的消費者,各自對消費ringBuffer中的全部事件。
        disruptor.start();
        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
     //producer生產1個對象,並存入ringBuffer數組當中
for (long l = 0; l < 1; l++) { producer.onData(l + ""); } disruptor.shutdown();  //方法會阻塞,但該方法執行結束後,並不必定會使程序關閉,後面詳細分析緣由。 } }

最終打印結果以下:

OrderHandler1 1,消費信息:0
OrderHandler1 2,消費信息:0

兩個消費者都已經消費掉了生產者生產的一個事件。然而,該程序屢次運行,會發現存在某些次運行過程消費完事件後,程序並無終止。調用jconsole查看線程相關信息,以下:

BatchEventProcess的run方法中,調用消費者Handler方法進行消費。

顯然,shutdown關閉以後,該消費者依然在等待RingBuffer中新的event進行消費。這個問題是如何產生的呢?

 

disruptor的start方法以下:

    public RingBuffer<T> start()
    {
        //開啓Disruptor,保證只被開啓一次
        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository)
        {
            consumerInfo.start(executor);
        }

        return ringBuffer;
    }

因而可知,start方法非阻塞的。將消費者線程放入一個線程池以後,即返回。start方法執行結束,此時的消費者線程未必會馬上運行!

咱們繼續看,disruptor.shutdown方法。

    public void shutdown()
    {
        try
        {
            shutdown(-1, TimeUnit.MILLISECONDS);   //超時shutdown,此時超時時間設置爲-1,表示一直阻塞,直到關閉
        }
        catch (final TimeoutException e)
        {
            exceptionHandler.handleOnShutdownException(e);
        }
    }
    public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException
    {
        final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);// 此處可能積壓事件處理完了,但生產者尚未生產完
        while (hasBacklog())
        {
            if (timeout >= 0 && System.currentTimeMillis() > timeOutAt)
            {
                throw TimeoutException.INSTANCE;
            }
            // Busy spin
        }
        halt();
    }
// 此處可能積壓事件處理完了,但生產者尚未生產完
private boolean hasBacklog()   { final long cursor = ringBuffer.getCursor(); //獲取全部除去已經中止的customer以外的全部的sequence for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false))  //false參數。代表已經中止或未啓動的消費者,再也不考慮 { //說明有消費者尚未消費完cursor if (cursor > consumer.get()) { return true; } } //全部消費者都已經消費完全部事件,不存在積壓的未處理事件 return false; } public void halt() { for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.halt(); } }

 此處關閉過程主要分爲兩部分:

1. 判斷RingBuffer當中是否還有未消費完的事件。

2. 全部事件都消費完以後,調用halt終止各個Customer。

consumerRepository.getLastSequenceInChain(false)

false參數表示只取消費者線程集合中正在運行的消費者線程,並將對應消費者線程的alert標記置爲true。還沒有啓動的消費者線程不受影響。

此處爲什麼要設置參數爲false呢?我的理解爲:消費者線程在運行中可能會拋出異常形成線程退出。若是shutdown方法考慮到這些消費者線程,則該消費者線程將永遠不可能消費RingBuffer中的event,形成阻塞。

另外解釋一下getLastSequenceInChain的意義,咱們在編寫消費者的時候,經常會存在消費者之間的依賴關係。例如:消費者A先消費event,而後消費者B、C同時消費該event,最後消費者D在B、C以後最後消費該event。這樣的結構,有利於編寫出流水線式的處理。具體以下圖所示:

一個事件event,最後處理該事件的event必定是D,D處理過的事件,前驅消費者A、B、C必定都已經消費過了。在判斷消費者集合中是否全部消費者都已經消費完某event,能夠直接取依賴的末端消費者集合,進行判斷便可。

getLastSequenceInChain方法便是完成該操做。

    public Sequence[] getLastSequenceInChain(boolean includeStopped)  //返回集合是否要包含未啓動或已中止的消費者線程
    {
        List<Sequence> lastSequence = new ArrayList<>();
        for (ConsumerInfo consumerInfo : consumerInfos)
        {
       //includeStopped爲true,則不管消費者線程是否已經在執行,都返回。
if ((includeStopped || consumerInfo.isRunning()) && consumerInfo.isEndOfChain()) { final Sequence[] sequences = consumerInfo.getSequences(); Collections.addAll(lastSequence, sequences); } } return lastSequence.toArray(new Sequence[lastSequence.size()]); }

因爲disruptor的shutdown方法中,最終調用的getLastSequenceInChain方法,inclueStopped爲false。所以,若是消費者線程在調用shutown的時候還沒有開啓,此時就會致使返回的Sequence序列漏掉了這部分線程。致使shutdown方法失效。

第二部分操做:halt,該方法最終調用BatchEventProcessor線程的halt方法。BatchEventProcessor線程負責對消費者OrderHandler1進行循環調用。其halt方法:

    public void halt()
    {
        running.set(false);  //運行狀態設置爲false
        sequenceBarrier.alert();  //Barrier調用alert方法,在後面消費者消費的時候,會查看該狀態,以決定是否阻塞在ringBuffer上等待事件。
    }

BatchEventProcessor線程的run方法以下。disruptor的shutdown方法最終的效果是,設置了正在運行的消費者線程的BatchEventProcessor的alert狀態。而即使咱們將shutdown調用過程當中的getLastSequenceInChain方法的includeStopped設置爲true,獲取到未開啓的消費者線程,在消費者線程執行的第一步也會將alert狀態清除(sequenceBarrier.clearAlert();調用重置了alert的狀態爲false)。

@Override
    public void run()
    {
        //確保一次只有一個線程執行這個方法
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        //清除sequenceBarrier的通知狀態!!!該代碼是致使shutdown不起效果的緣由
        sequenceBarrier.clearAlert();
        //通知生命週期eventHandler,事件處理開始
        notifyStart();
        T event = null;
        //獲取下一個應當進行消費的序號
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    //準備獲取下一個可用於消費者線程的sequence。拋出AlertException,說明disruptor已經運行結束了。
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                    if (batchStartAware != null)
                    {
                        batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                    }
                    //針對從nextSequence到availableSequence的每個event,調用相應的事件處理器進行處理
                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }
                    //當前消費到的序號寫入sequence的value變量
                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            //事件處理完畢,發送關閉通知。
            notifyShutdown();
            //線程運行完畢,設置運行狀態爲false
            running.set(false);
        }
    }

線程中會調用SequenceBarrier的waitFor方法等待下一個可消費事件的序號。該方法以下:

    @Override
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
    //檢查alert狀態是否改變。若是alert了,則拋出異常,消費者線程執行結束 checkAlert();  //若是狀態沒有改變,繼續根據後面的等待策略等待下一個可消費event的最大可用序號。
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } return sequencer.getHighestPublishedSequence(sequence, availableSequence); }

在具體的WaitStrategy中,會繼續檢查alert狀態。以YieldWaitStrategy爲例,其waitFor方法以下:

    private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException
    {
        //判斷是否在這一步驟時disruptor已經中止
        barrier.checkAlert();
        if (0 == counter)
        {
            Thread.yield();
        }
        else
        {
            --counter;
        }
        return counter;
    }

 

在BatchEventProcessor消費的過程當中,會屢次檢查alert狀態。若是alert狀態爲true,則說明shutdown方法已經改變了該狀態,程序須要中止。但若是shutdown線程先於消費者線程執行,則alert永遠爲false,消費者線程永遠阻塞。

 

場景2、disruptor的shutdown過程先於生產者的生產過程執行。

package liuqiang.instance;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executors;

public class Main2 {

    public static void main(String[] args) throws Exception {
        EventFactory<Order> factory = new OrderFactory();
        int ringBufferSize = 1024 * 1024;
        Disruptor<Order> disruptor =
                new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2"));
        disruptor.start();
        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
        //生產者單獨經過一個線程進行生產
        new Thread(new ProducerTask(ringBuffer)).start();
        //要想disruptor能夠正常的關閉,還須要消費者線程在執行該方法時,已經所有正常啓動。
        disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至全部的事件都獲得處理;
    }
}

class ProducerTask implements Runnable {
    Producer producer;
    public ProducerTask(RingBuffer<Order> ringBuffer) {
        producer = new Producer(ringBuffer);
    }
    @Override
    public void run() {
        producer.onData("1");
        System.out.println("producer event finished");
    }
}

 

在這個例子中,咱們將生產者的生產過程單獨做爲一個線程運行。

屢次嘗試以後,可能出現的結果以下:

輸出結果1:(輸出後執行結束)

producer event finished

 

輸出結果2:(輸出後程序不中止運行,三條打印信息的順序可能不一樣)

producer event finished
OrderHandler1 2,消費信息:1
OrderHandler1 1,消費信息:1

 

輸出結果3:(輸出後程序中止運行,三條打印信息的順序可能不一樣)

producer event finished
OrderHandler1 1,消費信息:1
OrderHandler1 2,消費信息:1

 

爲什麼會出現三種不一樣的打印結果呢?

緣由在於,這段程序的運行過程當中,存在三個線程:生產者線程、shutdown線程、消費者線程。三個線程的執行順序不一樣,會致使結果產生不一樣的效果。按照線程執行的順序不一樣,以下咱們分別進行分析:

1. 消費者線程->shutdown線程->生產者線程,輸出結果1

緣由在於,調用shutdown線程時,消費者線程已經在運行。而此時生產者線程還沒有填充event數據,在執行hasBackLog判斷時,會發現消費者無數據可消費,所以直接關掉消費者線程,結束shutdown調用。最終生產者線程執行,放入event數據到RingBuffer中,程序運行結束。

2. shutdown線程->生產者線程->消費者線程(或者,shutdown線程->消費者線程->生產者線程),輸出結果2

調用shutdown線程會給正在運行的消費者線程設置alert,但因爲消費者線程還沒有開啓,因此此步驟跳過。隨後消費者線程啓動,線程持續執行。不管生產者、消費者線程執行的順序如何,消息都最終能夠被消費(只是輸出的信息順序不一樣),但最終消費者阻塞在等待RingBuffer的過程上。

3. 生產者線程->消費者線程->shutdown線程,輸出結果3

生產者先生產好數據放入ringBuffer,隨後消費者線程開啓,shutdown線程發現消費者線程還沒有消費完全部數據時,會在hasBackLog方法上循環等待。最終,消費者線程消費完數據,shutdown關閉,程序正常結束。

4. 生產者線程->shutdown線程->消費者線程,輸出結果2

生產者生產好數據放入ringbuffer,shutdown取出此時正在運行的依賴尾節點消費者集合。但因爲消費者線程還沒有啓動,此操做無效。隨後消費者線程啓動,消費數據集並阻塞

5. 消費者線程->生產者線程->shutdown線程,輸出結果3

消費者線程運行並等待數據,生產者線程生產數據,shutdown線程等待消費者線程消費完畢,並關閉消費者線程。程序正常結束。

 

總結

由上可見,disruptor.shutdown方法僅僅能關閉當前已經啓動了的消費者線程,對於調用時還沒有啓動的消費者線程不起做用。在disruptor.shutdown若是能正確的關閉程序,須要知足兩個條件:

1. 生產者的生產線程必須執行在disruptor.shutdown方法以前。

2. disruptor.shutdown方法必須執行在全部消費者線程啓動以前。

 

在實際使用中,第二個條件產生的disruptor.shutdown失效問題也許並不常見。緣由在於:線上環境中,生產者線程每每已經運行了一段時間,這段時間內,足夠線程池調用全部的消費者線程並運行。但若是生產者運行的時間太短,致使shutdown提早調用在消費者線程啓動以前,則會產生問題。

相關文章
相關標籤/搜索