1、入隊操做
-
當一個線程獲取鎖失敗以後會被轉換爲Node節點,而後會使用enq方法,將該節點插入到AQS的阻塞隊列,下面看一下這個方法如何實現
private Node enq(final Node node) {
for(;;) {
Node t = tail;
if( t == null) {
if(compareAndSetHead(new Node())) {
tail = head;
}
}else {
node.prev = t;
if(compareAndSetTail(t,node)) {
t.next = node;
}
}
}
}
-
解析:進入到第一次循環,tail和head此時都是null,如圖default,而後讓t指向tail,即null,如圖I,此時都是null,而後隨便產生一個實例,也就是II所示,讓head指向它,而後執行語句,讓tail也指向它;
-
接下來進行第二次循環,讓t也指向這個實例,如圖III,而後進入到else語句,先讓node的前向指向哨兵,如圖IV;而後,把tail設置爲node,在把哨兵的next指向tail,如圖VI,也就結束了。
2、AQS-條件變量的支持
-
notify和wait方法是配合synchronized內置鎖實現線程同步的,條件變量的signal和await方法也是用來配合鎖(使用AQS實現的鎖),實現線程間同步的基礎設施
-
它們的不一樣在於,synchronized同時只能與一個共享變量的notify或者wait方法實現同步,而AQS的一個鎖能夠對應多個條件變量,咱們先看下面的例子
package com.ruigege.LockSourceAnalysis6;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestReentrantLock {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
System.out.println("begin wait");
condition.await();
System.out.println("end wait");
}catch(Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
lock.lock();
try {
System.out.println("begin signal");
condition.signal();
System.out.println("end signal");
}catch(Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
-
其實這裏的Lock對象就至關於synchronized加上了共享變量,調用lock.lock()就至關於進入了synchronized塊(獲取了共享變量的內置鎖),調用lock.unlock()方法就至關於推出了synchronized塊,調用條件變量的signal方法就至關於調用共享變量的notify方法,signalAll()至關於notifyAll()方法
-
在上面的代碼中,lock.newCondition()的做用其實就是new了一個在AQS內部聲明的ConditionObject對象,ConditionObject是AQS的內部類,能夠方法AQS內部的變量(例如狀態變量state)和方法,在每一個條件變量內部都維護了一個條件隊列,用來存放調用條件變量的await()方法時被阻塞的線程,注意這個條件隊列和AQS隊列不是一回事。
-
下面即是await方法源碼,當線程調用條件變量的await()方法時(必須先調用鎖的lock方法獲取鎖),在內部會構造一個類型爲Node.CONDITION的node節點,而後將該節點插入到條件隊列末尾,以後當前線程會釋放獲取得鎖(也就是會操做鎖對應的state變量的值),並被阻塞掛起,這時候若是有其餘線程調用lock.lock()嘗試獲取鎖,就會有一個線程獲取到鎖,若是獲取到的鎖的線程調用了條件變量的await方法,則該線程會被放入到條件變量的阻塞隊列中,而後釋放獲取到的鎖,在await方法處阻塞。
public final void await() throws InterruptedException {
if(Thread.interrupted()) {
throw new InterruptedException();
}
Node node = addContionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while(!isOnSyncQueue(node)) {
LockSupport.park(this);
if((interruprMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
}
3、源碼:
-
所在包:com.ruigege.ConcurrentListSouceCodeAnalysis5
-
https://github.com/ruigege66/ConcurrentJava
-
-
-
歡迎關注微信公衆號:傅里葉變換,我的帳號,僅用於技術交流