在Java中一般實現鎖有兩種方式,一種是synchronized關鍵字,另外一種是Lock。兩者其實並無什麼必然聯繫,可是各有各的特色,在使用中能夠進行取捨的使用。首先咱們先對比下二者。java
首先最大的不一樣:synchronized是基於JVM層面實現的,而Lock是基於JDK層面實現的。曾經反覆的找過synchronized的實現,惋惜最終無果。但Lock倒是基於JDK實現的,咱們能夠經過閱讀JDK的源碼來理解Lock的實現。node
對於使用者的直觀體驗上Lock是比較複雜的,須要lock和realse,若是忘記釋放鎖就會產生死鎖的問題,因此,一般須要在finally中進行鎖的釋放。可是synchronized的使用十分簡單,只須要對本身的方法或者關注的同步對象或類使用synchronized關鍵字便可。可是對於鎖的粒度控制比較粗,同時對於實現一些鎖的狀態的轉移比較困難。例如:安全
tips | synchronized | Lock |
---|---|---|
鎖獲取超時 | 不支持 | 支持 |
獲取鎖響應中斷 | 不支持 | 支持 |
在JDK1.5以後synchronized引入了偏向鎖,輕量級鎖和重量級鎖,從而大大的提升了synchronized的性能,同時對於synchronized的優化也在繼續進行。期待有一天能更簡單的使用java的鎖。併發
在之前不瞭解Lock的時候,感受Lock使用實在是太複雜,可是瞭解了它的實現以後就被深深吸引了。less
Lock的實現主要有ReentrantLock、ReadLock和WriteLock,後二者接觸的很少,因此簡單分析一下ReentrantLock的實現和運行機制。oop
ReentrantLock類在java.util.concurrent.locks包中,它的上一級的包java.util.concurrent主要是經常使用的併發控制類.性能
下面是ReentrantLock的UML圖,從圖中能夠看出,ReentrantLock實現Lock接口,在ReentrantLock中引用了AbstractQueuedSynchronizer的子類,全部的同步操做都是依靠AbstractQueuedSynchronizer(隊列同步器)實現。學習
研究一個類,須要從一個類的靜態域,靜態類,靜態方法和成員變量開始。優化
private static final long serialVersionUID = 7373984872572414699L; /** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is * implemented in subclasses, but both need nonfair * try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes this lock instance from a stream. * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }
從上面的代碼能夠看出來首先ReentrantLock是可序列化的,其次是ReentrantLock裏有一個對AbstractQueuedSynchronizer的引用。ui
看完了成員變量和靜態域,咱們須要瞭解下構造方法:
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
從上面代碼能夠看出,ReentrantLock支持兩種鎖模式,公平鎖和非公平鎖。默認的實現是非公平的。公平和非公平鎖的實現以下:
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
AbstractQueuedSynchronizer 是一個抽象類,因此在使用這個同步器的時候,須要經過本身實現預期的邏輯,Sync、FairSync和NonfairSync都是ReentrantLock爲了實現本身的需求而實現的內部類,之因此作成內部類,我認爲是隻在ReentrantLock使用上述幾個類,在外部沒有使用到。
咱們着重關注默認的非公平鎖的實現:
在ReentrantLock調用lock()的時候,調用的是下面的代碼:
/** * Acquires the lock. * * <p>Acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. * * <p>If the current thread already holds the lock then the hold * count is incremented by one and the method returns immediately. * * <p>If the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until the lock has been acquired, * at which time the lock hold count is set to one. */ public void lock() { sync.lock(); }
sync的實現是NonfairSync,因此調用的是NonfairSync的lock方法:
/** * Sync object for non-fair locks * tips:調用Lock的時候,嘗試獲取鎖,這裏採用的CAS去嘗試獲取鎖,若是獲取鎖成功 * 那麼,當前線程獲取到鎖,若是失敗,調用acquire處理。 * */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
接下來看看compareAndSetState方法是怎麼進行鎖的獲取操做的:
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a <tt>volatile</tt> read * and write. * * @param expect the expected value * @param update the new value * @return true if successful. False return indicates that the actual * value was not equal to the expected value. * * tips: 1.compareAndSetState的實現主要是經過Unsafe類實現的。 * 2.之因此命名爲Unsafe,是由於這個類對於JVM來講是不安全的,咱們平時也是使用不了這個類的。 * 3.Unsafe類內封裝了一些能夠直接操做指定內存位置的接口,是否是感受和C有點像了? * 4.Unsafe類封裝了CAS操做,來達到樂觀的鎖的爭搶的效果 */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
主要的說明都在方法的註釋中,接下來簡單的看一下 compareAndSwapInt的實現:
/** * Atomically update Java variable to <tt>x</tt> if it is currently * holding <tt>expected</tt>. * @return <tt>true</tt> if successful */ public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
一個native方法,沮喪.....可是從註釋看意思是,以CAS的方式將制定字段設置爲指定的值。同時咱們也明白了這個方法多是用java實現不了,只能依賴JVm底層的C代碼實現。下面看看操做的stateOffset:
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { //這個方法頗有意思,主要的意思是獲取AbstractQueuedSynchronizer的state成員的偏移量 //經過這個偏移量來更新state成員,另外state是volatile的來保證可見性。 stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } }
stateOffset 是AbstractQueuedSynchronizer內部定義的一個狀態量,AbstractQueuedSynchronizer是線程的競態條件,因此只要某一個線程CAS改變狀態成功,同時在沒有釋放的狀況下,其餘線程必然失敗(對於Unsafe類還不是很熟悉,後面還須要系統的學習)。
對於競爭成功的線程會調用 setExclusiveOwnerThread方法:
/** * The current owner of exclusive mode synchronization. */ private transient Thread exclusiveOwnerThread; /** * Sets the thread that currently owns exclusive access. A * <tt>null</tt> argument indicates that no thread owns access. * This method does not otherwise impose any synchronization or * <tt>volatile</tt> field accesses. */ protected final void setExclusiveOwnerThread(Thread t) { exclusiveOwnerThread = t; }
這個實現是比較簡單的,只是獲取當前線程的引用,令AbstractOwnableSynchronizer中的exclusiveOwnerThread引用到當前線程。競爭失敗的線程,會調用acquire方法,這個方法也是ReentrantLock設計的精華之處:
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * tips:此處主要是處理沒有獲取到鎖的線程 * tryAcquire:從新進行一次鎖獲取和進行鎖重入的處理。 * addWaiter:將線程添加到等待隊列中。 * acquireQueued:自旋獲取鎖。 * selfInterrupt:中斷線程。 * 三個條件的關係爲and,若是 acquireQueued返回true,那麼線程被中斷selfInterrupt會中斷線程 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
AbstractQueuedSynchronizer爲抽象方法,調用tryAcquire時,調用的爲NonfairSync的tryAcquire。
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
/** * Performs non-fair tryLock. tryAcquire is * implemented in subclasses, but both need nonfair * try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
nonfairTryAcquire方法主要是作重入鎖的實現,synchronized自己支持鎖的重入,而ReentrantLock則是經過此處實現。在鎖狀態爲0時,從新嘗試獲取鎖。若是已經被佔用,那麼作一次是否當前線程爲佔用鎖的線程的判斷,若是是同樣的那麼進行計數,固然在鎖的relase過程當中會進行遞減,保證鎖的正常釋放。
若是沒有從新獲取到鎖或者鎖的佔用線程和當前線程是一個線程,方法返回false。那麼把線程添加到等待隊列中,調用addWaiter:
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這裏主要是用當前線程構建一個Node的等待隊列雙向鏈表,這裏addWaiter中和enq中的部分邏輯是重複的,我的感受多是若是能一次成功就避免了enq中的死循環。由於tail節點是volatile的同時node也是不會發生競爭的因此node.prev = pred;是安全的。可是tail的next是不斷競爭的,因此利用compareAndSetTail保證操做的串行化。接下來調用acquireQueued方法:
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
此處是作Node節點線程的自旋過程,自旋過程主要檢查當前節點是否是head節點的next節點,若是是,則嘗試獲取鎖,若是獲取成功,那麼釋放當前節點,同時返回。至此一個非公平鎖的鎖獲取過程結束。
若是這裏一直不斷的循環檢查,實際上是很耗費性能的,JDK的實現確定不會這麼「弱智」,因此有了shouldParkAfterFailedAcquire和parkAndCheckInterrupt,這兩個方法就實現了線程的等待從而避免無限的輪詢:
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
首先,檢查一下當前Node的前置節點pred是不是SIGNAL,若是是SIGNAL,那麼證實前置Node的線程已經Park了,若是waitStatus>0,那麼當前節點已經Concel或者中斷。那麼不斷調整當前節點的前置節點,將已經Concel的和已經中斷的線程移除隊列。若是waitStatus<0,那麼設置waitStatus爲SIGNAL,由於調用shouldParkAfterFailedAcquire的方法爲死循環調用,因此終將返回true。接下來看parkAndCheckInterrupt方法,當shouldParkAfterFailedAcquire返回True的時候執行parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
此方法比較簡單,其實就是使當前的線程park,即暫停了線程的輪詢。當Unlock時會作後續節點的Unpark喚醒線程繼續爭搶鎖。
接下來看一下鎖的釋放過程,鎖釋放主要是經過unlock方法實現:
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); }
主要是調用AbstractQueuedSynchronizer同步器的release方法:
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease方法爲ReentrantLock中的Sync的tryRelease方法:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
tryRelease方法主要是作了一個釋放鎖的過程,將同步狀態state -1,直到減到0爲止,這主要是兼容重入鎖設計的,同時setExclusiveOwnerThread(null)清除當前佔用的線程。這些head節點後的線程和新進的線程就能夠開始爭搶。這裏須要注意的是對於同步隊列中的線程來講在setState(c),且c爲0的時候,同步隊列中的線程是沒有競爭鎖的,由於線程被park了尚未喚醒。可是此時對於新進入的線程是有機會獲取到鎖的。
下面代碼是進行線程的喚醒:
Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true;
由於在setState(c)釋放了鎖以後,是沒有線程競爭的,因此head是當前的head節點,先檢查當前的Node是否合法,若是合法則unpark it。開始鎖的獲取。就回到了上面的for循環執行獲取鎖邏輯:
至此鎖的釋放就結束了,能夠看到ReentrantLock是一個不斷的循環的狀態模型,裏面有不少東西值得咱們學習和思考。
ReentrantLock具備公平和非公平兩種模式,也各有優缺點:
公平鎖是嚴格的以FIFO的方式進行鎖的競爭,可是非公平鎖是無序的鎖競爭,剛釋放鎖的線程很大程度上能比較快的獲取到鎖,隊列中的線程只能等待,因此非公平鎖可能會有「飢餓」的問題。可是重複的鎖獲取能減少線程之間的切換,而公平鎖則是嚴格的線程切換,這樣對操做系統的影響是比較大的,因此非公平鎖的吞吐量是大於公平鎖的,這也是爲何JDK將非公平鎖做爲默認的實現。
關於併發和Lock還有不少的點仍是比較模糊,我也會繼續學習,繼續總結,若是文章中有什麼問題,還請各位看客及時指出,共同窗習。