LinkedBlockingQueue源碼解析

上一篇博客,咱們介紹了ArrayBlockQueue,知道了它是基於數組實現的有界阻塞隊列,既然有基於數組實現的,那麼必定有基於鏈表實現的隊列了,沒錯,固然有,這就是咱們今天的主角:LinkedBlockingQueue。ArrayBlockQueue是有界的,那麼LinkedBlockingQueue是有界仍是無界的呢?我以爲能夠說是有界的,也能夠說是無界的,爲何這麼說呢?看下去你就知道了。node

和上篇博客同樣,咱們仍是先看下LinkedBlockingQueue的基本應用,而後解析LinkedBlockingQueue的核心代碼。數組

LinkedBlockingQueue基本應用

public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue();

        linkedBlockingQueue.add(15);
        linkedBlockingQueue.add(60);
        linkedBlockingQueue.offer(50);
        linkedBlockingQueue.put(100);

        System.out.println(linkedBlockingQueue);

        System.out.println(linkedBlockingQueue.size());

        System.out.println(linkedBlockingQueue.take());
        System.out.println(linkedBlockingQueue);

        System.out.println(linkedBlockingQueue.poll());
        System.out.println(linkedBlockingQueue);

        System.out.println(linkedBlockingQueue.peek());
        System.out.println(linkedBlockingQueue);

        System.out.println(linkedBlockingQueue.remove(50));
        System.out.println(linkedBlockingQueue);
    }
複製代碼

運行結果:安全

[15, 60, 50, 100]
4
15
[60, 50, 100]
60
[50, 100]
50
[50, 100]
true
[100]
複製代碼

代碼比較簡單,先試着分析下:bash

  1. 建立了一個LinkedBlockingQueue 。
  2. 分別使用add/offer/put方法向LinkedBlockingQueue中添加元素,其中add方法執行了兩次。
  3. 打印出LinkedBlockingQueue:[15, 60, 50, 100]。
  4. 打印出LinkedBlockingQueue的size:4。
  5. 使用take方法彈出第一個元素,並打印出來:15。
  6. 打印出LinkedBlockingQueue:[60, 50, 100]。
  7. 使用poll方法彈出第一個元素,並打印出來:60。
  8. 打印出LinkedBlockingQueue:[50, 100]。
  9. 使用peek方法彈出第一個元素,並打印出來:50。
  10. 打印出LinkedBlockingQueue:[50, 100]。
  11. 使用remove方法,移除值爲50的元素,返回true。
  12. 打印出LinkedBlockingQueue:100。

代碼比較簡單,可是仍是有些細節不明白:源碼分析

  • 底層是如何保證線程安全性的?
  • 數據保存在哪裏,以什麼形式保存的?
  • offer/add/put都是往隊列裏面添加元素,區別是什麼?
  • poll/take/peek都是彈出隊列的元素,區別是什麼?

要解決上面的疑問,最好的途徑仍是看源碼,下面咱們就來看看LinkedBlockingQueue的核心源碼。ui

LinkedBlockingQueue源碼解析

構造方法

LinkedBlockingQueue提供了三個構造方法,以下圖所示: this

image.png
咱們一個一個來分析。

LinkedBlockingQueue()
public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
複製代碼

無參的構造方法居然直接把「鍋」甩出去了,甩給了另一個構造方法,可是咱們要注意傳的參數:Integer.MAX_VALUE。spa

LinkedBlockingQueue(int capacity)
public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
複製代碼
  1. 判斷傳入的capacity是否合法,若是不大於0,直接拋出異常。
  2. 把傳入的capacity賦值給capacity。
  3. 新建一個Node節點,而且把此節點賦值給head和last字段。

這個capacity是什麼呢?若是你們對代碼有必定的感受的話,應該很容易猜到這是LinkedBlockingQueue的最大容量。若是咱們調用無參的構造方法來建立LinkedBlockingQueue的話,那麼它的最大容量就是Integer.MAX_VALUE,咱們把它稱爲「無界」,可是咱們也能夠指定最大容量,那麼此隊列又是一個「有界」隊列了,因此有些博客很草率的說LinkedBlockingQueue是有界隊列,或者是無界隊列,我的認爲這是不嚴謹的。線程

咱們再來看看這個Node是個什麼鬼:指針

static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }
複製代碼

是否是有一種莫名的親切感,很明顯,這是單向鏈表的實現呀,next指向的就是下一個Node。

LinkedBlockingQueue(Collection<? extends E> c)
public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);//調用第二個構造方法,傳入的capacity是Int的最大值,能夠說 是一個無界隊列。
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); //開啓排他鎖
        try {
            int n = 0;//用於記錄LinkedBlockingQueue的size
            //循環傳入的c集合
            for (E e : c) {
                if (e == null)//若是e==null,則拋出空指針異常
                    throw new NullPointerException();
                if (n == capacity)//若是n==capacity,說明到了最大的容量,則拋出「Queue full」異常
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));//入隊操做
                ++n;//n自增
            }
            count.set(n);//設置count
        } finally {
            putLock.unlock();//釋放排他鎖
        }
    }
複製代碼
  1. 調用第二個構造方法,傳入了int的最大值,因此能夠說此時LinkedBlockingQueue是無界隊列。
  2. 開啓排他鎖putLock 。
  3. 定義了一個變量n,用來記錄當前LinkedBlockingQueue的size。
  4. 循環傳入的集合,若是其中的元素爲null,則拋出空指針異常,若是n==capacity,說明到了最大的容量,則拋出「Queue full」異常,不然執行enqueue操做來進行入隊,而後n進行自增。
  5. 設置count爲n,由此可知,count就是LinkedBlockingQueue的size了。
  6. 在finally中釋放排他鎖putLock 。

offer

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();//若是傳入的元素爲NULL,拋出異常
        final AtomicInteger count = this.count;//取出count
        if (count.get() == capacity)//若是count==capacity,說明到了最大容量,直接返回false
            return false;
        int c = -1;//表示size
        Node<E> node = new Node<E>(e);//新建Node節點
        final ReentrantLock putLock = this.putLock;
        putLock.lock();//開啓排他鎖
        try {
            if (count.get() < capacity) {//若是count<capacity,說明尚未達到最大容量
                enqueue(node);//入隊操做
                c = count.getAndIncrement();//得到count,賦值給c後完成自增操做
                if (c + 1 < capacity)//若是c+1 <capacity,說明還有剩餘的空間,喚醒由於調用notFull的await方法而被阻塞的線程
                    notFull.signal();
            }
        } finally {
            putLock.unlock();//在finally中釋放排他鎖
        }
        if (c == 0)//若是c==0,說明釋放putLock的時候,隊列中有一個元素,則調用signalNotEmpty
            signalNotEmpty();
        return c >= 0;
    }
複製代碼
  1. 若是傳進來的元素爲null,則拋出異常。
  2. 把本類實例的count賦值給局部變量count。
  3. 若是count==capacity,說明到了最大的容量,直接返回false。
  4. 定義局部變量c,用來表示size,初始值是-1。
  5. 新建Node節點。
  6. 開啓排他鎖putLock。
  7. 若是count>=capacity,說明到了最大的容量,釋放排他鎖後,返回false,由於此時c=-1,c>=0爲false;若是count<capacity,說明還有剩餘空間,繼續往下執行。這裏須要思考一個問題,爲何第三步已經判斷過了是否還有剩餘空間,這裏還要再判斷一次呢?由於可能有多個線程都在執行add/offer/put方法,當隊列沒有滿的時候,多個線程同時執行到第三步(第三步的時候尚未開啓排他鎖),而後同時往下走,因此開啓排他鎖後,還須要從新判斷下。
  8. 執行入隊操做。
  9. 得到count,而且賦值給c後,完成自增的操做。注意,是先賦值後自增,賦值和自增的前後順序會直接影響到後面的判斷邏輯。
  10. 若是c+1<capacity,說明還有剩餘的空間,喚醒由於調用notFull的await方法而被阻塞的線程。這裏爲何要+1再進行判斷?由於在第9步中,是先賦值後自增,也就是說局部變量c保存的仍是入隊以前LinkedBlockingQueue的size,因此要先進行+1操做,獲得的纔是當前LinkedBlockingQueue的size。
  11. 在finally中,釋放排他鎖putLock。
  12. 若是c==0,說明在釋放putLock排他鎖的時候,隊列中有且只有一個元素,則調用signalNotEmpty方法。讓咱們來看看signalNotEmpty方法:
private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
複製代碼

代碼比較簡單,就是開啓排他鎖,喚醒由於調用notEmpty的await方法而被阻塞的線程,可是這裏須要注意,這裏得到的排他鎖已經再也不是putLock,而是takeLock。

add

public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
複製代碼

add方法直接調用了offer方法,可是add和offer還不徹底同樣,當隊列滿了,若是調用offer方法,會直接返回false,可是調用add方法,會拋出"Queue full"的異常。

put

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();//若是傳入的元素爲NULL,拋出異常
        int c = -1;//表示size
        Node<E> node = new Node<E>(e);//新建Node節點
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;//得到count
        putLock.lockInterruptibly();//開啓排他鎖
        try {
            //若是到了最大容量,調用notFull的await方法,等待喚醒,用while循環,是爲了防止虛假喚醒
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);//入隊
            c = count.getAndIncrement();//count先賦值給c後,再進行自增操做
            if (c + 1 < capacity)//若是c+1<capacity,調用notFull的signal方法,喚醒由於調用notFull的await方法而被阻塞的線程
                notFull.signal();
        } finally {
            putLock.unlock();//釋放排他鎖
        }
        if (c == 0)//若是隊列中有一個元素,喚醒由於調用notEmpty的await方法而被阻塞的線程
            signalNotEmpty();
    }
複製代碼
  1. 若是傳入的元素爲NULL,則拋出異常。
  2. 定義一個局部變量c,來表示size,初始值是-1。
  3. 新建Node節點。
  4. 把本類實例中的count賦值給局部變量count。
  5. 開啓排他鎖putLock。
  6. 若是到了最大容量,則調用notFull的await方法,阻塞當前線程,等待其餘線程調用notFull的signal方法來喚醒本身,這裏用while循環是爲了防止虛假喚醒。
  7. 執行入隊操做。
  8. count先賦值給c後,再進行自增操做。
  9. 若是c+1<capacity,說明還有剩餘的空間,則調用notFull的signal方法,喚醒由於調用notFull的await方法而被阻塞的線程。
  10. 釋放排他鎖putLock。
  11. 若是隊列中有且只有一個元素,喚醒由於調用notEmpty的await方法而被阻塞的線程。

enqueue

private void enqueue(Node<E> node) {
        last = last.next = node;
    }
複製代碼

入隊操做是否是特別簡單,就是把傳入的Node節點,賦值給last節點的next字段,再賦值給last字段,從而造成一個單向鏈表。

小總結

至此offer/add/put的核心源碼已經分析完畢,咱們來作一個小總結,offer/add/put都是添加元素的方法,不過他們之間仍是有所區別的,當隊列滿了,調用以上三個方法會出現不一樣的狀況:

  • offer:直接返回false。
  • add:雖然內部也調用了offer方法,可是隊列滿了,會拋出異常。
  • put:線程會阻塞住,等待喚醒。

size

public int size() {
        return count.get();
    }
複製代碼

沒什麼好說的,count記錄着LinkedBlockingQueue的size,得到後返回就是了。

take

public E take() throws InterruptedException {
        E x;
        int c = -1;//size
        final AtomicInteger count = this.count;//得到count
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//開啓排他鎖
        try {
            while (count.get() == 0) {//說明目前隊列中沒有數據
                notEmpty.await();//阻塞,等待喚醒
            }
            x = dequeue();//出隊
            c = count.getAndDecrement();//先賦值,後自減
            if (c > 1)//若是size>1,說明在出隊以前,隊列中有至少兩個元素
                notEmpty.signal();//喚醒由於調用notEmpty的await方法而被阻塞的線程
        } finally {
            takeLock.unlock();//釋放排他鎖
        }
        if (c == capacity)//若是隊列中還有一個剩餘空間
            signalNotFull();
        return x;
    }
複製代碼
  1. 定義局部變量c,用來表示size,初始值是-1。
  2. 把本類實例的count字段賦值給臨時變量count。
  3. 開啓響應中斷的排他鎖takeLock 。
  4. 若是count==0,說明目前隊列中沒有數據,就阻塞當前線程,等待喚醒,直到其餘線程調用了notEmpty的signal方法喚醒了當前線程。用while循環是爲了防止虛假喚醒。
  5. 進行出隊操做。
  6. count先賦值給c後,在進行自減操做,這裏須要注意是先賦值,後自減。
  7. 若是c>1,也就是size>1,結合上面的先賦值,後自減,可知若是知足條件,說明在出隊以前,隊列中至少有兩個元素,則調用notEmpty的signal方法,喚醒由於調用notEmpty的await方法而被阻塞的線程。
  8. 釋放排他鎖takeLock 。
  9. 若是執行出隊後,隊列中有且只有一個剩餘空間,換個說法,就是執行出隊操做前,隊列是滿的,則調用signalNotFull方法。

咱們再來看下signalNotFull方法:

private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
複製代碼
  1. 開啓排他鎖,注意這裏的排他鎖是putLock 。
  2. 調用notFull的signal方法,喚醒由於調用notFull的await方法而被阻塞的線程。
  3. 釋放排他鎖putLock 。

poll

public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
複製代碼

相比take方法,最大的區別就若是隊列爲空,執行take方法會阻塞當前線程,直到被喚醒,而poll方法,直接返回null。

peek

public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
複製代碼

peek方法,只是拿到頭節點的值,可是不會移除該節點。

dequeue

private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
複製代碼

沒什麼好說的,就是彈出元素,而且移除彈出的元素。

小總結

至此take/poll/peek的核心源碼已經分析完畢,咱們來作一個小總結,take/poll/peek都是得到頭節點值的方法,不過他們之間仍是有所區別的:

  • take:當隊列爲空,會阻塞當前線程,直到被喚醒。會進行出隊操做,移除得到的節點。
  • poll:當隊列爲空,直接返回null。會進行出隊操做,移除得到的節點。
  • peek:當隊列爲空,直接返回null。不會移除節點。

LinkedBlockingQueue的核心源碼分析到這裏完畢了,謝謝你們。

相關文章
相關標籤/搜索