Disruptor Quick Start

建立步驟:java

  1. 簡歷一個工廠Event類,用於建立Event類實例對象
  2. 須要有一個監聽事件類,用於處理數據(Event類)
  3. 實例化Disruptor實例,配置一系列參數,編寫Disruptor核心組件
  4. 編寫生產者組件,向Disruptor容器中去投遞數據

簡歷一個工廠Event類,OrderEvent.javaui

package com.bfxy.disruptor.quickstart;

public class OrderEvent {
   //訂單的價格 
   private long value; 

   public long getValue() {
      return value;
   }

   public void setValue(long value) {
      this.value = value;
   }
   
}

OrderEventFactory.javathis

package com.bfxy.disruptor.quickstart;

import com.lmax.disruptor.EventFactory;

public class OrderEventFactory implements EventFactory<OrderEvent>{

   public OrderEvent newInstance() {
      return new OrderEvent();      //這個方法就是爲了返回空的數據對象(Event)
   }

}

二、須要有一個監聽事件類:OrderEventHandler.java線程

package com.bfxy.disruptor.quickstart;

import com.lmax.disruptor.EventHandler;

public class OrderEventHandler implements EventHandler<OrderEvent>{

   public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
      Thread.sleep(Integer.MAX_VALUE);
      System.err.println("消費者: " + event.getValue());
   }

}

三、實例化Disruptor實例:Main.java對象

package com.bfxy.disruptor.quickstart;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {

   public static void main(String[] args) {
      // 參數準備工做
      OrderEventFactory orderEventFactory = new OrderEventFactory();
      int ringBufferSize = 4;
      ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
      
      /**
       * 1 eventFactory: 消息(event)工廠對象
       * 2 ringBufferSize: 容器的長度
       * 3 executor: 線程池(建議使用自定義線程池) RejectedExecutionHandler
       * 4 ProducerType: 單生產者 仍是 多生產者
       * 5 waitStrategy: 等待策略
       */
      //1. 實例化disruptor對象
      Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
            ringBufferSize,
            executor,
            ProducerType.SINGLE,
            new BlockingWaitStrategy());
      
      //2. 添加消費者的監聽 (構建disruptor 與 消費者的一個關聯關係)
      disruptor.handleEventsWith(new OrderEventHandler());
      
      //3. 啓動disruptor
      disruptor.start();
      
      //4. 獲取實際存儲數據的容器: RingBuffer
      RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
      
      OrderEventProducer producer = new OrderEventProducer(ringBuffer);
      
      ByteBuffer bb = ByteBuffer.allocate(8);
      
      for(long i = 0 ; i < 5; i ++){
         bb.putLong(0, i);
         producer.sendData(bb);
      }
      
      disruptor.shutdown();
      executor.shutdown();
      
   }
}

四、編寫生產者組件:OrderEventProducer.java 事件

package com.bfxy.disruptor.quickstart;

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

public class OrderEventProducer {

   private RingBuffer<OrderEvent> ringBuffer;
   
   public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
      this.ringBuffer = ringBuffer;
   }
   
   public void sendData(ByteBuffer data) {
      //1 在生產者發送消息的時候, 首先 須要從咱們的ringBuffer裏面 獲取一個可用的序號
      long sequence = ringBuffer.next(); //0    
      try {
         //2 根據這個序號, 找到具體的 "OrderEvent" 元素 注意:此時獲取的OrderEvent對象是一個沒有被賦值的"空對象"
         OrderEvent event = ringBuffer.get(sequence);
         //3 進行實際的賦值處理
         event.setValue(data.getLong(0));         
      } finally {
         //4 提交發布操做
         ringBuffer.publish(sequence);        
      }
   }

 }
相關文章
相關標籤/搜索