JAVA LOCK代碼淺析

JAVALOCK整體來講關鍵要素主要包括3點:
1.unsafe.compareAndSwapXXX(Objecto,longoffset,intexpected,intx)
2.unsafe.park()和unsafe.unpark()
3.單向鏈表結構或者說存儲線程的數據結構java

第1點
主要爲了保證鎖的原子性,至關於一個鎖是否正在被使用的標記,而且比較和設置這個標記的操做是原子的(硬件提供的swap和test_and_set指令,單CPU下同一指令的多個指令週期不可中斷,SMP中經過鎖總線支持上訴兩個指令的原子性),這基本等於軟件級別所能達到的最高級別隔離。node

第2點
主要將未獲得鎖的線程禁用(park)和喚醒(unpark),也是直接native實現(這幾個native方法的實現代碼在hotspotsrcsharevmprimsunsafe.cpp文件中,可是關鍵代碼park的最終實現是和操做系統相關的,好比windows下實現是在os_windows.cpp中,有興趣的同窗能夠下載jdk源碼查看)。喚醒一個被park()線程主要手段包括如下幾種
1.其餘線程調用以被park()線程爲參數的unpark(Threadthread).
2.其餘線程中斷被park()線程,如waiters.peek().interrupt();waiters爲存儲線程對象的隊列.
3.不知緣由的返回。編程

park()方法返回並不會報告究竟是上訴哪一種返回,因此返回好最好檢查下線程狀態,如windows

[java]
LockSupport.park(); //禁用當前線程
if(Thread.interrupted){
//doSomething
}[/java]
AbstractQueuedSynchronizer(AQS)對於這點實現得至關巧妙,以下所示
[java]
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}

//parkAndCheckInterrupt()會返回park住的線程在被unpark後的線程狀態,若是線程中斷,跳出循環。數據結構

if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}

//只有線程被interrupt後纔會走到這裏多線程

cancelAcquire(node);
throw new InterruptedException();
}

//在park()住的線程被unpark()後,第一時間返回當前線程是否被打斷併發

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
[/java]

第3點對於一個Synchronizer的實現很是重要,存儲等待線程,而且unlock時喚醒等待線程,這中間有不少工做須要作,喚醒策略,等待線程意外終結處理,公平非公平,可重入不可重入等。ide

以上簡單說明了下JAVALOCKS關鍵要素,如今咱們來看下java.util.concurrent.locks大體結構ui

203503400.jpg上圖中,LOCK的實現類其實都是構建在AbstractQueuedSynchronizer上,爲什麼圖中沒有用UML線表示呢,這是每一個Lock實現類都持有本身內部類Sync的實例,而這個Sync就是繼承AbstractQueuedSynchronizer(AQS)。爲什麼要實現不一樣的Sync呢?這和每種Lock用途相關。另外還有AQS的State機制。this

基於AQS構建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch,ReetrantReadWriteLock,FutureTask等,這些Synchronizer實際上最基本的東西就是原子狀態的獲取和釋放,只是條件不同而已。

ReentrantLock須要記錄當前線程獲取原子狀態的次數,若是次數爲零,那麼就說明這個線程放棄了鎖(也有可能其餘線程佔據着鎖從而須要等待),若是次數大於1,也就是得到了重進入的效果,而其餘線程只能被park住,直到這個線程重進入鎖次數變成0而釋放原子狀態。如下爲ReetranLock的FairSync的tryAcquire實現代碼解析。
[java]
//公平獲取鎖

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();

//若是當前重進入數爲0,說明有機會取得鎖

if (c == 0) {

//若是是第一個等待者,而且設置重進入數成功,那麼當前線程得到鎖

if (isFirst(current) &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}

//若是當前線程自己就持有鎖,那麼疊加劇進入數,而且繼續得到鎖

else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}

//以上條件都不知足,那麼線程進入等待隊列。

return false;
}
[/java]

Semaphore則是要記錄當前還有多少次許可可使用,到0,就須要等待,也就實現併發量的控制,Semaphore一開始設置許可數爲1,實際上就是一把互斥鎖。如下爲Semaphore的FairSync實現

[java]
protected int tryAcquireShared(int acquires) {
Thread current = Thread.currentThread();
for (;;) {
Thread first = getFirstQueuedThread();

//若是當前等待隊列的第一個線程不是當前線程,那麼就返回-1表示當前線程須要等待

if (first != null && first != current)
return -1;

//若是當前隊列沒有等待者,或者當前線程就是等待隊列第一個等待者,那麼先取得semaphore還有幾個許可證,而且減去當前線程須要的許可證獲得剩下的值

int available = getState();
int remaining = available – acquires;

//若是remining<0,那麼反饋給AQS當前線程須要等待,若是remaining>0,而且設置availble成功設置成剩餘數,那麼返回剩餘值(>0),也就告知AQS當前線程拿到許可,能夠繼續執行。

if (remaining < 0 ||compareAndSetState(available, remaining))
return remaining;
}
}
[/java]

CountDownLatch閉鎖則要保持其狀態,在這個狀態到達終止態以前,全部線程都會被park住,閉鎖能夠設定初始值,這個值的含義就是這個閉鎖須要被countDown()幾回,由於每次CountDown是sync.releaseShared(1),而一開始初始值爲10的話,那麼這個閉鎖須要被countDown()十次,纔可以將這個初始值減到0,從而釋放原子狀態,讓等待的全部線程經過。

[java]

//await時候執行,只查看當前須要countDown數量減爲0了,若是爲0,說明能夠繼續執行,不然須要park住,等待countDown次數足夠,而且unpark全部等待線程

public int tryAcquireShared(int acquires) {
return getState() == 0? 1 : -1;
}

//countDown時候執行,若是當前countDown數量爲0,說明沒有線程await,直接返回false而不須要喚醒park住線程,若是不爲0,獲得剩下須要countDown的數量而且compareAndSet,最終返回剩下的countDown數量是否爲0,供AQS斷定是否釋放全部await線程。

public boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
[/java]

FutureTask須要記錄任務的執行狀態,當調用其實例的get方法時,內部類Sync會去調用AQS的acquireSharedInterruptibly()方法,而這個方法會反向調用Sync實現的tryAcquireShared()方法,即讓具體實現類決定是否讓當前線程繼續仍是park,而FutureTask的tryAcquireShared方法所作的惟一事情就是檢查狀態,若是是RUNNING狀態那麼讓當前線程park。而跑任務的線程會在任務結束時調用FutureTask實例的set方法(與等待線程持相同的實例),設定執行結果,而且經過unpark喚醒正在等待的線程,返回結果。

[java]

//get時待用,只檢查當前任務是否完成或者被Cancel,若是未完成而且沒有被cancel,那麼告訴AQS當前線程須要進入等待隊列而且park住

protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}

//斷定任務是否完成或者被Cancel

boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}

//get時調用,對於CANCEL與其餘異常進行拋錯

V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0,nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}

//任務的執行線程執行完畢調用(set(Vv))

void innerSet(V v) {
for (;;) {
int s = getState();
//若是線程任務已經執行完畢,那麼直接返回(多線程執行任務?)
if (s == RAN)
return;
//若是被CANCEL了,那麼釋放等待線程,而且會拋錯
if (s == CANCELLED) {
releaseShared(0);
return;
}

//若是成功設定任務狀態爲已完成,那麼設定結果,unpark等待線程(調用get()方法而阻塞的線程),以及後續清理工做(通常由FutrueTask的子類實現)

if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}
}
[/java]

以上4個AQS的使用是比較典型,然而有個問題就是這些狀態存在哪裏呢?而且是能夠計數的。從以上4個example,咱們能夠很快獲得答案,AQS提供給了子類一個intstate屬性。而且暴露給子類getState()和setState()兩個方法(protected)。這樣就爲上述狀態解決了存儲問題,RetrantLock能夠將這個state用於存儲當前線程的重進入次數,Semaphore能夠用這個state存儲許可數,CountDownLatch則能夠存儲須要被countDown的次數,而Future則能夠存儲當前任務的執行狀態(RUNING,RAN,CANCELL)。其餘的Synchronizer存儲他們的一些狀態。

AQS留給實現者的方法主要有5個方法,其中tryAcquire,tryRelease和isHeldExclusively三個方法爲須要獨佔形式獲取的synchronizer實現的,好比線程獨佔ReetranLock的Sync,而tryAcquireShared和tryReleasedShared爲須要共享形式獲取的synchronizer實現。

ReentrantLock內部Sync類實現的是tryAcquire,tryRelease,isHeldExclusively三個方法(由於獲取鎖的公平性問題,tryAcquire由繼承該Sync類的內部類FairSync和NonfairSync實現)Semaphore內部類Sync則實現了tryAcquireShared和tryReleasedShared(與CountDownLatch類似,由於公平性問題,tryAcquireShared由其內部類FairSync和NonfairSync實現)。CountDownLatch內部類Sync實現了tryAcquireShared和tryReleasedShared。FutureTask內部類Sync也實現了tryAcquireShared和tryReleasedShared。

其實使用過一些JAVAsynchronizer的以後,而後結合代碼,可以很快理解其究竟是如何作到各自的特性的,在把握了基本特性,即獲取原子狀態和釋放原子狀態,其實咱們本身也能夠構造synchronizer。以下是一個LOCKAPI的一個例子,實現了一個先入先出的互斥鎖。

[java]
public class FIFOMutex {
private AtomicBoolean locked=new AtomicBoolean(false);
private Queue<Thread> waiters=new ConcurrentLinkedQueue<Thread>();
public void lock(){
boolean wasInterrupted=false;
Thread current=Thread.currentThread();
waiters.add(current);

//若是waiters的第一個等待者不爲當前線程,或者當前locked的狀態爲被佔用(true)

//那麼park住當前線程

while(waiters.peek()!=current||!locked.compareAndSet(false, true)){
LockSupport.park();

//當線程被unpark時,第一時間檢查當前線程是否被interrupted

if(Thread.interrupted()){
wasInterrupted=true;
}
}

//獲得鎖後,從等待隊列移除當前線程,若是,而且若是當前線程已經被interrupted,

//那麼再interrupt一下以便供外部響應。

waiters.remove();
if(wasInterrupted){
current.interrupt();
}
}

//unlock邏輯相對簡單,設定當前鎖爲空閒狀態,而且將等待隊列中

//的第一個等待線程喚醒

public void unlock(){
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}
[/java]

總結,JAVAlock機制對於整個javaconcurrent包的成員意義重大,瞭解這個機制對於使用java併發類有着不少的幫助,文章中可能存在着各類錯誤,請各位多多諒解而且可以提出來,謝謝。

文章參考:JDK1.6sourcejava併發編程實踐JDK1.6API文檔

相關文章
相關標籤/搜索