AbstractQueuedSynchronizer是基於一個FIFO雙向鏈隊列 ==CLH隊列==,用於構建鎖或者同步裝置的類,也稱爲Java同步器,ReentrantLock的公平鎖與非公平鎖就是由該同步器構成,鏈隊列結構圖以下。node
你能夠理解爲銀行ATM機取錢,一我的先去取,獲取到了鎖,在這個時間內其餘線程處於阻塞狀態,只有等他取完錢了,他走了,釋放了鎖,排在它後面的人才能夠獲取到釋放的鎖並進行取錢。多線程
該同步器利用一個int值表示狀態,實現方式是==使用內部類繼承該同步器的方式==實現它的tryRelease、tryAcquire等方法管理狀態,管理狀態使用如下三個方法:ide
節點包含的狀態有:測試
節點其餘信息:ui
Node prev |
前驅節點 |
---|---|
Node next |
後繼節點 |
Node nextWaiter | 存儲condition隊列中的後繼節點 |
Thread thread |
入隊列時的當前線程 |
鎖在一個時間點只能被一個線程鎖佔有,AQS實現的ReentrantLock,又分爲公平鎖和非公平鎖this
保障了多線程下各線程獲取鎖的順序,先到的線程優先獲取鎖spa
加鎖時不考慮排隊等待問題,直接嘗試獲取鎖,獲取不到自動到隊尾等待線程
鎖在一個時間點能夠被多個線程同時獲取,AQS實現的CountDownLatch、ReadWriteLockcode
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}複製代碼
根據指定狀態獲取,能獲取到 執行compareAndSetState方法設置新狀態cdn
public final void acquire(int arg) {
//tryAcquire成功的話 acquire結束;
if (!tryAcquire(arg) &&
//AcquireQueued方法進行阻塞等待,直到獲取鎖爲止
//addWaiter把當前線程添加到隊列尾部
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//tryAcquire失敗而且acquiredQueued成功的話把當前線程中斷
selfInterrupt();
}複製代碼
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//獲取狀態
int c = getState();
//若是該鎖未被任何線程佔有,該鎖能被當前線程獲取
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//若被獲取到 查看是否被當前線程
else if (current == getExclusiveOwnerThread()) {
//是當前線程的話再次獲取,計數+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}複製代碼
在tryAcquire方法中使用了同步器提供的對state操做的方法,利用CAS原理保證只有一個線程可以對狀態進行成功修改,而沒有成功修改的線程將進入隊列排隊。
AcquireQueued方法進行阻塞等待,直到獲取鎖爲止
//node爲null,排他方式阻塞等待
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//p爲當前節點的前一個節點
final Node p = node.predecessor();
//若是p爲頭結點而且獲取成功就把當前線節點設置爲頭結點
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//若是線程須要被阻塞 則interrputr爲true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}複製代碼
判斷線程是否須要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//線程須要運行
if (ws == Node.SIGNAL)
return true;
//ws>0 處於CANCEL狀態的線程
if (ws > 0) {
//把這些線程從隊列中清除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//把等待的設置爲運行狀態
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}複製代碼
// park方法讓其餘線程處於等待狀態
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}複製代碼
調用release釋放一個鎖
public void unlock() {
//釋放一個鎖
sync.release(1);
}複製代碼
釋放鎖
public final boolean release(int arg) {
//調用tryRelease嘗試釋放一個鎖
if (tryRelease(arg)) {
Node h = head;
//釋放成功後
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}複製代碼
嘗試釋放當前鎖
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}複製代碼
鎖釋放後喚醒線程,一同競爭CPU資源
private void unparkSuccessor(Node node) {
//獲取當前節點狀態
int ws = node.waitStatus;
//把狀態設置爲等待獲取鎖狀態
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//若是沒有下一個節點或者下一個節點狀態爲CANCEL,則把它們清除
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//不然調用LockSupport中unpark方法,喚醒後一個節點
if (s != null)
LockSupport.unpark(s.thread);
}複製代碼
調用UNSAFE的本地方法unpark喚醒線程
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}複製代碼
AQS實現ReentrantLock公平鎖與非公平鎖最大的區別在下面這段代碼:
源碼以下:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}複製代碼
==判斷"當前線程"是否是在CLH隊列的隊首,來實現公平性==。
public class MyAQSLock implements Lock {
private final Sync sync;
public MyAQSLock() {
sync = new Sync();
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
/**
* 把lock、unlock實現使用AQS構建爲內部類
*/
private class Sync extends AbstractQueuedSynchronizer{
Condition newCondition(){
return new ConditionObject();
}
@Override
protected boolean tryAcquire(int arg) {
//第一個線程進來拿到鎖
int state = getState();
//用於重入鎖判斷
Thread current = Thread.currentThread();
if(state==0){
if(compareAndSetState(0,arg)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
//重入鎖判斷 當前線程和獨佔鎖線程相同,則再次獲取
else if(current==getExclusiveOwnerThread()){
int next = state+arg;
if(next<0){
throw new RuntimeException();
}
setState(next);
return true;
}
return false;
}
/**
* 可重入釋放鎖
* @param arg
* @return
*/
@Override
protected boolean tryRelease(int arg) {
if(Thread.currentThread()!=getExclusiveOwnerThread()){
throw new RuntimeException();
}
int state = getState()-arg;
if(state==0){
setExclusiveOwnerThread(null);
setState(0);
return true;
}
setState(0);
return false;
}
}
}複製代碼
public class TestAQSLock2 {
MyAQSLock myLock = new MyAQSLock();
private int value;
private int value2;
public int a(){
myLock.lock();
try {
b();
return value++;
}finally {
myLock.unlock();
}
}
public void b(){
myLock.lock();
try {
System.out.println(++value2);
}finally {
myLock.unlock();
}
}
public static void main(String[] args) {
TestAQSLock2 myLock = new TestAQSLock2();
for(int i=0;i<50;i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + ":" + myLock.a());
}).start();
}
}
}複製代碼