【Java併發編程】—–「J.U.C」:LinkedBlockingQueue

前言

在前面的文章ArrayBlockingQueue源碼分析中,已經對JDK中的BlockingQueue中的作了一個回顧,同時對ArrayBlockingQueue中的核心方法做了說明,而LinkedBlockingQueue做爲JDK中BlockingQueue家族系列中一員,因爲其做爲固定大小線程池(Executors.newFixedThreadPool())底層所使用的阻塞隊列,分析它的目的主要在於2點:
(1) 與ArrayBlockingQueue進行類比學習,加深各類數據結構的理解
(2) 瞭解底層實現,可以更好地理解每一種阻塞隊列對線程池性能的影響,作到真正的知其然,且知其因此然node

  • 源碼分析LinkedBlockingQueue的實現
  • 與ArrayBlockingQueue進行比較
  • 說明爲何選擇LinkedBlockingQueue做爲固定大小的線程池的阻塞隊列
    如發現有分析不對或不許確的地方,請您及時糾正(在此謝過)

1.LinkedBlockingQueue深刻分析

LinkedBlockingQueue,見名之意,它是由一個基於鏈表的阻塞隊列,首先看一下的核心組成:數組

// 全部的元素都經過Node這個靜態內部類來進行存儲,這與LinkedList的處理方式徹底同樣
    static class Node<E> {
        //使用item來保存元素自己
        E item;
        //保存當前節點的後繼節點
        Node<E> next;
        Node(E x) { item = x; }
    }
    /**
        阻塞隊列所能存儲的最大容量
        用戶能夠在建立時手動指定最大容量,若是用戶沒有指定最大容量
        那麼最默認的最大容量爲Integer.MAX_VALUE.
    */
    private final int capacity;

    /** 
        當前阻塞隊列中的元素數量
        PS:若是你看過ArrayBlockingQueue的源碼,你會發現
        ArrayBlockingQueue底層保存元素數量使用的是一個
        普通的int類型變量。其緣由是在ArrayBlockingQueue底層
        對於元素的入隊列和出隊列使用的是同一個lock對象。而數
        量的修改都是在處於線程獲取鎖的狀況下進行操做,所以不
        會有線程安全問題。
        而LinkedBlockingQueue卻不是,它的入隊列和出隊列使用的是兩個    
        不一樣的lock對象,所以不管是在入隊列仍是出隊列,都會涉及對元素數
        量的併發修改,(以後經過源碼能夠更加清楚地看到)所以這裏使用了一個原子操做類
        來解決對同一個變量進行併發修改的線程安全問題。
    */
    private final AtomicInteger count = new AtomicInteger(0);

    /**
     * 鏈表的頭部
     * LinkedBlockingQueue的頭部具備一個不變性:
     * 頭部的元素老是爲null,head.item==null   
     */
    private transient Node<E> head;

    /**
     * 鏈表的尾部
     * LinkedBlockingQueue的尾部也具備一個不變性:
     * 即last.next==null
     */
    private transient Node<E> last;

    /**
     元素出隊列時線程所獲取的鎖
     當執行take、poll等操做時線程須要獲取的鎖
    */
    private final ReentrantLock takeLock = new ReentrantLock();

    /**
    當隊列爲空時,經過該Condition讓從隊列中獲取元素的線程處於等待狀態
    */
    private final Condition notEmpty = takeLock.newCondition();

    /** 
      元素入隊列時線程所獲取的鎖
      當執行add、put、offer等操做時線程須要獲取鎖
    */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 
     當隊列的元素已經達到capactiy,經過該Condition讓元素入隊列的線程處於等待狀態
    */
    private final Condition notFull = putLock.newCondition();

經過上面的分析,咱們能夠發現LinkedBlockingQueue在入隊列和出隊列時使用的不是同一個Lock,這也意味着它們之間的操做不會存在互斥操做。在多個CPU的狀況下,它們能夠作到真正的在同一時刻既消費、又生產,可以作到並行處理。安全

下面讓咱們看下LinkedBlockingQueue的構造方法:數據結構

/**
     * 若是用戶沒有顯示指定capacity的值,默認使用int的最大值
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    /**
     能夠看到,當隊列中沒有任何元素的時候,此時隊列的頭部就等於隊列的尾部,
     指向的是同一個節點,而且元素的內容爲null
    */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    /*
    在初始化LinkedBlockingQueue的時候,還能夠直接將一個集合
    中的元素所有入隊列,此時隊列最大容量依然是int的最大值。
    */
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        //獲取鎖
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            //迭代集合中的每個元素,讓其入隊列,而且更新一下當前隊列中的元素數量
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                //參考下面的enqueue分析        
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            //釋放鎖
            putLock.unlock();
        }
    }

    /**
     * 我去,這代碼其實可讀性不怎麼樣啊。
     * 其實下面的代碼等價於以下內容:
     * last.next=node;
     * last = node;
     * 其實也沒有什麼花樣:
       就是讓新入隊列的元素成爲原來的last的next,讓進入的元素稱爲last
     *
     */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

在分析完LinkedBlockingQueue的核心組成以後,下面讓咱們再看下核心的幾個操做方法,首先分析一下元素入隊列的過程:併發

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        
        /*注意上面這句話,約定全部的put/take操做都會預先設置本地變量,
        能夠看到下面有一個將putLock賦值給了一個局部變量的操做
        */
        int c = -1;
        Node<E> node = new Node(e);
        /* 
         在這裏首先獲取到putLock,以及當前隊列的元素數量
         即上面所描述的預設置本地變量操做
        */
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        /*
            執行可中斷的鎖獲取操做,即意味着若是線程因爲獲取
            鎖而處於Blocked狀態時,線程是能夠被中斷而再也不繼
            續等待,這也是一種避免死鎖的一種方式,不會由於
            發現到死鎖以後而因爲沒法中斷線程最終只能重啓應用。
        */
        putLock.lockInterruptibly();
        try {
            /*
            當隊列的容量到底最大容量時,此時線程將處於等待狀
            態,直到隊列有空閒的位置才繼續執行。使用while判
            斷依舊是爲了放置線程被"僞喚醒」而出現的狀況,即當
            線程被喚醒時而隊列的大小依舊等於capacity時,線
            程應該繼續等待。
            */
            while (count.get() == capacity) {
                notFull.await();
            }
            //讓元素進行隊列的末尾,enqueue代碼在上面分析過了
            enqueue(node);
            //首先獲取原先隊列中的元素個數,而後再對隊列中的元素個數+1.
            c = count.getAndIncrement();
            /*注:c+1獲得的結果是新元素入隊列以後隊列元素的總和。
            當前隊列中的總元素個數小於最大容量時,此時喚醒其餘執行入隊列的線程
            讓它們能夠放入元素,若是新加入元素以後,隊列的大小等於capacity,
            那麼就意味着此時隊列已經滿了,也就沒有必需要喚醒其餘正在等待入隊列的線程,由於喚醒它們以後,它們也仍是繼續等待。
            */
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //完成對鎖的釋放
            putLock.unlock();
        }
        /*當c=0時,即意味着以前的隊列是空隊列,出隊列的線程都處於等待狀態,
        如今新添加了一個新的元素,即隊列再也不爲空,所以它會喚醒正在等待獲取元素的線程。
        */
        if (c == 0)
            signalNotEmpty();
    }
    
    /*
    喚醒正在等待獲取元素的線程,告訴它們如今隊列中有元素了
    */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //經過notEmpty喚醒獲取元素的線程
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

看完put方法,下面再看看下offer是如何處理的方法:源碼分析

/**
    在BlockingQueue接口中除了定義put方法外(當隊列元素滿了以後就會阻塞,
    直到隊列有新的空間能夠方法線程纔會繼續執行),還定義一個offer方法,
    該方法會返回一個boolean值,當入隊列成功返回true,入隊列失敗返回false。
    該方法與put方法基本操做基本一致,只是有細微的差別。
    */
     public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        /*
            當隊列已經滿了,它不會繼續等待,而是直接返回。
            所以該方法是非阻塞的。
        */
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            /*
            當獲取到鎖時,須要進行二次的檢查,由於可能當隊列的大小爲capacity-1時,
            兩個線程同時去搶佔鎖,而只有一個線程搶佔成功,那麼此時
            當線程將元素入隊列後,釋放鎖,後面的線程搶佔鎖以後,此時隊列
            大小已經達到capacity,因此將它沒法讓元素入隊列。
            下面的其他操做和put都同樣,此處再也不詳述
            */
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

BlockingQueue還定義了一個限時等待插入操做,即在等待必定的時間內,若是隊列有空間能夠插入,那麼就將元素入隊列,而後返回true,若是在過完指定的時間後依舊沒有空間能夠插入,那麼就返回false,下面是限時等待操做的分析:性能

/**
         經過timeout和TimeUnit來指定等待的時長
         timeout爲時間的長度,TimeUnit爲時間的單位
        */
        public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        //將指定的時間長度轉換爲毫秒來進行處理
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                //若是等待的剩餘時間小於等於0,那麼直接返回
                if (nanos <= 0)
                    return false;
            /*
              經過condition來完成等待,此時當前線程會完成鎖的,而且處於等待狀態
              直到被其餘線程喚醒該線程、或者當前線程被中斷、
              等待的時間截至纔會返回,該返回值爲從方法調用到返回所經歷的時長。
              注意:上面的代碼是condition的awitNanos()方法的通用寫法,
              能夠參看Condition.awaitNaos的API文檔。
              下面的其他操做和put都同樣,此處再也不詳述
            */
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

經過上面的分析,咱們應該比較清楚地知道了LinkedBlockingQueue的入隊列的操做,其主要是經過獲取到putLock鎖來完成,當隊列的數量達到最大值,此時會致使線程處於阻塞狀態或者返回false(根據具體的方法來看);若是隊列還有剩餘的空間,那麼此時會新建立出一個Node對象,將其設置到隊列的尾部,做爲LinkedBlockingQueue的last元素。學習

在分析完入隊列的過程以後,咱們接下來看看LinkedBlockingQueue出隊列的過程;因爲BlockingQueue的方法都具備對稱性,此處就只分析take方法的實現,其他方法的實現都一模一樣:this

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //經過takeLock獲取鎖,而且支持線程中斷
        takeLock.lockInterruptibly();
        try {
            //當隊列爲空時,則讓當前線程處於等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            //完成元素的出隊列
            x = dequeue();
            /*            
               隊列元素個數完成原子化操做-1,能夠看到count元素會
               在插入元素的線程和獲取元素的線程進行併發修改操做。
            */
            c = count.getAndDecrement();
            /*
              當一個元素出隊列以後,隊列的大小依舊大於1時
              當前線程會喚醒其餘執行元素出隊列的線程,讓它們也
              能夠執行元素的獲取
            */
            if (c > 1)
                notEmpty.signal();
        } finally {
            //完成鎖的釋放
            takeLock.unlock();
        }
        /*
            當c==capaitcy時,即在獲取當前元素以前,
            隊列已經滿了,而此時獲取元素以後,隊列就會
            空出一個位置,故當前線程會喚醒執行插入操做的線
            程通知其餘中的一個能夠進行插入操做。
        */
        if (c == capacity)
            signalNotFull();
        return x;
    }


    /**
     * 讓頭部元素出隊列的過程
     * 其最終的目的是讓原來的head被GC回收,讓其的next成爲head
     * 而且新的head的item爲null.
     * 由於LinkedBlockingQueue的頭部具備一致性:即元素爲null。
     */
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

LinkedBlockingQueue出隊列大體過程.pngspa

對於LinkedBlockingQueue的源碼分析就到這裏,下面讓咱們將LinkedBlockingQueue與ArrayBlockingQueue進行一個比較。

2.LinkedBlockingQueue與ArrayBlockingQueue的比較

ArrayBlockingQueue因爲其底層基於數組,而且在建立時指定存儲的大小,在完成後就會當即在內存分配固定大小容量的數組元素,所以其存儲一般有限,故其是一個「有界「的阻塞隊列;而LinkedBlockingQueue能夠由用戶指定最大存儲容量,也能夠無需指定,若是不指定則最大存儲容量將是Integer.MAX_VALUE,便可以看做是一個「無界」的阻塞隊列,因爲其節點的建立都是動態建立,而且在節點出隊列後能夠被GC所回收,所以其具備靈活的伸縮性。可是因爲ArrayBlockingQueue的有界性,所以其可以更好的對於性能進行預測,而LinkedBlockingQueue因爲沒有限制大小,當任務很是多的時候,不停地向隊列中存儲,就有可能致使內存溢出的狀況發生。

其次,ArrayBlockingQueue中在入隊列和出隊列操做過程當中,使用的是同一個lock,因此即便在多核CPU的狀況下,其讀取和操做的都沒法作到並行,而LinkedBlockingQueue的讀取和插入操做所使用的鎖是兩個不一樣的lock,它們之間的操做互相不受干擾,所以兩種操做能夠並行完成,故LinkedBlockingQueue的吞吐量要高於ArrayBlockingQueue。

3.選擇LinkedBlockingQueue的理由

/**
        下面的代碼是Executors建立固定大小線程池的代碼,其使用了
        LinkedBlockingQueue來做爲任務隊列。
    */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

JDK中選用LinkedBlockingQueue做爲阻塞隊列的緣由就在於其無界性。由於線程大小固定的線程池,其線程的數量是不具有伸縮性的,當任務很是繁忙的時候,就勢必會致使全部的線程都處於工做狀態,若是使用一個有界的阻塞隊列來進行處理,那麼就很是有可能很快致使隊列滿的狀況發生,從而致使任務沒法提交而拋出RejectedExecutionException,而使用無界隊列因爲其良好的存儲容量的伸縮性,能夠很好的去緩衝任務繁忙狀況下場景,即便任務很是多,也能夠進行動態擴容,當任務被處理完成以後,隊列中的節點也會被隨之被GC回收,很是靈活。

至此,LinkedBlockingQueue的分析就到這裏,若是您發現有任何編寫不對的地方,請指出(萬分感謝!)。

相關文章
相關標籤/搜索