Java併發編程筆記之LinkedBlockingQueue源碼探究

摘要: LinkedBlockingQueue的實現是使用獨佔鎖實現的阻塞隊列。首先看一下LinkedBlockingQueue 的類圖結構,以下圖所示:       如類圖所示:LinkedBlockingQueue是使用單向鏈表實現,有兩個Node分別來存放首尾節點,而且裏面有個初始值爲0 的原子變量count,它用來記錄隊列元素個數。node

LinkedBlockingQueue的實現是使用獨佔鎖實現的阻塞隊列。首先看一下LinkedBlockingQueue 的類圖結構,以下圖所示:算法

 

 

 

如類圖所示:LinkedBlockingQueue是使用單向鏈表實現,有兩個Node分別來存放首尾節點,而且裏面有個初始值爲0 的原子變量count,它用來記錄隊列元素個數。函數

另外裏面有兩個ReentrantLock的實例,分別用來控制元素入隊和出隊的原子性,其中takeLock用來控制同時只有一個線程能夠從隊列獲取元素,其餘線程必須等待,this

putLock控制同時只能有一個線程能夠獲取鎖去添加元素,其餘線程必須等待。另外notEmpty 和 notFull 是信號量,內部分別有一個條件隊列用來存放進隊和出隊的時候被阻塞的線程,線程

說白了,這其實就是一個生產者 -  消費者模型。指針

 

咱們首先看一下獨佔鎖的源碼,以下所示:blog

  /** 執行take, poll等操做時候須要獲取該鎖 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 當隊列爲空時候執行出隊操做(好比take)的線程會被放入這個條件隊列進行等待 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 執行put, offer等操做時候須要獲取該鎖*/
    private final ReentrantLock putLock = new ReentrantLock();

    /**當隊列滿時候執行進隊操做(好比put)的線程會被放入這個條件隊列進行等待 */
    private final Condition notFull = putLock.newCondition();

   /** 當前隊列元素個數 */
    private final AtomicInteger count = new AtomicInteger(0);

 

接着咱們要進入LinkedBlockingQueue 無參構造函數,源碼以下:隊列

public static final int   MAX_VALUE = 0x7fffffff;

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

  public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //初始化首尾節點,指向哨兵節點
    last = head = new Node<E>(null);
 }

從源碼中能夠看到,默認隊列的容量爲0x7fffffff; 用戶也能夠本身指定容量,因此必定程度上 LinkedBlockingQueue 能夠說是有界阻塞隊列。ci

 

接下來咱們主要看LinkedBlockingQueue 的幾個主要方法的源碼,以下:rem

  1.offer操做,向隊列尾部插入一個元素,若是隊列有空閒容量則插入成功後返回true,若是隊列已滿則丟棄當前元素而後返回false,若是 e元素爲null,則拋出空指針異常(NullPointerException ),還有一點就是,該方法是非阻塞的。源碼以下:

public boolean offer(E e) {

        //(1)空元素拋空指針異常
        if (e == null) throw new NullPointerException();

        //(2) 若是當前隊列滿了則丟棄將要放入的元素,而後返回false
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;

        //(3) 構造新節點,獲取putLock獨佔鎖
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            //(4)若是隊列不滿則進隊列,並遞增元素計數
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                //(5)
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            //(6)釋放鎖
            putLock.unlock();
        }
        //(7)
        if (c == 0)
            signalNotEmpty();
        //(8)
        return c >= 0;
}

private void enqueue(Node<E> node) {   
 last = last.next = node;
}

代碼(2)判斷的是若是當前隊列已滿則丟棄當前元素並返回false。

代碼(3)獲取到putLock鎖,當前線程獲取到該鎖後,則其餘調用put 和 offer 的線程將會被阻塞(阻塞的線程被放到 putLock 鎖的 AQS 阻塞隊列)。

代碼(4)這裏又從新判斷了一下當前隊列是否滿了,這是由於在執行代碼(2)和獲取到putLock鎖期間,有可能其餘線程經過put 或者 offer方法想隊列裏面添加了新的元素。從新判斷隊列確實不滿則新元素入隊,並遞增計數器。

代碼(5)判斷的是若是新元素入隊後還有空閒空間,則喚醒notFull的條件隊列裏面由於調用了notFull 的 await 操做(好比執行put方法而隊列滿了的時候)而被阻塞的一個線程,由於隊列如今有空閒,因此這裏能夠提早喚醒一個入隊線程。

代碼(6)則釋放獲取的putLock鎖,這裏要注意鎖的釋放必定要在finally裏面作,由於即便try塊拋出異常了,finally也是會被執行到的。另外釋放鎖後其餘由於調用put和offer而被阻塞的線程將會有一個獲取到改鎖。

代碼(7)c == 0說明在執行代碼(6)釋放鎖的時候隊列裏面至少有一個元素,隊列裏面有元素則執行signalNotEmpty,signalNotEmpty的源碼以下:

private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

經過上面代碼能夠看到其做用是激活notEmpty 的條件隊列中由於調用notEmpty的await方法(好比調用 take 方法而且隊列爲空的時候)而被阻塞的一個線程,這裏也說明了調用條件變量的方法前,要首先獲取對應的鎖。

offer的總結:offer方法中經過使用putLock鎖保證了在隊尾新增元素的原子性和隊列元素個數的比較和遞增操做的原子性。

 

  2.put操做,向隊列尾部插入一個元素,若是隊列有空閒則插入後直接返回true,若是隊列已經滿則阻塞當前線程知道隊列有空閒插入成功後返回true,若是在阻塞的時候被其餘線程設置了中斷標誌,

則被阻塞線程會拋出InterruptedException 異常而返回,另外若是 e 元素爲 null 則拋出 NullPointerException 異常。源碼以下:

  public void put(E e) throws InterruptedException {
        //(1)空元素拋空指針異常
        if (e == null) throw new NullPointerException();
        //(2) 構建新節點,並獲取獨佔鎖putLock
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            //(3)若是隊列滿則等待
            while (count.get() == capacity) {
                notFull.await();
            }
            //(4)進隊列並遞增計數
            enqueue(node);
            c = count.getAndIncrement();
            //(5)
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //(6)
            putLock.unlock();
        }
        //(7)
        if (c == 0)
            signalNotEmpty();
    }

代碼(2)中使用 putLock.lockInterruptibly() 獲取獨佔鎖,相比 offer 方法中這個獲取獨佔鎖方法意味着能夠被中斷,具體說是當前線程在獲取鎖的過程當中,若是被其它線程設置了中斷標誌則當前線程會拋出 InterruptedException 異常,

因此put操做在獲取 鎖過程當中是可被中斷的。

代碼(3)若是當前隊列已經滿,則notFull 的 await() 把當前線程放入 notFull 的條件隊列,當前線程被阻塞掛起並釋放獲取到的 putLock 鎖,因爲putLock鎖被釋放了,因此如今其餘線程就有機會獲取到putLock鎖了。

代碼(3)判斷隊列是否爲空爲什麼使用 while 循環而不是 if 語句呢?

這是由於考慮到當前線程被虛假喚醒的問題,也就是其它線程沒有調用 notFull 的 singal 方法時候,notFull.await() 在某種狀況下會自動返回。

若是使用if語句簡單判斷一下,那麼虛假喚醒後會執行代碼(4),元素入隊,而且遞增計數器,而這時候隊列已是滿了的,致使隊列元素個數大於了隊列設置的容量,致使程序出錯。

而使用使用 while 循環假如 notFull.await() 被虛假喚醒了,那麼循環在檢查一下當前隊列是不是滿的,若是是則再次進行等待。

 

  3.poll操做,從隊列頭部獲取並移除一個元素,若是隊列爲空則返回 null,該方法是不阻塞的。源碼以下:

  public E poll() {
        //(1)隊列爲空則返回null
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        //(2)獲取獨佔鎖
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //(3)隊列不空則出隊並遞減計數
            if (count.get() > 0) {//3.1
                x = dequeue();//3.2
                c = count.getAndDecrement();//3.3
                //(4)
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            //(5)
            takeLock.unlock();
        }
        //(6)
        if (c == capacity)
            signalNotFull();
        //(7)返回
        return x;
    }
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

代碼(1) 若是當前隊列爲空,則直接返回 null。

代碼(2)獲取獨佔鎖 takeLock,當前線程獲取該鎖後,其它線程在調用 poll 或者 take 方法會被阻塞掛起。

代碼 (3) 若是當前隊列不爲空則進行出隊操做,而後遞減計數器。

代碼(4)若是 c>1 則說明當前線程移除掉隊列裏面的一個元素後隊列不爲空(c 是刪除元素前隊列元素個數),那麼這時候就能夠激活由於調用 poll 或者 take 方法而被阻塞到notEmpty 的條件隊列裏面的一個線程。

代碼(5)釋放鎖,必定要在finally裏面釋放鎖。

代碼(6)說明當前線程移除隊頭元素前當前隊列是滿的,移除隊頭元素後隊列當前至少有一個空閒位置,那麼這時候就能夠調用signalNotFull激活由於調用put 或者 offer 而被阻塞放到 notFull 的條件隊列裏的一個線程,signalNotFull 源碼以下:

  private void signalNotFull() {
          final ReentrantLock putLock = this.putLock;
          putLock.lock();
          try {
              notFull.signal();
          } finally {
              putLock.unlock();
          }
   }

poll 代碼邏輯比較簡單,值得注意的是獲取元素時候只操做了隊列的頭節點。

 

  4.peek 操做,獲取隊列頭部元素可是不從隊列裏面移除,若是隊列爲空則返回 null,該方法是不阻塞的。源碼以下:

  public E peek() {
        //(1)
        if (count.get() == 0)
            return null;
        //(2)
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            //(3)
            if (first == null)
                return null;
            else
            //(4)
                return first.item;
        } finally {
           //(5)
            takeLock.unlock();
        }
    }

能夠看到代碼(3)這裏仍是須要判斷下 first 是否爲 null 的,不能直接執行代碼(4)。

正常狀況下執行到代碼(2)說明隊列不爲空,可是代碼(1)和(2)不是原子性操做,也就是在執行代碼(1)判斷隊列不爲空後,

在代碼(2)獲取到鎖前,有可能其餘線程執行了poll 或者 take 操做致使隊列變爲了空,而後當前線程獲取鎖後,直接執行 first.item 會拋出空指針異常。

 

  5.take 操做,獲取當前隊列頭部元素並從隊列裏面移除,若是隊列爲空則阻塞調用線程。若是隊列爲空則阻塞當前線程知道隊列不爲空,而後返回元素,若是在阻塞的時候被其餘線程設置了中斷標誌,則被阻塞線程會拋出InterruptedException 異常而返回。源碼以下:

  public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        //(1)獲取鎖
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            //(2)當前隊列爲空則阻塞掛起
            while (count.get() == 0) {
                notEmpty.await();
            }
            //(3)出隊並遞減計數
            x = dequeue();
            c = count.getAndDecrement();
            //(4)
            if (c > 1)
                notEmpty.signal();
        } finally {
           //(5)
            takeLock.unlock();
        }
        //(6)
        if (c == capacity)
            signalNotFull();
        //(7)
        return x;
    }

代碼(1)當前線程獲取到獨佔鎖,其餘調用take 或者 poll的線程將會被阻塞掛起。

代碼(2)若是隊列爲空則阻塞掛起當前線程,並把當前線程放入 notEmpty 的條件隊列。

代碼(3)進行出隊操做並遞減計數。

代碼(4)若是 c > 1 說明當前隊列不爲空,則喚醒notEmpty 的條件隊列的條件隊列裏面的一個由於調用 take 或者 poll 而被阻塞的線程。

代碼(5)釋放鎖。

代碼(6)若是 c == capacity 則說明當前隊列至少有一個空閒位置,則激活條件變量 notFull 的條件隊列裏面的一個由於調用 put 或者 offer 而被阻塞的線程。

 

  6.remove操做,刪除隊列裏面指定元素,有則刪除返回 true,沒有則返回 false,源碼以下:

public boolean remove(Object o) {
    if (o == null) return false;

    //(1)雙重加鎖
    fullyLock();
    try {

        //(2)遍歷隊列找則刪除返回true
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
             //(3)
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        //(4)找不到返回false
        return false;
    } finally {
        //(5)解鎖
        fullyUnlock();
    }
}

代碼(1)經過fullyLock獲取雙重鎖,當前線程獲取後,其餘線程進行入隊或者出隊的操做就會被阻塞掛起。雙重鎖方法fullyLock的源碼以下:

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

代碼(2)遍歷隊列尋找要刪除的元素,找不到則直接返回false,找到則執行unlink操做,unlink的源碼以下:

  void unlink(Node<E> p, Node<E> trail) {
      p.item = null;
      trail.next = p.next;
      if (last == p)
          last = trail;
      若是當前隊列滿,刪除後,也不忘記喚醒等待的線程
      if (count.getAndDecrement() == capacity)
          notFull.signal();
    }

能夠看到刪除元素後,若是發現當前隊列有空閒空間,則喚醒 notFull 的條件隊列中一個由於調 用 put 或者 offer 方法而被阻塞的線程。

代碼(5)調用 fullyUnlock 方法使用與加鎖順序相反的順序釋放雙重鎖,源碼以下:

void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

 

 

  7.size操做,獲取當前隊列元素個數。源碼以下:

public int size() {
   return count.get();
}

 

 

總結:因爲在操做出隊入隊的時候操做Count的時候加了鎖,所以相比ConcurrentLinkedQueue 的size方法比較準確。

最後用一張圖來加深LinkedBlockingQueue的理解,以下圖:

 

 

所以咱們要思考一個問題:爲什麼 ConcurrentLinkedQueue 中須要遍歷鏈表來獲取 size 而不適用一個原子變量呢?

這是由於使用原子變量保存隊列元素個數須要保證入隊出隊操做和操做原子變量是原子操做,而ConcurrentLinkedQueue 是使用 CAS 無鎖算法的,因此沒法作到這個。

相關文章
相關標籤/搜索