Disruptor框架中生產者、消費者的各類複雜依賴場景下的使用總結

 版權聲明:原創做品,謝絕轉載!不然將追究法律責任。html

Disruptor是一個優秀的併發框架,能夠實現單個或多個生產者生產消息,單個或多個消費者消息,且消費者之間能夠存在消費消息的依賴關係。網上其餘博客每每僅針對框架的一部分使用示例進行了介紹,對於某些場景下介紹並不徹底:如多生產者間複雜的依賴關係的使用編碼。java

本文儘量對Disruptor的全部使用場景進行總結,若有不全之處歡迎指出,請諒解。多線程

具體關於Disruptor的原理,參見:http://ifeve.com/disruptor/,本文不在贅述。併發

Disruptor類的handleEventsWith,handleEventsWithWorkerPool方法的聯繫及區別

在disruptor框架調用start方法以前,每每須要將消息的消費者指定給disruptor框架。框架

經常使用的方法是:disruptor.handleEventsWith(EventHandler ... handlers),將多個EventHandler的實現類傳入方法,封裝成一個EventHandlerGroup,實現多消費者消費。ide

disruptor的另外一個方法是:disruptor.handleEventsWithWorkerPool(WorkHandler ... handlers),將多個WorkHandler的實現類傳入方法,封裝成一個EventHandlerGroup實現多消費者消費。源碼分析

二者共同點都是,將多個消費者封裝到一塊兒,供框架消費消息。post

不一樣點在於,this

1. 對於某一條消息m,handleEventsWith方法返回的EventHandlerGroup,Group中的每一個消費者都會對m進行消費,各個消費者之間不存在競爭。handleEventsWithWorkerPool方法返回的EventHandlerGroup,Group的消費者對於同一條消息m不重複消費;也就是,若是c0消費了消息m,則c1再也不消費消息m。編碼

2. 傳入的形參不一樣。對於獨立消費的消費者,應當實現EventHandler接口。對於不重複消費的消費者,應當實現WorkHandler接口。

所以,根據消費者集合是否獨立消費消息,能夠對不一樣的接口進行實現。也能夠對兩種接口同時實現,具體消費流程由disruptor的方法調用決定。

 

在進行場景分析以前,首先定義公共的生產者Producer,消費者OrderHandler1,消息Order,消息工廠OrderFactory。定義分別以下:

package liuqiang.complex.common;

public class Order {

    private String id;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
}
package liuqiang.complex.common;

import com.lmax.disruptor.EventFactory;

public class OrderFactory implements EventFactory<Order> {
    @Override
    public Order newInstance() {
        return new Order();
    }
}
package liuqiang.complex.common;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

//EventHandler用於EventHandlerGroup,WorkHandler用於WorkPool。同時實現兩接口,該類對象可同時用於EventHandlerGroup和WorkPool
public class OrderHandler1 implements EventHandler<Order>, WorkHandler<Order> {
    private String consumerId;
    public OrderHandler1(String consumerId){
        this.consumerId = consumerId;
    }

    //EventHandler的方法
    @Override
    public void onEvent(Order order, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("OrderHandler1 " + this.consumerId + ",消費信息:" + order.getId());
    }

    //WorkHandler的方法
    @Override
    public void onEvent(Order order) throws Exception {
        System.out.println("OrderHandler1 " + this.consumerId + ",消費信息:" + order.getId());
    }
}
package liuqiang.complex.common;

import com.lmax.disruptor.RingBuffer;

public class Producer {

    private final RingBuffer<Order> ringBuffer;
    public Producer(RingBuffer<Order> ringBuffer){
        this.ringBuffer = ringBuffer;
    }
    public void onData(String data){
        long sequence = ringBuffer.next();
        try {
            Order order = ringBuffer.get(sequence);
            order.setId(data);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

 

下面定義兩種不一樣的消費者集合關係:

 

場景一:單生產者單消費者

package liuqiang.complex.single;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import liuqiang.complex.common.Order;
import liuqiang.complex.common.OrderFactory;
import liuqiang.complex.common.OrderHandler1;
import liuqiang.complex.common.Producer;

import java.util.concurrent.Executors;

public class Main1 {

    //單生產者模式,單消費者模式
    public static void main(String[] args) throws Exception {
        EventFactory<Order> factory = new OrderFactory();
        int ringBufferSize = 1024 * 1024;
        Disruptor<Order> disruptor =
                new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        //設置一個消費者
        disruptor.handleEventsWith(new OrderHandler1("1"));
        disruptor.start();
        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
        //單生產者,生產3條數據
        for (int l = 0; l < 3; l++) {
            producer.onData(l + "");
        }
        //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題
        Thread.sleep(1000);
        disruptor.shutdown();
    }
}

這種狀況最爲簡單,單生產者,僅需在Disruptor初始化時,傳入ProducerType.SINGLE便可。使用disruptor.handleEventsWith傳入單消費者。Thread.sleep方法調用是爲了保證,在調用disruptor.shutdown方法前,全部的消費者線程都已經啓動,防止shutdown失效的問題。具體問題詳見本人另外一篇博客:Disruptor中shutdown方法失效,及產生的不肯定性源碼分析。

輸出結果以下:

OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2

 

場景二:單生產者多消費者,多消費者間造成依賴關係,每一個依賴節點只有一個消費者。

package liuqiang.complex.single;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import liuqiang.complex.common.*;

import java.util.concurrent.Executors;

public class Main2 {

    //單生產者,多消費者,但多消費者間造成依賴關係,每一個依賴節點單線程。
    public static void main(String[] args) throws Exception {
        EventFactory<Order> factory = new OrderFactory();
        int ringBufferSize = 1024 * 1024;
        Disruptor<Order> disruptor =
                new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        //多個消費者間造成依賴關係,每一個依賴節點的消費者爲單線程。
        disruptor.handleEventsWith(new OrderHandler1("1")).then(new OrderHandler1("2"), new OrderHandler1("3")).then(new OrderHandler1("4"));
        disruptor.start();
        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
        //單生產者,生產3條數據
        for (int l = 0; l < 3; l++) {
            producer.onData(l + "");
        }
        //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題
        Thread.sleep(1000);
        disruptor.shutdown();
    }
}

四個消費者之間的依賴圖以下:

消費者C二、C3只有在C1消費完消息m後,才能消費m。消費者C4只有在C二、C3消費完m後,才能消費該消息。

可能的輸出結果以下(可能由於線程執行前後順序不一樣略有區別,但輸出必定知足相關依賴約束):

OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 2,消費信息:0
OrderHandler1 3,消費信息:0
OrderHandler1 1,消費信息:2
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 4,消費信息:0
OrderHandler1 4,消費信息:1
OrderHandler1 4,消費信息:2

場景三:單生產者,多消費者模式。多消費者對於消息不重複消費。

package liuqiang.complex.multi;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import liuqiang.complex.common.*;

import java.util.concurrent.Executors;

public class Main3 {

    //單生產者,多消費者模式。多消費者對於消息不重複消費。例如:1線程消費了消息0,則2線程只能從0後面的消息消費,不能對消息0進行消費。
    public static void main(String[] args) throws Exception {
        EventFactory<Order> factory = new OrderFactory();
        int ringBufferSize = 1024 * 1024;
        Disruptor<Order> disruptor =
                new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        /*
         * 該方法傳入的消費者須要實現WorkHandler接口,方法的內部實現是:先建立WorkPool,而後封裝WorkPool爲EventHandlerPool返回。
         * 消費者一、2對於消息的消費有時有競爭,保證同一消息只能有一個消費者消費
         */
        disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2"));
        disruptor.start();
        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
        //單生產者,生產3條數據
        for (int l = 0; l < 3; l++) {
            producer.onData(l + "");
        }
        //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題
        Thread.sleep(1000);
        disruptor.shutdown();
    }
}

調用handleEventsWithWorkerPool造成WorkerPool,並進一步封裝成EventHandlerGroup。對於同一條消息,兩消費者不重複消費。

可能輸出結果以下:

OrderHandler1 1,消費信息:0
OrderHandler1 2,消費信息:1
OrderHandler1 1,消費信息:2

場景四:單生產者多消費者,多消費者對於消息m獨立消費。

package liuqiang.complex.multi;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import liuqiang.complex.common.Order;
import liuqiang.complex.common.OrderFactory;
import liuqiang.complex.common.OrderHandler1;
import liuqiang.complex.common.Producer;

import java.util.concurrent.Executors;

public class Main4 {

    //單生產者,多消費者模式。多消費者對於消息獨立消費。例如:對於消息m,兩個消費者都要對其進行消費。
    public static void main(String[] args) throws Exception {
        EventFactory<Order> factory = new OrderFactory();
        int ringBufferSize = 1024 * 1024;
        Disruptor<Order> disruptor =
                new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        /*
         * 兩個消費者建立EventHandlerGroup。該消費者須要實現EventHandler類。兩個消費者對於RingBuffer中的每一個消息,都獨立消費一次。
         * 兩個消費者在消費消息的過程當中,各自獨立,不產生競爭。
         */
        disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2"));
        disruptor.start();
        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
        //單生產者,生產3條數據
        for (int l = 0; l < 3; l++) {
            producer.onData(l + "");
        }
        //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題
        Thread.sleep(1000);
        disruptor.shutdown();
    }
}

可能輸出結果以下:

OrderHandler1 1,消費信息:0
OrderHandler1 2,消費信息:0
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2

 

場景五:單生產者,多消費者間存在依賴關係的模式。消費者一、2消息獨立消費。消費者三、4僅能消費一、2均消費過的消息,消費者5僅能消費三、4均消費過的消息

package liuqiang.complex.multi;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import liuqiang.complex.common.Order;
import liuqiang.complex.common.OrderFactory;
import liuqiang.complex.common.OrderHandler1;
import liuqiang.complex.common.Producer;

import java.util.concurrent.Executors;

public class Main5 {

    //單生產者,多消費者間存在依賴關係的模式。消費者一、2組成EventHandlerGroup,消息獨立消費。消費者三、4僅能消費一、2均消費過的消息,且獨立消費。消費者5僅能消費三、4均消費過的消息
    public static void main(String[] args) throws Exception {
        EventFactory<Order> factory = new OrderFactory();
        int ringBufferSize = 1024 * 1024;
        Disruptor<Order> disruptor =
                new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        //至關於在各個EventHandlerGroup之間進行級聯,造成依賴關係。
        disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2")).then(new OrderHandler1("3"), new OrderHandler1("4")).then(new OrderHandler1("5"));
        disruptor.start();
        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
        //單生產者,生產3條數據
        for (int l = 0; l < 3; l++) {
            producer.onData(l + "");
        }
        //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題
        Thread.sleep(1000);
        disruptor.shutdown();
    }
}

消費者之間的依賴關係以下:

可能的輸出結果以下:

OrderHandler1 2,消費信息:0
OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 3,消費信息:0
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 4,消費信息:0
OrderHandler1 4,消費信息:1
OrderHandler1 4,消費信息:2
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2

 

場景六:單生產者,多消費者。多消費者之間不重複消費,且不一樣的消費者WorkPool之間存在依賴關係。

package liuqiang.complex.multi;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import liuqiang.complex.common.Order;
import liuqiang.complex.common.OrderFactory;
import liuqiang.complex.common.OrderHandler1;
import liuqiang.complex.common.Producer;

import java.util.concurrent.Executors;

public class Main6 {

    /*
     * 單生產者,多消費者。多消費者之間不重複消費,且不一樣的消費者WorkPool之間存在依賴關係。
     * 消費者一、2不重複消費消息,消費者三、4不重複消費1或者2消費過的消息,消費者5消費消費者3或4消費過的消息。
     */
    public static void main(String[] args) throws Exception {
        EventFactory<Order> factory = new OrderFactory();
        int ringBufferSize = 1024 * 1024;
        Disruptor<Order> disruptor =
                new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2")).thenHandleEventsWithWorkerPool(new OrderHandler1("3"), new OrderHandler1("4")).thenHandleEventsWithWorkerPool(new OrderHandler1("5"));
        disruptor.start();
        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
        //單生產者,生產3條數據
        for (int l = 0; l < 3; l++) {
            producer.onData(l + "");
        }
        //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題
        Thread.sleep(1000);
        disruptor.shutdown();
    }
}

消費者之間的依賴圖以下所示:

可能的輸出結果以下:

OrderHandler1 2,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 3,消費信息:0
OrderHandler1 3,消費信息:2
OrderHandler1 4,消費信息:1
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2

 

場景七:單生產者,多消費者模式。消費者一、2不重複消費消息,消費者三、4消費消費者1或2消費過的消息,且獨立重複消費。消費者5消費消費者三、4均消費過的消息。

package liuqiang.complex.multi;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import liuqiang.complex.common.Order;
import liuqiang.complex.common.OrderFactory;
import liuqiang.complex.common.OrderHandler1;
import liuqiang.complex.common.Producer;

import java.util.concurrent.Executors;

public class Main7 {

    //單生產者,多消費者模式。消費者一、2不重複消費消息,消費者三、4消費消費者1或2消費過的消息,且獨立重複消費。消費者5消費消費者三、4均消費過的消息。
    public static void main(String[] args) throws Exception {
        EventFactory<Order> factory = new OrderFactory();
        int ringBufferSize = 1024 * 1024;
        Disruptor<Order> disruptor =
                new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2")).then(new OrderHandler1("3"), new OrderHandler1("4")).then(new OrderHandler1("5"));
        disruptor.start();
        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
        //單生產者,生產3條數據
        for (long l = 0; l < 3; l++) {
            producer.onData(l + "");
        }
        //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題
        Thread.sleep(1000);
        disruptor.shutdown();
    }
}

 

消費者之間的依賴圖以下:

可能的輸出結果以下:

OrderHandler1 1,消費信息:1
OrderHandler1 2,消費信息:0
OrderHandler1 1,消費信息:2
OrderHandler1 4,消費信息:0
OrderHandler1 4,消費信息:1
OrderHandler1 3,消費信息:0
OrderHandler1 4,消費信息:2
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2

 

場景八:單生產者,多消費者模式。消費者一、2獨立消費每一條消息,消費者三、4不重複消費消費者一、2均處理過的消息,消費者5消費消費者3或4消費過的消息

package liuqiang.complex.multi;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import liuqiang.complex.common.Order;
import liuqiang.complex.common.OrderFactory;
import liuqiang.complex.common.OrderHandler1;
import liuqiang.complex.common.Producer;

import java.util.concurrent.Executors;

public class Main8 {

    //單生產者,多消費者模式。消費者一、2獨立消費每一條消息,消費者三、4不重複消費消費者一、2均處理過的消息,消費者5消費消費者3或4消費過的消息
    public static void main(String[] args) throws Exception {
        EventFactory<Order> factory = new OrderFactory();
        int ringBufferSize = 1024 * 1024;
        Disruptor<Order> disruptor =
                new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
        disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2")).thenHandleEventsWithWorkerPool(new OrderHandler1("3"), new OrderHandler1("4")).then(new OrderHandler1("5"));
        disruptor.start();
        RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);
        //單生產者,生產3條數據
        for (int l = 0; l < 3; l++) {
            producer.onData(l + "");
        }
        //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題
        Thread.sleep(1000);
        disruptor.shutdown();
    }
}

消費者間的依賴圖以下:

可能的輸出結果以下:

OrderHandler1 2,消費信息:0
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2
OrderHandler1 3,消費信息:0
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2

場景九:多生產者,單消費者模式

該場景較爲簡單,只需將ProducerType.SINGLE改成ProducerType.MULTI,而且編寫多線程生產者的相關代碼便可。

package liuqiang.complex.multi;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import liuqiang.complex.common.Order;
import liuqiang.complex.common.OrderFactory;
import liuqiang.complex.common.OrderHandler1;
import liuqiang.complex.common.Producer;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;

public class Main9 {

    //多生產者,單消費者版本。三個生產者獨立生產消息。
    public static void main(String[] args) throws Exception {
        EventFactory<Order> factory = new OrderFactory();
        int ringBufferSize = 1024 * 1024;
        //ProducerType要設置爲MULTI,後面纔可使用多生產者模式
        Disruptor<Order> disruptor =
                new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy());
        //簡化問題,設置爲單消費者模式,也能夠設置爲多消費者及消費者間多重依賴。
        disruptor.handleEventsWith(new OrderHandler1("1"));
        disruptor.start();
        final RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();
        //判斷生產者是否已經生產完畢
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        //單生產者,生產3條數據
        for (int l = 0; l < 3; l++) {
            Thread thread = new Thread() {
                @Override
                public void run() {
                    for(int i = 0; i < 3; i++) {
                        new Producer(ringBuffer).onData(Thread.currentThread().getName() + "'s " + i + "th message");
                    }
                    countDownLatch.countDown();
                }
            };
            thread.setName("producer thread " + l);
            thread.start();
        }
        countDownLatch.await();
        //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題
        Thread.sleep(1000);
        disruptor.shutdown();
    }
}

以上是,對disruptor的各個使用場景的簡單介紹。

後面會寫博客針對Disruptor的各部分源碼作一分析,詳細介紹其消費者之間依賴關係的實現機制、單生產者、多生產者之間的不一樣實現方式等。

相關文章
相關標籤/搜索