多線程學習筆記三之ReentrantLock與AQS實現分析

簡介

  ReentrantLock是基於同步器AbstractQueuedSynchronizer(AQS)實現的獨佔式重入鎖,支持公平鎖、非公平鎖(默認是非公平鎖)、申請鎖可響應中斷以及限時獲取鎖等高級功能,分析ReentrantLock就離不開同步器AQS,關係圖以下:node

  在AQS中實現瞭如何獲取鎖和釋放鎖的模板方法,重入鎖ReentrantLock實現時經過內部類繼承Sync同步器AbstractQueuedSynchronizer。並調用同步器提供的模板方法,而這些模板方法將會調用ReentrantLock重寫的方法,這是典型的模板方法設計模式。AQS實現同步器功能離不開三大基礎組件:設計模式

  • 對共享資源同步狀態進行原子性管理 ---> 利用CAS對同步狀態進行更新
  • 線程的阻塞與喚醒 ---> 調用native方法
  • 等待隊列的管理 ---> 維護FIFO隊列

AQS同步狀態

  AQS中使用了一個int型的volatile變量來表示同步狀態,線程在嘗試獲取鎖的時候,就回去比較同步器同步狀態state是否爲0,爲0,那麼線程就拿到了鎖並改變同步狀態;不爲0,說明有其餘線程拿到了鎖。AQS中提供瞭如下三個方法來訪問或修改同步狀態:數據結構

//AQS成員變量,同步狀態
    private volatile int state;

    //獲取當前同步狀態
    protected final int getState() {
        return state;
    }

    //設置當前同步狀態
    protected final void setState(int newState) {
        state = newState;
    }

    //使用CAS設置當前狀態,該方法可以保證狀態設置的原子性
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

AQS同步隊列

  當有多個線程競爭獲取鎖時,只有一個線程能獲取到鎖,那麼這些沒有獲取到鎖的線程就須要等待,等到線程把鎖釋放了再喚醒等待線程去獲取鎖,爲了實現等待-喚醒機制,AQS提供了基於CLH隊列(Craig, Landin,Hagersten)實現的等待隊列,是一個先入先出的雙向隊列。同步隊列是一個非阻塞的 FIFO 隊列。也就是說往裏面插入或移除一個節點的時候,在併發條件下不會阻塞,而是經過自旋鎖和CAS保證節點插入和移除的原子性。
併發

  AQS中的內部類Node是構建同步隊列和等待隊列(後面介紹Condition再介紹)的基礎節點類,Node類部分源碼以下:函數

static final class Node {
        //等待狀態
        volatile int waitStatus;

        //前驅結點
        volatile Node prev;
    
        //後繼節點
        volatile Node next;
        
        //等待獲取鎖的線程
        volatile Thread thread;
        
        //condition隊列的後繼節點
        Node nextWaiter;      
    }

關於節點Node的waitStatus,它反映的是節點中線程的等待狀態,有以下取值:ui

  • CANCELLED,值爲1,由於超時或中斷,該線程已經被取消
  • SIGNAL,值爲-1,線程的後繼線程正/已被阻塞,當該線程release或cancel時要從新這個後繼線程(unpark)
  • CONDITION,值爲-2,代表該線程被處於條件隊列,就是由於調用了Condition.await而被阻塞
  • PROPAGATE,值爲-3,表示當前場景下後續的acquireShared可以得以執行
  • 等待狀態的初始值爲0,表示當前節點在sync隊列中,等待着獲取鎖。

ReentrantLock數據結構

  從關係圖能夠看出,ReentrantLock實現了Lock接口,內部類Sync是AQS的子類,Sync有兩個子類FairSync(公平鎖)和NonFairSync(非公平鎖)。ReentrantLock只有一個成員變量sync,經過構造函數初始化,能夠看到經過默認的構造函數構造的ReentrantLock是非公平鎖。this

private final Sync sync;

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

公平鎖的獲取

  ReentrantLock獲取鎖方法以下:線程

public void lock() {
        sync.lock();
    }

公平鎖調用的是FairSync的lock方法:設計

final void lock() {
        acquire(1);
    }

acquire方法是AQS實現的方法,介紹一下參數的1的意思:AQS規定同步狀態state,想要得到鎖就去改變同步狀態,就是把同步狀態加1。acquire方法:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

獲取鎖的過程:

  1. 嘗試獲取鎖。
  2. 嘗試獲取失敗,將當前線程構成Node加入Sync隊列。
  3. 再次嘗試獲取,若獲取失敗線程進入等待態,等待喚醒。

tryAcquire(arg)

  公平鎖嘗試獲取,在FairSync裏實現,獲取同步狀態成功返回true,不然返回false

protected final boolean tryAcquire(int acquires) {
        //獲取當前線程
        final Thread current = Thread.currentThread();
        //獲取同步狀態
        int c = getState();
        //同步狀態爲0,沒有其餘線程佔據鎖
        if (c == 0) {
            //檢測同步隊列沒有其餘線程等待(確保公平性),若是沒有獲取鎖就以CAS方式嘗試改變同步狀態
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                //設置鎖的擁有者爲當前線程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //同步狀態不爲0,檢測是不是當前線程擁有鎖
        else if (current == getExclusiveOwnerThread()) {
            //當前線程擁有鎖,直接更新同步狀態,重入鎖
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
  • hasQueuedPredecessors()
      hasQueuedPredecessors是AQS中的方法,檢測同步隊列有沒有等待獲取鎖的線程,保證公平性。
public final boolean hasQueuedPredecessors() {
        //同步隊列尾節點
        Node t = tail;
        //同步隊列頭節點 
        Node h = head;
        Node s;
        //h!=t 頭節點和尾節點不一樣,說明同步隊列不爲空
        //同步隊列不爲空,檢測下一個等待獲取鎖的線程(h.next.thread)是否是當前線程
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
  • compareAndSetState(int expect, int update)
      compareAndSetState()在AQS中實現。compareAndSwapInt() 是sun.misc.Unsafe類中的一個native方法,若是當前狀態值等於預期值,則以原子方式將同步狀態設置爲給定的更新值。
protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
  • setExclusiveOwnerThread(Thread thread) & getExclusiveOwnerThread()
      setExclusiveOwnerThread和getExclusiveOwnerThread都是AQS父類AbstractOwnableSynchronizer的方法,setExclusiveOwnerThread用於設置線程t爲當前擁有獨佔鎖的線程。getExclusiveOwnerThread用於得到當前佔據獨佔鎖的線程
protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }

addWaiter(Node mode)

  addWaiter在AQS中實現,以當前線程構成節點加入到同步隊列末尾,並返回這個節點Node。

private Node addWaiter(Node mode) {
        //以當前線程和給定模式構成節點Node
        Node node = new Node(Thread.currentThread(), mode);
        // 同步隊列不爲空,以CAS方式把當前線程加入到隊列末尾
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //隊列爲空,創建同步隊列,再把當前線程加入同步隊列
        enq(node);
        return node;
    }
  • compareAndSetTail(Node expect, Node update)
      compareAndSetTail是AQS中的方法,調用本地native方法,若是同步隊列隊尾是expect節點,就把update節點添加到隊列末尾,這是一個原子操做。
private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
  • enq(final Node node)
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued(final Node node, int arg)

  若是當前線程的節點的前驅結點,就去嘗試獲取同步狀態,若是不是或者獲取失敗根據waitStatus對同步隊列進行清理:把waitStatus爲CANCELLED從同步隊列清除,修改錯誤的waitStatus,而後把線程堵塞,返回當前線程是否被中斷。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //當前節點的前驅結點
                final Node p = node.predecessor();
                //前驅結點是head頭節點,嘗試獲取同步狀態
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • shouldParkAfterFailedAcquire(Node pred, Node node)
      前驅結點不是head頭節點或嘗試獲取同步狀態失敗之後,並非立刻把當前線程線程堵塞,還要檢測同步隊列前驅結點的狀態,檢查規則以下:
  1. 若是前驅節點狀態爲SIGNAL,代表當前節點須要被堵塞,此時則返回true。
  2. 若是前驅節點狀態爲CANCELLED(ws>0),說明前繼節點已經被取消,則從後往前找到一個有效(非CANCELLED狀態)的節點,並返回false;以後無限循環直到步驟1返回true,線程阻塞。
  3. 若是前驅節點狀態爲非SIGNAL、非CANCELLED,則CAS設置前驅節點的狀態爲SIGNAL,並返回false;以後無限循環直到步驟1返回true,線程阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)            
            return true;
        if (ws > 0) {            
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {            
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
  • parkAndCheckInterrupt()
      把當前線程堵塞並檢查是否有中斷。
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

鎖的釋放

  ReentrantLock公平鎖與非公平鎖的釋放機制是同樣的,釋放鎖方法以下:

public void unlock() {
        sync.release(1);
    }

unlock方法調用的release方法是在AQS中實現的,這裏的1相似於acquire(1),適用於用來設置同步狀態的,釋放鎖時會把同步狀態減1。release方法會先調用tryRelease來嘗試釋放當前線程鎖持有的鎖。成功的話,則喚醒後繼等待線程,並返回true。不然,直接返回false

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease(int releases)

  tryRelease嘗試獲取鎖,當同步狀態爲0時清空佔據鎖的線程,返回true;若是同步狀態不爲0返回false,由於ReentrantLock是重入鎖,只有完全釋放tryRelease纔會返回true。

protected final boolean tryRelease(int releases) {
        // c是本次釋放鎖以後的同步狀態
        int c = getState() - releases;
        //當前線程不是鎖的擁有者,拋出IllegalMonitorStateException異常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //若是「鎖」已經被當前線程完全釋放,則設置「鎖」的持有者爲null,即鎖是可獲取狀態。
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

unparkSuccessor(Node node)

  當前線程釋放鎖成功的話,會喚醒當前線程的後繼線程。從aquireQueued方法能夠看出,一旦頭結點的後繼結點被喚醒,那麼後繼結點就嘗試去獲取鎖,若是獲取成功就將頭結點設置爲自身,並將前一個頭節點清空。

private void unparkSuccessor(Node node) {
        // 獲取當前線程(要釋放鎖)的等待狀態
        int ws = node.waitStatus;
        if (ws < 0)
            //設置爲初始狀態
            compareAndSetWaitStatus(node, ws, 0);

        //同步隊列頭節點的下一個等待節點
        Node s = node.next;
        //等待節點無效,從同步節點尾部開始遍歷找到有效的等待節點
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //喚醒等待節點的線程
        if (s != null)
            LockSupport.unpark(s.thread);
    }

非公平鎖的獲取

  NonfairSync類中lock()實現,首先嚐試用CAS更改同步狀態,若是成功,把當前線程設置爲獨佔鎖的擁有者;而後調用acquire(1)方法。

final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

acquire方法除了tryAcquire是由AQS的子類實現的,其餘方法都是在AQS類實現的,tryAcquire的實現機制不一樣體現了公平鎖與非公平鎖的不一樣。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

ReentrantLock中的NonfairSync的tryAcquire方法,調用了nonfairTryAcquire方法

protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }

nonfairTryAcquire(int acquires)

  非公平鎖的嘗試獲取鎖時,若是同步狀態爲0,即沒有其餘線程獲取到鎖,當前線程直接以CAS方式改變同步狀態,不會去同步隊列找是否有其餘線程早於當前線程等在同步隊列中,效率較高。

final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //同步狀態爲0,嘗試以CAS方式改變同步狀態
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //重入鎖
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

總結

  本文介紹了ReentrantLock基於AQS同步器實現的公平鎖和非公平鎖的獲取和釋放,基於CAS改變同步狀態是得到獨佔鎖的基礎,爲了不多個線程同時對進行競爭,在AQS中維護了FIFO的同步隊列,當獨佔鎖釋放時,AQS同步器調度同步隊列隊首等待節點的線程去獲取鎖,有效避免了海量競爭獨佔鎖形成資源的浪費,是一個很是巧妙的方法。

相關文章
相關標籤/搜索