package com.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * Created by cxx on 2018/1/16. * * 獨佔鎖示例 */ public class Mutex implements Lock { //靜態內部類,自定義同步器 private static class Sync extends AbstractQueuedSynchronizer{ //是否處於佔用狀態 protected boolean isHeldExclusively(){ return getState() == 1; } //當狀態爲0的時候,獲取鎖 public boolean tryAcquire(int acquires){ if (compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //釋放鎖,將狀態設置爲0 public boolean tryRelease(int releases){ if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } //返回一個condition,每一個condition包含了一個condition隊列 Condition newCondition(){ return new ConditionObject(); } } /*** * 將操做代理到Sync上便可 */ private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } public boolean isLocked(){ return sync.isHeldExclusively(); } public boolean hasQueuedThreads(){ return sync.hasQueuedThreads(); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } }
如代碼所示,獨佔鎖實現了在同一時刻只能用一個線程獲取到鎖,而其餘獲取鎖的線程只能處於同步等待隊列中等待,只有獲取鎖的線程釋放了鎖,後繼的線程纔可以獲取鎖。java
#鎖、同步器、使用者node
如上的mutex鎖同樣,具體的實現代理到sync,mutex只須要向用戶定義交互方式便可。安全
獨佔鎖的獲取與釋放(包括了對同步隊列的操做)ide
acquire(int arg):獨佔式獲取同步狀態,若是當前線程獲取同步狀態成功,則由該方法返回,不然,將會進入同步隊列等待,該方法將會調用可重寫的tryAcquire(int arg)方法;工具
release(int arg):獨佔式釋放同步狀態,該方法會在釋放同步狀態以後,將同步隊列中第一個節點包含的線程喚醒;ui
共享鎖的獲取與釋放(包括了對同步隊列的操做)this
獨佔鎖的獲取與釋放的具體實現(沒有對同步隊列的操做,功能單一)線程
共享鎖的獲取與釋放(沒有對同步隊列的操做)設計
操做隊列同步器的狀態代理
在基於AQS構建的同步器中,只能在一個時刻發生阻塞,從而下降上下文切換的開銷,提升了吞吐量。同時在設計AQS時充分考慮了可伸縮行,所以J.U.C中全部基於AQS構建的同步器都可以得到這個優點。
AQS的主要使用方式是繼承,子類經過繼承同步器並實現它的抽象方法來管理同步狀態。
AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操做,固然AQS能夠確保對state的操做是安全的。
AQS經過內置的FIFO同步隊列來完成資源獲取線程的排隊工做,若是當前線程獲取同步狀態失敗時,AQS則會將當前線程以及等待狀態等信息構形成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程
當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。
在CLH同步隊列中,一個節點表示一個線程,它保存着線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、後繼節點(next),其定義以下:
static final class Node { /** 共享 */ static final Node SHARED = new Node(); /** 獨佔 */ static final Node EXCLUSIVE = null; /** * 由於超時或者中斷,節點會被設置爲取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變爲其餘狀態; */ static final int CANCELLED = 1; /** * 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行 */ static final int SIGNAL = -1; /** * 節點在等待隊列中,節點線程等待在Condition上,當其餘線程對Condition調用了signal()後,改節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中 */ static final int CONDITION = -2; /** * 表示下一次共享式同步狀態獲取將會無條件地傳播下去 */ static final int PROPAGATE = -3; /** 等待狀態 */ volatile int waitStatus; /** 前驅節點 */ volatile Node prev; /** 後繼節點 */ volatile Node next; /** 獲取同步狀態的線程 */ volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
第一,頭結點是成功獲取到同步狀態的節點,而頭結點的線程釋放了同步狀態以後,將會喚醒其後繼節點,後繼節點的線程被喚醒後須要檢查本身的前驅節點是不是頭結點。
第二,維護同步隊列的FIFO原則。
在線程獲取同步狀態時若是獲取失敗,則加入CLH同步隊列,經過經過自旋的方式不斷獲取同步狀態,可是在自旋的過程當中則須要判斷當前線程是否須要阻塞,其主要方法在acquireQueued():
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
經過這段代碼咱們能夠看到,在獲取同步狀態失敗後,線程並非立馬進行阻塞,須要檢查該線程的狀態,檢查狀態的方法爲 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,該方法主要靠前驅節點判斷當前線程是否應該被阻塞,代碼以下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驅節點 int ws = pred.waitStatus; //狀態爲signal,表示當前線程處於等待狀態,直接放回true if (ws == Node.SIGNAL) return true; //前驅節點狀態 > 0 ,則爲Cancelled,代表該節點已經超時或者被中斷了,須要從同步隊列中取消 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } //前驅節點狀態爲Condition、propagate else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
這段代碼主要檢查當前線程是否須要被阻塞,具體規則以下:
若是當前線程的前驅節點狀態爲SINNAL,則代表當前線程須要被阻塞,調用unpark()方法喚醒,直接返回true,當前線程阻塞
若是當前線程的前驅節點狀態爲CANCELLED(ws > 0),則代表該線程的前驅節點已經等待超時或者被中斷了,則須要從CLH隊列中將該前驅節點刪除掉,直到回溯到前驅節點狀態 <= 0 ,返回false
若是前驅節點非SINNAL,非CANCELLED,則經過CAS的方式將其前驅節點設置爲SINNAL,返回false
若是 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,則調用parkAndCheckInterrupt()方法阻塞當前線程:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt() 方法主要是把當前線程掛起,從而阻塞住線程的調用棧,同時返回當前線程的中斷狀態。其內部則是調用LockSupport工具類的park()方法來阻塞該方法。
當線程釋放同步狀態後,則須要喚醒該線程的後繼節點:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //喚醒後繼節點 unparkSuccessor(h); return true; } return false; }
調用unparkSuccessor(Node node)喚醒後繼節點:
private void unparkSuccessor(Node node) { //當前節點狀態 int ws = node.waitStatus; //當前狀態 < 0 則設置爲 0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //當前節點的後繼節點 Node s = node.next; //後繼節點爲null或者其狀態 > 0 (超時或者被中斷了) if (s == null || s.waitStatus > 0) { s = null; //從tail節點來找可用節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //喚醒後繼節點 if (s != null) LockSupport.unpark(s.thread); }
可能會存在當前線程的後繼節點爲null,超時、被中斷的狀況,若是遇到這種狀況了,則須要跳過該節點,可是爲什麼是從tail尾節點開始,而不是從node.next開始呢?緣由在於node.next仍然可能會存在null或者取消了,因此採用tail回溯辦法找第一個可用的線程。最後調用LockSupport的unpark(Thread thread)方法喚醒該線程。