從0到1實現本身的阻塞隊列(上)

阻塞隊列不止是一道熱門的面試題,同時也是許多併發處理模型的基礎,好比經常使用的線程池類ThreadPoolExecutor內部就使用了阻塞隊列來保存等待被處理的任務。並且在大多數經典的多線程編程資料中,阻塞隊列都是其中很是重要的一個實踐案例。甚至能夠說只有本身動手實現了一個阻塞隊列才能真正掌握多線程相關的API。面試

在這篇文章中,咱們會從一個最簡單的原型開始一步一步完善爲一個相似於JDK中阻塞隊列實現的真正實用的阻塞隊列。在這個過程當中,咱們會一路涉及synchronized關鍵字、條件變量、顯式鎖ReentrantLock等等多線程編程的關鍵技術,最終掌握Java多線程編程的完整理論和實踐知識。編程

閱讀本文須要瞭解基本的多線程編程概念與互斥鎖的使用,還不瞭解的讀者能夠參考一下這篇文章《多線程中那些看不見的陷阱》中到ReentrantLock部分爲止的內容。數組

什麼是阻塞隊列?

阻塞隊列是這樣的一種數據結構,它是一個隊列(相似於一個List),能夠存放0到N個元素。咱們能夠對這個隊列執行插入或彈出元素操做,彈出元素操做就是獲取隊列中的第一個元素,而且將其從隊列中移除;而插入操做就是將元素添加到隊列的末尾。當隊列中沒有元素時,對這個隊列的彈出操做將會被阻塞,直到有元素被插入時纔會被喚醒;當隊列已滿時,對這個隊列的插入操做就會被阻塞,直到有元素被彈出後纔會被喚醒。安全

在線程池中,每每就會用阻塞隊列來保存那些暫時沒有空閒線程能夠直接執行的任務,等到線程空閒以後再從阻塞隊列中彈出任務來執行。一旦隊列爲空,那麼線程就會被阻塞,直到有新任務被插入爲止。數據結構

一個最簡單的版本

代碼實現

咱們先來實現一個最簡單的隊列,在這個隊列中咱們不會添加任何線程同步措施,而只是實現了最基本的隊列與阻塞特性。 那麼首先,一個隊列能夠存放必定量的元素,並且能夠執行插入元素和彈出元素的操做。而後由於這個隊列仍是一個阻塞隊列,那麼在隊列爲空時,彈出元素的操做將會被阻塞,直到隊列中被插入新的元素可供彈出爲止;而在隊列已滿的狀況下,插入元素的操做將會被阻塞,直到隊列中有元素被彈出爲止。多線程

下面咱們會將這個最初的阻塞隊列實現類拆解爲獨立的幾塊分別講解和實現,到最後就能拼裝出一個完整的阻塞隊列類了。爲了在阻塞隊列中保存元素,咱們首先要定義一個數組來保存元素,也就是下面代碼中的items字段了,這是一個Object數組,因此能夠保存任意類型的對象。在最後的構造器中,會傳入一個capacity參數來指定items數組的大小,這個值也就是咱們的阻塞隊列的大小了。併發

takeIndexputIndex就是咱們插入和彈出元素的下標位置了,爲何要分別用兩個整型來保存這樣的位置呢?由於阻塞隊列在使用的過程當中會不斷地被插入和彈出元素,因此能夠認爲元素在數組中是像貪吃蛇同樣一步一步往前移動的,每次彈出的都是隊列中的第一個元素,而插入的元素則會被添加到隊列的末尾。當下標到達末尾時會被設置爲0,從數組的第一個下標位置從新開始向後增加,造成一個不斷循環的過程。工具

那麼若是隊列中存儲的個數超過items數組的長度時,新插入的元素豈不是會覆蓋隊列開頭尚未被彈出的元素了嗎?這時咱們的最後一個字段count就能派上用場了,當count等於items.length時,插入操做就會被阻塞,直到隊列中有元素被彈出時爲止。那麼這種阻塞是如何實現的呢?咱們接下來來看一下put()方法如何實現。post

/** 存放元素的數組 */
    private final Object[] items;
    
    /** 彈出元素的位置 */
    private int takeIndex;

    /** 插入元素的位置 */
    private int putIndex;
    
    /** 隊列中的元素總數 */
    private int count;
    
    /**
     * 指定隊列大小的構造器
     *
     * @param capacity  隊列大小
     */
    public BlockingQueue(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        // putIndex, takeIndex和count都會被默認初始化爲0
        items = new Object[capacity];
    }

下面是put()take()方法的實現,put()方法向隊列末尾添加新元素,而take()方法從隊列中彈出最前面的一個元素,咱們首先來看一下咱們目前最關心的put()方法。在put()方法的開頭,咱們能夠看到有一個判斷count是否達到了items.length(隊列大小)的if語句,若是count不等於items.length,那麼就表示隊列尚未滿,隨後就直接調用了enqueue方法對元素進行了入隊。enqueue方法的實現會在稍後介紹,這裏咱們只須要知道這個入隊方法會將元素放入到隊列中並對count加1就能夠了。在成功插入元素以後咱們就會經過break語句跳出最外層的無限while循環,從方法中返回。性能

可是若是這時候隊列已滿,那麼count的值就會等於items.length,這將會致使咱們調用Thread.sleep(200L)使當前線程休眠200毫秒。當線程從休眠中恢復時,又會進入下一次循環,從新判斷條件count != items.length。也就是說,若是隊列沒有彈出元素使咱們能夠完成插入操做,那麼線程就會一直處於「判斷 -> 休眠」的循環而沒法從put()方法中返回,也就是進入了「阻塞」狀態。

隨後的take()方法也是同樣的道理,只有在隊列不爲空的狀況下才能順利彈出元素完成任務並返回,若是隊列一直爲空,調用線程就會在循環中一直等待,直到隊列中有元素插入爲止。

/**
     * 將指定元素插入隊列
     *
     * @param e 待插入的對象
     */
    public void put(Object e) throws InterruptedException {
        while (true) {
            // 直到隊列未滿時才執行入隊操做並跳出循環
            if (count != items.length) {
                // 執行入隊操做,將對象e實際放入隊列中
                enqueue(e);
                break;
            }

            // 隊列已滿的狀況下休眠200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 從隊列中彈出一個元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        while (true) {
            // 直到隊列非空時才繼續執行後續的出隊操做並返回彈出的元素
            if (count != 0) {
                // 執行出隊操做,將隊列中的第一個元素彈出
                return dequeue();
            }

            // 隊列爲空的狀況下休眠200ms
            Thread.sleep(200L);
        }
    }

在上面的put()take()方法中分別調用了入隊方法enqueue和出隊方法dequeue,那麼這兩個方法到底須要如何實現呢?下面是這兩個方法的源代碼,咱們能夠看到,在入隊方法enqueue()中,總共有三步操做:

  1. 首先把指定的對象e保存到items[putIndex]中,putIndex指示的就是咱們插入元素的位置。
  2. 以後,咱們會將putIndex向後移一位,來肯定下一次插入元素的下標位置,若是已經到了隊列末尾咱們就會把putIndex設置爲0,回到隊列的開頭。
  3. 最後,入隊操做會將count值加1,讓count值和隊列中的元素個數一致。

而出隊方法dequeue中執行的操做則與入隊方法enqueue相反。

/**
     * 入隊操做
     *
     * @param e 待插入的對象
     */
    private void enqueue(Object e) {
        // 將對象e放入putIndex指向的位置
        items[putIndex] = e;

        // putIndex向後移一位,若是已到末尾則返回隊列開頭(位置0)
        if (++putIndex == items.length)
            putIndex = 0;

        // 增長元素總數
        count++;
    }

    /**
     * 出隊操做
     *
     * @return  被彈出的元素
     */
    private Object dequeue() {
        // 取出takeIndex指向位置中的元素
        // 並將該位置清空
        Object e = items[takeIndex];
        items[takeIndex] = null;

        // takeIndex向後移一位,若是已到末尾則返回隊列開頭(位置0)
        if (++takeIndex == items.length)
            takeIndex = 0;

        // 減小元素總數
        count--;

        // 返回以前代碼中取出的元素e
        return e;
    }

到這裏咱們就能夠將這個三個模塊拼接爲一個完整的阻塞隊列類BlockingQueue了。完整的代碼以下,你們能夠拷貝到IDE中,或者本身從新實現一遍,而後咱們就能夠開始上手用一用咱們剛剛完成的阻塞隊列了。

public class BlockingQueue {

    /** 存放元素的數組 */
    private final Object[] items;

    /** 彈出元素的位置 */
    private int takeIndex;

    /** 插入元素的位置 */
    private int putIndex;

    /** 隊列中的元素總數 */
    private int count;

    /**
     * 指定隊列大小的構造器
     *
     * @param capacity  隊列大小
     */
    public BlockingQueue(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        items = new Object[capacity];
    }

    /**
     * 入隊操做
     *
     * @param e 待插入的對象
     */
    private void enqueue(Object e) {
        // 將對象e放入putIndex指向的位置
        items[putIndex] = e;

        // putIndex向後移一位,若是已到末尾則返回隊列開頭(位置0)
        if (++putIndex == items.length)
            putIndex = 0;

        // 增長元素總數
        count++;
    }

    /**
     * 出隊操做
     *
     * @return  被彈出的元素
     */
    private Object dequeue() {
        // 取出takeIndex指向位置中的元素
        // 並將該位置清空
        Object e = items[takeIndex];
        items[takeIndex] = null;

        // takeIndex向後移一位,若是已到末尾則返回隊列開頭(位置0)
        if (++takeIndex == items.length)
            takeIndex = 0;

        // 減小元素總數
        count--;

        // 返回以前代碼中取出的元素e
        return e;
    }

    /**
     * 將指定元素插入隊列
     *
     * @param e 待插入的對象
     */
    public void put(Object e) throws InterruptedException {
        while (true) {
            // 直到隊列未滿時才執行入隊操做並跳出循環
            if (count != items.length) {
                // 執行入隊操做,將對象e實際放入隊列中
                enqueue(e);
                break;
            }

            // 隊列已滿的狀況下休眠200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 從隊列中彈出一個元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        while (true) {
            // 直到隊列非空時才繼續執行後續的出隊操做並返回彈出的元素
            if (count != 0) {
                // 執行出隊操做,將隊列中的第一個元素彈出
                return dequeue();
            }

            // 隊列爲空的狀況下休眠200ms
            Thread.sleep(200L);
        }
    }

}

測驗阻塞隊列實現

既然已經有了阻塞隊列的實現,那麼咱們就寫一個測試程序來測試一下吧。下面是一個對阻塞隊列進行併發的插入和彈出操做的測試程序,在這個程序中,會建立2個生產者線程向阻塞隊列中插入數字0~19;同時也會建立2個消費者線程從阻塞隊列中彈出20個數字,並打印這些數字。並且在程序中也統計了整個程序的耗時,會在全部子線程執行完成以後打印出程序的總耗時。

這裏咱們指望這個測驗程序可以以任意順序輸出0~19這20個數字,而後打印出程序的總耗時,那麼實際執行狀況會如何呢?

public class BlockingQueueTest {

    public static void main(String[] args) throws Exception {

        // 建立一個大小爲2的阻塞隊列
        final BlockingQueue q = new BlockingQueue(2);

        // 建立2個線程
        final int threads = 2;
        // 每一個線程執行10次
        final int times = 10;

        // 線程列表,用於等待全部線程完成
        List<Thread> threadList = new ArrayList<>(threads * 2);
        long startTime = System.currentTimeMillis();

        // 建立2個生產者線程,向隊列中併發放入數字0到19,每一個線程放入10個數字
        for (int i = 0; i < threads; ++i) {
            final int offset = i * times;
            Thread producer = new Thread(() -> {
                try {
                    for (int j = 0; j < times; ++j) {
                        q.put(new Integer(offset + j));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

            threadList.add(producer);
            producer.start();
        }

        // 建立2個消費者線程,從隊列中彈出20次數字並打印彈出的數字
        for (int i = 0; i < threads; ++i) {
            Thread consumer = new Thread(() -> {
                try {
                    for (int j = 0; j < times; ++j) {
                        Integer element = (Integer) q.take();
                        System.out.println(element);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

            threadList.add(consumer);
            consumer.start();
        }

        // 等待全部線程執行完成
        for (Thread thread : threadList) {
            thread.join();
        }

        // 打印運行耗時
        long endTime = System.currentTimeMillis();
        System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3));
    }
}

在個人電腦上運行這段程序的輸出爲:

0
1
2
3
4
5
null
10
8
7
14
9
16
15
18
17
null

不只是打印出了不少個null,並且打印出17行以後就再也不打印更多數據,並且程序也就一直沒有打印總耗時並結束了。爲何會發生這種狀況呢?

緣由就是在咱們實現的這個阻塞隊列中徹底沒有線程同步機制,因此同時併發進行的4個線程(2個生產者和2個消費者)會同時執行阻塞隊列的put()take()方法。這就可能會致使各類各樣併發執行順序致使的問題,好比兩個生產者同時對阻塞隊列進行插入操做,有可能就會在putIndex沒更新的狀況下對同一下標位置又插入了一次數據,致使了數據還沒被消費就被覆蓋了;而兩個消費者也可能會在takeIndex沒更新的狀況下又獲取了一次已經被清空的位置,致使打印出了null。最後由於這些緣由都有可能會致使消費者線程最後尚未彈出20個數字count就已經爲0了,這時消費者線程就會一直處於阻塞狀態沒法退出了。

那麼咱們應該如何給阻塞隊列加上線程同步措施,使它的運行不會發生錯誤呢?

一個線程安全的版本

使用互斥鎖來保護隊列操做

以前碰到的併發問題的核心就是多個線程同時對阻塞隊列進行插入或彈出操做,那麼咱們有沒有辦法讓同一時間只能有一個線程對阻塞隊列進行操做呢?

也許不少讀者已經想到了,咱們最經常使用的一種併發控制方式就是synchronized關鍵字。經過synchronized,咱們可讓一段代碼同一時間只能有一個線程進入;若是在同一個對象上經過synchronized加鎖,那麼put()take()兩個方法能夠作到同一時間只能有一個線程調用兩個方法中的任意一個。好比若是有一個線程調用了put()方法插入元素,那麼其餘線程再調用put()方法或者take()就都會被阻塞直到前一個線程完成對put()方法的調用了。

在這裏,咱們只修改put()take()方法,把這兩個方法中對enqueuedequeue的調用都包裝到一個synchronized (this) {...}的語句塊中,保證了同一時間只能有一個線程進入這兩個語句塊中的任意一個。若是對synchronized之類的線程同步機制還不熟悉的讀者,建議先看一下這篇介紹多線程同步機制的文章《多線程中那些看不見的陷阱》再繼續閱讀以後的內容,相信會有事半功倍的效果。

/**
     * 將指定元素插入隊列
     *
     * @param e 待插入的對象
     */
    public void put(Object e) throws InterruptedException {
        while (true) {
            synchronized (this) {
                // 直到隊列未滿時才執行入隊操做並跳出循環
                if (count != items.length) {
                    // 執行入隊操做,將對象e實際放入隊列中
                    enqueue(e);
                    break;
                }
            }

            // 隊列已滿的狀況下休眠200ms
            Thread.sleep(200L);
        }
    }

    /**
     * 從隊列中彈出一個元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        while (true) {
            synchronized (this) {
                // 直到隊列非空時才繼續執行後續的出隊操做並返回彈出的元素
                if (count != 0) {
                    // 執行出隊操做,將隊列中的第一個元素彈出
                    return dequeue();
                }
            }

            // 隊列爲空的狀況下休眠200ms
            Thread.sleep(200L);
        }
    }

再次測試

咱們再來試一試這個新的阻塞隊列實現,在個人電腦上測試程序的輸出以下:

0
1
2
3
10
11
4
5
6
12
13
14
15
7
8
9
16
17
18
19
總耗時:1.81s

這下看起來結果就對了,並且多跑了幾回也都能穩定輸出全部0~19的20個數字。看起來很是棒,咱們成功了,來給本身鼓個掌吧!

可是仔細那麼一看,好像最後的耗時是否是有一些高了?雖然「1.81秒」也不是太長的時間,可是好像通常計算機程序作這麼一點事情只要一眨眼的功夫就能完成纔對呀。爲何這個阻塞隊列會這麼慢呢?

一個更快的阻塞隊列

讓咱們先來診斷一下以前的阻塞隊列中究竟是什麼致使了效率的下降,由於put()take()方法是阻塞隊列的核心,因此咱們天然從這兩個方法看起。在這兩個方法裏,咱們都看到了同一段代碼Thread.sleep(200L),這段代碼會讓put()take()方法分別在隊列已滿和隊列爲空的狀況下進入一次固定的200毫秒的休眠,防止線程佔用過多的CPU資源。可是若是隊列在這200毫秒裏發生了變化,那麼線程也仍是在休眠狀態沒法立刻對變化作出響應。好比若是一個調用put()方法的線程由於隊列已滿而進入了200毫秒的休眠,那麼即便隊列已經被消費者線程清空了,它也仍然會忠實地等到200毫秒以後纔會從新嘗試向隊列中插入元素,中間的這些時間就都被浪費了。

可是若是咱們去掉這段休眠的代碼,又會致使CPU的使用率太高的問題。那麼有沒有一種方法能夠平衡二者的利弊,同時獲得兩種狀況的好處又沒有各自的缺點呢?

使用條件變量優化阻塞喚醒

爲了完成上面這個困難的任務,既要馬兒跑又要馬兒不吃草。那麼咱們就須要有一種方法,既讓線程進入休眠狀態再也不佔用CPU,可是在隊列發生改變時又能及時地被喚醒來重試以前的操做了。既然用了對象鎖synchronized,那麼咱們就找找有沒有與之相搭配的同步機制能夠實現咱們的目標。

Object類,也就是全部Java類的基類裏,咱們找到了三個有意思的方法Object.wait()Object.notify()Object.notifyAll()。這三個方法是須要搭配在一塊兒使用的,其功能與操做系統層面的條件變量相似。條件變量是這樣的一種線程同步工具:

  1. 每一個條件變量都會有一個對應的互斥鎖,要調用條件變量的wait()方法,首先須要持有條件變量對應的這個互斥鎖。以後,在調用條件變量的wait()方法時,首先會釋放已持有的這個互斥鎖,而後當前線程進入休眠狀態,等待被Object.notify()或者Object.notifyAll()方法喚醒;
  2. 調用Object.notify()或者Object.notifyAll()方法能夠喚醒由於Object.wait()進入休眠狀態的線程,區別是Object.notify()方法只會喚醒一個線程,而Object.notifyAll()會喚醒全部線程。

由於咱們以前的代碼中經過synchronized獲取了對應於this引用的對象鎖,因此天然也就要用this.wait()this.notify()this.notifyAll()方法來使用與這個對象鎖對應的條件變量了。下面是使用條件變量改造後的put()take()方法。仍是和以前同樣,咱們首先以put()方法爲例分析具體的改動。首先,咱們去掉了最外層的while循環,而後咱們把Thread.sleep替換爲了this.wait(),以此在隊列已滿時進入休眠狀態,等待隊列中的元素被彈出後再繼續。在隊列知足條件,入隊操做成功後,咱們經過調用this.notifyAll()喚醒了可能在等待隊列非空條件的調用take()的線程。take()方法的實現與put()也基本相似,只是操做相反。

/**
     * 將指定元素插入隊列
     *
     * @param e 待插入的對象
     */
    public void put(Object e) throws InterruptedException {
        synchronized (this) {
            if (count == items.length) {
                // 隊列已滿時進入休眠
                this.wait();
            }

            // 執行入隊操做,將對象e實際放入隊列中
            enqueue(e);

            // 喚醒全部休眠等待的進程
            this.notifyAll();
        }
    }

    /**
     * 從隊列中彈出一個元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        synchronized (this) {
            if (count == 0) {
                // 隊列爲空時進入休眠
                this.wait();
            }

            // 執行出隊操做,將隊列中的第一個元素彈出
            Object e = dequeue();

            // 喚醒全部休眠等待的進程
            this.notifyAll();

            return e;
        }
    }

可是咱們在測試程序運行以後發現結果好像又出現了問題,在我電腦上的輸出以下:

0
19
null
null
null
null
null
null
null
null
null
18
null
null
null
null
null
null
null
null
總耗時:0.10s

雖然咱們解決了耗時問題,如今的耗時已經只有0.10s了,可是結果中又出現了大量的null,咱們的阻塞隊列好像又出現了正確性問題。那麼問題出在哪呢?建議讀者能夠先本身嘗試分析一下,這樣有助於你們積累解決多線程併發問題的能力。

while循環判斷條件是否知足

通過分析,咱們看到,在調用this.wait()後,若是線程被this.notifyAll()方法喚醒,那麼就會直接開始直接入隊/出隊操做,而不會再次檢查count的值是否知足條件。而在咱們的程序中,當隊列爲空時,可能會有不少消費者線程在等待插入元素。此時,若是有一個生產者線程插入了一個元素並調用了this.notifyAll(),則全部消費者線程都會被喚醒,而後依次執行出隊操做,那麼第一個消費者線程以後的全部線程拿到的都將是null值。並且同時,在這種狀況下,每個執行完出隊操做的消費者線程也一樣會調用this.notifyAll()方法,這樣即便隊列中已經沒有元素了,後續進入等待的消費者線程仍然會被本身的同類所喚醒,消費根本不存在的元素,最終只能返回null

因此要解決這個問題,核心就是在線程從this.wait()中被喚醒時也仍然要從新檢查一遍count值是否知足要求,若是count不知足要求,那麼當前線程仍然調用this.wait()回到等待狀態當中去繼續休眠。而咱們是沒辦法預知程序在第幾回判斷條件時能夠獲得知足條件的count值從而繼續執行的,因此咱們必須讓程序循環執行「判斷條件 -> 不知足條件繼續休眠」這樣的流程,直到count知足條件爲止。那麼咱們就可使用一個while循環來包裹this.wait()調用和對count的條件判斷,以此達到這個目的。

下面是具體的實現代碼,咱們在其中把count條件(隊列未滿/非空)做爲while條件,而後在count值還不知足要求的狀況下調用this.wait()方法使當前線程進入等待狀態繼續休眠。

/**
     * 將指定元素插入隊列
     *
     * @param e 待插入的對象
     */
    public void put(Object e) throws InterruptedException {
        synchronized (this) {
            while (count == items.length) {
                // 隊列已滿時進入休眠
                this.wait();
            }

            // 執行入隊操做,將對象e實際放入隊列中
            enqueue(e);

            // 喚醒全部休眠等待的進程
            this.notifyAll();
        }
    }

    /**
     * 從隊列中彈出一個元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        synchronized (this) {
            while (count == 0) {
                // 隊列爲空時進入休眠
                this.wait();
            }

            // 執行出隊操做,將隊列中的第一個元素彈出
            Object e = dequeue();

            // 喚醒全部休眠等待的進程
            this.notifyAll();

            return e;
        }
    }

再次運行咱們的測試程序,在個人電腦上獲得了以下的輸出:

0
10
1
2
11
12
13
3
4
14
5
6
15
16
7
17
8
18
9
19
總耗時:0.11s

耗時只有0.11s,並且結果也是正確的,看來咱們獲得了一個又快又好的阻塞隊列實現。這是一個里程碑式的版本,咱們實現了一個真正能夠在程序代碼中使用的阻塞隊列,到這裏能夠說你已經學會了如何實現一個阻塞隊列了,讓咱們爲本身鼓個掌吧。

當時進度條出賣了我,這篇文章還有很多內容。既然咱們已經學會如何實現一個真正可用的阻塞隊列了,咱們爲何還要繼續看這麼多內容呢?別慌,雖然咱們已經實現了一個真正可用的版本,可是若是咱們更進一步的話就能夠實現一個JDK級別的高強度版本了,這聽起來是否是很是的誘人?讓咱們繼續咱們的旅程吧。

一個更安全的版本

咱們以前的版本中使用這些同步機制:synchronized (this)this.wait()this.notifyAll(),這些同步機制都和當前對象this有關。由於synchronized (obj)可使用任意對象對應的對象鎖,而Object.wati()Object.notifyAll()方法又都是public方法。也就是說不止在阻塞隊列類內部可使用這個阻塞隊列對象的對象鎖及其對應的條件變量,在外部的代碼中也能夠任意地獲取阻塞隊列對象上的對象鎖和對應的條件變量,那麼就有可能發生外部代碼濫用阻塞隊列對象上的對象鎖致使阻塞隊列性能降低甚至是發生死鎖的狀況。那咱們有沒有什麼辦法可讓阻塞隊列在這方面變得更安全呢?

使用顯式鎖

最直接的方式固然是請出JDK在1.5以後引入的代替synchronized關鍵字的顯式鎖ReentrantLock類了。ReentrantLock類是一個可重入互斥鎖,互斥指的是和synchronized同樣,同一時間只能有一個線程持有鎖,其餘獲取鎖的線程都必須等待持有鎖的線程釋放該鎖。而可重入指的就是同一個線程能夠重複獲取同一個鎖,若是在獲取鎖時這個鎖已經被當前線程所持有了,那麼這個獲取鎖的操做仍然會直接成功。

通常咱們使用ReentrantLock的方法以下:

lock.lock();
try {
    作一些操做
}
finally {
    lock.unlock();
}

上面的lock變量就是一個ReentrantLock類型的對象。在這段代碼中,釋放鎖的操做lock.unlock()被放在了finally塊中,這是爲了保證線程在獲取到鎖以後,不論出現異常或者什麼特殊狀況都能保證正確地釋放互斥鎖。若是不這麼作就可能會致使持有鎖的線程異常退出後仍然持有該鎖,其餘須要獲取同一個鎖的線程就永遠運行不了。

那麼在咱們的阻塞隊列中應該如何用ReentrantLock類來改寫呢?

首先,咱們顯然要爲咱們的阻塞隊列類添加一個實例變量lock來保存用於在不一樣線程間實現互斥訪問的ReentrantLock鎖。而後咱們要將原來的synchronized(this) {...}格式的代碼修改成上面使用ReentrantLock進行互斥訪問保護的實現形式,也就是lock.lock(); try {...} finally {lock.unlock();}這樣的形式。

可是原來與synchronized所加的對象鎖相對應的條件變量使用方法this.wait()this.notifyAll()應該如何修改呢?ReentrantLock已經爲你作好了準備,咱們能夠直接調用lock.newCondition()方法來建立一個與互斥鎖lock相對應的條件變量。而後爲了在不一樣線程中都能訪問到這個條件變量,咱們一樣要新增一個實例變量condition來保存這個新建立的條件變量對象。而後咱們原來使用的this.wait()就須要修改成condition.await(),而this.notifyAll()就修改成了condition.signalAll()

/** 顯式鎖 */
    private final ReentrantLock lock = new ReentrantLock();

    /** 鎖對應的條件變量 */
    private final Condition condition = lock.newCondition();
    
    /**
     * 將指定元素插入隊列
     *
     * @param e 待插入的對象
     */
    public void put(Object e) throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                // 隊列已滿時進入休眠
                // 使用與顯式鎖對應的條件變量
                condition.await();
            }

            // 執行入隊操做,將對象e實際放入隊列中
            enqueue(e);

            // 經過條件變量喚醒休眠線程
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 從隊列中彈出一個元素
     *
     * @return  被彈出的元素
     */
    public Object take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                // 隊列爲空時進入休眠
                // 使用與顯式鎖對應的條件變量
                condition.await();
            }

            // 執行出隊操做,將隊列中的第一個元素彈出
            Object e = dequeue();

            // 經過條件變量喚醒休眠線程
            condition.signalAll();

            return e;
        } finally {
            lock.unlock();
        }
    }

到這裏,咱們就完成了使用顯式鎖ReentrantLock所須要作的全部改動了。整個過程當中並不涉及任何邏輯的變動,咱們只是把synchronized (this) {...}修改成了lock.lock() try {...} finally {lock.unlock();},把this.wait()修改成了condition.await(),把this.notifyAll()修改成了condition.signalAll()。就這樣,咱們的鎖和條件變量由於是private字段,因此外部的代碼就徹底沒法訪問了,這讓咱們的阻塞隊列變得更加安全,是時候能夠提供給其餘人使用了。

可是這個版本的阻塞隊列仍然還有很大的優化空間,繼續閱讀下一篇文章,相信你就能夠實現出JDK級別的阻塞隊列了。

相關文章
相關標籤/搜索