Disruptor設計特色html
隊裏 | 有界性 | 鎖 | 底層結構 |
---|---|---|---|
ArrayBlockingQueue | 有界 | 有鎖 | 數組 |
LinkedBlockingQueue | 有界 | 有鎖 | 鏈表 |
ConcurrentLinkedQueue | 無界 | 無鎖 | 鏈表 |
在高併發且要求較高的穩定性的系統場景下,非了防止生產者速度過快,只能選有界隊列;同時,爲了減小Java的垃圾回收對系統性能的影響儘可能選擇「數組」做爲隊列的底層結構,符合條件只有一個:ArrayBlockingQueuejava
加鎖:不加鎖的性能 > CAS操做的性能 > 加鎖的性能。數組
僞共享:緩存系統中是以緩存行(cache line)爲單位存儲的,當多線程修改互相獨立的變量時,若是這些變量共享同一個緩存行,就會無心中影響彼此的性能,
ArrayBlockingQueue有三個成員變量:緩存
這三個變量很容易放到一個緩存行中,可是之間修改沒有太多的關聯。因此每次修改,都會使以前緩存的數據失效,從而不能徹底達到共享的效果。安全
public class ArrayBlockingQueue<E> { /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; }
// value1和value2可能會產生僞共享 class ValueNoPadding { protected volatile long value1 = 0L; protected volatile long value2 = 0L; } // value1和value2中間插入無用值 p1~p14 class ValuePadding { protected long p1, p2, p3, p4, p5, p6, p7; protected volatile long value1 = 0L; protected long p9, p10, p11, p12, p13, p14; protected volatile long value2 = 0L; }
準備數據容器數據結構
// 數據容器,存放生產和消費的數據內容 public class LongEvent { private long value; }
準備數據容器的生產工廠,用於RingBuffer初始化時的數據填充多線程
// 數據容器生產工廠 public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
準備消費者併發
//消費者 public class LongEventConsumer implements EventHandler<LongEvent> { /** * * @param longEvent * @param sequence 當前的序列 * @param endOfBatch 是不是最後一個數據 * @throws Exception */ @Override public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception { String str = String.format("long event : %s l:%s b:%s", longEvent.getValue(), sequence, endOfBatch); System.out.println(str); } }
生產線程、主線程框架
public class Main { public static void main(String[] args) throws Exception { // 線程工廠 ThreadFactory threadFactory = (r) -> new Thread(r); // disruptor-建立一個disruptor // 設置數據容器的工廠類,ringBuffer的初始化大小,消費者線程的工廠類 Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(new LongEventFactory(), 8, threadFactory); // disruptor-設置消費者 disruptor.handleEventsWith(new LongEventConsumer()); disruptor.start(); // 獲取disruptor的RingBuffer RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // 主線程開始生產 for (long l = 0; l <= 8; l++) { long nextIndex = ringBuffer.next(); LongEvent event = ringBuffer.get(nextIndex); event.setValue(l); ringBuffer.publish(nextIndex); Thread.sleep(1000); } } }
// 數據左右兩邊插入多餘變量隔離真正的變量 class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { protected volatile long value; } class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; }
public class Sequence extends RhsPadding { static final long INITIAL_VALUE = -1L; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET; public Sequence(final long initialValue) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue); } public long get() { return value; } // 使用UNSAFE操做直接修改內存值 public void set(final long value) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); } }