Disruptor是一個用於在線程間通訊的高效低延時的消息組件,它像個加強的隊列,使用柵欄(barrier)+序號(Sequencing)機制協調生產者與消費者,從而避免使用鎖和CAS,同時還組合使用預分配內存機制、緩存行機制(cache line)、批處理效應(batch effect)來達到高吞吐量和低時延的目標。java
代碼併發執行主要考慮兩個方面:互斥和變化的可見性。互斥主要用來管理對某些資源的競爭訪問;變化的可見性主要用來控制何時這些變化對其餘線程可見。併發環境中最耗時的操做其實就是併發寫操做。多線程對同一個資源近與須要複雜昂貴的協調,一般會經過某種鎖來實現資源協調:git
1.鎖的代價github
鎖提供了互斥,並可以確保變化可以以一個肯定的順序讓其它的線程可見鎖會涉及到操做系統的上下文切換,操做系統會掛起全部在等待這把鎖的線程,直到鎖持有者釋放該鎖數組
2.CAS的代價緩存
CAS依賴於處理器的支持,CAS相對於鎖是很是高效的,CAS並非免費的,通常是採用CPU空轉的方式數據結構
Disruptor的設計初衷就是爲了解決鎖代價和CAS代價過大等問題,以求最優化內存分配,使用緩存友好的方式來最佳使用硬件資源,Disruptor地核心機制在於:以RingBuffer的形式預分配有界的數據結構,單個或多個生產者能夠抽RingBuffer中寫入數據,單個或多個消費者能夠從RingBuffer讀取數據。多線程
BlockingWaitStrategy:經過線程阻塞的方式,等待生產者喚醒併發
BusySpinWaitStrategy:線程一直自旋等待,比較耗CPU。dom
LiteBlockingWaitStrategy:經過線程阻塞的方式,等待生產者喚醒,比BlockingWaitStrategy要輕,某些狀況下能夠減小阻塞的次數。高併發
PhasedBackoffWaitStrategy:根據指定的時間段參數和指定的等待策略決定採用哪一種等待策略。
SleepingWaitStrategy:可經過參數設置,使線程經過Thread.yield()主動放棄執行,經過線程調度器從新調度;或一直自旋等待。
TimeoutBlockingWaitStrategy:經過參數設置阻塞時間,若是超時則拋出異常。
YieldingWaitStrategy: 經過Thread.yield()主動放棄執行,經過線程調度器從新調度。
public E get(long sequence) { return elementAt(sequence); }
protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); }
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
package com.scgaopan.disruptor.pakingdemo; /** * Author:scgaopan * Date:18/10/25 * Description: 汽車信息 */ public class MyInParkingDataEvent { private String carLicense;//車牌號 public String getCarLicense() { return carLicense; } public void setCarLicense(String carLicense) { this.carLicense = carLicense; } }
package com.scgaopan.disruptor.pakingdemo.handler; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; import com.scgaopan.disruptor.pakingdemo.MyInParkingDataEvent; /** * Author:scgaopan * Date:18/10/25 * Description:第一個消費者,負責保存進場汽車的信息 */ public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent>, WorkHandler<MyInParkingDataEvent> { public void onEvent(MyInParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("this is Eventhandler executed.........data="+event.getCarLicense()); this.onEvent(event); } public void onEvent(MyInParkingDataEvent event) throws Exception { System.out.println("this is WorkHandler executed.........data="+event.getCarLicense()); Thread.sleep(3000); System.out.println(" 成功保存車牌信息到database中,currentThread="+Thread.currentThread().getId()+",data="+event.getCarLicense()); } }
package com.scgaopan.disruptor.pakingdemo.handler; import com.lmax.disruptor.EventHandler; import com.scgaopan.disruptor.pakingdemo.MyInParkingDataEvent; /** * Author:scgaopan * Date:18/10/25 * Description: */ public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent> { public void onEvent(MyInParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception { Thread.sleep(2000); System.out.println("經過kafka成功,當前線程="+Thread.currentThread().getId()+" data="+event.getCarLicense()); } }
package com.scgaopan.disruptor.pakingdemo.handler; import com.lmax.disruptor.EventHandler; import com.scgaopan.disruptor.pakingdemo.MyInParkingDataEvent; /** * Author:scgaopan * Date:18/10/25 * Description: */ public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent> { public void onEvent(MyInParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception { //Thread.sleep(1000); System.out.println("發送消息成功,當前線程="+Thread.currentThread().getId()+" data="+event.getCarLicense()); } }
package com.scgaopan.disruptor.pakingdemo.producer; import com.lmax.disruptor.EventTranslator; import com.scgaopan.disruptor.pakingdemo.MyInParkingDataEvent; /** * Author:scgaopan * Date:18/10/25 * Description: */ public class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> { public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) { this.generateData(myInParkingDataEvent); } private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) { myInParkingDataEvent.setCarLicense("車牌號: 鄂A-" + (int) (Math.random() * 100000)); // 隨機生成一個車牌號 // System.out.println("Thread Id " + Thread.currentThread().getId() + " 寫完一個event"); return myInParkingDataEvent; } }
package com.scgaopan.disruptor.pakingdemo.producer; import com.lmax.disruptor.dsl.Disruptor; import com.scgaopan.disruptor.pakingdemo.DateUtil; import com.scgaopan.disruptor.pakingdemo.MyInParkingDataEvent; import java.util.concurrent.CountDownLatch; /** * Author:scgaopan * Date:18/10/25 * Description: */ public class MyInParkingDataEventPublisher implements Runnable { private CountDownLatch countDownLatch; // 用於監聽初始化操做,等初始化執行完畢後,通知主線程繼續工做 private Disruptor<MyInParkingDataEvent> disruptor; private static final Integer NUM = 1; // 1,10,100,1000 public MyInParkingDataEventPublisher(CountDownLatch countDownLatch, Disruptor<MyInParkingDataEvent> disruptor) { this.countDownLatch = countDownLatch; this.disruptor = disruptor; } public void run() { MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator(); try { for (int i = 0; i < NUM; i++) { disruptor.publishEvent(eventTranslator); Thread.sleep(1000); // 假設一秒鐘進一輛車 System.out.println(DateUtil.getCurrentTime()+ "有一輛車進入..............."); } } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); // 執行完畢後通知 await()方法 System.out.println(NUM + "輛車已經所有進入進入停車場!"); } } }
package com.scgaopan.disruptor.pakingdemo; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; import com.scgaopan.disruptor.pakingdemo.handler.MyParkingDataInDbHandler; import com.scgaopan.disruptor.pakingdemo.handler.MyParkingDataSmsHandler; import com.scgaopan.disruptor.pakingdemo.handler.MyParkingDataToKafkaHandler; import com.scgaopan.disruptor.pakingdemo.producer.MyInParkingDataEventPublisher; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Author:scgaopan * Date:18/10/25 * Description: */ public class MyInParkingDataEventMain { public static void main(String[] args) { long beginTime=System.currentTimeMillis(); int bufferSize = 2048; // 2的N次方 try { // 建立消費者線程池,負責處理Disruptor的四個消費者 ExecutorService executor = Executors.newFixedThreadPool(4); // 初始化一個 Disruptor Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() { public MyInParkingDataEvent newInstance() { return new MyInParkingDataEvent(); // Event 初始化工廠 } }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 使用disruptor建立消費者組 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith( new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler()); // 當上面兩個消費者處理(這裏是處理結束,而不是通知)結束後再消耗 smsHandler MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler(); handlerGroup.then(myParkingDataSmsHandler); // 啓動Disruptor disruptor.start(); CountDownLatch countDownLatch = new CountDownLatch(1); // 一個生產者線程準備好了就能夠通知主線程繼續工做了 // 生產者生成數據 executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor)); countDownLatch.await(); // 等待生產者結束 disruptor.shutdown(); executor.shutdown(); } catch (Exception e) { e.printStackTrace(); } System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime)); } }
Disruptor源碼地址: https://github.com/LMAX-Exchange/disruptor