<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.4</version> </dependency>
public class PCData {ide
private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; }
}this
public class PCDataFactory implements EventFactory<PCData> {code
@Override public PCData newInstance() { return new PCData(); }
}get
public class Producer {it
private final RingBuffer<PCData> ringBuffer; public Producer(RingBuffer<PCData> ringBuffer) { this.ringBuffer = ringBuffer; } public void pushData(ByteBuffer byteBuffer){ long sequence = ringBuffer.next(); try{ PCData event = ringBuffer.get(sequence); event.setValue(byteBuffer.getLong(0)); }finally { ringBuffer.publish(sequence); } }
}io
public class Consumer implements WorkHandler<PCData> {event
@Override public void onEvent(PCData pcData) throws Exception { System.out.println(Thread.currentThread().getName()+"Event:--"+pcData.getValue()*pcData.getValue()+"--"); }
}class
public class App {im
public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); PCDataFactory factory = new PCDataFactory(); int bufferSize = 1024; Disruptor<PCData> dataDisruptor = new Disruptor<PCData>(factory,bufferSize,executorService, ProducerType.MULTI,new BlockingWaitStrategy()); dataDisruptor.handleEventsWithWorkerPool( new Consumer(), new Consumer(), new Consumer(), new Consumer() ); dataDisruptor.start(); RingBuffer<PCData> ringBuffer = dataDisruptor.getRingBuffer(); Producer producer = new Producer(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); for(long l=0;true;l++){ byteBuffer.putLong(0,l); producer.pushData(byteBuffer); Thread.sleep(100); System.out.println("add data"+l); } }
}next