java高併發之從零到放棄(五)

前言

這篇主要來說解多線程中一個很是經典的設計模式
包括它的基礎到拓展
但願你們可以有所收穫設計模式

生產者-消費者模式簡述

此設計模式中主要分兩類線程:生產者線程和消費者線程
生產者提供數據和任務
消費者處理數據和任務
該模式的核心就是數據和任務的交互點:共享內存緩存區
下面給出簡單易懂的一張圖:數組

clipboard.png

阻塞隊列緩存區

使用BlockingQueue來作緩衝區是很是合適的
經過BlockingQueue來理解生產者消費者模式
首先咱們要知道BlockingQueue是什麼?
它是一個實現接口,有不少實現類,好比:
ArrayBlockingQueue:前面講過,這個隊列適合作有界隊列,固定線程數
LinkedBlockingQueue:它適合作無界隊列
......緩存

以ArrayBlockingQueue爲例
它在內部放置了一個對象數組:多線程

final Object[] items;

經過items數組來進行元素的存取
1(存).向隊列中壓入一個元素:
.offer():若是隊列滿了,返回false
.put():將元素壓入隊列末尾,若是隊列滿了,它就會一直等待併發

2(取).向隊列中彈出元素(從頭部彈出):
.poll():若是隊列爲空,返回null
.take():若是隊列爲空,繼續等待,知道隊列中有元素框架

實例流程

瞭解了上面這些基礎後,咱們來看下實際操做是怎樣的
在開始以前咱們要有一個Entity類,只存一個long類型的value值進去:異步

public class MyData {
    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

有了這個數據模型,看下最後的執行main方法:ide

public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();      //創建線程池
        BlockingQueue<MyData> blockingQueue = new ArrayBlockingQueue<MyData>(10);       //創建緩存隊列
        for (int i=0;i<3;i++){ Producer i = new Producer(queue); executor.execute(i); }      //製造三個生產線程
        for (int j=0;j<3;j++){ Consumer j = new Consumer(queue); executor.execute(j); }      //製造三個消費線程
        Thread.sleep(10000);
        for (int i=0;i<3;i++){ i.stop(); }      //中止生產
        Thread.sleep(5000);
        executor.shutdown();
        }

這裏只給出Main,你們能夠經過代碼簡單理解使用BlockingQueue作緩衝區的過程
沒有給出生產者和消費者的具體線程實現類,除了博主比較懶以外,還有是由於使用BlockingQueue作緩衝區並不推薦使用
雖然BlockingQueue是個不錯的選擇,但它使用了鎖和阻塞來保證線程間的同步,並不具有良好的併發性能
下面講解一種具備高性能的共享緩衝區函數

Disruptor

咱們知道BlockingQueue隊列的性能不是特別優越
而以前講到過ConcurrentLinkedQueue是一個高性能隊列,由於它使用了大量的CAS操做
同理,若是咱們利用CAS操做實現生產者-消費者模式,性能就能夠獲得客觀的提高
可是大量的CAS操做本身實現起來很是困難
因此推薦使用Disruptor框架高併發

實際工做仍是得使用成熟的框架,Disruptor是一款高效的無鎖內存隊列
它不像傳統隊列有head和tail指針來操控入列和出列
而是實現了一個固定大小的環形隊列(RingBuffer),來看下實際模型圖:

clipboard.png

生產者向緩衝區寫入數據,消費者從緩衝區讀取數據,你們都使用了CAS操做
並且因爲是環形隊列的緣由,能夠作到徹底的內存複用
從而大大減小系統分配空間以及回收空間的額外開銷

Disruptor具體實現

那麼這個框架怎麼使用呢?
1.導入包(博主使用了Maven依賴,不一樣版本大同小異):

<dependency>
     <groupId>com.lmax</groupId>
     <artifactId>disruptor</artifactId>
     <version>3.3.2</version>
</dependency>

2.依舊建立一個entity類:

public class MyData {
    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

3.還要寫一個Factory類,細心的同窗會看到環形隊列是固定大小的
這個Factory會在Disruptor實例對象構造時,構造全部緩衝區中的對象實例

public class DataFactory implements EventFactory{
    @Override
    public Object newInstance() {
        return new MyData();
    }
}

4.生產者(具體每行代碼的做用都已經註釋):

public class Producers {
    private final RingBuffer<MyData> ringBuffer;        //建立環形隊列(環形緩衝區)

    public Producers(RingBuffer<MyData> ringBuffer) {
        this.ringBuffer = ringBuffer;           //將ringBuffer與Producers綁定
    }

    public void putData(ByteBuffer byteBuffer){         //此方法將產生的數據推入緩衝區

    long sequeue = ringBuffer.next();       //經過.next()方法獲得ringBuffer的下一個節點,而且賦值給sequeue

    MyData event = ringBuffer.get(sequeue);     //將mydata數據存入到下一個節點

    event.setValue(byteBuffer.getLong(0));        //mydata的值有ByteBuffer參數帶入

    ringBuffer.publish(sequeue);        //將sequeue節點內的數據發佈
    }
}

5.消費者:

public class Consumers implements WorkHandler<MyData>{
    
    @Override
    public void onEvent(MyData myData) throws Exception {
        System.out.println("當前線程爲:"+Thread.currentThread().getId()+"線程,它處理的數據是:"+myData.getValue());
    }
}

6.執行函數:

public class RunTest {
        public static void main(String[] args) throws InterruptedException {
        Executor executor = Executors.newCachedThreadPool();        //建立線程池
        DataFactory dataFactory = new DataFactory();        //建立Factory實例
        int bufferSize = 1024;      //設置緩存區大小爲1024(必須是2的整數次冪)
        Disruptor<MyData> disruptor = new Disruptor<MyData>(
                dataFactory,
                bufferSize,
                executor,
                ProducerType.MULTI,
                new BlockingWaitStrategy()
                );
        disruptor.handleEventsWithWorkerPool(
                new Consumers(),
                new Consumers(),
                new Consumers(),
                new Consumers()
                );
        disruptor.start();      //Disruptor啓動
        RingBuffer<MyData> ringBuffer = disruptor.getRingBuffer();      //實例化環形隊列並與Disruptor綁定
        Producers producers = new Producers(ringBuffer);        //實例化生產者並綁定ringBuffer
        ByteBuffer byteBuffe = ByteBuffer.allocate(8);      //建立一個容量爲256字節的ByteBuffer
        for (long n = 0;true;n++){
            byteBuffe.putLong(0,n);
            producers.putData(byteBuffe);
            Thread.sleep(100);
            System.out.println("add data "+n);
        }
    }
}

咱們來看下執行結果:

當前線程爲:13線程,它處理的數據是:1059
add data 1059
當前線程爲:11線程,它處理的數據是:1060
add data 1060
當前線程爲:10線程,它處理的數據是:1061
add data 1061
當前線程爲:12線程,它處理的數據是:1062
add data 1062
當前線程爲:13線程,它處理的數據是:1063
add data 1063
當前線程爲:11線程,它處理的數據是:1064
add data 1064
當前線程爲:10線程,它處理的數據是:1065

能夠看出,由於我無限的讓生產線程生產數據,而RingBuffer中那十幾條消費線程不停的消費數據

此外Disruptor不止CAS操做,還提供了四種等待策略讓消費者監控緩衝區的信息:
1.BlockingWaitStrategy:默認策略,最節省CPU,但在高併發下性能表現最糟糕
2.SleepingWaitStrategy:等待數據時自旋等待,不成功會使用LockSupport方法阻塞本身,一般用於異步日誌
3.YieldWaitStrategy:用於低延時場合,在內部執行Thread.yield()死循環
4.BusySpinWaitStrategy:消費線程進行死循環監控緩衝區,吃掉全部CPU資源

除了CAS操做,消費者等待策略,Disruptor還使用CPU Cache的優化來進行優化

根據Disruptor官方報道:Disruptor的性能比BlockingQueuez至少高一倍以上!

以上即是生產者消費者模式的應用
謝謝閱讀,記得點關注看更新

相關文章
相關標籤/搜索