今天在QQ羣上拋出來一個問題,以下html
我以Java自帶的數據結構爲例,用源碼的形式說明,如何阻塞線程、通知線程的。node
1、Lock & Condition
ArrayBlockingQueue以可重入鎖和兩個Condition對象來控制併發。數據結構
/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
構造函數中初始化了notEmpty和notFull.併發
/** * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed) * capacity and the specified access policy. * @param capacity the capacity of this queue * @param fair if <tt>true</tt> then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if <tt>false</tt> the access order is unspecified. * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
2、線程阻塞
當ArrayBlockingQueue存儲的元素是0個的時候,take()方法會阻塞.less
public Object take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } Object x = extract(); return x; } finally { lock.unlock(); } }
這裏take方法首先得到可重入鎖lock,而後判斷若是元素爲空就執行notEmpty.await();
這個時候線程掛起。函數
3、通知線程
好比使用put放入一個新元素,ui
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
在enqueue
方法中,this
/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
對剛纔的notEmpty
Condition進行通知。
4、ReentrantLock vs AbstractQueuedSynchronizer
ArrayBlockingQueue使用ReentrantLock來控制併發,同時也使用ArrayBlockingQueue的Condition對象來與線程交互。notEmpty
和notFull
都是由
ReentrantLock的成員變量sync
生成的,spa
public Condition newCondition() { return sync.newCondition(); }
sync
能夠認爲是一個抽象類類型,Sync
,它是在ReentrantLock內部定義的靜態抽象類,抽象類實現了newCondition
方法,.net
final ConditionObject newCondition() { return new ConditionObject(); }
返回的類型是實現了Condition
接口的ConditionObject
類,這是在AbstractQueuedSynchronizer
內部定義的類。在ArrayBlockingQueue
中的notEmpty
就是ConditionObject
實例。
阻塞:
當ArrayBlockingQueue
爲空時,notEmpty.await()
將本身掛起,如ConditionObject的await方法,
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter是將當前線程做爲一個node加入到ConditionObject的隊列中,隊列是用鏈表實現的。
若是是初次加入隊列的狀況,node.waitStatus == Node.CONDITION
成立,方法isOnSyncQueue
返回false,那麼就將線程park。
while (!isOnSyncQueue(node)) { LockSupport.park(this); .... }
至此線程被掛起,LockSupport.park(this)
;這裏this是指ConditionObject,是notEmpty.
通知:
當新的元素put進入ArrayBlockingQueue
後,notEmpty.signal()
通知在這上面等待的線程,如ConditionObject的signal方法,
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
doSignal方法,
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
doSignal一開始接收到的參數就是firstWaiter這個參數,在內部實現中用了do..while的形式,首先將first的的nextWaiter找出來保存到firstWaiter此時(first和firstWaiter不是一回事),在while的比較條件中可調用了transferForSignal方法,
整個while比較條件能夠看着短路邏輯,若是transferForSignal結果爲true,後面的first = firstWaiter就不執行了,整個while循環就結束了。
參照註釋,看
transferForSignal方法,
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
首先確保想要被signal的等待node仍是處於Node.CONDITION狀態,而後調整狀態爲Node.SIGNAL,這兩個都是採用CAS方法,最後調用的是
LockSupport.unpark(node.thread);
5、LockSupport
至此,咱們已經知道了線程的掛起和通知都是使用LockSupport來完成的,併發數據結構與線程直接的交互最終也是須要LockSupport。那麼關於LockSupport,咱們又能夠了解多少呢?
Ref:
Java中的ReentrantLock和synchronized兩種鎖定機制的對比
Java的LockSupport.park()實現分析