本文基於最新的3.4.2的版本文檔進行翻譯,翻譯自:java
github.com/LMAX-Exchan…github
最好的方法去理解Disruptor就是將它和容易理解而且類似的隊列,例如BlockingQueue。Disruptor其實就像一個隊列同樣,用於在不一樣的線程之間遷移數據,可是Disruptor也實現了一些其餘隊列沒有的特性,如:算法
這是Disruptor和隊列最大的區別。當你有多個消費者監聽了一個Disruptor,全部的事件將會被髮布到全部的消費者中,相比之下隊列的一個事件只能被髮到一個消費者中。Disruptor這一特性被用來須要對同一數據進行多個並行操做的狀況。如在LMAX系統中有三個操做能夠同時進行:日誌(將數據持久到日誌文件中),複製(將數據發送到其餘的機器上,以確保存在數據遠程副本),業務邏輯處理。也可使用WokrerPool來並行處理不一樣的事件。編程
爲了支持真實世界中的業務並行處理流程,Disruptor提供了多個消費者之間的協助功能。回到上面的LMAX的例子,咱們可讓日誌處理和遠程副本賦值先執行完以後再執行業務處理流程,這個功能被稱之爲gating。gating發生在兩種場景中。第一,咱們須要確保生產者不要超過消費者。經過調用RingBuffer.addGatingConsumers()增長相關的消費者至Disruptor來完成。第二,就是以前所說的場景,經過構造包含須要必須先完成的消費者的Sequence的SequenceBarrier來實現。緩存
引用上面的例子來講,有三個消費者監聽來自RingBuffer的事件。這裏有一個依賴關係圖。ApplicationConsumer依賴JournalConsumer和ReplicationConsumer。這個意味着JournalConsumer和ReplicationConsumer能夠自由的併發運行。依賴關係能夠當作是從ApplicationConsumer的SequenceBarrier到JournalConsumer和ReplicationConsumer的Sequence的鏈接。還有一點值得關注,Sequencer與下游的消費者之間的關係。它的角色是確保發佈不會包裹RingBuffer。爲此,全部下游消費者的Sequence不能比ring buffer的Sequence小且不能比ring buffer 的大小小。由於ApplicationConsumers的Sequence是確保比JournalConsumer和ReplicationConsumer的Sequence小或等於,因此Sequencer只須要檢查ApplicationConsumers的Sequence。在更爲廣泛的應用場景中,Sequencer只須要意識到消費者樹中的葉子節點的的Sequence便可。安全
Disruptor的一個目標之一是被用在低延遲的環境中。在一個低延遲系統中有必要去減小和下降內存的佔用。在基於Java的系統中,須要減小因爲GC致使的停頓次數(在低延遲的C/C++系統中,因爲內存分配器的爭用,大量的內存分配也會致使問題)。bash
爲了知足這點,用戶能夠在Disruptor中爲事件預分配內存。因此EventFactory是用戶來提供,而且Disruptor的Ring Buffer每一個entry中都會被調用。當將新的數據發佈到Disruptor中時,Disruptor的API將會容許用戶持有所構造的對象,以便用戶能夠調用這些對象的方法和更新字段到這些對象中。Disruptor將確保這些操做是線程安全。服務器
無鎖算法實現的Disruptor的全部內存可見性和正確性都使用內存屏障和CAS操做實現。只僅僅一個場景BlockingWaitStrategy中使用到了lock。而這僅僅是爲了使用Condition,以便消費者線程能被park住當在等待一個新的事件到來的時候。許多低延遲系統都使用自旋(busy-wait)來避免使用Condition形成的抖動。可是自旋(busy-wait)的數量變多時將會致使性能的降低,特別是CPU資源嚴重受限的狀況下。例如,在虛擬環境中的Web服務器。架構
咱們使用一個簡單的例子來體驗一下Disruptor。生產者會傳遞一個long類型的值到消費者,消費者接受到這個值後會打印出這個值。
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
}
複製代碼
爲了使用Disruptor的內存預分配event,咱們須要定義一個EventFactory:
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
複製代碼
爲了讓消費者處理這些事件,因此咱們這裏定義一個事件處理器,負責打印event:
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
System.out.println("Event: " + event);
}
}
複製代碼
在Disruptor的3.0版本中,因爲加入了豐富的Lambda風格的API,能夠用來幫組開發人員簡化流程。因此在3.0版本後首選使用Event Publisher/Event Translator來發布事件。
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;
public class LongEventProducerWithTranslator {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>()
{
public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
event.set(bb.getLong(0));
}
};
public void onData(ByteBuffer bb) {
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}
複製代碼
這種方法的另外一個優勢是能夠將translator代碼放入一個單獨的類中,而且能夠輕鬆地對它們進行獨立的單元測試。
import com.lmax.disruptor.RingBuffer;
public class LongEventProducer {
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb) {
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}
複製代碼
這裏咱們須要把發佈包裹在try/finally代碼塊中。若是某個請求的 sequence 未被提交,將會堵塞後續的發佈操做或者其它的 producer。特別地在多生產中若是沒有提交Sequence,那麼會形成消費者停滯,致使只能重啓消費者才能恢復。
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class LongEventMain {
public static void main(String[] args) throws Exception {
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.onData(bb);
Thread.sleep(1000);
}
}
}
複製代碼
咱們也可使用Java 8的函數式編程來寫這個例子:
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class LongEventMain {
public static void main(String[] args) throws Exception {
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}
複製代碼
使用函數式編程咱們能夠發現不少的類都不須要了,如:handler,translator等。 上面的代碼還能夠再簡化一下:
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}
複製代碼
不過這樣將實例化一個對象去持有ByteBuffer bb
變量傳入lambda的值。這會產生沒必要要的垃圾。所以,若是要求低GC壓力,則應首選將參數傳遞給lambda的調用。
若是想要讓Disruptor擁有更好的性能這裏有兩個選項能夠調整,wait strategy 和 producer的類型。
最好的方法在併發環境下提升性能是堅持單獨寫原則( Single Writer Principle)。若是你的業務場景中只有一個線程寫入數據到Disruptor,那麼你能夠設置成單生產者來提高性能:
public class LongEventMain {
public static void main(String[] args) throws Exception {
//.....
// Construct the Disruptor with a SingleProducerSequencer
Disruptor<LongEvent> disruptor = new Disruptor(
factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
//.....
}
}
複製代碼
性能測試: Multiple Producer
Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec
複製代碼
Single Producer
Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec
複製代碼
BlockingWaitStrategy Disruptor的默認策略是BlockingWaitStrategy。在BlockingWaitStrategy內部是使用鎖和condition來控制線程的喚醒。BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小而且在各類不一樣部署環境中能提供更加一致的性能表現
SleepingWaitStrategy SleepingWaitStrategy 的性能表現跟 BlockingWaitStrategy 差很少,對 CPU 的消耗也相似,但其對生產者線程的影響最小,經過使用LockSupport.parkNanos(1)來實現循環等待。通常來講Linux系統會暫停一個線程約60µs,這樣作的好處是,生產線程不須要採起任何其餘行動就能夠增長適當的計數器,也不須要花費時間信號通知條件變量。可是,在生產者線程和使用者線程之間移動事件的平均延遲會更高。它在不須要低延遲而且對生產線程的影響較小的狀況最好。一個常見的用例是異步日誌記錄。
YieldingWaitStrategy YieldingWaitStrategy是可使用在低延遲系統的策略之一。YieldingWaitStrategy將自旋以等待序列增長到適當的值。在循環體內,將調用Thread.yield(),以容許其餘排隊的線程運行。在要求極高性能且事件處理線數小於 CPU 邏輯核心數的場景中,推薦使用此策略;例如,CPU開啓超線程的特性。
BusySpinWaitStrategy 性能最好,適合用於低延遲的系統。在要求極高性能且事件處理線程數小於CPU邏輯核心樹的場景中,推薦使用此策略;例如,CPU開啓超線程的特性。
經過Disruptor傳遞數據時,對象的生存期可能比預期的更長。爲避免發生這種狀況,可能須要在處理事件後清除事件。若是隻有一個事件處理程序,則須要在處理器中清除對應的對象。若是您有一連串的事件處理程序,則可能須要在該鏈的末尾放置一個特定的處理程序來處理清除對象。
class ObjectEvent<T> {
T val;
void clear() {
val = null;
}
}
public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>> {
public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch) {
// Failing to call clear here will result in the
// object associated with the event to live until
// it is overwritten once the ring buffer has wrapped
// around to the beginning.
event.clear();
}
}
public static void main(String[] args) {
Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
() -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);
disruptor
.handleEventsWith(new ProcessingEventHandler())
.then(new ClearingObjectHandler());
}
複製代碼