無鎖編程筆記

一 簡單壓測

1 ArrayBlockingQueue

// 單生產者單消費者
public static void main(String[] args) {
    ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(10000000);
    long startTime = System.currentTimeMillis();

    new Thread(() -> {
        long i = 0;
        while (i < Constrants.EVENT_NUM_OM) {
            Data data = new Data(i, "c" + i);
            try {
                queue.put(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            i++;
        }
    }).start();

    new Thread(() -> {
        int k = 0;
        while (k < Constrants.EVENT_NUM_OM) {
            try {
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            k++;
        }
        long endTime = System.currentTimeMillis();
        System.out.println("runTime:" + (endTime - startTime));
    }).start();
}

// Data
public class Data implements Serializable {
    private Long id;
    private String name;
}

2 Disruptor

public class DataConsumer implements EventHandler<Data> {
    private long startTime;
    private int i;

    public DataConsumer() {
        this.startTime = System.currentTimeMillis();
    }

    @Override
    public void onEvent(Data event, long sequence, boolean endOfBatch) throws Exception {
        i++;
        if (i == Constrants.EVENT_NUM_OM) {
            long endTime = System.currentTimeMillis();
            System.out.println("runTime:" + (endTime - startTime));
        }
    }
}

public static void main(String[] args) {
    int ringBufferSize = 65536;// 必需爲2的倍數

    Disruptor<Data> disruptor = new Disruptor<>(() -> new Data(), ringBufferSize, Executors.newSingleThreadExecutor(), ProducerType.SINGLE, new YieldingWaitStrategy());
    DataConsumer dataConsumer = new DataConsumer();
    disruptor.handleEventsWith(dataConsumer);
    disruptor.start();

    new Thread(() -> {
        RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
        for (long i = 0; i < Constrants.EVENT_NUM_OM; i++) {
            long seq = ringBuffer.next();
            Data data = ringBuffer.get(seq);
            data.setId(i);
            data.setName("c" + i);
            ringBuffer.publish(seq);
        }
    }).start();
}

二 併發編程框架核心講解

1 簡介

  • 一個線程執行6百萬訂單
  • 業務邏輯處理器運行在內存中
  • 基於事件驅動方式

2 Quick Start

(1)核心類

  • Event工廠類:建立Event實例
  • 監聽事件類:處理Event
  • Disruptor實例:配置參數,編寫核心組件
  • 生產者組件:向Disruptor中投遞數據

(2)demo

@Data
public class OrderEvent {
    private long value;
}

public class OrderEventFactory implements EventFactory<OrderEvent> {
    @Override
    public OrderEvent newInstance() {
        return new OrderEvent();
    }
}

public class OrderEventHandler implements EventHandler<OrderEvent> {
    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("消費者:" + event.getValue());
    }
}

public class OrderEventProducer {
    private RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void sendData(ByteBuffer byteBuffer) {
        // 1. 在生產者發送消息時,從RingBuffer獲取一個序號
        long seq = ringBuffer.next();
        try {
            // 2. 根據須要找到具體的OrderEvent元素,此時爲未填充的對象
            OrderEvent orderEvent = ringBuffer.get(seq);
            // 3. 進行實際的賦值
            orderEvent.setValue(byteBuffer.getLong(0));
        } finally {
            // 4. 發佈提交
            ringBuffer.publish(seq);
        }
    }
}

public class Main {
    public static void main(String[] args) {
        int ringBufferSize = 1024 * 8;

        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        /**
         * 1 eventFactory: 消息工廠類
         * 2 ringBufferSize: 容器長度
         * 3 executor: 線程池(自定義線程池) RejectedExecutionHandler
         * 4 ProducerType: 生產者數量
         * 5 WaitStrategy: 等待策略
         */

        // 1. 實例化disruptor
        Disruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(), ringBufferSize, executorService, ProducerType.SINGLE, new BlockingWaitStrategy());

        // 2.添加消息消費者監聽
        disruptor.handleEventsWith(new OrderEventHandler());

        // 3.啓動disruptor
        disruptor.start();

        // 4.獲取實際存儲數據的容器:RingBuffer
        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
        OrderEventProducer producer = new OrderEventProducer(ringBuffer);

        ByteBuffer byteBuffer = ByteBuffer.allocate(8);

        for (long i = 0; i < 100; i++) {
            byteBuffer.putLong(0,i);
            producer.sendData(byteBuffer);
        }

        disruptor.shutdown();
        executorService.shutdown();
    }
}

3 RingBuffer

  • 它是一個環。
  • 它在作不一樣上下文(線程)間傳遞數據的Buffer。
  • 它擁有一個序號,序號指向數組中下一個可用的元素(取模操做)。
  • 它的容量通常爲2的N次方,由於2的N次方有利於計算機計算。
  • 它是基於數組的緩存實現,也是建立Sequencer與定義WaitStrategy的入口

4 Disruptor

  • 持有RingBuffer、消費者線程池Executor、消費者集合ConsumerRepository等引用

5 Sequence

  • 經過順序遞增的序號來編號,管理進行交換的數據
  • 對數據進行處理過程老是沿着序號逐個遞增處理
  • 一個Sequence用於追蹤標識某個特定的事件處理者(RingBuffer/Producer/Consumer)的處理進度
  • Sequence能夠當作一個AtomicLong用於標識進度,防止Sequence之間CPU緩存僞共享(Flase Sharing)的問題

6 Sequencer

  • 它是Disruptor的真正核心
  • 兩個實現:
    • SingleProducerSequencer
    • MultiProducerSequencer
  • 主要實現生產者和消費者之間快速、正確地傳遞數據的併發算法

7 Sequence Barrier

  • 保持對RingBuffer的Main Pushlished Sequence(Producer)和Consumer之間的平衡關係
  • Sequence Barrier還定義了決定Consumer是否還有可處理的事件的邏輯

8 EventProcessor

  • 主要事件循環,處理Disruptor的Event,擁有消費者的Sequence。
  • 它有一個實現類BatchEventProcessor,包含了event loop有效的實現,並將回調到一個EventHandler接口的實現對象。

三 Disruptor的高級應用

1 核心鏈路場景應用講解

  • 特色:相當重要但業務複雜
  • 實現1:傳統徹底解耦模式(copy)
  • 實現2:模板模式
  • 解決手段:
    • 領域模型的高度抽象。
    • 尋找更好的框架幫助咱們進行編碼。
  • 框架:
    • 有限狀態機框架:如Spring-StateMachine
    • 使用Disruptor

2 並行計算—串、並行操做、多邊形高端操做

(1)概述

EventHandlerGroup<T> handleEvnetsWith(final EventHandler<? super T>... handlers);

// 串行操做:鏈式調用

// 並行操做:單獨調用

(2)操做

@Data
public class Trade {
    private String id;
    private String name;
    private double price;
    private AtomicInteger count = new AtomicInteger(0);
}

public class TradePublisher implements Runnable {

    private CountDownLatch latch;
    private Disruptor disruptor;

    private static final int PUBLISH_COUNT = 2;

    public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
        this.latch = latch;
        this.disruptor = disruptor;
    }

    @Override
    public void run() {
        EventTranslator<Trade> eventTranslator = new EventTranslator<Trade>() {
            private Random random = new Random();

            private void generateTrade(Trade trade) {
                trade.setPrice(random.nextDouble() * 9999);
            }

            @Override
            public void translateTo(Trade trade, long sequence) {
                this.generateTrade(trade);
            }
        };

        for (int i = 0; i < PUBLISH_COUNT; i++)
            disruptor.publishEvent(eventTranslator);

        latch.countDown();
    }
}

public static void main(String[] args) throws InterruptedException {
    ExecutorService es1 = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    ExecutorService es2 = Executors.newFixedThreadPool(8);

    Disruptor<Trade> disruptor = new Disruptor<Trade>(
        (EventFactory<Trade>) () -> new Trade(),
        1024 * 1024,
        es2,
        ProducerType.SINGLE,
        new BusySpinWaitStrategy());
    
    Handler1 handler1 = new Handler1();
    Handler2 handler2 = new Handler2();
    Handler3 handler3 = new Handler3();     
    Handler3 handler4 = new Handler4();     
    Handler3 handler5 = new Handler5();     
    
    // 串行操做1
    // disruptor.handleEventsWith(handler1)
    //            .then(handler2)
    //            .then(handler3);
    //
    
    // 串行操做2
    // disruptor.handleEventsWith(handler1)
    //            .handleEventsWith(handler2)
    //            .handleEventsWith(handler3);
    //

    // 並行操做1
    // disruptor.handleEventsWith(handler1);
    // disruptor.handleEventsWith(handler3);
    // disruptor.handleEventsWith(handler2);

    // 並行操做2
    // disruptor.handleEventsWith(handler1,handler3,handler2);

    // 惟一操做: 多消費者
    // disruptor.handleEventsWithWorkerPool(handler1,handler2,handler3);
    
    // 多邊形
    // disruptor.handleEventsWith(handler1,handler2).then(handler3);
        
    // 複雜多邊形操做
    // disruptor.handleEventsWith(handler1, handler4);
    // disruptor.after(handler1).then(handler2);
    // disruptor.after(handler4).then(handler5);
    // disruptor.after(handler2, handler5).then(handler3);
    
    RingBuffer<Trade> ringBuffer = disruptor.start();

    CountDownLatch latch = new CountDownLatch(1);

    es1.submit(new TradePublisher(latch, disruptor));

    latch.await();

    disruptor.shutdown();
    es1.shutdown();
    es2.shutdown();
}

3 多生產者、多消費者模型

@Data
public class Order {
    private String id;
    private String name;
    private double price;
    private AtomicInteger count = new AtomicInteger(0);
}

public class Consumer implements WorkHandler<Order> {
    private String consumerId;
    private AtomicInteger atomicInteger = new AtomicInteger(0);
    private Random random = new Random();

    public Consumer(String consumerId) {
        this.consumerId = consumerId;
    }

    @Override
    public void onEvent(Order event) throws Exception {
        Thread.sleep(1 * random.nextInt(5));
        System.out.println("當前消費者:" + this.consumerId + ",消費:" + event);
        atomicInteger.incrementAndGet();
    }

    public int getCount() {
        return atomicInteger.get();
    }
}

public class Producer {
    private RingBuffer<Order> ringBuffer;
    
    public Producer(RingBuffer<Order> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void sendData(String uuid) {
        long sequence = ringBuffer.next();
        try {
            Order order = ringBuffer.get(sequence);
            order.setId(uuid);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

// Main1
public static void main(String[] args) throws InterruptedException {
    // 1.建立RingBuffer
    RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, () -> new Order(), 1024 * 1024, new YieldingWaitStrategy());

    // 2.RingBuffer建立屏障
    SequenceBarrier barrier = ringBuffer.newBarrier();

    // 3.構建多消費者
    int count = 10;
    Consumer[] consumers = new Consumer[count];
    for (int i = 0; i < count; i++) {
        consumers[i] = new Consumer("c-" + i);
    }

    // 4.多消費者工做池
    WorkerPool<Order> workerPool = new WorkerPool<>(
        ringBuffer,
        barrier,
        new ExceptionHandler<Order>() {
            @Override
            public void handleEventException(Throwable ex, long sequence, Order event) { }
            @Override
            public void handleOnStartException(Throwable ex) { }
            @Override
            public void handleOnShutdownException(Throwable ex) { }
        },
        consumers
    );
    //5 設置多個消費者的sequence序號 用於單獨統計消費進度, 而且設置到ringbuffer中
    ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

    //6 啓動workerPool
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    workerPool.start(executorService);

    final CountDownLatch latch = new CountDownLatch(1);

    for (int i = 0; i < 100; i++) {
        final Producer producer = new Producer(ringBuffer);
        new Thread(() -> {
            try {
                latch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            for (int j = 0; j < 100; j++) {
                producer.sendData(UUID.randomUUID().toString());
            }
        }).start();
    }

    Thread.sleep(2000);
    System.err.println("----------線程建立完畢,開始生產數據----------");
    latch.countDown();

    Thread.sleep(10000);

    for (int i = 0; i < consumers.length; i++) {
        System.err.println("任務總數:" + consumers[i].getCount());
    }
    executorService.shutdown();
}

// Main2
public static void main(String[] args) throws InterruptedException {
    Consumer[] consumers = new Consumer[10];
    for (int i = 0; i < consumers.length; i++) {
        consumers[i] = new Consumer("c-" + i);
    }

    Disruptor<Order> disruptor = new Disruptor<Order>(
        (EventFactory<Order>) () -> new Order(),
        1024 * 1024,
        Executors.defaultThreadFactory(),
        ProducerType.SINGLE,
        new BlockingWaitStrategy());
    disruptor.handleEventsWithWorkerPool(consumers);
    disruptor.start();

    final CountDownLatch latch = new CountDownLatch(1);

    for (int i = 0; i < 100; i++) {
        final Producer producer = new Producer(disruptor.getRingBuffer());
        new Thread(() -> {
            try {
                latch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            for (int j = 0; j < 100; j++) {
                producer.sendData(UUID.randomUUID().toString());
            }
        }).start();
    }

    Thread.sleep(2000);
    System.err.println("----------線程建立完畢,開始生產數據----------");
    latch.countDown();
    Thread.sleep(10000);

    for (int i = 0; i < consumers.length; i++) {
        System.err.println("任務總數:" + consumers[i].getCount());
    }
    disruptor.shutdown();
}

四 Disruptor源碼分析

1 UML圖

2 Disruptro性能爲何這麼優秀?

  • 數據結構層面:環形結構、數組、內存預加載
  • 單線程寫方式、內存屏障
  • 消除僞共享(填充緩存行)
  • 序號柵欄和序號配合使用來消除鎖和CAS

3 數據結構層面

  • RingBuffer使用Object[] entries做爲存儲容器,環形較少內存消費,防止內存溢出,不會無限增加;
  • 內存預加載:一開始就將對象建立(new)出來,元素在內存中一直存在,減小GC頻率,空間換時間。
abstract class RingBufferFields<E> extends RingBufferPad{
    private final Object[] entries;
    
    RingBufferFields(EventFactory<E> eventFactory,Sequencer sequencer){
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1){
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1){
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.indexMask = bufferSize - 1;
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
    }
    
    private void fill(EventFactory<E> eventFactory){
        for (int i = 0; i < bufferSize; i++){
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }
}

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>{
    
    RingBuffer(EventFactory<E> eventFactory,Sequencer sequencer){
        super(eventFactory, sequencer);
    }
}

4 內核——單線程寫

  • RingBuffer徹底無鎖,由於它是單線程。
  • Redis、Netty、Nodejs都使用這個核心思想(事件驅動)。

5 系統內存優化——內存屏障

  • 正確實現無鎖:內存屏障
  • 實際就是volatile和happens before語義
  • 內存屏障—Linux的smp_wmb()/smp_rmb()
  • 系統內核:Linux的kfifo舉例:使用了smp_wmb()

6 系統緩存優化——消除僞共享

  • 緩存系統中以緩存行(cache line)爲單位存儲的
  • 緩存行爲2的整數冪個連續字節,通常爲32~256個字節
  • 最多見的緩存行大小爲64字節
  • 當多線程修改互相獨立的變量時,若是這些變量共享在一個緩存行中,就無心中影響了彼此的性能,這就是僞共享。
// (左邊填充7個) value (右邊填充7個),一個緩存行通常爲64字節,即訪問value時,能夠保證value獨佔一個緩存行,不存在僞共享問題,空間換時間。
class LhsPadding{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding{
    protected volatile long value;
}

class RhsPadding extends Value{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding{
    
    // ...
}

7 算法優化—序號柵欄

  • ringBuffer.next()
  • SequenceBarrier和Sequence結合使用來協調和管理消費者與生產者的工做節奏,避免了鎖和CAS的使用。
  • 各個消費者和生產者都持有本身的序號
    • 消費者序號必須小於生產者序號
    • 消費者序號必須小於其前置消費者的序號
    • 生產者序號不能大於消費者中最小的序號(防止覆蓋)
// SingleProducerSequencer.next()
public long next(){
    next(1);
}

// SingleProducerSequencer.next(int n)
public long next(int n){
    if (n < 1){
        throw new IllegalArgumentException("n must be > 0");
    }

    long nextValue = this.nextValue;
    
    // nextSequence默認爲-1.
    long nextSequence = nextValue + n;
    // 判斷是否繞過整個RingBuffer環
    long wrapPoint = nextSequence - bufferSize;
    // 緩存值:緩存上次的最小消費者序號,無需每次都從新獲取
    long cachedGatingSequence = this.cachedValue;

    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue){
        cursor.setVolatile(nextValue);  // StoreLoad fence
    
        long minSequence;
        // Util.getMinimumSequence(gatingSequences, nextValue)找到消費者中最小的序號(初始值爲-1)
        // 自旋鎖: 生產者序號不能大於消費者中最小的序號(防止覆蓋)
        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))){
            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
        }
        this.cachedValue = minSequence;
    }

    // 賦值nextValue
    this.nextValue = nextSequence;
    return nextSequence;
}

8 WaitStrategy等待策略

(1)各個實現

  • WaitStrategy
    • BlockingWaitStrategy:當吞吐量和低延遲不如CPU資源那麼重要時,可使用此策略。
    • BusySpinWaitStrategy:此策略將使用CPU資源來避免可能引入延遲抖動的系統調用。 最好在線程綁定到特定CPU內核時使用。
    • LiteBlockingWaitStrategy:當吞吐量和低延遲不如CPU資源那麼重要時,可使用此策略。至關於BlockingWaitStrategy的優化,但處於實驗階段。
    • PhasedBackoffWaitStrategy:當吞吐量和低延遲不如CPU資源那麼重要時,可使用此策略。自旋,必定時間自旋失敗則採用回退策略。
    • SleepingWaitStrategy:此策略是性能和CPU資源之間的良好折衷。
      • 自旋策略:默認嘗試200次,前100次爲瘋狂嘗試,後100次爲下降該線程的CPU資源。若是200次都沒結束,則進行休眠,默認休眠100ms後。
    • TimeoutBlockingWaitStrategy:相似BlockWaitStrategy,只是加個超時。
    • YieldWaitStrategy:此策略將使用100%CPU,可是當其餘線程須要CPU資源,它比忙碌旋轉策略更容易放棄CPU。(性能最高)

(2)使用

// RingBuffer publish
public void publish(long sequence){
    sequencer.publish(sequence);
}


// Sequencer#publish(long)
// 
// AbstractSequencer#Sequence cursor 
// AbstractSequencer#WaitStrategy waitStrategy
public void publish(long sequence){
    cursor.set(sequence);
    waitStrategy.signalAllWhenBlocking();
}

// BlockingWaitStrategy
public final class BlockingWaitStrategy implements WaitStrategy{
    private final Lock lock = new ReentrantLock();
    private final Condition processorNotifyCondition = lock.newCondition();

    @Override
    /**
    * sequence: 消費者下一個想要獲得的序號
    * cursorSequence: 當前容器中最大序號
    * 
    **/
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException{
        long availableSequence;
        if (cursorSequence.get() < sequence){
            lock.lock();
            try{
                while (cursorSequence.get() < sequence){
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            }finally{
                lock.unlock();
            }
        }

        while ((availableSequence = dependentSequence.get()) < sequence){
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking() {
        lock.lock();
        try{
            processorNotifyCondition.signalAll();
        }finally {
            lock.unlock();
        }
    }
}

9 EventProcessor核心機制深度分析

public interface EventProcessor extends Runnable{
    /**
     * Get a reference to the {@link Sequence} being used by this {@link EventProcessor}.
     */
    Sequence getSequence();

    /**
     * Signal that this EventProcessor should stop when it has finished consuming at the next clean break.
     */
    void halt();

    boolean isRunning();
}

public final class BatchEventProcessor<T>implements EventProcessor{
    private static final int IDLE = 0;
    private static final int HALTED = IDLE + 1;
    private static final int RUNNING = HALTED + 1;

    // 是否正在運行
    private final AtomicInteger running = new AtomicInteger(IDLE);
    // 運行異常處理
    private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
    // 獲取數據
    private final DataProvider<T> dataProvider;
    // 生產者與消費者的進度管理
    private final SequenceBarrier sequenceBarrier;
    // 消費者接口
    private final EventHandler<? super T> eventHandler;
    private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
    // 超時處理
    private final TimeoutHandler timeoutHandler;
    private final BatchStartAware batchStartAware;
    
    @Override
    public void run(){
        if (running.compareAndSet(IDLE, RUNNING)){
            sequenceBarrier.clearAlert();// 清空柵欄
            notifyStart(); // 喚醒線程工做
            try{
                // 實際工做
                if (running.get() == RUNNING) processEvents();   
            }finally{
                notifyShutdown();
                running.set(IDLE);
            }
        }else{
            // This is a little bit of guess work.  The running state could of changed to HALTED by this point.  However, Java does not have compareAndExchange which is the only way to get it exactly correct.
            if (running.get() == RUNNING){
                throw new IllegalStateException("Thread is already running");
            }else{
                earlyExit();
            }
        }
    }
    
    private void processEvents(){
        T event = null;
        long nextSequence = sequence.get() + 1L;

        while (true){
            try{
                // 真實可用序號
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                    
                // batchStart鉤子
                if (batchStartAware != null){
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                }

                // 當下一個但願消費的Sequence小於當前可消費的Sequence,則把全部可消費的Sequence都消費掉。
                while (nextSequence <= availableSequence){
                    event = dataProvider.get(nextSequence);
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }

                sequence.set(availableSequence);
            }catch (final TimeoutException e){
                notifyTimeout(sequence.get());
            }catch (final AlertException ex){
                if (running.get() != RUNNING) break;
            }catch (final Throwable ex){
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    }
}

五 Netty整合Disruptor

1 Netty Common(Netty 公共項目)

@Data
// Netty傳輸類
public class TranslatorData implements Serializable {
    private String id;
    private String name;
    private String message; //傳輸消息體內容
}

@Data
// Disruptor傳輸類
public class TranslatorDataWapper {
    private TranslatorData data;
    private ChannelHandlerContext ctx;
}

@Data
// 公共消費者
public abstract class MessageConsumer implements WorkHandler<TranslatorDataWapper> {
    protected String consumerId;
}

@Data
// 公共生產者
public class MessageProducer {
    private String producerId;
    private RingBuffer<TranslatorDataWapper> ringBuffer;
    
    public void onData(TranslatorData data, ChannelHandlerContext ctx) {
        long sequence = ringBuffer.next();
        try {
            TranslatorDataWapper wapper = ringBuffer.get(sequence);
            wapper.setData(data);
            wapper.setCtx(ctx);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

// 單例實現
public class RingBufferWorkerPoolFactory {
    
    private static class SingletonHolder {
        static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory();
    }

    private RingBufferWorkerPoolFactory() {
    }

    public static RingBufferWorkerPoolFactory getInstance() {
        return SingletonHolder.instance;
    }

    // 生產者池化
    private static Map<String, MessageProducer> producers = new ConcurrentHashMap<String, MessageProducer>();

    // 消費者池化
    private static Map<String, MessageConsumer> consumers = new ConcurrentHashMap<String, MessageConsumer>();

    private RingBuffer<TranslatorDataWapper> ringBuffer;

    private SequenceBarrier sequenceBarrier;

    private WorkerPool<TranslatorDataWapper> workerPool;

    public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy,
            MessageConsumer[] messageConsumers) {
        // 1. 構建ringBuffer對象
        this.ringBuffer = RingBuffer.create(type, new EventFactory<TranslatorDataWapper>() {
            public TranslatorDataWapper newInstance() {
                return new TranslatorDataWapper();
            }
        }, bufferSize, waitStrategy);
        // 2.設置序號柵欄
        this.sequenceBarrier = this.ringBuffer.newBarrier();
        // 3.設置工做池
        this.workerPool = new WorkerPool<TranslatorDataWapper>(this.ringBuffer, this.sequenceBarrier,new EventExceptionHandler(), messageConsumers);
        // 4 把所構建的消費者置入池中
        for (MessageConsumer mc : messageConsumers) {
            this.consumers.put(mc.getConsumerId(), mc);
        }
        // 5 添加咱們的sequences
        this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
        // 6 啓動咱們的工做池
        this.workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2));
    }

    public MessageProducer getMessageProducer(String producerId) {
        MessageProducer messageProducer = this.producers.get(producerId);
        if (null == messageProducer) {
            messageProducer = new MessageProducer(producerId, this.ringBuffer);
            this.producers.put(producerId, messageProducer);
        }
        return messageProducer;
    }

    /**
     * 異常靜態類
     * 
     * @author Alienware
     *
     */
    static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWapper> {
        public void handleEventException(Throwable ex, long sequence, TranslatorDataWapper event) {
        }

        public void handleOnStartException(Throwable ex) {
        }

        public void handleOnShutdownException(Throwable ex) {
        }
    }
}

/**
 * Marshalling工廠
 * Marshalling是JBoss的序列化框架,Netty提供了Marshalling的編碼和解碼方便用戶使用。
 */
public final class MarshallingCodeCFactory {

    /**
     * 建立Jboss Marshalling解碼器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //首先經過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //建立了MarshallingConfiguration對象,配置了版本號爲5 
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //根據marshallerFactory和configuration建立provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //構建Netty的MarshallingDecoder對象,倆個參數分別爲provider和單個消息序列化後的最大長度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
        return decoder;
    }

    /**
     * 建立Jboss Marshalling編碼器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化爲二進制數組
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}

2 Netty Server

// Netty服務器
public class NettyServer {
    public NettyServer() {
        // 1. 建立兩個工做線程組: 一個用於接受網絡請求的線程組. 另外一個用於實際處理業務的線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        // 2 輔助類
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 表示緩存區動態調配(自適應)
                .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
                // 緩存區 池化操做
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    Pipeline pipeline = sc.pipeline();
                    pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    pipeline.addLast(new ServerHandler());
                }
            });
            // 綁定端口,同步等等請求鏈接
            ChannelFuture cf = serverBootstrap.bind(8765).sync();
            System.err.println("Server Startup...");
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 優雅停機
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
            System.err.println("Sever ShutDown...");
        }
    }
}

// 實際處理器
public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        TranslatorData request = (TranslatorData) msg;
        // 自已的應用服務應該有一個ID生成規則
        String producerId = "code:sessionId:001";
        MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance()
                                                .getMessageProducer(producerId);    
        messageProducer.onData(request, ctx);
    }
}

// 具體消費者
public class MessageConsumerImpl4Server extends MessageConsumer {

    public MessageConsumerImpl4Server(String consumerId) {
        super(consumerId);
    }

    public void onEvent(TranslatorDataWapper event) throws Exception {
        TranslatorData request = event.getData();
        ChannelHandlerContext ctx = event.getCtx();
        //1.業務處理邏輯:
        System.err.println("Sever端: id= " + request.getId() 
                           + ", name= " + request.getName() 
                           + ", message= " + request.getMessage());
        
        //2.回送響應信息:
        TranslatorData response = new TranslatorData();
        response.setId("resp: " + request.getId());
        response.setName("resp: " + request.getName());
        response.setMessage("resp: " + request.getMessage());
        //寫出response響應信息:
        ctx.writeAndFlush(response);
    }

}

// 啓動類
public class NettyServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(NettyServerApplication.class, args);
        
        MessageConsumer[] conusmers = new MessageConsumer[4];
        for(int i =0; i < conusmers.length; i++) {
            MessageConsumer messageConsumer = new MessageConsumerImpl4Server("code:serverId:" + i);
            conusmers[i] = messageConsumer;
        }
        RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
                1024*1024,
                //new YieldingWaitStrategy(),
                new BlockingWaitStrategy(),
                conusmers);
        new NettyServer();
    }
}

3 Netty Client

// Netty客戶端
public class NettyClient {
    public static final String HOST = "127.0.0.1";
    public static final int PORT = 8765;
    
    //擴展 完善 池化: ConcurrentHashMap<KEY -> String, Value -> Channel> 
    private Channel channel;    
    
    //1. 建立工做線程組: 用於實際處理業務的線程組
    private EventLoopGroup workGroup = new NioEventLoopGroup();
    
    private ChannelFuture cf;
    
    public NettyClient() {
        this.connect(HOST, PORT);
    }

    private void connect(String host, int port) {
        //2 輔助類(注意Client 和 Server 不同)
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(workGroup)
            .channel(NioSocketChannel.class)
            //表示緩存區動態調配(自適應)
            .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
            //緩存區 池化操做
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .handler(new LoggingHandler(LogLevel.INFO))
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                    sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
            //綁定端口,同步等等請求鏈接
            this.cf = bootstrap.connect(host, port).sync();
            System.err.println("Client connected...");
            //接下來就進行數據的發送, 可是首先咱們要獲取channel:
            this.channel = cf.channel();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public void sendData(){
        
        for(int i =0; i <10; i++){
            TranslatorData request = new TranslatorData();
            request.setId("" + i);
            request.setName("請求消息名稱 " + i);
            request.setMessage("請求消息內容 " + i);
            this.channel.writeAndFlush(request);
        }
    }
    
    public void close() throws Exception {
        cf.channel().closeFuture().sync();
        //優雅停機
        workGroup.shutdownGracefully();
        System.err.println("Sever ShutDown...");        
    }
}

// 客戶端處理器
public class ClientHandler extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        TranslatorData response = (TranslatorData) msg;
        String producerId = "code:seesionId:002";
        MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
        messageProducer.onData(response, ctx);
    }
}

// 具體的消費者
public class MessageConsumerImpl4Client extends MessageConsumer {
    public MessageConsumerImpl4Client(String consumerId) {
        super(consumerId);
    }

    public void onEvent(TranslatorDataWapper event) throws Exception {
        TranslatorData response = event.getData();
        ChannelHandlerContext ctx = event.getCtx();
        //業務邏輯處理:
        try {
            System.err.println("Client端: id= " + response.getId() 
            + ", name= " + response.getName()
            + ", message= " + response.getMessage());
        } finally {
            ReferenceCountUtil.release(response);
        }
    }
}

// Netty客戶端啓動類
public class NettyClientApplication {
    public static void main(String[] args) {
        SpringApplication.run(NettyClientApplication.class, args);
        MessageConsumer[] conusmers = new MessageConsumer[4];
        for(int i =0; i < conusmers.length; i++) {
            MessageConsumer messageConsumer = new MessageConsumerImpl4Client("code:clientId:" + i);
            conusmers[i] = messageConsumer;
        }
        RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
                                                               1024*1024,
                                                               //new YieldingWaitStrategy(),
                                                               new BlockingWaitStrategy(),
                                                               conusmers);
        //創建鏈接併發送消息
        new NettyClient().sendData();
    }
}

六 分佈式統一ID生成抗壓策略

1 ID生成策略

  • 最基本
    • java.util.UUID:保證惟一性,但沒有排序,須要額外加時間字段
    • 雪花算法、數據庫sequence序列、自增ID
  • 順序ID生成工具
    • KeyUtil生成ID,減小索引,減小硬盤佔用。
  • 業務ID生成方式
    • 維度1(區域)+維度2(類型)+UUID:10010+001+UUID

2 順序ID生成工具

// 1. 引入jar包,com.fasterxml.uuid java-uuid-generator

// 2. 編寫keytool類
public class KeyUtil {
    public static String generatorUUID(){
        TimeBasedGenerator timeBasedGenerator = Generators.timeBasedGenerator(EthernetAddress.fromInterface());
        return timeBasedGenerator.generate().toString();
    }
}

3 業務ID生成方式

  • 節省索引,節省字段,節省佔用
select * from goods_shelf gs where id > '1001000000000000000000000000000000000' 
                           and id < '2000000000000000000000000000000000000'

4 高併發統一ID生成策略抗壓

(1)統一ID生成策略服務

  • 如何解決ID生成在併發下的重複生成問題
  • 如何承載高併發ID生成的性能瓶頸問題

(2)如何落地

  • 使用Zookeeper的分佈式鎖實現
  • 使用Redis緩存,利用Redis分佈鎖實現,存在超時重試,可是QPS降低

(3)業界主流的分佈式ID生成器策略:

  • 實現1:提早加載
    • 提早加載,放到內存中
    • 併發獲取,採用Disruptor框架去提高性能
  • 實現2:單點生成方式(保底策略)
    • 固定一個機器節點來生成一個惟一ID,全局惟一
    • 須要根據業務規則拼接:機器碼+時間戳+自增序列

5 經典NTP問題

  • 爲何須要拼接自增序列?由於高併發場景下,生成的ID十分巨大,會暴露出NTP問題
  • NTP問題:NTP是網絡時間協議,用來同步網絡中各個計算機的時間的協議。NTP是服務器系統的時間會定時去獲取,而後進行更新校準,致使時間戳重複。

6 實現架構

相關文章
相關標籤/搜索