高性能的生產者-消費者新選擇:無鎖的緩存框架 Disruptor

<dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.3.4</version>
    </dependency>

public class PCData {ide

private long value;

public long getValue() {
    return value;
}

public void setValue(long value) {
    this.value = value;
}

}this

public class PCDataFactory implements EventFactory<PCData> {code

@Override
public PCData newInstance() {
    return new PCData();
}

}get

public class Producer {it

private final RingBuffer<PCData> ringBuffer;

public Producer(RingBuffer<PCData> ringBuffer) {
    this.ringBuffer = ringBuffer;
}

public void pushData(ByteBuffer byteBuffer){
    long sequence = ringBuffer.next();
    try{
        PCData event = ringBuffer.get(sequence);
        event.setValue(byteBuffer.getLong(0));
    }finally {
        ringBuffer.publish(sequence);
    }
}

}io

public class Consumer implements WorkHandler<PCData> {event

@Override
public void onEvent(PCData pcData) throws Exception {
    System.out.println(Thread.currentThread().getName()+"Event:--"+pcData.getValue()*pcData.getValue()+"--");
}

}class

public class App {im

public static void main(String[] args) throws InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    PCDataFactory factory = new PCDataFactory();

    int bufferSize = 1024;
    Disruptor<PCData> dataDisruptor = new Disruptor<PCData>(factory,bufferSize,executorService,
            ProducerType.MULTI,new BlockingWaitStrategy());
    dataDisruptor.handleEventsWithWorkerPool(
            new Consumer(),
            new Consumer(),
            new Consumer(),
            new Consumer()
    );

    dataDisruptor.start();

    RingBuffer<PCData> ringBuffer = dataDisruptor.getRingBuffer();
    Producer producer = new Producer(ringBuffer);
    ByteBuffer byteBuffer = ByteBuffer.allocate(8);
    for(long l=0;true;l++){
        byteBuffer.putLong(0,l);
        producer.pushData(byteBuffer);
        Thread.sleep(100);
        System.out.println("add data"+l);
    }
}

}next

相關文章
相關標籤/搜索