架構師養成記--15.Disruptor併發框架

1、概述java

disruptor對於處理併發任務很擅長,曾有人測過,一個線程裏1s內能夠處理六百萬個訂單,性能至關感人。緩存

這個框架的結構大概是:數據生產端 --> 緩存 --> 消費端併發

緩存中的數據是主動發給消費端的,而不是像通常的生產者消費者模式那樣,消費端去緩存中取數據。框架

能夠將disruptor理解爲,基於事件驅動的高效隊列、輕量級的JMS異步

disruptor學習網站:http://ifeve.com/disruptor-getting-startedide

2、開發流程性能

1.建Event類(數據對象)學習

2.創建一個生產數據的工廠類,EventFactory,用於生產數據;網站

3.監聽事件類(處理Event數據)this

4.實例化Disruptor,配置參數,綁定事件;

5.建存放數據的核心 RingBuffer,生產的數據放入 RungBuffer。

3、HelloWord 

1.入口

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class LongEventMain {

    public static void main(String[] args) throws Exception {
        //建立緩衝池
        ExecutorService  executor = Executors.newCachedThreadPool();
        //建立工廠
        LongEventFactory factory = new LongEventFactory();
        //建立bufferSize ,也就是RingBuffer大小,必須是2的N次方
        int ringBufferSize = 1024 * 1024; // 

        /**
        //BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小而且在各類不一樣部署環境中能提供更加一致的性能表現
        WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
        //SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差很少,對CPU的消耗也相似,但其對生產者線程的影響最小,適合用於異步日誌相似的場景
        WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
        //YieldingWaitStrategy 的性能是最好的,適合用於低延遲的系統。在要求極高性能且事件處理線數小於CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啓超線程的特性
        WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
        */
        
        //建立disruptor
        Disruptor<LongEvent> disruptor = 
                new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
        // 鏈接消費事件方法
        disruptor.handleEventsWith(new LongEventHandler());
        
        // 啓動
        disruptor.start();
        
        //Disruptor 的事件發佈過程是一個兩階段提交的過程:
        //發佈事件
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        
        LongEventProducer producer = new LongEventProducer(ringBuffer); 
        //LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for(long l = 0; l<100; l++){
            byteBuffer.putLong(0, l);
            producer.onData(byteBuffer);
            //Thread.sleep(1000);
        }

        
        disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至全部的事件都獲得處理;
        executor.shutdown();//關閉 disruptor 使用的線程池;若是須要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;        
        
        
    }
}

 

 

2.數據對象:

public class LongEvent { 
    private long value;
    public long getValue() { 
        return value; 
    } 
 
    public void setValue(long value) { 
        this.value = value; 
    } 
} 

3.Event工廠

import com.lmax.disruptor.EventFactory;
// 須要讓disruptor爲咱們建立事件,咱們同時還聲明瞭一個EventFactory來實例化Event對象。
public class LongEventFactory implements EventFactory { 

    @Override 
    public Object newInstance() { 
        return new LongEvent(); 
    } 
} 

4.生產者

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;
/**
 * 很明顯的是:當用一個簡單隊列來發布事件的時候會牽涉更多的細節,這是由於事件對象還須要預先建立。
 * 發佈事件最少須要兩步:獲取下一個事件槽併發布事件(發佈事件的時候要使用try/finnally保證事件必定會被髮布)。
 * 若是咱們使用RingBuffer.next()獲取一個事件槽,那麼必定要發佈對應的事件。
 * 若是不能發佈事件,那麼就會引發Disruptor狀態的混亂。
 * 尤爲是在多個事件生產者的狀況下會致使事件消費者失速,從而不得不重啓應用才能會恢復。
 */
public class LongEventProducer {

    private final RingBuffer<LongEvent> ringBuffer;
    
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
        this.ringBuffer = ringBuffer;
    }
    
    /**
     * onData用來發布事件,每調用一次就發佈一次事件
     * 它的參數會用過事件傳遞給消費者
     */
    public void onData(ByteBuffer bb){
        //1.能夠把ringBuffer看作一個事件隊列,那麼next就是獲得下面一個事件槽
        long sequence = ringBuffer.next();
        try {
            //2.用上面的索引取出一個空的事件用於填充(獲取該序號對應的事件對象)
            LongEvent event = ringBuffer.get(sequence);
            //3.獲取要經過事件傳遞的業務數據
            event.setValue(bb.getLong(0));
        } finally {
            //4.發佈事件
            //注意,最後的 ringBuffer.publish 方法必須包含在 finally 中以確保必須獲得調用;若是某個請求的 sequence 未被提交,將會堵塞後續的發佈操做或者其它的 producer。
            ringBuffer.publish(sequence);
        }
    }
    
    
}

 

5.消費者

import com.lmax.disruptor.EventHandler;

//咱們還須要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:
public class LongEventHandler implements EventHandler<LongEvent>  {

    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        System.out.println(longEvent.getValue());         
    }

}
相關文章
相關標籤/搜索