這篇主要來說解多線程中一個很是經典的設計模式
包括它的基礎到拓展
但願你們可以有所收穫設計模式
此設計模式中主要分兩類線程:生產者線程和消費者線程
生產者提供數據和任務
消費者處理數據和任務
該模式的核心就是數據和任務的交互點:共享內存緩存區
下面給出簡單易懂的一張圖:數組
使用BlockingQueue來作緩衝區是很是合適的
經過BlockingQueue來理解生產者消費者模式
首先咱們要知道BlockingQueue是什麼?
它是一個實現接口,有不少實現類,好比:
ArrayBlockingQueue:前面講過,這個隊列適合作有界隊列,固定線程數
LinkedBlockingQueue:它適合作無界隊列
......緩存
以ArrayBlockingQueue爲例
它在內部放置了一個對象數組:多線程
final Object[] items;
經過items數組來進行元素的存取
1(存).向隊列中壓入一個元素:.offer()
:若是隊列滿了,返回false.put()
:將元素壓入隊列末尾,若是隊列滿了,它就會一直等待併發
2(取).向隊列中彈出元素(從頭部彈出):.poll()
:若是隊列爲空,返回null.take()
:若是隊列爲空,繼續等待,知道隊列中有元素框架
瞭解了上面這些基礎後,咱們來看下實際操做是怎樣的
在開始以前咱們要有一個Entity類,只存一個long類型的value值進去:異步
public class MyData { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
有了這個數據模型,看下最後的執行main方法:ide
public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); //創建線程池 BlockingQueue<MyData> blockingQueue = new ArrayBlockingQueue<MyData>(10); //創建緩存隊列 for (int i=0;i<3;i++){ Producer i = new Producer(queue); executor.execute(i); } //製造三個生產線程 for (int j=0;j<3;j++){ Consumer j = new Consumer(queue); executor.execute(j); } //製造三個消費線程 Thread.sleep(10000); for (int i=0;i<3;i++){ i.stop(); } //中止生產 Thread.sleep(5000); executor.shutdown(); }
這裏只給出Main,你們能夠經過代碼簡單理解使用BlockingQueue作緩衝區的過程
沒有給出生產者和消費者的具體線程實現類,除了博主比較懶以外,還有是由於使用BlockingQueue作緩衝區並不推薦使用
雖然BlockingQueue是個不錯的選擇,但它使用了鎖和阻塞來保證線程間的同步,並不具有良好的併發性能
下面講解一種具備高性能的共享緩衝區函數
咱們知道BlockingQueue隊列的性能不是特別優越
而以前講到過ConcurrentLinkedQueue是一個高性能隊列,由於它使用了大量的CAS操做
同理,若是咱們利用CAS操做實現生產者-消費者模式,性能就能夠獲得客觀的提高
可是大量的CAS操做本身實現起來很是困難
因此推薦使用Disruptor框架高併發
實際工做仍是得使用成熟的框架,Disruptor是一款高效的無鎖內存隊列
它不像傳統隊列有head和tail指針來操控入列和出列
而是實現了一個固定大小的環形隊列(RingBuffer),來看下實際模型圖:
生產者向緩衝區寫入數據,消費者從緩衝區讀取數據,你們都使用了CAS操做
並且因爲是環形隊列的緣由,能夠作到徹底的內存複用
從而大大減小系統分配空間以及回收空間的額外開銷
那麼這個框架怎麼使用呢?
1.導入包(博主使用了Maven依賴,不一樣版本大同小異):
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.2</version> </dependency>
2.依舊建立一個entity類:
public class MyData { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
3.還要寫一個Factory類,細心的同窗會看到環形隊列是固定大小的
這個Factory會在Disruptor實例對象構造時,構造全部緩衝區中的對象實例
public class DataFactory implements EventFactory{ @Override public Object newInstance() { return new MyData(); } }
4.生產者(具體每行代碼的做用都已經註釋):
public class Producers { private final RingBuffer<MyData> ringBuffer; //建立環形隊列(環形緩衝區) public Producers(RingBuffer<MyData> ringBuffer) { this.ringBuffer = ringBuffer; //將ringBuffer與Producers綁定 } public void putData(ByteBuffer byteBuffer){ //此方法將產生的數據推入緩衝區 long sequeue = ringBuffer.next(); //經過.next()方法獲得ringBuffer的下一個節點,而且賦值給sequeue MyData event = ringBuffer.get(sequeue); //將mydata數據存入到下一個節點 event.setValue(byteBuffer.getLong(0)); //mydata的值有ByteBuffer參數帶入 ringBuffer.publish(sequeue); //將sequeue節點內的數據發佈 } }
5.消費者:
public class Consumers implements WorkHandler<MyData>{ @Override public void onEvent(MyData myData) throws Exception { System.out.println("當前線程爲:"+Thread.currentThread().getId()+"線程,它處理的數據是:"+myData.getValue()); } }
6.執行函數:
public class RunTest { public static void main(String[] args) throws InterruptedException { Executor executor = Executors.newCachedThreadPool(); //建立線程池 DataFactory dataFactory = new DataFactory(); //建立Factory實例 int bufferSize = 1024; //設置緩存區大小爲1024(必須是2的整數次冪) Disruptor<MyData> disruptor = new Disruptor<MyData>( dataFactory, bufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy() ); disruptor.handleEventsWithWorkerPool( new Consumers(), new Consumers(), new Consumers(), new Consumers() ); disruptor.start(); //Disruptor啓動 RingBuffer<MyData> ringBuffer = disruptor.getRingBuffer(); //實例化環形隊列並與Disruptor綁定 Producers producers = new Producers(ringBuffer); //實例化生產者並綁定ringBuffer ByteBuffer byteBuffe = ByteBuffer.allocate(8); //建立一個容量爲256字節的ByteBuffer for (long n = 0;true;n++){ byteBuffe.putLong(0,n); producers.putData(byteBuffe); Thread.sleep(100); System.out.println("add data "+n); } } }
咱們來看下執行結果:
當前線程爲:13線程,它處理的數據是:1059 add data 1059 當前線程爲:11線程,它處理的數據是:1060 add data 1060 當前線程爲:10線程,它處理的數據是:1061 add data 1061 當前線程爲:12線程,它處理的數據是:1062 add data 1062 當前線程爲:13線程,它處理的數據是:1063 add data 1063 當前線程爲:11線程,它處理的數據是:1064 add data 1064 當前線程爲:10線程,它處理的數據是:1065
能夠看出,由於我無限的讓生產線程生產數據,而RingBuffer中那十幾條消費線程不停的消費數據
此外Disruptor不止CAS操做,還提供了四種等待策略讓消費者監控緩衝區的信息:
1.BlockingWaitStrategy:默認策略,最節省CPU,但在高併發下性能表現最糟糕
2.SleepingWaitStrategy:等待數據時自旋等待,不成功會使用LockSupport方法阻塞本身,一般用於異步日誌
3.YieldWaitStrategy:用於低延時場合,在內部執行Thread.yield()死循環
4.BusySpinWaitStrategy:消費線程進行死循環監控緩衝區,吃掉全部CPU資源
除了CAS操做,消費者等待策略,Disruptor還使用CPU Cache的優化來進行優化
根據Disruptor官方報道:Disruptor的性能比BlockingQueuez至少高一倍以上!
以上即是生產者消費者模式的應用
謝謝閱讀,記得點關注看更新