Java併發學習之ReentrantLock的工做原理及使用姿式

Lock,ReentrantLock的工做原理及使用方式

jdk提供synchronized實現線程同步,但有些場景下並不靈活,如多個同步方法,每次只能有一個線程訪問;而Lock則能夠很是靈活的在代碼中實現同步機制java

I. Lock的使用

在以前學習阻塞隊列中,較多地方使用 ReadWriteLock, Condition,接下來在探究實現原理以前,先研究下鎖的使用node

Lock 接口的定義安全

public interface Lock {

     // 獲取鎖,若當前lock被其餘線程獲取;則此線程阻塞等待lock被釋放
     // 若是採用Lock,必須主動去釋放鎖,而且在發生異常時,不會自動釋放鎖
    void lock();

    // 獲取鎖,若當前鎖不可用(被其餘線程獲取);
    // 則阻塞線程,等待獲取鎖,則這個線程可以響應中斷,即中斷線程的等待狀態
    void lockInterruptibly() throws InterruptedException;

    // 來嘗試獲取鎖,若是獲取成功,則返回true;
    // 若是獲取失敗(即鎖已被其餘線程獲取),則返回false
    // 也就是說,這個方法不管如何都會當即返回
    boolean tryLock();

    // 在拿不到鎖時會等待必定的時間
    // 等待過程當中,能夠被中斷
    // 超過期間,依然獲取不到,則返回false;不然返回true
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    // 釋放鎖
    void unlock();

    // 返回一個綁定該lock的Condtion對象
    // 在Condition#await()以前,鎖會被該線程持有
    // Condition#await() 會自動釋放鎖,在wait返回以後,會自動獲取鎖
    Condition newCondition();
}

1. ReentrantLock

可重入鎖。jdk中ReentrantLock是惟一實現了Lock接口的類多線程

可重入的意思是一個線程擁有鎖以後,能夠再次獲取鎖,併發

基本使用

最基本的使用場景,就是利用lock和unlock來實現線程同步框架

以輪班爲實例進行說明,要求一我的下班以後,另外一我的才能上班,即不能兩我的同時上班,具體實現能夠以下ide

public class LockDemo {

    private Lock lock = new ReentrantLock();

    private void workOn() {
        System.out.println(Thread.currentThread().getName() + ":上班!");
    }

    private void workOff() {
        System.out.println(Thread.currentThread().getName() + ":下班");
    }


    public void work() {
        try {
            lock.lock();
            workOn();
            System.out.println(Thread.currentThread().getName() 
              + "工做中!!!!");
            Thread.sleep(100);
            workOff();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        LockDemo lockDemo = new LockDemo();

        int i = 0;
        List<Thread> list = new ArrayList<>(30);
        do {
            Thread a = new Thread(new Runnable() {
                @Override
                public void run() {
                    lockDemo.work();
                }
            }, "小A_" + i);

            Thread b = new Thread(new Runnable() {
                @Override
                public void run() {
                    lockDemo.work();
                }
            }, "小B_" + i);


          list.add(a);
          list.add(b);
        } while (i++ < 10);
        list.parallelStream().forEach(Thread::start);

        Thread.sleep(3000);
        System.out.println("main over!");
    }
}

上面的示例,主要給出了lock()unlock() 的配套使用,當一個線程在上班遷嘗試獲取鎖,若是獲取到,則只有在下班以後纔會釋放鎖,保證在其上班的過程當中,不會有線程也跑來上崗搶飯碗,輸出以下源碼分析

小A_3:上班!
小A_3工做中!!!!
小A_3:下班
小A_1:上班!
小A_1工做中!!!!
小A_1:下班
// .... 省略部分
小B_7:上班!
小B_7工做中!!!!
小B_7:下班
小B_5:上班!
小B_5工做中!!!!
小B_5:下班
main over!

從基本的使用中,肯定lock的使用姿式通常以下:學習

  1. 建立鎖對象 Lock lock = new ReentrantLock()
  2. 在但願保證線程同步的代碼以前顯示調用 lock.lock() 嘗試獲取鎖,若被其餘線程佔用,則阻塞
  3. 執行完以後,必定得手動釋放鎖,不然會形成死鎖 lock.unlock(); 通常來說,把釋放鎖的邏輯,放在須要線程同步的代碼包裝外的finally塊中

即常見的使用姿式應該是測試

try {
  lock.lock();
  // .....
} finally {
  lock.unlock();
}

一個疑問,若沒被加鎖,僅只執行lock.unlock()是否會有問題?

測試以下

@Test
public void testLock() {
    Lock lock = new ReentrantLock();
    // lock.lock();
    // lock.unlock();
    lock.unlock();
    System.out.println("123");
}

執行以後,發現拋了異常(去掉上面的註釋,即加鎖一次,釋放鎖兩次也會拋下面異常)

exception

另外一個疑問,代碼塊中屢次上鎖,釋放鎖只一次,是否會有問題?

將前面的TestDemo方法稍稍改動一下

public void work() {
    try {
        lock.lock();
        workOn();
        lock.lock();
        System.out.println(Thread.currentThread().getName() 
          + "工做中!!!!");
        Thread.sleep(100);
        workOff();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}

再次執行,發現其餘線程都沒法再次獲取到鎖了,運行的gif圖以下

gif

所以能夠得出結論:

  1. lock() 和 unlock() 配套使用,不要出現一個比另外一個用得多的狀況
  2. 不要出現lock(),lock()連續調用的狀況,即二者之間沒有釋放鎖unlock()的顯示調用

2. Condtion 及 Lock的配合使用

在JDK的阻塞隊列中,不少地方就利用了Condition和Lock來實現出隊入隊的併發安全性,以 ArrayBlockingQueue爲例

內部定義了鎖,非空條件,非滿條件

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

public ArrayBlockingQueue(int capacity, boolean fair) {
    // ... 省略
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

出隊,入隊的實現以下(屏蔽一些與鎖無關邏輯)

// 入隊邏輯
public void put(E e) throws InterruptedException {
    // ...
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) 
        // 若是隊列已滿,則執行Condtion notFull的等待方法
        // 本線程會釋放鎖,等待其餘線程出隊以後,執行 notFull.singal()方法
            notFull.await();
        enqueue(e);
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

private void enqueue(E x) {
    // 入隊,notEmpty 條件執行,喚醒被 notEmpty.await() 阻塞的出隊線程
    notEmpty.signal();
}


// 出隊
public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
      while (count == 0)
          // 隊列爲空,線程執行notEmpty.wait(),阻塞並釋放鎖
          // 等待其餘入隊線程執行  notEmpty.signal(); 後被喚醒
          notEmpty.await();
      return dequeue();
  } finally {
      lock.unlock();
  }
}

private E dequeue() {
  // ...
  // 出隊,notFull 條件執行,喚醒被 notFull.await() 阻塞的入隊線程
  notFull.signal();
}

下面看下Condition的定義

public interface Condition {

    // 使當前線程處於等待狀態,釋放與Condtion綁定的lock鎖
    // 直到 singal()方法被調用後,被喚醒(若中斷,就game over了)
    // 喚醒後,該線程會再次獲取與條件綁定的 lock鎖
    void await() throws InterruptedException;

    // 相比較await()而言,不響應中斷
    void awaitUninterruptibly();

    // 在wait()的返回條件基礎上增長了超時響應,返回值表示當前剩餘的時間
    // < 0 ,則表示超時
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    // 同上,只是時間參數不一樣而已
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    // 同上,只是時間參數不一樣而已
    boolean awaitUntil(Date deadline) throws InterruptedException;

    // 表示條件達成,喚醒一個被條件阻塞的線程
    void signal();

    // 喚醒全部被條件阻塞的線程。
    void signalAll();
}

經過上面的註釋,也就是說Condtion通常是與Lock配套使用,應用在多線程協同工做的場景中;即一個線程的執行,指望另外一個線程執行完畢以後才完成

針對這種方式,咱們寫個測試類,來實現累加,要求以下:

  1. 線程1實現 start-middle的累加;線程2實現middle-end的累加
  2. 線程3實現線程1和線程2計算結果的相加

上面這種狀況下,線程3的執行,要求線程1和線程2都執行完畢

說明:下面實現只是爲了演示Condition和Lock的使用,上面這種場景有更好的選擇,如Thread.join()或者利用Fork/Join都更加優雅

public class LockCountDemo {

    private int start = 10;
    private int middle = 90;
    private int end = 200;

    private volatile int tmpAns1 = 0;
    private volatile int tmpAns2 = 0;

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private AtomicInteger count = new AtomicInteger(0);


    private int add(int i, int j) {
        try {
            lock.lock();
            int sum = 0;
            for (int tmp = i; tmp < j; tmp++) {
                sum += tmp;
            }
            return sum;
        } finally {
            atomic();
            lock.unlock();
        }
    }


    private int sum() throws InterruptedException {
        try {
            lock.lock();
            condition.await();
            return tmpAns1 + tmpAns2;
        } finally {
            lock.unlock();
        }
    }

    private void atomic() {
        if (2 == count.addAndGet(1)) {
            condition.signal();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        LockCountDemo demo = new LockCountDemo();
        Thread thread1 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + 
                " : 開始執行");
            demo.tmpAns1 = demo.add(demo.start, demo.middle);
            System.out.println(Thread.currentThread().getName() +
                    " : calculate ans: " + demo.tmpAns1);
        }, "count1");

        Thread thread2 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() +
                " : 開始執行");
            demo.tmpAns2 = demo.add(demo.middle, demo.end + 1);
            System.out.println(Thread.currentThread().getName() +
                    " : calculate ans: " + demo.tmpAns2);
        }, "count2");

        Thread thread3 = new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() +
                    " : 開始執行");
                int ans = demo.sum();
                System.out.println("the total result: " + ans);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "sum");


        thread3.start();
        thread1.start();
        thread2.start();

        Thread.sleep(3000);
        System.out.println("over");
    }
}

輸出以下

sum : 開始執行
count2 : 開始執行
count1 : 開始執行
count1 : calculate ans: 3960
the total result: 20055
count2 : calculate ans: 16095
over

小結Condition的使用:

  • Condition與Lock配套使用,經過 Lock#newConditin() 進行實例化
  • Condition#await() 會釋放lock,線程阻塞;直到線程中斷or Condition#singal()被執行,喚醒阻塞線程,並從新獲取lock
  • 經典case能夠參考jdk的阻塞隊列實現(ArrayBlockingQueue, LinkedBlockingQueue)

II. ReentrantLock實現原理

1. AbstractQueuedSynchronizer (簡稱AQS)

AQS是一個用於構建鎖和同步容器的框架。事實上concurrent包內許多類都是基於AQS構建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解決了在實現同步容器時設計的大量細節問題

AQS使用一個FIFO的隊列表示排隊等待鎖的線程,隊列頭節點稱做「哨兵節點」或者「啞節點」,它不與任何線程關聯。其餘的節點與等待線程關聯,每一個節點維護一個等待狀態waitStatus

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    //取值爲 CANCELLED, SIGNAL, CONDITION, PROPAGATE 之一
    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    // Link to next node waiting on condition, 
    // or the special value SHARED
    volatile Thread thread;

    Node nextWaiter;
}

artch

2. lock()實現

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

源碼分析:

  • 首先用一個CAS操做,判斷state是不是0(表示當前鎖未被佔用),
    • 若是是0則把它置爲1,而且設置當前線程爲該鎖的獨佔線程,表示獲取鎖成功
  • 當多個線程同時嘗試佔用同一個鎖時,CAS操做只能保證一個線程操做成功,其餘被阻塞

acquire的邏輯以下:

tyrAcquire: 嘗試獲取鎖,非阻塞當即返回

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { // 沒有線程佔用鎖
        if (compareAndSetState(0, acquires)) {
          // 佔用鎖成功,設置爲獨佔
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        // 本線程以前已經獲取到鎖
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新重入次數
        setState(nextc);
        return true;
    }
    return false;
}

非公平鎖tryAcquire的流程是:

  • 檢查state字段,若爲0,表示鎖未被佔用,那麼嘗試佔用
  • 若不爲0,檢查當前鎖是否被本身佔用,若被本身佔用,則更新state字段,表示重入鎖的次數。
  • 若是以上兩點都沒有成功,則獲取鎖失敗,返回false

addWaiter: 入隊

private Node addWaiter(Node mode) {
    // 建立一個節點,並將其掛在鏈表的最後
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    
    // 尾節點爲空,說明隊列還未初始化,須要初始化head節點併入隊新節點
    enq(node);
    return node;
}

acquireQueued掛起

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // /標記線程是否被中斷過
        boolean interrupted = false;
        for (;;) {
            //獲取前驅節點
            final Node p = node.predecessor();
            // 若該節點爲有效的隊列頭(head指向的Node內部實際爲空)
            // 嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 獲取成功,將當前節點設置爲head節點
                p.next = null; // help GC
                failed = false;
                //返回是否被中斷過
                return interrupted;
            }
            // 判斷獲取失敗後是否能夠掛起,若能夠則掛起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                 // 線程若被中斷,設置interrupted爲true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

線程掛起的邏輯

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //前驅節點的狀態
    if (ws == Node.SIGNAL) // 前驅節點狀態爲signal,返回true
        return true;
    if (ws > 0) { 
    // 從隊尾向前尋找第一個狀態不爲CANCELLED的節點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 將前驅節點的狀態設置爲SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

線程入隊後可以掛起的前提是,它的前驅節點的狀態爲SIGNAL,它的含義是「Hi,前面的兄弟,若是你獲取鎖而且出隊後,記得把我喚醒!」。

shouldParkAfterFailedAcquire會先判斷當前節點的前驅狀態是否符合要求:

  • 若符合則返回true,而後調用parkAndCheckInterrupt,將本身掛起
  • 若是不符合,再看前驅節點是否>0(CANCELLED)
  • 如果那麼向前遍歷直到找到第一個符合要求的前驅
  • 若不是則將前驅節點的狀態設置爲SIGNAL

小結下lock()流程

  1. CAS判斷state是否爲0來表示鎖是否被佔用;若未被佔用,則獨佔鎖
  2. 不然,嘗試獲取鎖 acquire()
  3. 若嘗試獲取鎖成功(鎖就是被當前線程佔用的,重入計數+1便可;或者鎖正好被釋放)
  4. 獲取鎖失敗,則須要建立一個Node節點(包含了線程信息)入隊
  5. 掛起線程 acquireQueued, 掛起以前,會先嚐試獲取鎖,值有確認失敗以後,則掛起鎖,並設置前置Node的狀態爲SIGNAL(以保障在釋放鎖的時候,能夠保證喚醒Node的後驅節點線程)

3. unlock的實現

嘗試釋放鎖,成功,須要清楚各類狀態(計數,釋放獨佔鎖)

此外還須要額外判斷隊列下個節點是否須要喚醒,而後決定喚醒被掛起的線程;

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 嘗試釋放鎖
        Node h = head;
        if (h != null && h.waitStatus != 0)
        // 查看頭結點的狀態是否爲SIGNAL,若是是則喚醒頭結點的下個節點關聯的線程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // 計算釋放後state值
    if (Thread.currentThread() != getExclusiveOwnerThread())
        // 若是不是當前線程佔用鎖,那麼拋出異常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 鎖被重入次數爲0,表示釋放成功,清空獨佔線程
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

小結

  1. 建立鎖對象 Lock lock = new ReentrantLock()
  2. 在但願保證線程同步的代碼以前顯示調用 lock.lock() 嘗試獲取鎖,若被其餘線程佔用,則阻塞
  3. 執行完以後,必定得手動釋放鎖,不然會形成死鎖 lock.unlock(); 通常來說,把釋放鎖的邏輯,放在須要線程同步的代碼包裝外的finally塊中
  4. lock() 和 unlock() 配套使用,不要出現一個比另外一個用得多的狀況
  5. 不要出現lock(),lock()連續調用的狀況,即二者之間沒有釋放鎖unlock()的顯示調用
  6. Condition與Lock配套使用,經過 Lock#newConditin() 進行實例化
  7. Condition#await() 會釋放lock,線程阻塞;直到線程中斷or Condition#singal()被執行,喚醒阻塞線程,並從新獲取lock
  8. ReentrantLock#lock的流程圖大體以下

lock

參考

ReentrantLock實現原理詳解

掃描關注,java分享

QrCode

相關文章
相關標籤/搜索