ReentrantLock是如何基於AQS實現的

←←←←←←←←←←←← 快!點關注node

ReentrantLock是一個可重入的互斥鎖,基於AQS實現,它具備與使用 synchronized 方法和語句相同的一些基本行爲和語義,但功能更強大。程序員

lock和unlock

ReentrantLock 中進行同步操做都是從lock方法開始。lock獲取鎖,進行一系列的業務操做,結束後使用unlock釋放鎖。sql

private final ReentrantLock lock = new ReentrantLock();
public void sync(){
    lock.lock();
    try {
        // ... method body
    }
    finally {
        lock.unlock()
    }
}

lock

ReentrantLock 中lock的實現是經過調用AQS的AbstractQueuedSynchronizer#acquire方法實現。多線程

public final void acquire(int arg) {
    //嘗試獲取鎖
    if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
}

根據以前介紹的模板方法模式,對於鎖的獲取tryAcquire是在ReentrantLock中實現的。而非公平鎖中的實際實現方法爲nonfairTryAcquire。架構

ReentrantLock#nonfairTryAcquire併發

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
 
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

在獲取鎖的邏輯中首先是嘗試以cas方式獲取鎖,若是獲取失敗則表示鎖已經被線程持有。分佈式

再判斷持有該鎖的線程是否爲當前線程,若是是當前線程就將state的值加1,在釋放鎖是也須要釋放屢次。這就是可重入鎖的實現。高併發

若是持有鎖的線程並不是當前線程則此次加鎖失敗,返回false。加鎖失敗後將調用AbstractQueuedSynchronizer#acquireQueued(addWaiter(Node.EXCLUSIVE), arg)性能

首先會調用addWaiter方法將該線程入隊。學習

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

mode是指以何種模式的節點入隊,這裏傳入的是Node.EXCLUSIVE(獨佔鎖)。首先將當前線程包裝爲node節點。而後判斷等待隊列的尾節點是否爲空,若是不爲空則經過cas的方式將當前節點接在隊尾。若是tail爲空則執行enq方法。

AbstractQueuedSynchronizer#enq

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) {
            // Must initialize
            if (compareAndSetHead(new Node()))
                            tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq方法經過for(;;)無限循環的方式將node節點設置到等待隊列的隊尾(隊列爲空時head和tail都指向當前節點)。

綜上可知addWaiter方法的做用是將競爭鎖失敗的節點放到等待隊列的隊尾。

等待隊列中的節點也並非什麼都不作,這些節點也會不斷的嘗試獲取鎖,邏輯在acquireQueued中實現。

AbstractQueuedSynchronizer#acquireQueued

final Boolean acquireQueued(final Node node, int arg) {
    Boolean failed = true;
    try {
        Boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                            interrupted = true;
        }
    }
    finally {
        if (failed)
                    cancelAcquire(node);
    }
}

能夠看到該方法也是使用for(;;)無限循環的方式來嘗試獲取鎖。首先判斷當前節點是否爲頭結點的下一個節點,若是是則再次調用tryAcquire嘗試獲取鎖。固然這個過程並非必定不停進行的,這樣的話多線程競爭下cpu切換也極耗費資源。

shouldParkAfterFailedAcquire會判斷是否對當前節點進行阻塞,阻塞以後只有當unpark後節點纔會繼續假如爭奪鎖的行列。

private static Boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
            return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        }
        while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

判斷一個節點是否須要被阻塞是經過該節點的前繼節點的狀態判斷的。

若是前繼節點狀態爲singal,則表示前繼節點還在等待,當前節點須要繼續被阻塞。返回true。

若是前繼節點大於0,則表示前繼節點爲取消狀態。取消狀態的節點不參與鎖的競爭,直接跳過。返回false。

若是前繼節點時其餘狀態(0,PROPAGATE),不進行阻塞,表示當前節點須要重試嘗試獲取鎖。返回false。

shouldParkAfterFailedAcquire方法若是返回true,表示須要將當前節點阻塞,阻塞方法爲parkAndCheckInterrupt。

AbstractQueuedSynchronizer#parkAndCheckInterrupt

private final Boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

阻塞是經過LockSupport進行阻塞,被阻塞的節點不參與鎖的競爭(不在進行循環獲取鎖),只能被unpark後才繼續競爭鎖。

而被阻塞的節點要被釋放則依賴於unlock方法。

unlock

ReentrantLock 中unlock的實現是經過調用AQS的AbstractQueuedSynchronizer#release方法實現。

public final Boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
                        unparkSuccessor(h);
        return true;
    }
    return false;
}

release調用tryRelease方法,tryRelease是在ReentrantLock中實現。

ReentrantLock#tryRelease

protected final Boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
    Boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

tryRelease方法邏輯很簡單,首先減去releases(通常爲1)表示釋放一個鎖,若是釋放後state=0表示釋放鎖成功,後續等待的節點能夠獲取該鎖了。若是state!=0則表示該鎖爲重入鎖,須要屢次釋放。

當釋放鎖成功後(state=0),會對頭結點的後繼節點進行unpark。

AbstractQueuedSynchronizer#unparkSuccessor

int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
    }

unparkSuccessor見名知意適用於接觸後面節點的阻塞狀態。整個方法的邏輯就是找到傳入節點的後繼節點,將其喚醒(排除掉狀態爲cancel即waitStatus > 0的節點)。

公平鎖和非公平鎖

ReentrantLock的構造方法接受一個可選的公平參數。當設置爲 true 時,在多個線程的競爭時,傾向於將鎖分配給等待時間最長的線程。

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

在多個鎖競爭統一資源的環境下,AQS維護了一個等待隊列,未能獲取到鎖的線程都會被掛到該隊列中。若是使用公平鎖則會從隊列的頭結點開始獲取該資源。

而根據代碼在公平鎖和非公平鎖的實現的差異僅僅在於公平鎖多了一個檢測的方法。

公平鎖

protected final boolean tryAcquire(int acquires) {
    //...
    if (c == 0) {
        if (!hasQueuedPredecessors() //!hasQueuedPredecessors()即是比非公平鎖多出來的操做
        && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //...
    return false;
}

hasQueuedPredecessors()

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

方法邏輯很簡單,就是若是等待隊列還有節點而且排在首位的不是當前線程所處的節點返回true表示還有等待更長時間的節點。須要等這部分節點獲取資源後才能獲取。

讀者福利:

分享免費學習資料

針對於Java程序員,我這邊準備免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)

爲何某些人會一直比你優秀,是由於他自己就很優秀還一直在持續努力變得更優秀,而你是否是還在知足於現狀心裏在竊喜!但願讀到這的您能點個小贊和關注下我,之後還會更新技術乾貨,謝謝您的支持!

資料領取方式:加入Java技術交流羣963944895點擊加入羣聊,私信管理員便可免費領取

相關文章
相關標籤/搜索