【AQS】隊列和【Disruptor】環形隊列

1:使用場景

咱們在編碼的過程當中,常常會碰到這樣一種場景:java

  • 須要使用多線程的任務
  • 這個任務須要同步,不能併發
  • 咱們想要這些線程一個個乖乖的排隊去執行,別串來串去賊煩

這個時候,你可使用synchronized關鍵字加鎖,固然咱們也可使用jdk1.5以後的juc的各類工具,固然這些juc的工具其實都是基於咱們的aqs隊列。git



2:aqs隊列

不少好用的juc的工具,咱們這裏就不寫了,咱們這裏就來分析aqs隊列github

咱們來本身實現一個aqs隊列的模板:bash

/**
     * aqs隊列使用模板模式
     */
    private static class AqsSync extends AbstractQueuedSynchronizer {
    
        private static final int LOCK = 1;
        private static final int UNLOCK = 0;

        /**
         * 判斷是否處於lock狀態
         *
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {
            return getState() == LOCK;
        }

        @Override
        protected boolean tryAcquire(int acquires) {
            if (compareAndSetState(UNLOCK, LOCK)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;

        }

        @Override
        protected boolean tryRelease(int releases) {
            if (getState() == UNLOCK) {
                throw new IllegalMonitorStateException();
            }
            //沒有線程擁有這個鎖
            setExclusiveOwnerThread(null);
            setState(UNLOCK);
            return true;
        }
    }
複製代碼

而後咱們來使用一下多線程

public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 1; i <= 5; i++) {
            int count = i;
            executorService.execute(() -> {
                try {
                    doTask(count);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }


    private static void doTask(int i) throws InterruptedException {
        AqsSync aqs = new AqsSync();
        aqs.acquire(AqsSync.LOCK);
        //這個方法能夠超時就退出阻塞,這裏是非公平競爭
        //aqs.tryAcquireNanos(AqsSync.LOCK,TimeUnit.MILLISECONDS.toNanos(2000L));
        System.out.println("開始執行任務" + i);
        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        aqs.release(AqsSync.UNLOCK);
        System.out.println("結束執行任務" + i);
    }
複製代碼

那麼aqs隊列這麼強力,究竟是怎麼實現的呢?原理是什麼呢,我給個比較不錯的博客你們本身按需看吧 :瞭解aqs隊列併發



2:終極大招—Disruptor隊列

留心過的同窗都知道,log4j實現異步日誌的關鍵組件就是這個Disruptor,Disruptor可讓咱們的線程排隊執行,而且執行高效,內存節省異步

添加maven依賴maven

<dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
</dependency>
複製代碼

隨便定義個事件對象pojoide

@Data
public class Book{
    private int bookId;
    private String bookName;
    private String bookType;
}
複製代碼

定義一個消費者工具

public class BookEventHander implements EventHandler<Book> {
    @Override
    public void onEvent(Book book, long sequence, boolean endOfBatch) throws InterruptedException {
        String threandName = Thread.currentThread().getName();
        String resultT = "consume one ->thread name : {0} , event :{1}";
        String result = MessageFormat.format(resultT,threandName,book);
        System.out.println(result);
        //模擬業務處理時間
        Thread.sleep(5000L);
    }
}
複製代碼

定義一個相似生產者的東西(也能夠不用定義直接用lambda,官方給出了三種方式,官方文檔

public class BookEventProducer {
    private final RingBuffer<Book> ringBuffer;

    public BookEventProducer(RingBuffer<Book> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void loadBook(Book booksource) {
        long sequence = ringBuffer.next();
        try {
            Book book = ringBuffer.get(sequence);
            book.setBookId(booksource.getBookId());
            book.setBookName(booksource.getBookName());
            book.setBookType(booksource.getBookType());
        } finally {
            //最終的生產實際上靠的是這行代碼
            ringBuffer.publish(sequence);
        }
    }
}
複製代碼

直接來測試下

public static void main(String[] args) throws InterruptedException {
        //建立線程池  用於建立多個線程消費者
        ExecutorService executor = Executors.newCachedThreadPool();
        //定義環形隊列大小 2的n次方只能是
        int ringBufferSize = 2048;

        //生成 disruptor 實例
        Disruptor<Book> disruptor = new Disruptor<>(Book::new, ringBufferSize, executor);

        //鏈接到消費者
        // Connect the handler
        disruptor.handleEventsWith(new BookEventHander(), new BookEventHanderTwo());

        // 啓動 disruptor 而且獲取生產者
        RingBuffer<Book> ringBuffer = disruptor.start();
        BookEventProducer producer = new BookEventProducer(ringBuffer);

        //開始進行生產
        System.out.println("開始進行生產");
        for (int l = 0; l <= 10; l++) {
            System.out.println("生產第" + l + "條記錄");
            Book booksource = new Book();
            booksource.setBookId(l);
            booksource.setBookType("測試類型" + l);
            booksource.setBookName("測試之書" + l);
            producer.loadBook(booksource);
        }
        disruptor.shutdown();
        executor.shutdown();
    }
複製代碼

你們能夠看到咱們的任務就會乖乖排隊去執行了,因此log4j能夠用它來異步的完成日誌的有序輸出。

最後推薦你們使用單線程做爲producer,能夠大大提升disruptor的吞吐量!



3:總結

在玩java的時候,不免遇到多線程啊,同步啊什麼的問題,這個時候要合理利用咱們的vilatile,sync,juc各類工具以及像咱們的Disruptor工具去完成咱們所需的業務。

相關文章
相關標籤/搜索