上一篇已經解釋了ReentrantLock,AQS基本框架html
AQS是JUC重要的同步器,全部的鎖基於這個同步器實現,先溫習下 主要由如下幾個重要部分組成java
那麼今天就是重點介紹AQS中的ConditionObject 功能和實現。node
在AQS中,Node節點經過nextWaiter指針串起來的就是條件隊列,中有ConditionObject對象,實現接口Condition,由具體的lock,如ReentrantLock.newCondition建立和調用。api
Condition主要接口有:緩存
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統線程的通訊方式,Condition均可以實現。多線程
**好比一個對象裏面能夠有多個Condition,能夠註冊在不一樣的condition,能夠有選擇性的調度線程,很靈活。而Synchronized只有一個condition(就是對象自己),全部的線程都註冊在這個conditon身上,線程調度不靈活。 ** Condition和傳統的線程通訊沒什麼區別,Condition的強大之處在於它能夠爲多個線程間創建不一樣的Condition,下面引入API中的一段代碼(原文地址,http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html),ArrayBlockingQueue 提供了相似的功能,實習原理相似。oracle
class BoundedBuffer { final Lock lock = new ReentrantLock();//鎖對象 final Condition notFull = lock.newCondition();//寫線程條件 final Condition notEmpty = lock.newCondition();//讀線程條件 final Object[] items = new Object[100];//緩存隊列 int putptr/*寫索引*/, takeptr/*讀索引*/, count/*隊列中存在的數據個數*/; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length)//若是隊列滿了 notFull.await();//阻塞寫線程 items[putptr] = x; if (++putptr == items.length) putptr = 0;//若是寫索引寫到隊列的最後一個位置了,那麼置爲0 ++count;//個數++ notEmpty.signal();//喚醒讀線程 } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0)//若是隊列爲空 notEmpty.await();//阻塞讀線程 Object x = items[takeptr];//取值 if (++takeptr == items.length) takeptr = 0;//若是讀索引讀到隊列的最後一個位置了,那麼置爲0 --count;//個數-- notFull.signal();//喚醒寫線程 return x; } finally { lock.unlock(); } } }
在多線程環境下,緩存區提供了兩個方法,put and take,put是存數據的,take是取數據的,items充當緩存隊列,運行場景是有多個線程同時往調用put和take;這裏的讀和寫共用的實際上是一把鎖lock,實際讀寫不能同時並行;當寫入一個數據後,便通知讀線程,寫滿後調用notFull.await()阻塞寫;當讀入一個數據後,便通知寫線程,讀空後調用notEmpty.await()阻塞讀,等待寫條件通知信號;框架
接下來咱們講解AQS.ConditionObject 內部源碼。 主要分析如下幾個方法:less
這裏我先定義下兩個概念:ui
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //添加到Condition本身維護的一個鏈表中,經過nextWaiter鏈接起來,即條件等待隊列 int savedState = fullyRelease(node); //釋當前節點的鎖,並返回釋放以前的鎖狀態,執行await必定是先拿到了當前鎖的 int interruptMode = 0; while (!isOnSyncQueue(node)) { //判斷該節點是否在鎖等待隊列中,當釋放鎖後就不在鎖隊列中了,等待條件隊列的線程應當被繼續阻塞,若是在鎖等待隊列中,則說明有資格獲取鎖,執行下一步,聰明的你發現是在signal時被再次加入鎖等待隊列 LockSupport.park(this); //在條件隊列中阻塞該線程 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被喚醒後,從新開始正式競爭鎖,一樣,若是競爭不到仍是會將本身沉睡,等待喚醒從新開始競爭。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // 存在下一個節點,從條件隊列中取消不是在CONDITION的線程節點 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
當前waiter線程加入條件等待隊列裏,從隊尾入隊,並將節點線程狀態設置爲CONDITION
/** *當前waiter線程加入條件等待隊列裏,從隊尾入隊,並將節點線程狀態設置爲CONDITION * [@return](https://my.oschina.net/u/556800) 返回新的節點 */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
釋當前節點的鎖,並返回釋放以前的鎖狀態,執行await必定是先拿到了當前鎖的
/** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */
//釋當前節點的鎖,並返回釋放以前的鎖狀態,執行await必定是先拿到了當前鎖的
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
判斷這個節點是否在鎖等待隊列中,顯然waitStatus爲CONDITION這種結點,只屬於條件隊列,不在鎖等待隊列中。
判斷這個節點是否在鎖等待隊列中,顯然waitStatus爲CONDITION這種結點,只屬於條件隊列,不在鎖等待隊列中。 final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); }
從條件隊列中的頭節點開始,釋放第一個節點線程,加入到鎖等待隊列
//從條件隊列中的頭節點開始,釋放第一個節點線程,加入到鎖等待隊列 public final void signal() { //當前線程不是擁有鎖線程這拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter;//條件隊列中第一個節點 if (first != null) doSignal(first); }
doSignal()方法是一個while循環,直到transferForSignal成功爲止, 不斷地完成這樣一個transfer(條件隊列--->鎖同步等待隊列)操做,直到有一個成功爲止。
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //將第一個節點的繼任變量設置空,意義是first與next節點斷掉,便於gc first.nextWaiter = null; // 將節點從條件隊列轉移到鎖等待隊列,若是成功則返回true,while循環終止,若是轉移失敗,則從first日後找,聰明的你確定就知道,若是是doSignalAll,這從first一直日後執行transferForSignal } while (!transferForSignal(first) && (first = firstWaiter) != null); }
將節點從條件隊列轉移到鎖等待隊列,若是成功則返回true transferForSignal作了兩件事
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal). */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //假設在條件隊列中,將node狀態設置0,若是waitStatus不能被更改,則該節點已經被取消 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //經過enq將節點加入到鎖等待隊列,從隊尾插入,並返回前驅節點。 Node p = enq(node); int ws = p.waitStatus; //若是該前驅節點被取消或者更改waitStatus狀態爲SIGNAL失敗,說明前驅失效,則喚醒在鎖同步等待隊列當前節點 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
。。。待續