java併發系列-AQS.ReentrantLock源碼分析

1、背景

java jdk提供了一系列解決併發衝突的鎖和工具,ReentrantLock爲可重入獨佔鎖。 要從哪裏開始,由於這個其實要講起了不少。java

1.1 先來個簡單的使用例子來入門吧。

public class Locktest {
    
    /** 
     * 測試Lock的使用。在方法中使用Lock,能夠避免使用Synchronized關鍵字。 
     */  
    public static class LockTest {  
  
        Lock lock = new ReentrantLock();// 鎖  
        double value = 0d; // 值  
        int addtimes = 0;  
  
        /** 
         * 增長value的值,該方法的操做分爲2步,並且相互依賴,必須實如今一個事務中 
         * 因此該方法必須同步,之前的作法是在方法聲明中使用Synchronized關鍵字。 
         * @throws InterruptedException 
         */  
        public void addValue(double v) throws InterruptedException {  
            lock.lock();// 取得鎖  
            System.out.println("LockTest to addValue: " + v + "   "  
                    + System.currentTimeMillis());  
             
            this.value += v;  
            Thread.sleep(1000);
            this.addtimes++;  
            lock.unlock();// 釋放鎖  
        }  
  
        public Double getValue() {  
            return this.value;  
        }
         
    } 
   
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        final LockTest lockTest = new LockTest();
        Runnable run = new Runnable(){

            @Override
            public void run() {
                try {
                    lockTest.addValue(1);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                
            }
            
        };
        for(int i=0;i<100;i++){
            new Thread(run).start();
        }

爲了保證value 和addtimes 的操做在addValue函數中是原子操做,且最後的值是正確的,加了一把ReentrantLock鎖。node

那麼接下來咱們來分析下ReentrantLock是如何實現的?數據結構

看源碼分析前建議能夠先從最後總結開始,從宏觀上有一個大體認識。架構

2、ReentrantLock 源碼分析

ReetrantLock在jdkjava.util.concurrent.locks包下,實現接口Lock併發

2.1 使用過程

a Lock lock = new ReentrantLock();// 鎖  
b lock.lock(); //獲取鎖 2
/**
* 業務邏輯,保證只有一個線程同時執行
**/
c lock.unlock() //釋放鎖  3

2.2 ReentrantLock的內部結構

ReentrantLock

2.3 AQS、Sync、FairSync、NonfairSync 類圖關係

如圖所示: 由三個內部類Sync、FairSync、NonfairSync,關係以下,都是基於AbstractQueuedSynchronizer實現,後面簡稱AQS,因此能夠知道,jdk鎖的實現AQS是關鍵app

 Sync extends AbstractQueuedSynchronizer

2.3 初始化鎖實例(默認是非公平鎖)

/**
    * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
    */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

能夠指定 new ReentrantLock(true); 爲公平鎖ide

public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

2.4 lock.lock() 獲取鎖流程

public void lock() {
        sync.lock();
    }

2.4.1 FairSync ,公平鎖lock

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1); //直接調用獲取鎖方法acquire,按照正常的程序拿鎖,進入隊列
        }
...
}

2.4.2 NonfairSync,非公平鎖lock

非公平鎖會先直接去搶佔,而後在acquire函數

static final class NonfairSync extends Sync {
       private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
        if (compareAndSetState(0, 1)) //先嚐試插隊直接去拿鎖,更改state狀態爲1,若是成功則把Owner線程設置爲當前線程,則表示成功得到鎖
            setExclusiveOwnerThread(Thread.currentThread());
        else //插隊失敗則按照公平鎖方式同樣,排隊獲取
            acquire(1);
    }
    //嘗試獲取鎖後面再講
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
 

2.4.2.1 compareAndSetState

爲AQS一方法,底層調用CAS,將state公共變量更改成1。工具

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

2.4.2.1.2 AQS架構

AQS是JUC重要的同步器,全部的鎖基於整個同步器實現協調,這裏簡單的介紹下,有興趣後面再重點分析 主要由如下幾個重要部分組成源碼分析

  • Node 節點
  • head 頭節點
  • tail 尾節點
  • state 當前鎖的狀態
  • acquire(int arg) 獲取鎖
  • acquireQueued(final Node node, int arg) 獲取鎖隊列
  • addWaiter(Node mode) 加入等待隊列
  • release(int) 釋放鎖
  • unparkSuccessor(Node) 喚醒繼任節點
  • ConditionObject 條件對象,功能相似wait/notify

AQS架構

state是關鍵,volatile int state;用volatile修飾。當爲0時表示鎖是空閒,能夠獲取鎖,當大於0時表示得到鎖。 獨佔鎖時大於0表示鎖的重入次數,共享鎖時,state共當前共享線程個數。

node鏈表

node是一個雙向鏈表,有Node、prev、next、head、tail組成,該鏈表被稱之CHL隊列(FIFO) 如上圖

2.4.3 acquire()內部實現

acquire流程通過如下步驟:

  1. tryAquire 先嚐試快速獲取鎖
  2. addWaiter 加入隊列放置隊尾
  3. acquireQueue 從隊列中獲取鎖,一樣也會先嚐試tryAcquire
  4. selfInterrupt() 若是被中斷,則中斷
public final void acquire(int arg) {
   // 1.先嚐試 tryAcquire 獲取鎖,具體實現後面再詳細講解,
   // 2.再addWaiter 加入隊尾等待,acquireQueued放入同步隊列
         if (!tryAcquire(arg) &&
             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
             selfInterrupt();
    }

公平鎖與最大區別在tryAcquire,如下分析兩則tryAcquire源碼

2.4.3.1 FairSync.tryAcquire()

公平鎖嘗試獲取鎖實現(OwnerThread爲以得到鎖的線程)

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();//當前前程
            int c = getState();//獲取當前鎖狀態
            if (c == 0) {//當鎖空閒時
            判斷前置節點爲空,則調用cas將state設置成1,當前線程設置成OwnerThread,獲取鎖成功,true返回
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }//Ownerthread爲當前線程時,+1,如下爲重入鎖的邏輯
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);//設置state值,+1
                return true;//返回true獲取鎖
            }
            return false;
        }

2.4.3.2 NonfairSync.tryAcquire()

非公平鎖tryAcquire實現

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread();//獲取當前線程 int c = getState();//get到state鎖狀態 if (c == 0) {//鎖空閒,能夠獲取鎖 //經過CAS將state狀態更改爲 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

 

2.4.3.3 addWaiter 實現

//建立與當前線程隊列的節點和給定鎖模式(獨佔、共享)
//新節點node從隊尾加入,設置成功則把新節點設置成尾節點tail,並將原tail.next 指向node
/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node // 返回新的節點
     */
    private Node addWaiter(Node mode) {
    // new 一個新節點,設置當前線程和獨佔模式exclusive
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;//tail節點賦值給pred,用於後面交換
        if (pred != null) {//若是原尾節點存在
            node.prev = pred; //將新節點的上一個指針指向原尾節點
            if (compareAndSetTail(pred, node)) {//新節點node經過CAS設置成新tail節點
                pred.next = node;//原tail節點的下一個指針指向新的尾節點tail
                return node;//返回新節點,即也是新尾節點
            }
        }
        enq(node);//假如原尾節點爲空或者compareAndSetTail失敗再次enq放入尾節點
        return node;
    }
//空隊列,首先必須初始化,插入隊列尾部,返回當前節點上一個節點
/**
    * Inserts node into queue, initializing if necessary. See picture above.
    * @param node the node to insert
    * @return node's predecessor
    */
   private Node enq(final Node node) {
       for (;;) {
           Node t = tail;
           if (t == null) { // Must initialize
               if (compareAndSetHead(new Node()))
                   tail = head;
           } else {
               node.prev = t;
               if (compareAndSetTail(t, node)) {
                   t.next = node;
                   return t;
               }
           }
       }
   }

2.4.3.4 acquireQueued 實現

獲取鎖的關鍵

//
/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true; //設置標誌位,若是爲true 則會被中斷
        try {
            boolean interrupted = false;
            for (;;) {//自旋
                //當前節點node已經經過addWaiter設置爲tail了,定義p爲tail上一個節點
                final Node p = node.predecessor();
               //若是p爲head節點,則纔有資格嘗試調用tryAcquire獲取鎖
                if (p == head && tryAcquire(arg)) {
               //獲取鎖成功則當前節點設置成head,setHead中已將node.prev = null;指向前置節點設置成null了,再也不指向原head
                    setHead(node);
               //將原head節點next指向null,這個時候,原head將是一個孤立的node,有利於gc回收
                    p.next = null; // help GC
                    failed = false;//獲取成功標誌
                    return interrupted;
                }
              //一、獲取鎖失敗後,只有被unpark喚醒的waitStatus狀態爲Node.SIGNAL才能夠被阻塞;二、阻塞當前線程,返回中斷狀態
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) //阻塞當前線程,返回中斷狀態,爲true,則返回
                    interrupted = true;//若是阻塞線程被中斷則設置true,下次for循環進來被return interrupted;
            }
        } finally {
            if (failed)//若是失敗則取消該節點獲取鎖
                cancelAcquire(node);
        }
    }

2.4.3.5 shouldParkAfterFailedAcquire

// CANCELLED = 1
// SIGNAL = -1
// CONDITION = -2
// NORMAL = 0
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前一個節點的狀態(注意:不是當前節點)
    int ws = pred.waitStatus;
    if (ws < 0)
        // waitStatus<0,也就是前面的節點尚未得到到鎖,那麼返回true,表示當前節點(線程)就應該park()了。
        return true;
    if (ws > 0) {
        // waitStatus>0,也就是前一個節點被CANCELLED了,那麼就將前一個節點去掉,遞歸此操做直到全部前一個節點的waitStatus<=0,進行4
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // waitStatus=0,修改前一個節點狀態位爲SINGAL,表示後面有節點等待你處理,須要根據它的等待狀態來決定是否該park()
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // ws<0才須要park(),ws>=0都返回false,表示線程不該該park()
    return false;
}

2.5 lock.unlock() 解鎖流程

2.5.1 release

public final boolean release(int arg) {
          //嘗試釋放鎖,設置AQS state狀態,若是爲0則返回true,若是解鎖成功則喚醒head的下一個節點,讓其得到鎖
         if (tryRelease(arg)) {
             Node h = head;//head 賦給h,中間變量用於後面交換
             //存在頭節點,waitStatus 爲1 -1 -2 -3,喚醒下一個節點
             if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//喚醒下一個節點
             return true; 
        }
         return false;
     }

2.5.2 tryRelease

protected final boolean tryRelease(int releases) {
        int c = getState() - releases;//當前狀態state,獨佔表示重入次數-1
        //當前線程不是獨佔OwnerThread,則拋出異常,由於lock和unlock是一對,必須保證釋放鎖的線程爲當前得到鎖的線程
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {//若是等於0表示解鎖成功,OwnerThread設置null 若是是重入鎖要屢次解鎖,直到0
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);//設置AQS state狀態,若是是重入鎖要多長解鎖
        return free;
    }

2.5.2 unparkSuccessor

若是一個存在,喚醒節點的next

/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus; //head節點等待狀態,
        // 此時node是須要釋放鎖的頭節點
        // 清空頭節點的waitStatus,也就是不須要鎖了,這裏修改爲功失敗無所謂
        if (ws < 0)//設置0代表已經得到鎖
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
  //若是不存在下一個節點或者線程已中斷或已取消
// 從頭節點的下一個節點開始尋找繼任節點,當且僅當繼任結點的waitStatus<=0纔是有效繼任節點,不然將這些waitStatus>0(也就是CANCELLED的節點)從AQS隊列中剔除
        if (s == null || s.waitStatus > 0) {
            s = null;
          //從隊尾開始往前任找,直到node.next,過濾掉中斷的結點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null) //下一個節點存在則直接喚醒
            LockSupport.unpark(s.thread);
    }

2.5.3 AQS waitStatus狀態位說明

  • CANCELLED:由於超時或者中斷,結點會被設置爲取消狀態,被取消狀態的結點不該該去競爭鎖,只能保持取消狀態不變,不能轉換爲其餘狀態。處於這種狀態的結點會被踢出隊列,被GC回收;
  • SIGNAL:表示這個結點的繼任結點被阻塞了,到時須要通知它;
  • CONDITION:表示這個結點在條件隊列中,由於等待某個條件而被阻塞;
  • PROPAGATE:使用在共享模式頭結點有可能牌處於這種狀態,表示鎖的下一次獲取能夠無條件傳播;
  • 0:None of the above,新結點會處於這種狀態。
static final int CANCELLED =  1;  
static final int SIGNAL    = -1;  
static final int CONDITION = -2;  
static final int PROPAGATE = -3;

3、基於CAS本身實現一個簡單的獨佔鎖

LockSupport爲阻塞線程提供基礎的功能,它由一對park和unpark組成,park會阻塞當前線程(獲取許可,線程默認許可被佔用了),unpark「喚醒」等待線程(釋放許可);至關於信號量,park拿到才能夠運行。 簡而言之,是用mutex和condition保護了一個_counter的變量,當park時,這個變量置爲了0,當unpark時,這個變量置爲1。

LockSupport.park();  中止 
System.out.println("======");

樂觀的獨佔鎖(相似ReentrantLock) SimpleExclusiveLock .java

package com.concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;

/**
* 簡單樂觀獨佔鎖
*/ 
public class SimpleExclusiveLock { 
 
    /**
     * 獨佔鎖標記 true 鎖不可用 false 鎖可用
     */ 
    private AtomicBoolean state = new AtomicBoolean(false); 
    List<Thread>          queue = new ArrayList<Thread>();//阻塞隊列 
 
    public boolean lock() { 
        if (!state.get()&&state.compareAndSet(false, true)) {//取鎖成功不會阻塞,程序會繼續執行 
            return true; // 利用CAS   
        } else { 
            System.out.println("queue.add and park "+Thread.currentThread());
            queue.add(Thread.currentThread());//加入阻塞隊列 
            LockSupport.park();//阻塞線程 
            System.out.println("park after "+Thread.currentThread());
            return false; 
        } 
    } 
 
    public boolean unLock() { 
        if (state.get()) {
            System.out.println("queue.remove and unpark "+Thread.currentThread());
            queue.remove(Thread.currentThread());//從隊列裏移除 
            if (state.compareAndSet(true, false)) {// 利用CAS 
                if(!queue.isEmpty()){
                    System.out.println("unpark "+queue.get(0).getName());
                    LockSupport.unpark((Thread) queue.get(0));//喚醒第一個等待線程 
                    System.out.println("unpark after "+queue.get(0).getName());
                } 
                return true; 
            } 
            return false; 
        } else { 
            return false; 
        } 
    } 
    
    
}

SimpleExclusiveLockTest .java

使用

package com.concurrent;

public class SimpleExclusiveLockTest {
    public static SimpleExclusiveLock lock = new SimpleExclusiveLock(); // 獨佔鎖 
    public static volatile int            i    = 0;                            // 保證可見性 
 
    public class RunnableTask implements Runnable { 
 
        @Override 
        public void run() { 
            while (true) { 
                try { 
                    lock.lock();//加鎖 
                    i += 1; 
                    System.out.println("thread name:"+ Thread.currentThread().getName() +" i="+ i); 
                    try { 
                        Thread.currentThread().sleep(1000); 
                    } catch (InterruptedException e) { 
                        e.printStackTrace(); 
                    } 
                } finally { 
                    lock.unLock();//釋放鎖 
                   
                } 
            } 
        } 
    } 
 
    public void runTask() { 
        for (int i = 0; i < 100; i++) { 
            new Thread(new RunnableTask(),"thread"+ i).start(); 
        } 
        
        
    } 
 
    public static void main(String[] args) { 
        SimpleExclusiveLockTest test = new SimpleExclusiveLockTest(); 
        test.runTask(); 
 
    } 
}

4、總結

JUC(Java Util Concurrency)僅用簡單的park, unpark和CAS指令就實現了各類高級同步數據結構,並且效率很高,使人驚歎。

如下我從宏觀角度描述獲取鎖和解鎖流程

鎖的狀態是由AQS.state控制,加鎖和解鎖都會感知和變動此變量,當爲0時表示鎖是空閒,能夠獲取鎖,當大於0時表示得到鎖。 獨佔鎖時大於0表示鎖的重入次數,共享鎖時,state共當前共享線程個數。

4.1 公平鎖與非公平鎖區別在兩點

  1. 非公平鎖在lock 時首先先去搶佔
  2. 而後都會進去acquire流程,在此流程中區別就在tryAcquire中

4.2 acquire總流程

acquire流程通過如下步驟:

  1. tryAquire 先嚐試快速獲取鎖
  2. addWaiter 加入隊列放置隊尾
  3. acquireQueue 從隊列中獲取鎖,一樣也會先嚐試tryAcquire
  4. selfInterrupt() 若是被中斷,則中斷

acquire總流程

4.1.1 FairSync acquire流程

FairSync acquire流程

4.1.2 Nonfair acquire流程

與公平鎖acquire惟一區別在tryAcquire流程中,不用要求前置節點是head節點,則表示tail能夠直接去搶佔鎖,若是搶佔失敗後面的流程與公平一致。

4.2 release 解鎖流程

解鎖流程比較簡單,解鎖節點確定是head,由於head持有鎖

  1. 先將state減1,若是結果是0,返回true執行第二步,這裏可能存在重入鎖,因此依然大於0.
  2. 經過unparkSuccessor 喚醒下一個節點

release 解鎖流程

相關文章
相關標籤/搜索