最近在看Java Threads第三版,收穫頗多。全英文閱讀,感受真的是爽歪歪。推薦你們都看看。java
這一篇想系統的講一講,線程之間通訊的2種模式,wait-notify 和 Condition。node
先上一個生產者和消費者的例子數據庫
package waitnotify; import java.util.ArrayList; import java.util.List; import java.util.Random; public class Data { private List<Integer> data = new ArrayList<Integer>(5); private Random random = new Random(); public void put() { synchronized (data) { if (data.size() >= 5) { try { data.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { data.add(random.nextInt(100)); data.notify(); } } } public void get() { synchronized (data) { if (data.size() < 1) { try { data.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { data.remove(0); data.notify(); } } } }
package waitnotify; public class Producer extends Thread{ private Data data; public Producer(Data data, String name) { this.data = data; this.setName(name); } @Override public void run() { for (int i=0; i<3; i++) { data.put(); } } }
package waitnotify; public class Consumer extends Thread { private Data data; public Consumer(Data data, String name) { this.data = data; this.setName(name); } @Override public void run() { for (int i=0; i<3; i++) { data.get(); } } }
package waitnotify; public class Test { public static void main (String args[]) { Data data = new Data(); new Producer(data, "put---1").start(); new Consumer(data, "get---1").start(); new Consumer(data, "get---2").start(); new Producer(data, "put---2").start(); } }
這個例子簡單、易懂、易讀。首先基於synchronized鎖定共享資源(data),而後生產者和消費者經過wait和notify相互通訊,實現互斥的訪問共享資源。這裏的關鍵是互斥,只要用了synchronized,全部的線程都只能互斥的執行。數據結構
一、那咱們思考一個有關數據庫的問題,寫操做和讀操做、寫操做和寫操做都是須要互斥的,讀操做和讀操做須要互斥嗎?不須要,由於兩個讀操做同時進行,不會產生髒數據。這種狀況,synchronized就無從下手。併發
二、還有,當多個線程訪問synchronized代碼,只有一個線程在執行,其它的線程只能死等。這裏也是能夠增長一些靈活性的,好比說,其它的線程能夠等到必定時間後,中斷本身,去幹別的事情。app
三、notify的做用是喚醒除了本身之外的其它線程,至因而哪個,它無論。試想一個場景,有多個生產者線程和消費者線程。若是一個生產者線程發現data被裝滿,本身就等待,而後喚醒一個線程,那喚醒的那個線程是生產者線程,仍是消費者線程呢?若是又喚醒了一個生產者線程,它仍是等待,雖然不會產生髒數據,可是浪費了性能和時間啊。此處也能夠優化。less
下面就一塊兒看看,如何解決上面的問題。dom
先來看看Lock這個接口,定義了哪些行爲ide
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
其中2個拋出中斷異常的方法能夠解決第二個問題;newCondition方法生成多個Condition能夠解決第三個問題;而另外一個鎖接口ReadWriteLock解決了第一個問題,實現類裏面用到了共享鎖。 性能
再來一個Condition版本的生產者和消費者(主要是修改了Data類)
package condition; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Data { private List<Integer> data = new ArrayList<Integer>(5); private Random random = new Random(); Lock lock = new ReentrantLock(); Condition full = lock.newCondition(); Condition empty = lock.newCondition(); public void put() { lock.lock(); try { if (data.size() >= 5) { full.await(); } else { data.add(random.nextInt(100)); empty.signal(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void get() { lock.lock(); try { if (data.size() < 1) { empty.await(); } else { data.remove(0); full.signal(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
這裏用一個lock生成了2個Condition,而後分別進行await和signal。假設一個生產者線程發現data裝滿,而後full.await,而後以前的empty.signal會喚醒一個線程,此線程一定是消費者線程。其實講到這裏,讀者應該能猜到,full和empty分別維護了各自的等待隊列,full的隊列裏全是生產者線程,empty隊列裏全是消費者線程。另外發現,代碼彷佛變複雜了,必須在finally裏面釋放鎖。由於synchronized是JVM實現的鎖語義,JVM會自動的幫你釋放鎖。而Lock是Java代碼層面的語義,若是有異常,須要本身釋放。
講了半天,好像只是描述了一個表面現象,而沒有接觸到實質。沒辦法,源碼走起。
從最重要的一個類ReentrantLock的實現看起,發現幾乎全部的操做,兜兜轉轉都是調用了另外一個核心類AbstractQueuedSynchronizer。這個類是Java併發大師的傑做,是Lock機制的靈魂所在,我至今還有些地方沒可以徹底領悟。哎,大師就是大師。
固然,大師也是站在巨人的肩膀上。AbstractQueuedSynchronizer的核心思想是CLH隊列,說白了,就是將同一個鎖上等待的全部線程當作一個個節點,每個節點都在自旋,等待前驅節點的信號而且嘗試獲取鎖。並且,AbstractQueuedSynchronizer將同一個Condition上等待的全部線程也封裝成了節點隊列。
那就先看看這個節點是個什麼數據結構
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
這個Node是AbstractQueuedSynchronizer的一個內部類,大部分的英文註釋說的很明白了。Node就是對線程的包裝,Node有兩種模式,獨佔(EXCLUSIVE)和共享(SHARED),前面提到的讀-讀操做就是利用了共享鎖。對於一個節點的等待狀態,CANCELLED表示當前節點被取消,好比說線程中斷致使的;SIGNAL表示後繼節點能夠執行;CONDITION表示此節點在等待一個Condition,在Condition等待隊列裏面。
再看看Condition等待隊列的結構
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } // Internal methods /** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } /** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } // public methods /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * Implements uninterruptible condition wait. * <ol> * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } /* * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */ /** Mode meaning to reinterrupt on exit from wait */ private static final int REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1; /** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } /** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } /** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } /** * Implements absolute timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } /** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } // support for instrumentation /** * Returns true if this condition was created by the given * synchronization object. * * @return {@code true} if owned */ final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } /** * Queries whether any threads are waiting on this condition. * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. * * @return {@code true} if there are any waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } /** * Returns an estimate of the number of threads waiting on * this condition. * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. * * @return the estimated number of waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } /** * Returns a collection containing those threads that may be * waiting on this Condition. * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. * * @return the collection of threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } }
ConditionObject也是AbstractQueuedSynchronizer的一個內部類,主要是處理線程通訊的一個數據結構,裏面維護一個等待被喚醒的線程的節點隊列。
AbstractQueuedSynchronizer的成員變量,和幾個CAS操做
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. */ private volatile int state; -----------------------------CAS操做---------------------------------------------- /** * Setup to support compareAndSet. We need to natively implement * this here: For the sake of permitting future enhancements, we * cannot explicitly subclass AtomicInteger, which would be * efficient and useful otherwise. So, as the lesser of evils, we * natively implement using hotspot intrinsics API. And while we * are at it, we do the same for other CASable fields (which could * otherwise be done with atomic field updaters). */ private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } /** * CAS head field. Used only by enq. */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * CAS waitStatus field of a node. */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } /** * CAS next field of a node. */ private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); }
AbstractQueuedSynchronizer經過head和tail鏈接一個邏輯上的隊列,我稱之爲sync隊列,與Condition隊列區別開來。後面的CAS操做,之後專門寫一遍文章來闡述
如今再從新審視一遍Lock版本的生產者-消費者
lock.lock()調用的是非公平鎖的lock方法
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
第一個進來的線程如願的得到了鎖,其它的線程調用acquire(1),這個方法在AbstractQueuedSynchronizer
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這幾行代碼真是短小精悍!!!tryAcquire方法原本就是留給子類實現本身的邏輯的,又回到非公平鎖,而後調用
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 第一個線程已經將state改成1了 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 這裏處理可重入,每次重入+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
而後就會調用addWaiter方法,將線程封裝成Node添加到sync隊列
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這個enq方法是個入隊列操做,彷佛在死循環,其實循環兩次也就返回了,請讀者開動腦筋。
成功添加sync隊列以後,開始執行acquireQueued方法
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 此處纔是真正的死循環 for (;;) { final Node p = node.predecessor(); // 這裏要先判斷node的前驅節點是否爲頭節點,而後再嘗試獲取鎖 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
// 當前節點獲取鎖失敗後,是否要將線程park private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驅節點的等待狀態爲SIGNAL,能夠將當前線程park if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; // 前驅節點的等待狀態爲CANCELLED,則遍歷全部前驅,將全部CANCELLED前驅跳過 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 將前驅節點的等待狀態設置爲SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { // 將線程park LockSupport.park(this); // 是否中斷 return Thread.interrupted(); }
park方法最終會藉助操做系統將當前線程阻塞,與之對應的unpark方法會喚醒線程。
以上就是lock.lock()獲取鎖時的大致邏輯,lock.unlock()釋放鎖時的邏輯再也不贅述,望有心人仔細閱讀。
回過頭來再說說Condition隊列。
full.await()是ConditionObject的方法
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 將當前階段加入到Condition等待隊列 Node node = addConditionWaiter(); // 釋放鎖 int savedState = fullyRelease(node); int interruptMode = 0; // 判斷節點是否已經轉移到了sync隊列,也是一直循環 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被喚醒以後,再度嘗試獲取鎖 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter方法也比較簡單,這裏再也不分析了。
而後就到了full.signal(),signal方法作的事情很少,真正幹活的是doSignal方法
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
signal會將Condition等待隊列的頭節點,經過transferForSignal轉移到sync隊列,讓節點中的線程去競爭鎖以得到執行的機會。有一點值得注意,調用signal方法以後,頭節點中的線程並無立刻被喚醒,至於何時被喚醒,就得看sync隊列裏的節點的執行狀況了。這和wait-notify是同樣的,調用notify方法後,沒有立刻釋放鎖,只有執行完synchronized代碼後,纔會釋放鎖,讓被喚醒的線程獲取。