併發容器之ArrayBlockingQueue和LinkedBlockingQueue實現原理詳解

1. ArrayBlockingQueue簡介

在多線程編程過程當中,爲了業務解耦和架構設計,常常會使用併發容器用於存儲多線程間的共享數據,這樣不只能夠保證線程安全,還能夠簡化各個線程操做。例如在「生產者-消費者」問題中,會使用阻塞隊列(BlockingQueue)做爲數據容器,關於BlockingQueue能夠看這篇文章。爲了加深對阻塞隊列的理解,惟一的方式是對其實驗原理進行理解,這篇文章就主要來看看ArrayBlockingQueue和LinkedBlockingQueue的實現原理。node

2. ArrayBlockingQueue實現原理

阻塞隊列最核心的功能是,可以可阻塞式的插入和刪除隊列元素。當前隊列爲空時,會阻塞消費數據的線程,直至隊列非空時,通知被阻塞的線程;當隊列滿時,會阻塞插入數據的線程,直至隊列未滿時,通知插入數據的線程(生產者線程)。那麼,多線程中消息通知機制最經常使用的是lock的condition機制,關於condition能夠看這篇文章的詳細介紹。那麼ArrayBlockingQueue的實現是否是也會採用Condition的通知機制呢?下面來看看。編程

2.1 ArrayBlockingQueue的主要屬性

ArrayBlockingQueue的主要屬性以下:數組

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;
複製代碼

從源碼中能夠看出ArrayBlockingQueue內部是採用數組進行數據存儲的(屬性items),爲了保證線程安全,採用的是ReentrantLock lock,爲了保證可阻塞式的插入刪除數據利用的是Condition,當獲取數據的消費者線程被阻塞時會將該線程放置到notEmpty等待隊列中,當插入數據的生產者線程被阻塞時,會將該線程放置到notFull等待隊列中。而notEmpty和notFull等中要屬性在構造方法中進行建立:安全

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
複製代碼

接下來,主要看看可阻塞式的put和take方法是怎樣實現的。數據結構

2.2 put方法詳解

put(E e)方法源碼以下:多線程

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
		//若是當前隊列已滿,將線程移入到notFull等待隊列中
        while (count == items.length)
            notFull.await();
		//知足插入數據的要求,直接進行入隊操做
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
複製代碼

該方法的邏輯很簡單,當隊列已滿時(count == items.length)將線程移入到notFull等待隊列中,若是當前知足插入數據的條件,就能夠直接調用enqueue(e)插入數據元素。enqueue方法源碼爲:架構

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
	//插入數據
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
	//通知消費者線程,當前隊列中有數據可供消費
    notEmpty.signal();
}
複製代碼

enqueue方法的邏輯一樣也很簡單,先完成插入數據,即往數組中添加數據(items[putIndex] = x),而後通知被阻塞的消費者線程,當前隊列中有數據可供消費(notEmpty.signal())。併發

2.3 take方法詳解

take方法源碼以下:less

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
		//若是隊列爲空,沒有數據,將消費者線程移入等待隊列中
        while (count == 0)
            notEmpty.await();
		//獲取數據
        return dequeue();
    } finally {
        lock.unlock();
    }
}
複製代碼

take方法也主要作了兩步:1. 若是當前隊列爲空的話,則將獲取數據的消費者線程移入到等待隊列中;2. 若隊列不爲空則獲取數據,即完成出隊操做dequeue。dequeue方法源碼爲:post

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
	//獲取數據
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //通知被阻塞的生產者線程
	notFull.signal();
    return x;
}
複製代碼

dequeue方法也主要作了兩件事情:1. 獲取隊列中的數據,即獲取數組中的數據元素((E) items[takeIndex]);2. 通知notFull等待隊列中的線程,使其由等待隊列移入到同步隊列中,使其可以有機會得到lock,並執行完成功退出。

從以上分析,能夠看出put和take方法主要是經過condition的通知機制來完成可阻塞式的插入數據和獲取數據。在理解ArrayBlockingQueue後再去理解LinkedBlockingQueue就很容易了。

3. LinkedBlockingQueue實現原理

LinkedBlockingQueue是用鏈表實現的有界阻塞隊列,當構造對象時爲指定隊列大小時,隊列默認大小爲Integer.MAX_VALUE。從它的構造方法能夠看出:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
複製代碼

3.1 LinkedBlockingQueue的主要屬性

LinkedBlockingQueue的主要屬性有:

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
transient Node<E> head;

/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
複製代碼

能夠看出與ArrayBlockingQueue主要的區別是,LinkedBlockingQueue在插入數據和刪除數據時分別是由兩個不一樣的lock(takeLockputLock)來控制線程安全的,所以,也由這兩個lock生成了兩個對應的condition(notEmptynotFull)來實現可阻塞的插入和刪除數據。而且,採用了鏈表的數據結構來實現隊列,Node結點的定義爲:

static class Node<E> {
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;

    Node(E x) { item = x; }
}
複製代碼

接下來,咱們也一樣來看看put方法和take方法的實現。

3.2 put方法詳解

put方法源碼爲:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
		//若是隊列已滿,則阻塞當前線程,將其移入等待隊列
        while (count.get() == capacity) {
            notFull.await();
        }
		//入隊操做,插入數據
        enqueue(node);
        c = count.getAndIncrement();
		//若隊列知足插入數據的條件,則通知被阻塞的生產者線程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
複製代碼

put方法的邏輯也一樣很容易理解,可見註釋。基本上和ArrayBlockingQueue的put方法同樣。take方法的源碼以下:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
		//當前隊列爲空,則阻塞當前線程,將其移入到等待隊列中,直至知足條件
        while (count.get() == 0) {
            notEmpty.await();
        }
		//移除隊頭元素,獲取數據
        x = dequeue();
        c = count.getAndDecrement();
        //若是當前知足移除元素的條件,則通知被阻塞的消費者線程
		if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
複製代碼

take方法的主要邏輯請見於註釋,也很容易理解。

4. ArrayBlockingQueue與LinkedBlockingQueue的比較

相同點:ArrayBlockingQueue和LinkedBlockingQueue都是經過condition通知機制來實現可阻塞式插入和刪除元素,並知足線程安全的特性;

不一樣點:1. ArrayBlockingQueue底層是採用的數組進行實現,而LinkedBlockingQueue則是採用鏈表數據結構; 2. ArrayBlockingQueue插入和刪除數據,只採用了一個lock,而LinkedBlockingQueue則是在插入和刪除分別採用了putLocktakeLock,這樣能夠下降線程因爲線程沒法獲取到lock而進入WAITING狀態的可能性,從而提升了線程併發執行的效率。

相關文章
相關標籤/搜索