AQS(AbstractQueuedSynchronizer)

1、概述

AQS是AbstractQueuedSynchronizer(抽象隊列同步器)的縮寫。它是多線程訪問共享資源的框架,ReentrantLock、CountDownLatch、Semaphore等都是基於它來實現的。node

image
從圖中能夠看到,有兩個關鍵的組成部分,一個是state(共享資源,也能夠理解爲資源佔用計數器),另外一個是FIFO隊列,用來保存須要得到共享資源的縣城,其head節點始終指向當前真在佔用共享資源的線程。多線程

進入等待隊列的線程會被封裝成一個Node。其主要成員以下:框架

class Node {
    //在同步隊列中等待的線程等待超時或被中斷,須要從同步隊列中取消該Node的結點,其結點的waitStatus爲CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。
    static final int CANCELLED =  1;
    //值爲-1,被標識爲該等待喚醒狀態的後繼結點,當其前繼結點的線程釋放了同步鎖或被取消,將會通知該後繼結點的線程執行。說白了,就是處於喚醒狀態,只要前繼結點釋放鎖,就會通知標識爲SIGNAL狀態的後繼結點的線程執行。
    static final int SIGNAL    = -1;
    //與Condition相關,該標識的結點處於等待隊列中,結點的線程等待在Condition上,當其餘線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
    static final int CONDITION = -2;
    //與共享模式相關,在共享模式中,該狀態標識結點的線程處於可運行狀態。
    static final int PROPAGATE = -3;
    //節點的等待狀態,默認0狀態:值爲0,表明初始化狀態。
    volatile int waitStatus;
    //前驅結點
    volatile Node prev;
    //後驅節點
    volatile Node next;
    //目標線程
    volatile Thread thread;
    //獲取前驅結點
    final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
        }
}

對於共享資源state的修改,除了提供普通的getter以外,還提供了一個原子操做compareAndSetState()。函數

AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。ui

AQS提供了幾個重要的方法,參數都是state的值:this

  • tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,若是釋放後容許喚醒後續等待結點返回true,不然返回false。

自定義同步器主要實現這些方法便可,其餘的工做AQS自己已經實現好了。線程

以ReentrantLock爲例,state初始化爲0,表示資源/鎖未被佔用。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是==可重入==的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。code

2、AQS工做原理

今天主要分析獨佔式下的acquire-release。blog

acquire(int)

1 public final void acquire(int arg) {
2     if (!tryAcquire(arg) &&
3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4         selfInterrupt();
5 }

函數步驟以下:

  1. tryAcquire(arg)。獲取共享資源。
  2. addWaiter(Node.EXCLUSIVE)。將須要獲取共享資源的線程放入等待隊列的尾部,並標記爲獨佔模式。
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))。進入等待隊列裏再次嘗試獲取共享資源,一直獲取到資源後才返回。若是在整個等待過程當中被中斷過,則返回true,不然返回false。
  4. 若是若是直接獲取失敗或者在等待隊列裏被中斷過則執行selfInterrupt(),調用Thread.currentThread().interrupt()來中斷線程。

下面逐個方法看。

tryAcquire(int)

1     protected boolean tryAcquire(int arg) {
2         throw new UnsupportedOperationException();
3     }

該方法的實現僅僅是拋出一個異常。然而該方法正是須要自定義同步器重寫的方法,包括對state的操做。

addWaiter(Node)

因爲直接獲取資源失敗,該方法是將線程放到等待隊列尾部。

private Node addWaiter(Node mode) {
    //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);
    
    //嘗試快速方式直接放到隊尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    
    //上一步失敗則經過enq入隊。
    enq(node);
    return node;
}

其中compareAndSetTail(pred, node)是以原子的方式進行尾節點和當前線程節點的交換。

enq(Node)

該方法是在快速加入尾節點失敗以後執行,目的也是將node加入隊尾。

1 private Node enq(final Node node) {
 2     //"自旋",直到成功加入隊尾
 3     for (;;) {
 4         Node t = tail;
 5         if (t == null) { // 隊列爲空,建立一個空的標誌結點做爲head結點,並將tail也指向它。
 6             if (compareAndSetHead(new Node()))//原子設置頭節點
 7                 tail = head;
 8         } else {//正常流程,放入隊尾
 9             node.prev = t;
10             if (compareAndSetTail(t, node)) {
11                 t.next = node;
12                 return t;
13             }
14         }
15     }
16 }

acquireQueued(Node, int)

經過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部「休息」,直到其餘線程完全釋放資源後喚醒本身,再去獲取共享資源。

1 final boolean acquireQueued(final Node node, int arg) {
 2     boolean failed = true;//標記是否成功拿到資源
 3     try {
 4         boolean interrupted = false;//標記等待過程當中是否被中斷過
 5         
 6         //又是一個「自旋」!
 7         for (;;) {
 8             final Node p = node.predecessor();//拿到前驅
 9             //若是前驅是head,則能夠去獲取資源
10             if (p == head && tryAcquire(arg)) {
11                 setHead(node);//拿到資源後,將head指向該結點。因此head所指的標杆結點,就是當前獲取到資源的那個結點或null。
12                 p.next = null; // setHead中node.prev已置爲null,此處再將原來的head.next置爲null,就是爲了方便GC回收之前的head結點。也就意味着以前拿完資源的結點出隊了!
13                 failed = false;
14                 return interrupted;//返回等待過程當中是否被中斷過
15             }
16             
17             //判斷是否能夠休息,若是能夠,就進入waiting狀態,若是等待過程當中被中斷過,就將interrupted標記爲true
18             if (shouldParkAfterFailedAcquire(p, node) &&
19                 parkAndCheckInterrupt())
20                 interrupted = true;
21         }
22     } finally {//自旋過程當中超時或者被中斷則從隊列移除該節點
23         if (failed)
24             cancelAcquire(node);
25     }
26 }

shouldParkAfterFailedAcquire(Node, Node)

此方法主要用於檢查狀態,看看本身是否能夠進入waiting狀態。

1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2     int ws = pred.waitStatus;//拿到前驅的等待狀態
 3     if (ws == Node.SIGNAL)
 4         //若是已經告訴前驅資源釋放後通知本身一下,那就能夠安心休息了
 5         return true;
 6     if (ws > 0) {
 7         /*
 8          * 若是前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的後邊。
 9          * 注意:那些放棄的結點稍後就會被回收
10          */
11         do {
12             node.prev = pred = pred.prev;
13         } while (pred.waitStatus > 0);
14         pred.next = node;
15     } else {
16          //若是前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它資源釋放後通知本身一下。
17         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
18     }
19     return false;
20 }

parkAndCheckInterrupt()

休眠而且檢查中斷。

1 private final boolean parkAndCheckInterrupt() {
2     LockSupport.park(this);//調用本地方法park()使線程進入waiting狀態
3     return Thread.interrupted();//返回當前線程是否被中斷。
4 }

cancelAcquire(Node)

從隊列移除節點

private void cancelAcquire(Node node) {
        if (node == null)
            return;

        node.thread = null;
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;//跳過已經被取消的節點一直往前找,直到找到一個有效的節點,讓node的前驅結點指向該節點
        Node predNext = pred.next;//獲取剛纔找到的前驅結點的後置節點
        node.waitStatus = Node.CANCELLED;
        if (node == tail && compareAndSetTail(node, pred)) {//若是當前node就是尾節點,就以原子方式把剛纔找到的前驅結點設置爲新的尾節點
            compareAndSetNext(pred, predNext, null);//以原子的方式將上面設置爲新的尾節點的後置節點置爲null
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {//前驅結點不是頭節點並且成功設置了"信號狀態"的以後,就把它的後置節點指向即將要取消的node節點的後置節點
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);//不然喚醒下一個須要獲取鎖的節點
            }

            node.next = node;
        }
    }

當前節點是尾節點:
image
當前節點既不是尾節點也不是頭節點:
image

release(int)

釋放資源,若是完全釋放了(即state=0),它會喚醒等待隊列裏的其餘線程來獲取資源。

1 public final boolean release(int arg) {
2     if (tryRelease(arg)) {
3         Node h = head;//找到頭結點,即當前持有資源的線程對應的節點
4         if (h != null && h.waitStatus != 0)
5             unparkSuccessor(h);//喚醒等待隊列裏的下一個線程
6         return true;
7     }
8     return false;
9 }

tryRelease(int)

此方法嘗試去釋放指定量的資源。須要自定義同步器本身實現。

1 protected boolean tryRelease(int arg) {
2     throw new UnsupportedOperationException();
3 }

其實就是將state減去arg,若是已經完全釋放資源(state=0),要返回true,不然返回false。

unparkSuccessor(Node)

此方法用於喚醒等待隊列中下一個線程。

1 private void unparkSuccessor(Node node) {
 3     int ws = node.waitStatus;
 4     if (ws < 0)//置0當前線程所在的結點狀態。
 5         compareAndSetWaitStatus(node, ws, 0);
 6 
 7     Node s = node.next;//找到下一個須要喚醒的結點s
 8     if (s == null || s.waitStatus > 0) {//若是爲空或已取消
 9         s = null;
10         for (Node t = tail; t != null && t != node; t = t.prev)//從隊列尾向前找,直到找到下一個距離node最近的有效節點
11             if (t.waitStatus <= 0)//從這裏能夠看出,<=0的結點,都是還有效的結點。
12                 s = t;
13     }
14     if (s != null)
15         LockSupport.unpark(s.thread);//喚醒
16 }
相關文章
相關標籤/搜索