版權聲明:原創做品,謝絕轉載!不然將追究法律責任。html
Disruptor是一個優秀的併發框架,能夠實現單個或多個生產者生產消息,單個或多個消費者消息,且消費者之間能夠存在消費消息的依賴關係。網上其餘博客每每僅針對框架的一部分使用示例進行了介紹,對於某些場景下介紹並不徹底:如多生產者間複雜的依賴關係的使用編碼。java
本文儘量對Disruptor的全部使用場景進行總結,若有不全之處歡迎指出,請諒解。多線程
具體關於Disruptor的原理,參見:http://ifeve.com/disruptor/,本文不在贅述。併發
在disruptor框架調用start方法以前,每每須要將消息的消費者指定給disruptor框架。框架
經常使用的方法是:disruptor.handleEventsWith(EventHandler ... handlers),將多個EventHandler的實現類傳入方法,封裝成一個EventHandlerGroup,實現多消費者消費。ide
disruptor的另外一個方法是:disruptor.handleEventsWithWorkerPool(WorkHandler ... handlers),將多個WorkHandler的實現類傳入方法,封裝成一個EventHandlerGroup實現多消費者消費。源碼分析
二者共同點都是,將多個消費者封裝到一塊兒,供框架消費消息。post
不一樣點在於,this
1. 對於某一條消息m,handleEventsWith方法返回的EventHandlerGroup,Group中的每一個消費者都會對m進行消費,各個消費者之間不存在競爭。handleEventsWithWorkerPool方法返回的EventHandlerGroup,Group的消費者對於同一條消息m不重複消費;也就是,若是c0消費了消息m,則c1再也不消費消息m。編碼
2. 傳入的形參不一樣。對於獨立消費的消費者,應當實現EventHandler接口。對於不重複消費的消費者,應當實現WorkHandler接口。
所以,根據消費者集合是否獨立消費消息,能夠對不一樣的接口進行實現。也能夠對兩種接口同時實現,具體消費流程由disruptor的方法調用決定。
在進行場景分析以前,首先定義公共的生產者Producer,消費者OrderHandler1,消息Order,消息工廠OrderFactory。定義分別以下:
package liuqiang.complex.common; public class Order { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } }
package liuqiang.complex.common; import com.lmax.disruptor.EventFactory; public class OrderFactory implements EventFactory<Order> { @Override public Order newInstance() { return new Order(); } }
package liuqiang.complex.common; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; //EventHandler用於EventHandlerGroup,WorkHandler用於WorkPool。同時實現兩接口,該類對象可同時用於EventHandlerGroup和WorkPool public class OrderHandler1 implements EventHandler<Order>, WorkHandler<Order> { private String consumerId; public OrderHandler1(String consumerId){ this.consumerId = consumerId; } //EventHandler的方法 @Override public void onEvent(Order order, long sequence, boolean endOfBatch) throws Exception { System.out.println("OrderHandler1 " + this.consumerId + ",消費信息:" + order.getId()); } //WorkHandler的方法 @Override public void onEvent(Order order) throws Exception { System.out.println("OrderHandler1 " + this.consumerId + ",消費信息:" + order.getId()); } }
package liuqiang.complex.common; import com.lmax.disruptor.RingBuffer; public class Producer { private final RingBuffer<Order> ringBuffer; public Producer(RingBuffer<Order> ringBuffer){ this.ringBuffer = ringBuffer; } public void onData(String data){ long sequence = ringBuffer.next(); try { Order order = ringBuffer.get(sequence); order.setId(data); } finally { ringBuffer.publish(sequence); } } }
下面定義兩種不一樣的消費者集合關係:
package liuqiang.complex.single; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main1 { //單生產者模式,單消費者模式 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //設置一個消費者 disruptor.handleEventsWith(new OrderHandler1("1")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
這種狀況最爲簡單,單生產者,僅需在Disruptor初始化時,傳入ProducerType.SINGLE便可。使用disruptor.handleEventsWith傳入單消費者。Thread.sleep方法調用是爲了保證,在調用disruptor.shutdown方法前,全部的消費者線程都已經啓動,防止shutdown失效的問題。具體問題詳見本人另外一篇博客:Disruptor中shutdown方法失效,及產生的不肯定性源碼分析。
輸出結果以下:
OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2
package liuqiang.complex.single; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.*; import java.util.concurrent.Executors; public class Main2 { //單生產者,多消費者,但多消費者間造成依賴關係,每一個依賴節點單線程。 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //多個消費者間造成依賴關係,每一個依賴節點的消費者爲單線程。 disruptor.handleEventsWith(new OrderHandler1("1")).then(new OrderHandler1("2"), new OrderHandler1("3")).then(new OrderHandler1("4")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
四個消費者之間的依賴圖以下:
消費者C二、C3只有在C1消費完消息m後,才能消費m。消費者C4只有在C二、C3消費完m後,才能消費該消息。
可能的輸出結果以下(可能由於線程執行前後順序不一樣略有區別,但輸出必定知足相關依賴約束):
OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 2,消費信息:0
OrderHandler1 3,消費信息:0
OrderHandler1 1,消費信息:2
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 4,消費信息:0
OrderHandler1 4,消費信息:1
OrderHandler1 4,消費信息:2
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.*; import java.util.concurrent.Executors; public class Main3 { //單生產者,多消費者模式。多消費者對於消息不重複消費。例如:1線程消費了消息0,則2線程只能從0後面的消息消費,不能對消息0進行消費。 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); /* * 該方法傳入的消費者須要實現WorkHandler接口,方法的內部實現是:先建立WorkPool,而後封裝WorkPool爲EventHandlerPool返回。 * 消費者一、2對於消息的消費有時有競爭,保證同一消息只能有一個消費者消費 */ disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
調用handleEventsWithWorkerPool造成WorkerPool,並進一步封裝成EventHandlerGroup。對於同一條消息,兩消費者不重複消費。
可能輸出結果以下:
OrderHandler1 1,消費信息:0
OrderHandler1 2,消費信息:1
OrderHandler1 1,消費信息:2
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main4 { //單生產者,多消費者模式。多消費者對於消息獨立消費。例如:對於消息m,兩個消費者都要對其進行消費。 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); /* * 兩個消費者建立EventHandlerGroup。該消費者須要實現EventHandler類。兩個消費者對於RingBuffer中的每一個消息,都獨立消費一次。 * 兩個消費者在消費消息的過程當中,各自獨立,不產生競爭。 */ disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
可能輸出結果以下:
OrderHandler1 1,消費信息:0
OrderHandler1 2,消費信息:0
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main5 { //單生產者,多消費者間存在依賴關係的模式。消費者一、2組成EventHandlerGroup,消息獨立消費。消費者三、4僅能消費一、2均消費過的消息,且獨立消費。消費者5僅能消費三、4均消費過的消息 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //至關於在各個EventHandlerGroup之間進行級聯,造成依賴關係。 disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2")).then(new OrderHandler1("3"), new OrderHandler1("4")).then(new OrderHandler1("5")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
消費者之間的依賴關係以下:
可能的輸出結果以下:
OrderHandler1 2,消費信息:0
OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 3,消費信息:0
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 4,消費信息:0
OrderHandler1 4,消費信息:1
OrderHandler1 4,消費信息:2
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main6 { /* * 單生產者,多消費者。多消費者之間不重複消費,且不一樣的消費者WorkPool之間存在依賴關係。 * 消費者一、2不重複消費消息,消費者三、4不重複消費1或者2消費過的消息,消費者5消費消費者3或4消費過的消息。 */ public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2")).thenHandleEventsWithWorkerPool(new OrderHandler1("3"), new OrderHandler1("4")).thenHandleEventsWithWorkerPool(new OrderHandler1("5")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
消費者之間的依賴圖以下所示:
可能的輸出結果以下:
OrderHandler1 2,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 3,消費信息:0
OrderHandler1 3,消費信息:2
OrderHandler1 4,消費信息:1
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main7 { //單生產者,多消費者模式。消費者一、2不重複消費消息,消費者三、4消費消費者1或2消費過的消息,且獨立重複消費。消費者5消費消費者三、4均消費過的消息。 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWithWorkerPool(new OrderHandler1("1"), new OrderHandler1("2")).then(new OrderHandler1("3"), new OrderHandler1("4")).then(new OrderHandler1("5")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (long l = 0; l < 3; l++) { producer.onData(l + ""); } //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
消費者之間的依賴圖以下:
可能的輸出結果以下:
OrderHandler1 1,消費信息:1
OrderHandler1 2,消費信息:0
OrderHandler1 1,消費信息:2
OrderHandler1 4,消費信息:0
OrderHandler1 4,消費信息:1
OrderHandler1 3,消費信息:0
OrderHandler1 4,消費信息:2
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.Executors; public class Main8 { //單生產者,多消費者模式。消費者一、2獨立消費每一條消息,消費者三、4不重複消費消費者一、2均處理過的消息,消費者5消費消費者3或4消費過的消息 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(new OrderHandler1("1"), new OrderHandler1("2")).thenHandleEventsWithWorkerPool(new OrderHandler1("3"), new OrderHandler1("4")).then(new OrderHandler1("5")); disruptor.start(); RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { producer.onData(l + ""); } //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
消費者間的依賴圖以下:
可能的輸出結果以下:
OrderHandler1 2,消費信息:0
OrderHandler1 2,消費信息:1
OrderHandler1 2,消費信息:2
OrderHandler1 1,消費信息:0
OrderHandler1 1,消費信息:1
OrderHandler1 1,消費信息:2
OrderHandler1 3,消費信息:0
OrderHandler1 3,消費信息:1
OrderHandler1 3,消費信息:2
OrderHandler1 5,消費信息:0
OrderHandler1 5,消費信息:1
OrderHandler1 5,消費信息:2
場景九:多生產者,單消費者模式
該場景較爲簡單,只需將ProducerType.SINGLE改成ProducerType.MULTI,而且編寫多線程生產者的相關代碼便可。
package liuqiang.complex.multi; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import liuqiang.complex.common.Order; import liuqiang.complex.common.OrderFactory; import liuqiang.complex.common.OrderHandler1; import liuqiang.complex.common.Producer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; public class Main9 { //多生產者,單消費者版本。三個生產者獨立生產消息。 public static void main(String[] args) throws Exception { EventFactory<Order> factory = new OrderFactory(); int ringBufferSize = 1024 * 1024; //ProducerType要設置爲MULTI,後面纔可使用多生產者模式 Disruptor<Order> disruptor = new Disruptor<Order>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy()); //簡化問題,設置爲單消費者模式,也能夠設置爲多消費者及消費者間多重依賴。 disruptor.handleEventsWith(new OrderHandler1("1")); disruptor.start(); final RingBuffer<Order> ringBuffer = disruptor.getRingBuffer(); //判斷生產者是否已經生產完畢 final CountDownLatch countDownLatch = new CountDownLatch(3); //單生產者,生產3條數據 for (int l = 0; l < 3; l++) { Thread thread = new Thread() { @Override public void run() { for(int i = 0; i < 3; i++) { new Producer(ringBuffer).onData(Thread.currentThread().getName() + "'s " + i + "th message"); } countDownLatch.countDown(); } }; thread.setName("producer thread " + l); thread.start(); } countDownLatch.await(); //爲了保證消費者線程已經啓動,留足足夠的時間。具體緣由詳見另外一篇博客:disruptor的shutdown失效問題 Thread.sleep(1000); disruptor.shutdown(); } }
以上是,對disruptor的各個使用場景的簡單介紹。
後面會寫博客針對Disruptor的各部分源碼作一分析,詳細介紹其消費者之間依賴關係的實現機制、單生產者、多生產者之間的不一樣實現方式等。