AQS:JAVA經典之鎖實現算法(一)

序言

AQS能夠說是JAVA源碼中必讀源碼之一。同時它也是JAVA大廠面試的高頻知識點之一。認識並瞭解它,JAVA初中升高級工程師必備知識點之一。 AQS是AbstractQueuedSynchronizer的簡稱,它也是JUC包下衆多非原生鎖實現的核心。node

一:AQS基礎概況

AQS是基於CLH隊列算法改進實現的鎖機制。大致邏輯是AQS內部有一個鏈型隊列,隊列結點類是AQS的一個內部類Node,造成一個相似以下Sync Queue(記住這個名詞)面試

Sync Queue

能夠看出,一個Node除了先後結點的索引外,還維護了一個Thread對象,一個int的waitStatus。 Thread對象就是處於競爭隊列中的線程對象自己。 waitStatus表示當前競爭結點的狀態,這裏暫且忽略掉。算法

處於隊首的,即Head所指向的結點,即爲獲取到鎖的結點。釋放鎖即爲出隊,後續結點則成爲隊首,即獲取到鎖bash

Tips:這裏幫你們理解一個事情,每個ReentrantLock實例都有且只有一個AQS實例,一個AQS實例維護一個Sync Queue。因此說,當咱們的業務代碼中的多個線程對同一個ReentrantLock實例進行鎖競爭操做時,其實際就是對同一個Sync Queue的隊列進行入隊、出隊操做。函數

二:AQS調用入口----ReentrantLock

咱們在用ReentrantLock時,代碼一般以下:ui

ReentrantLock lock = new ReentrantLock();
Runnable runnable = new Runnable() {
  public void run() {
    lock.lock();
    Sys.out(Thread.currentThread().name() + "搶到鎖");
    lock.unlock();
  }
};
Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);
t2.start();
t1.start();
複製代碼

可見線程t1,t2競爭lock這個鎖。 lock.lock()搶鎖 lock.unlock()釋放鎖 來看入口函數lockthis

public void lock() {
  sync.lock();
}
複製代碼

syncReentrantLock內的一個繼承了AQS的抽象類spa

abstract static class Sync extends AbstractQueuedSynchronizer {
}
複製代碼

抽象類的具體實現是另兩個內部類NonFairSync FairSync,分別表明非公平鎖、公平鎖。線程

咱們系統來看下繼承圖 code

Sync父子圖

ReentrantLock中的sync實例,默認是在構造函數中初始化的,

public ReentrantLock() {
  sync = new NonfairSync();
}
複製代碼

那咱們就看默認的NonFairSync的實現邏輯。

NonFairSync

當咱們調用ReentrantLock.lock()時,直接調用到的是NonFairSync.lock()

/**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
複製代碼

compareAndSetState(0, 1)經過CAS(CAS是啥,這裏不講,參考樂觀鎖。具體Google吧)方式,設置AQS的int state字段爲1。AQS內部就是經過這個state是否爲0來判斷當前鎖是否已經被線程獲取到。 返回true,則說明獲取鎖成功,設置當前鎖的獨佔線程setExclusiveOwnerThreaThread.currentThread()); 不然,acquire(1)嘗試獲d(取鎖。這個方法會自旋、阻塞,一直到獲取鎖成功爲止。這裏,傳進去的參數1,就參考樂觀鎖的版本字段,同時,這裏,它還記錄了可重入鎖重複獲取到鎖的次數。只有釋放一樣次數才能最終釋放鎖。

具體看AQS的acquire()的邏輯

AbstractQueuedSynchronizer.acquire()

public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}
複製代碼

要注意,這個方法是忽略線程中斷的。 先看tryAcquire(arg),這個方法是AQS留給子實現類的口子,具體實現看NonFairSync,它的實現裏直接調用了Sync.nonfairTryAcquire()

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
複製代碼

能夠看到方法內,先是嘗試獲取state = 0時的初始鎖,若是失敗,判斷當前鎖是不是被當前線程獲取,是的話,將acquire時傳入的參數累加到state字段上。在這裏,這個字段就是用來記錄重複獲取鎖的次數。 獲取失敗則返回false

回到acquire 在獲取失敗,返回false後,纔會繼續調用 &&右邊的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。 先看addWaiter

private Node addWaiter(Node mode) {
1:        Node node = new Node(Thread.currentThread(), mode);
2:        // Try the fast path of enq; backup to full enq on failure
3:        Node pred = tail;
4:        if (pred != null) {
5:           node.prev = pred;
6:           if (compareAndSetTail(pred, node)) {
7:                pred.next = node;
8:                return node;
9:            }
10:        }
11:       enq(node);
12:        return node;
13:    }
複製代碼

利用傳進來的Node.EXCLUSIVE表示的排斥鎖參數以及當前線程實例初始化新Node,第4-10行代碼是在Sync Queue隊列內有競爭線程時進入。爲空時會走到enq(node),這裏是在競爭爲空時將競爭線程入隊的操做。 而後返回當前競爭的線程node 此時Sync Queue如圖

Sync Queue.png
咱們假設node_1是已經獲取到鎖的結點,node_2即爲咱們當前操做的結點。

返回acquire接着看 addWaiter返回的node直接做爲參數給了acquireQueued,這個方法就是主要的node競爭鎖方法。

final boolean acquireQueued(final Node node, int arg) {
        // 獲取鎖成功失敗標記
        boolean failed = true;
        try {
            // 當前競爭線程的中斷標記
            boolean interrupted = false;
            // 自旋競爭鎖,競爭不到鎖的話,線程又沒有中斷
            // 則一直在這兒循環
            for (;;) {
                // 獲取當前線程的前驅結點
                final Node p = node.predecessor();
                // 若是前驅結點是頭結點,則嘗試去tryAcquire
                // 這個邏輯咱們以前看過,當前線程未獲取到鎖的狀況下
                // 在AQS的state字段不爲0時,則返回false
                if (p == head && tryAcquire(arg)) {
                    // 進入到這裏,說明要不就是當前線程在重複獲取
                    // 要不就是前邊的結點釋放鎖,state 歸0,這裏獲取到
                    setHead(node);
                    p.next = null; // help GC
                    // 標識競爭鎖成功
                    failed = false;
                    // 這個方法不響應線程中斷,可是會返回線程在競爭鎖過程當中
                    // 中斷標記返回
                    return interrupted;
                }
                // 若獲取鎖失敗,則到這裏。這裏的邏輯主要在If判斷
                // 的兩個方法中,用來將當前線程掛起的,具體邏輯
                // 看下面
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

park就是停下的意思。因此這個方法從名字上也比較好理解,就是 掛起線程而且檢查線程的中斷狀態。這裏要注意,LockSupport.part(this)方法是會在線程中斷時自動喚醒的

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
複製代碼

這個方法傳入了當前競爭結點及其前驅結點

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驅結點的等待狀態。這裏只須要記住,咱們這裏考慮的是
        // 非共享鎖、非公平鎖的AQS。因此,只須要確保當前競爭
        // 結點的前驅結點狀態爲SIGNAL就好。剩下的狀態,
        // 與咱們此時研究的狀況而言沒有用
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 若是前驅結點的status爲0,則將其改成SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 複製代碼

shouldParkAfterFailedAcquire能夠看出來,在確保前驅結點status爲SIGNAL時,就能夠放心的去unsafe.park()了。之因此要爲SIGNAL,是由於這個狀態含義爲:當前結點OVER時要喚醒後繼結點。

因此不難推出,咱們的結點如今就park在那了。等他前驅結點釋放鎖,或者本身interrupt來喚醒,但由於這個方法是無視中斷的,因此即便interrupt了,只是設置了一個標記位,但仍然在循環中。


這裏假設前驅結點獲取鎖後釋放,則當前結點在parkAndCheckInterrupt()方法中被喚醒,然後再次循環for(;;),此次會在第一個if中就進入,當前結點獲取到鎖,而後重置Head指向的結點等,返回當前線程的中斷標記。

返回acquire

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼

if中的selfInterrupt()方法只是去從新設置當前線程的中斷標記位。這是由於獲取線程中斷狀態的方法,在返回狀態字段的同時,也會重置字段,因此須要標記後從新設置相應的值。

下面咱們看下AQS釋放鎖的接口方法

ReentrantLock.unlock

public void unlock() {
        sync.release(1);
    }
複製代碼

追進去

ReentrantLock.release

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
複製代碼

tryRelease是在NonFairLock中的實現的,若是是釋放成功,則在Head存在而且狀態不爲0(其實能夠理解爲值爲SIGNAL時)去喚醒Head的後繼結點。

NonFailLock.tryRelease

下面看下NonFairLocktryRelease方法的實現

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
複製代碼

能夠看出,這個tryRelease其實就是去判斷下是否是當前線程擁有鎖,是的話,判斷下當前的釋放鎖是否徹底釋放,由於鎖能夠重複獲取,徹底釋放的話,就設置state爲0,表明AQS的鎖已經被釋放了。

相關文章
相關標籤/搜索