上一篇分析AQS的內部結構,其中有介紹AQS是什麼,以及它的內部結構的組成,那麼今天就來分析下前面說的內部結構在AQS中的具體做用(主要在具體實現中體現)。html
上篇有說到AQS是抽象類,而它的設計是基於模板方法模式的,也就是說:使用者須要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步組件的實現中,並調用其提供的模板方法。其中須要子類重寫的方法與描述以下表:java
方法名稱 | 描述 |
protected boolean tryAcquire(int arg) | 嘗試以獨佔模式獲取。 此方法應查詢對象的狀態是否容許以獨佔模式獲取它,若是是,則獲取它。node 實現該方法須要查詢當前狀態並判斷同步狀態是否預期,而後進行CAS設置同步狀態。ide |
protected boolean tryRelease(int arg) | 嘗試釋放獨佔式的同步狀態。函數 等待獲取同步狀態的線程將有機會獲取同步狀態。測試 |
protected int tryAcquireShared(int arg) | 嘗試以共享模式獲取。 此方法應查詢對象的狀態是否容許在共享模式下獲取它,若是是,則獲取。ui 實現該方法須要查詢當前狀態並判斷同步狀態是否預期,而後進行CAS設置同步狀態。this |
protected boolean tryReleaseShared(int arg) | 嘗試釋放共享式的同步狀態。spa |
protected boolean isHeldExclusively() | 表示當前同步器是否在獨佔模式下被線程佔用。 |
在重寫上面這些方法時,可能須要下面這三個方法(注意其中state是使用volatile關鍵字修飾的)線程
方法名 | 描述 |
protected final int getState() | 獲取當前的同步狀態 |
protected final void setState(int newState) | 設置當前同步狀態 |
protected final boolean compareAndSetState (int expect, int update) |
使用CAS設置當前狀態,該方法能保證狀態設置的原子性 |
其實前面這些都不須要關心,由於這些通常都是在自定義同步組件中實現。自定義同步組件除了重寫第一個表格那些方法外,AQS還爲其提供了一些公共方法(或者說模板方法),這些纔是關鍵,也是重中之重。下面我先簡單列出以及其方法描述,後面一一分析:
方法名稱 | 描述 |
public final void acquire(int arg) | 獨佔式獲取同步狀態,忽略中斷。 若是當前線程獲取同步狀態成功,則由該方法返回;不然將會進入同步隊列等待( 即上篇說的Node節點隊列)。 該方法將會調用重寫的tryAcquire(int args)方法。 |
public final void acquireInterruptibly(int arg) |
與acquire(int args)方法同樣,可是該方法響應中斷(從方法名就大概知道意思了吧。) 當前線程未獲取到同步狀態而進入同步隊列中,若是當前線程被中斷,則該方法會拋出InterruptedException異常 |
public final boolean release(int arg) | 獨佔式的釋放同步狀態, 該方法會在釋放同步狀態後將同步隊列中第一個節點包含的線程喚醒。 該方法會調用tryRelease(int args)方法 |
public final void acquireShared(int arg) | 共享式獲取同步狀態,忽略中斷。 若是當前線程獲取同步狀態成功,則由該方法返回;不然將會進入同步隊列等待 (即上篇說的Node節點隊列)。 與獨佔式獲取的主要區別是在同一時刻能夠有多個線程獲取到同步狀態。 該方法將會調用重寫的tryAcquireShare(int args)方法。 |
public final void acquireSharedInterruptibly(int arg) | 與acquireInterruptibly方法相同 |
public final boolean releaseShared(int arg) |
共享式的釋放同步狀態 該方法會調用tryReleaseShared(int args)方法 |
根據上面提供的方法,同步器主要提供兩種模式:獨佔式和共享式。顧名思義,獨佔式表示同一時刻只有一個線程纔會獲取到同步狀態,而其餘線程都得等待;而共享式就容許同一時刻能夠多個線程獲取到同步狀態。至於示例的話,你們能夠查看源碼類上註釋的Mutx類,表示一個自定義的獨佔鎖。下面我仍是直接貼出示例代碼。
class Mutex implements Lock, java.io.Serializable { // 內部類,自定義同步器 private static class Sync extends AbstractQueuedSynchronizer { // 是否處於佔用狀態 protected boolean isHeldExclusively() { return getState() == 1; } // 當狀態爲0的時候獲取鎖 public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { // 將當前線程設置爲獨佔線程 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 釋放鎖,將狀態設置爲0 protected boolean tryRelease(int releases) { assert releases == 1; // 斷言 if (getState() == 0) throw new IllegalMonitorStateException(); // 將線程或狀態 重置爲初始值 setExclusiveOwnerThread(null); setState(0); return true; } // 返回一個Condition,每一個condition都包含了一個condition隊列 Condition newCondition() { return new ConditionObject(); } } // 僅須要將操做代理到Sync上便可 private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
看了下自定義的獨佔鎖Metux(上面代碼來自源碼),寫個案例測試下它究竟是否是獨佔鎖(你們應該知道怎麼測試吧)。
public class MutexTest { private Lock lock ; private MutexTest(Lock lock) { this.lock = lock; } public void runTask() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " 執行任務中..."); Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + " 任務執行完成。"); } catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public static void main(String[] args) { Lock lock = new Mutex(); final MutexTest test = new MutexTest(lock); for (int i = 0; i < 5; i ++) { new Thread(new Runnable() { @Override public void run() { test.runTask(); } }, "線程" + i).start(); } } }
運行該案例從打印結果中能夠看出,同一時刻只有一個線程在執行(這就是獨佔鎖的特性)。
線程0 執行任務中...
線程0 任務執行完成。
線程2 執行任務中...
線程2 任務執行完成。
線程1 執行任務中...
線程1 任務執行完成。
線程3 執行任務中...
線程3 任務執行完成。
線程4 執行任務中...
線程4 任務執行完成。
關於獲取和釋放下面只分析acquire函數和release函數,由於其餘都與這個函數相似。
/** * 獨佔式獲取同步狀態,忽略中斷。 */ public final void acquire(int arg) { /** * 1 調用子類的tryAcquiref(arg)方法,若是獲取成功則直接返回,不然以獨佔模式建立節點加入等待隊列 */ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire函數主要功能都放在這三個方法中:
/** * 2 根據給定模式爲當前線程建立並排隊節點。 */ private Node addWaiter(Node mode) { // 2.1 根據跟定的模式和當前線程建立節點。(在這就用的上Node了) Node node = new Node(Thread.currentThread(), mode); // 2.2 嘗試下快速通道:判斷tail節點是否爲空,若是不爲空就直接添加到尾節點後面。 Node pred = tail; if (pred != null) { node.prev = pred; // 2.2.1 進入到這個方法說明線程並無獲取鎖,因此須要CAS保證原子性。 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 2.3 若是是第一個入隊的節點或者compareAndSetTail設置失敗,那麼就進入enq()方法 enq(node); return node; } /** * 將節點插入隊列,必要時進行初始化。 */ private Node enq(final Node node) { // 自旋,直至設置添加尾節點成功。 for (;;) { Node t = tail; if (t == null) { // 2.3.1 尾節點爲空,則須要初始化隊列(同理採起CAS保證原子性) if (compareAndSetHead(new Node())) tail = head; } else { // 2.3.2 尾節點不爲空,則將節點設置成尾節點(同理採起CAS保證原子性) node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
上述邏輯主要包括:使用當前線程建立節點,而後將當前節點添加到同步隊列中。其中設置節點都是利用CAS設置,保證原子性。
具體流程:
a 先行嘗試在隊尾添加(若是尾節點不爲空)(另外這一步很重要,若是尾節點存在就能夠以最短路徑O(1)的效果來完成線程入隊,是最大化減小開銷的一種方式):
b 若是是第一個入隊的節點或者compareAndSetTail設置失敗:
若是尾節點爲空,則須要初始化隊列(同理採起CAS保證原子性),繼續自旋判斷;
重複上面a步驟將節點嘗試添加至尾節點後,直接添加成功。
/** * 3 對於已經在隊列中的線程,以獨佔不間斷模式獲取。 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 一樣採起自旋直至條件知足 for (;;) { // 3.1 獲取當前節點的前驅節點 final Node p = node.predecessor(); // 3.2 判斷前驅節點是否爲頭節點,並此時是否能夠獲取到同步狀態 if (p == head && tryAcquire(arg)) { // 3.2.1 若上面條件知足,則將當前節點設置爲頭節點。 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
上述邏輯主要包括:
若是知足上述條件,那麼表明可以佔有鎖,根據節點對鎖佔有的含義,設置頭結點爲當前節點。
若是沒有輪到當前節點運行,那麼將當前線程從線程調度器上摘下,也就是進入等待狀態。也就是調用shouldParkAfterFailedAcquire和parkAndCheckInterrupt函數
下面看下它是怎麼將不知足節點摘下來進入等待狀態的。
/** * 檢查並更新獲取失敗的節點的狀態。 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 狀態處於SIGNAL狀態(-1),表示後繼節點隨時能夠upark */ return true; if (ws > 0) { /* * ws > 0表示處於CANCELLED狀態,則須要跳過找到node節點前面不處於取消狀態的節點。 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 此時ws爲PROPAGATE -3 或者是0 表示無狀態,(爲CONDITION -2時,表示此節點在condition queue中) * 比較並設置前驅結點的狀態爲SIGNAL */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } /** * 此時還不肯定Node的前置節點是否處於SIGNAL狀態 * 因此不支持park操做 */ return false; } /** * 進行park操做而且返回該線程是否被中斷 */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
上述主要邏輯包括:
那麼acquire函數分析到這就結束了,估計看了一遍仍是不太清晰流程那麼就多看幾遍。下面也對這個流程進行總結下:
/** * 以獨佔模式釋放 */ public final boolean release(int arg) { // tryRelease由子類實現 if (tryRelease(arg)) { // 獲取頭結點 Node h = head; // 頭結點不爲空而且頭結點狀態不爲0 if (h != null && h.waitStatus != 0) // 釋放頭結點的後繼結點 unparkSuccessor(h); return true; } return false; } /** * 喚醒後繼節點 */ private void unparkSuccessor(Node node) { // 獲取節點狀態 int ws = node.waitStatus; // 若是節點狀態小於,則將其設置爲初始狀態。 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 若是節點狀態是取消或節點爲空,則從尾部向後移動以找到實際未取消的繼任者。 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
上述邏輯主要包括:
tryRelease可以保證原子化的將狀態設置回去,固然須要使用compareAndSet來保證。若是釋放狀態成功過以後,將會進入後繼節點的喚醒過程。
經過LockSupport的unpark方法將休眠中的線程喚醒,讓其繼續acquire狀態。
另外送你們一碗心靈雞湯:)
我從不相信什麼懶洋洋的自由,我向往的自由是經過勤奮和努力實現更廣闊的人生,那樣的自由纔是珍貴的、有價值的。我相信一萬小時定律,我歷來不相信天上掉餡餅的靈感和坐等的成就。作一個自由又自律的人,靠勢必實現的決心認真地活着。