構建高性能隊列,你不得不知道的底層知識!

前言

本文收錄於專輯:http://dwz.win/HjK,點擊解鎖更多數據結構與算法的知識。java

你好,我是彤哥。面試

上一節,咱們一塊兒學習瞭如何將遞歸改寫爲非遞歸,其中,用到的數據結構主要是棧。算法

棧和隊列,能夠說是除了數組和鏈表以外最基礎的數據結構了,在不少場景中都有用到,後面咱們也會陸陸續續的看到。數組

今天,我想介紹一下,在Java中,如何構建一個高性能的隊列,以及咱們須要掌握的底層知識。緩存

學習其餘語言的同窗,也能夠看看,在你的語言中,是如何構建高性能隊列的。安全

隊列

隊列,是一種先進先出(First In First Out,FIFO)的數據結構,相似於實際生活場景中的排隊,先到的人先得。
構建高性能隊列,你不得不知道的底層知識!數據結構

使用數組和鏈表實現簡單的隊列,咱們前面都介紹過了,這裏就再也不贅述了,有興趣的同窗能夠點擊如下連接查看:多線程

重溫四大基礎數據結構:數組、鏈表、隊列和棧架構

今天咱們主要來學習如何實現高性能的隊列。併發

提及高性能的隊列,固然是說在高併發環境下也可以工做得很好的隊列,這裏的很好主要是指兩個方面:併發安全、性能好。

併發安全的隊列

在Java中,默認地,也自帶了一些併發安全的隊列:

隊列 有界性 數據結構
ArrayBlockingQueue 有界 加鎖 數組
LinkedBlockingQueue 可選有界 加鎖 鏈表
ConcurrentLinkedQueue *** 無鎖 鏈表
SynchronousQueue *** 無鎖 隊列或棧
LinkedTransferQueue *** 無鎖 鏈表
PriorityBlockingQueue *** 加鎖
DelayQueue *** 加鎖

這些隊列的源碼解析快捷入口:死磕 Java併發集合之終結篇

總結起來,實現併發安全隊列的數據結構主要有:數組、鏈表和堆,堆主要用於實現優先級隊列,不具有通用性,暫且不討論。

從有界性來看,只有ArrayBlockingQueue和LinkedBlockingQueue能夠實現有界隊列,其它的都是***隊列。

從加鎖來看,ArrayBlockingQueue和LinkedBlockingQueue都採用了加鎖的方式,其它的都是採用的CAS這種無鎖的技術實現的。

從安全性的角度來講,咱們通常都要選擇有界隊列,防止生產者速度過快致使內存溢出。

從性能的角度來講,咱們通常要考慮無鎖的方式,減小線程上下文切換帶來的性能損耗。

從JVM的角度來講,咱們通常選擇數組的實現方式,由於鏈表會頻繁的增刪節點,致使頻繁的垃圾回收,這也是一種性能損耗。

因此,最佳的選擇就是:數組 + 有界 + 無鎖。

而JDK並無提供這樣的隊列,所以,不少開源框架都本身實現了高性能的隊列,好比Disruptor,以及Netty中使用的jctools。

高性能隊列

咱們這裏不討論具體的某一個框架,只介紹實現高性能隊列的通用技術,並本身實現一個。

環形數組

經過上面的討論,咱們知道實現高性能隊列使用的數據結構只能是數組,而數組實現隊列,必然要使用到環形數組。

環形數組,通常經過設置兩個指針實現:putIndex和takeIndex,或者叫writeIndex和readIndex,一個用於寫,一個用於讀。
構建高性能隊列,你不得不知道的底層知識!

當寫指針到達數組尾端時,會從頭開始,固然,不能越過讀指針,同理,讀指針到達數組尾端時,也會從頭開始,固然,不能讀取未寫入的數據。
構建高性能隊列,你不得不知道的底層知識!

而爲了防止寫指針和讀指針重疊的時候,沒法分清隊列究竟是滿了仍是空的狀態,通常會再添加一個size字段:
構建高性能隊列,你不得不知道的底層知識!

構建高性能隊列,你不得不知道的底層知識!

因此,使用環形數組實現隊列的數據結構通常爲:

public class ArrayQueue<T> {
    private T[] array;
    private long wrtieIndex;
    private long readIndex;
    private long size;
}

在單線程的狀況下,這樣不會有任何問題,可是,在多線程環境中,這樣會帶來嚴重的僞共享問題。

僞共享

什麼是共享?

在計算機中,有不少存儲單元,咱們接觸最多的就是內存,又叫作主內存,此外,CPU還有三級緩存:L一、L二、L3,L1最貼近CPU,固然,它的存儲空間也很小,L2比L1稍大一些,L3最大,能夠同時緩存多個核心的數據。CPU取數據的時候,先從L1緩存中讀取,若是沒有再從L2緩存中讀取,若是沒有再從L3中讀取,若是三級緩存都沒有,最後會從內存中讀取。離CPU核心越遠,則相對的耗時就越長,因此,若是要作一些很頻繁的操做,要儘可能保證數據緩存在L1中,這樣能極大地提升性能。
構建高性能隊列,你不得不知道的底層知識!

緩存行

而數據在三級緩存中,也不是說來一個數據緩存一下,而是一次緩存一批數據,這一批數據又稱做緩存行(Cache Line),一般爲64字節。
構建高性能隊列,你不得不知道的底層知識!

每一次,當CPU去內存中拿數據的時候,都會把它後面的數據一併拿過來(組成64字節),咱們以long型數組爲例,當CPU取數組中一個long的時候,同時會把後續的7個long一塊兒取到緩存行中。
構建高性能隊列,你不得不知道的底層知識!

這在必定程度上可以加快數據的處理,由於,此時在處理下標爲0的數據,下一個時刻可能就要處理下標爲1的數據了,直接從緩存中取要快不少。

可是,這樣又帶來了一個新的問題——僞共享。

僞共享

試想一下,兩個線程(CPU)同時在處理這個數組中的數據,兩個CPU都緩存了,一個CPU在對array[0]的數據加1,另外一個CPU在對array[1]的數據加1,那麼,回寫到主內存的時候,到底以哪一個緩存行的數據爲準(寫回主內存的時候也是以緩存行的形式寫回),因此,此時,就須要對這兩個緩存行「加鎖」了,一個CPU先修改數據,寫回主內存,另外一個CPU才能讀取數據並修改數據,再寫回主內存,這樣勢必會帶來性能的損耗,出現的這種現象就叫作僞共享,這種「加鎖」的方式叫作內存屏障,關於內存屏障的知識咱們就不展開敘述了。

那麼,怎麼解決僞共享帶來的問題呢?

以環形數組實現的隊列爲例,writeIndex、readIndex、size如今是這樣處理的:
構建高性能隊列,你不得不知道的底層知識!

因此,咱們只須要在writeIndex和readIndex之間加7個long就能夠把它們隔離開,同理,readIndex和size之間也是同樣的。
構建高性能隊列,你不得不知道的底層知識!

這樣就消除了writeIndex和readIndex之間的僞共享問題,由於writeIndex和readIndex確定是在兩個不一樣的線程中更新,因此,消除僞共享以後帶來的性能提高是很明顯的。

假若有多個生產者,writeIndex是確定會被爭用的,此時,要怎麼友好地修改writeIndex呢?即一個生產者線程修改了writeIndex,另外一個生產者線程要立馬可見。

你第一時間想到的確定是volatile,沒錯,但是光volatile還不行哦,volatile只能保證可見性和有序性,不能保證原子性,因此,還須要加上原子指令CAS,CAS是誰提供的?原子類AtomicInteger和AtomicLong都具備CAS的功能,那咱們直接使用他們嗎?確定不是,仔細觀察,發現他們最終都是調用Unsafe實現的。

OK,下面就輪到最牛逼的底層殺手登場了——Unsafe。

Unsafe

Unsafe不只提供了CAS的指令,還提供不少其它操做底層的方法,好比操做直接內存、修改私有變量的值、實例化一個類、阻塞/喚醒線程、帶有內存屏障的方法等。

關於Unsafe,能夠看這篇文章:死磕 java魔法類之Unsafe解析

固然,構建高性能隊列,主要使用的是Unsafe的CAS指令以及帶有內存屏障的方法等:

// 原子指令
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
// 以volatile的形式獲取值,至關於給變量加了volatile關鍵字
public native long getLongVolatile(Object var1, long var2);
// 延遲更新,對變量的修改不會當即寫回到主內存,也就是說,另外一個線程不會當即可見
public native void putOrderedLong(Object var1, long var2, long var4);

好了,底層知識介紹的差很少了,是時候展示真正的技術了——手寫高性能隊列。

手寫高性能隊列

咱們假設這樣一種場景:有多個生產者(Multiple Producer),卻只有一個消費者(Single Consumer),這是Netty中的經典場景,這樣一種隊列該怎麼實現?

直接上代碼:

/**
 * 多生產者單消費者隊列
 *
 * @param <T>
 */
public class MpscArrayQueue<T> {

    long p01, p02, p03, p04, p05, p06, p07;
    // 存放元素的地方
    private T[] array;
    long p1, p2, p3, p4, p5, p6, p7;
    // 寫指針,多個生產者,因此聲明爲volatile
    private volatile long writeIndex;
    long p11, p12, p13, p14, p15, p16, p17;
    // 讀指針,只有一個消費者,因此不用聲明爲volatile
    private long readIndex;
    long p21, p22, p23, p24, p25, p26, p27;
    // 元素個數,生產者和消費者均可能修改,因此聲明爲volatile
    private volatile long size;
    long p31, p32, p33, p34, p35, p36, p37;

    // Unsafe變量
    private static final Unsafe UNSAFE;
    // 數組基礎偏移量
    private static final long ARRAY_BASE_OFFSET;
    // 數組元素偏移量
    private static final long ARRAY_ELEMENT_SHIFT;
    // writeIndex的偏移量
    private static final long WRITE_INDEX_OFFSET;
    // readIndex的偏移量
    private static final long READ_INDEX_OFFSET;
    // size的偏移量
    private static final long SIZE_OFFSET;

    static {
        Field f = null;
        try {
            // 獲取Unsafe的實例
            f = Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            UNSAFE = (Unsafe) f.get(null);

            // 計算數組基礎偏移量
            ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(Object[].class);
            // 計算數組中元素偏移量
            // 簡單點理解,64位系統中有壓縮指針佔用4個字節,沒有壓縮指針佔用8個字節
            int scale = UNSAFE.arrayIndexScale(Object[].class);
            if (4 == scale) {
                ARRAY_ELEMENT_SHIFT = 2;
            } else if (8 == scale) {
                ARRAY_ELEMENT_SHIFT = 3;
            } else {
                throw new IllegalStateException("未知指針的大小");
            }

            // 計算writeIndex的偏移量
            WRITE_INDEX_OFFSET = UNSAFE
                    .objectFieldOffset(MpscArrayQueue.class.getDeclaredField("writeIndex"));
            // 計算readIndex的偏移量
            READ_INDEX_OFFSET = UNSAFE
                    .objectFieldOffset(MpscArrayQueue.class.getDeclaredField("readIndex"));
            // 計算size的偏移量
            SIZE_OFFSET = UNSAFE
                    .objectFieldOffset(MpscArrayQueue.class.getDeclaredField("size"));
        } catch (Exception e) {
            throw new RuntimeException();
        }
    }

    // 構造方法
    public MpscArrayQueue(int capacity) {
        // 取整到2的N次方(未考慮越界)
        capacity = 1 << (32 - Integer.numberOfLeadingZeros(capacity - 1));
        // 實例化數組
        this.array = (T[]) new Object[capacity];
    }

    // 生產元素
    public boolean put(T t) {
        if (t == null) {
            return false;
        }
        long size;
        long writeIndex;
        do {
            // 每次循環都從新獲取size的大小
            size = this.size;
            // 隊列滿了直接返回
            if (size >= this.array.length) {
                return false;
            }

            // 每次循環都從新獲取writeIndex的值
            writeIndex = this.writeIndex;

            // while循環中原子更新writeIndex的值
            // 若是失敗了從新走上面的過程
        } while (!UNSAFE.compareAndSwapLong(this, WRITE_INDEX_OFFSET, writeIndex, writeIndex + 1));

        // 到這裏,說明上述原子更新成功了
        // 那麼,就把元素的值放到writeIndex的位置
        // 且更新size
        long eleOffset = calcElementOffset(writeIndex, this.array.length-1);
        // 延遲更新到主內存,讀取的時候才更新
        UNSAFE.putOrderedObject(this.array, eleOffset, t);

        // 往死裏更新直到成功
        do {
            size = this.size;
        } while (!UNSAFE.compareAndSwapLong(this, SIZE_OFFSET, size, size + 1));

        return true;
    }

    // 消費元素
    public T take() {
        long size = this.size;
        // 若是size爲0,表示隊列爲空,直接返回
        if (size <= 0) {
            return null;
        }
        // size大於0,確定有值
        // 只有一個消費者,不用考慮線程安全的問題
        long readIndex = this.readIndex;
        // 計算讀指針處元素的偏移量
        long offset = calcElementOffset(readIndex, this.array.length-1);
            // 獲取讀指針處的元素,使用volatile語法,強制更新生產者的數據到主內存
        T e = (T) UNSAFE.getObjectVolatile(this.array, offset);

        // 增長讀指針
        UNSAFE.putOrderedLong(this, READ_INDEX_OFFSET, readIndex+1);
        // 減少size
        do {
            size = this.size;
        } while (!UNSAFE.compareAndSwapLong(this, SIZE_OFFSET, size, size-1));

        return e;
    }

    private long calcElementOffset(long index, long mask) {
        // index & mask 至關於取餘數,表示index到達數組尾端了從頭開始
        return ARRAY_BASE_OFFSET + ((index & mask) << ARRAY_ELEMENT_SHIFT);
    }

}

是否是看不懂?那就對了,多看幾遍吧,面試又能吹一波了。

這裏使用的是每兩個變量之間加7個long類型的變量來消除僞共享,有的開源框架你可能會看到經過繼承的方式實現的,還有的是加15個long類型,另外,JDK8中也提供了一個註解@Contended來消除僞共享。

本例其實還有優化的空間,好比,size的使用,能不能不使用size?不使用size又該如何實現?

後記

本節,咱們一塊兒學習了在Java中如何構建高性能的隊列,並學習了一些底層的知識,絕不誇張地講,學會了這些底層知識,面試的時候光隊列就能跟面試官吹一個小時。

另外,最近收到一些同窗的反饋,說哈希、哈希表、哈希函數他們之間有關係嗎?有怎樣的關係?爲何Object中要放一個hash()方法?跟equals()方法怎麼又扯上關係了呢?

下一節,咱們就來看看關於哈希的一切,想及時獲取最新推文嗎?還不快點來關注我!

關注公號主「彤哥讀源碼」,解鎖更多源碼、基礎、架構知識。

相關文章
相關標籤/搜索