多線程之ReentrantLock篇(五)

昨天有說事後面講ReentrantLock,今天咱們這篇幅就全局的講解下,咱們在Lock出來前,解決併發問題沒得選只能用Synchronized。java

一.ReentrantLock PK synchronized

      (1)synchronized是獨佔鎖,加鎖和解鎖的過程自動進行,易於操做,但不夠靈活。ReentrantLock也是獨佔鎖,加鎖和解鎖的過程須要手動進行,不易操做,但很是靈活。node

      (2)synchronized可重入,由於加鎖和解鎖自動進行,沒必要擔憂最後是否釋放鎖;ReentrantLock也可重入,但加鎖和解鎖須要手動進行,且次數需同樣,不然其餘線程沒法得到鎖。多線程

      (3)synchronized不可響應中斷,一個線程獲取不到鎖就一直等着;ReentrantLock能夠相應中斷。併發

ReentrantLock好像比synchronized關鍵字沒好太多,咱們再去看看synchronized所沒有的,一個最主要的就是ReentrantLock還能夠實現公平鎖機制。什麼叫公平鎖呢?也就是在鎖上等待時間最長的線程將得到鎖的使用權。通俗的理解就是誰排隊時間最長誰先執行獲取鎖。app

 Lock接口的一些方法:ui

 

 

  1. lock():是最經常使用的獲取鎖的方法,若鎖被其餘線程獲取,則等待(阻塞)。 
  2.  lockInterruptibly():獲取鎖,若是鎖可用則線程繼續執行;若是鎖不可用則線程進入阻塞狀態,此時能夠在其它線程執行時調用這個線程的interrupt方法打斷它的阻塞狀態。
  3. tryLock():嘗試非阻塞地獲取鎖,當即返回。獲取成功返回true;獲取失敗返回false,但不會阻塞。 (這個方法比synchronized好)
  4. tryLock(long time, TimeUnit unit):阻塞嘗試鎖。參數表明時長,在指定時長內嘗試鎖。
  5. unlock():若是沒有獲取鎖標記就放鎖,會拋出異常。

  二.  Lock實現類介紹

        1.ReentrantLock(重入鎖)

public class ReentrantLockDemo {

    private static int count=0;

    //重入鎖(如何實現的?)
    static Lock lock=new ReentrantLock();

    public static void inc(){
        lock.lock(); //得到鎖(互斥鎖) ThreadA 得到了鎖
        try {
            Thread.sleep(1);
            count++;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();//釋放鎖 ThreadA釋放鎖  state=1-1=0
        }
    }

    public static void main(String[] args) throws InterruptedException {


        for (int i = 0; i < 1000; i++) {

            new Thread(()-> ReentrantLockDemo.inc()).start();
        }
        Thread.sleep(4000);
        System.out.println("result:"+count);
    }
}

  2.ReentrantReadWriteLock(重入讀寫鎖)this

          讀多寫少的狀況下,讀和讀不互斥,讀和寫互斥,寫和寫互斥spa

public class ReentrantReadWriteLockDemo {
    static Map<String,Object> cacheMap=new HashMap<>();
    static ReentrantReadWriteLock rwl=new ReentrantReadWriteLock();
    static Lock read=rwl.readLock();
    static Lock write=rwl.writeLock();

    public static Object get(String key){
        read.lock(); //讀鎖 ThreadA 阻塞
        try{
            return cacheMap.get(key);
        }finally {
            read.unlock(); //釋放讀鎖
        }
    }
    public static Object write(String key,Object value){
        write.lock(); //Other Thread 得到了寫鎖
        try{
            return cacheMap.put(key,value);
        }finally {
            write.unlock();
        }
    }
}

 三.思考鎖的實現

      關於鎖咱們講了不少,也寫了不少案例,下面咱們就底層是怎麼實現鎖的機制來進行一個猜測設計而後帶着咱們的猜測去看大佬們的源碼是否是和咱們的猜測同樣:       線程

        1.首先鎖的互斥的原理是多個線程訪問同一個共享資源只有一個能進去訪問,咱們這裏要分析鎖的互斥特性是怎麼實現的:要實現互斥性首先咱們要有一個共享變量,而後在設計時用一個狀態來標記共享資源的狀態(例如0,1)設計

        2.沒有搶佔到鎖的線程怎麼玩,沒有搶到鎖的線程就要阻塞等待,想到等待就很容易想起前面篇幅講的wait(等待、喚醒),可是這裏不是用wait由於wait/notify不能喚醒指定的線程,因此咱們想到了另外一個方案,LockSupport.park()

        3.等待中的線程是怎麼存儲的,這裏面想到的是雙向鏈表

        4.公平和非公平(可否插隊)

        5.鎖的重入的特性(識別是不是同一個線程)重入次數能夠用數字累加

下面咱們就lock.lock(); 是怎麼實現的進行深刻分析下:l

   咱們在多線程訪問lock.lock()方法時若是獲取lock權限的線程就能夠向下執行,沒有獲取權限的線程就會阻塞,這個方向是大方向

 

 

 下面咱們就lock.lock()方法裏面作了什麼事情,首先看到他調用了sync.lock();

 

 咱們看下類的關係圖,其中ReentantLock是Lock的一個實現咱們從下面關係圖片中能夠看出ReentrantLockK中定義了一個sync

 

 咱們能夠看到Sync是一個靜態的抽像內部類,他繼承了AbstractQueuedSynchronizer

我 們回退到sync.lock();方法,他實現了兩種鎖,一種是共平鎖一種是非公平鎖,類關係圖以下

 

 在sync.lock()中默認是非公平鎖,那麼咱們在sync.lock()中進入NonfairSync方法中,首先他進來第一件事是搶佔資源,在這裏的判斷compareAndSetState保證了多線程下的原子性,這裏的compareAndSetState判斷是採用了樂觀鎖機制來進行加鎖,在不少源碼中都有用到CAS操做,其中expect是預期值,update是更改值,這個操做是直接跟內存交互,這樣作的好處是保證只有一個線程能進入,進入後操做setExclusiveOwnerThread(Thread.currentThread());保存當前線程

 

 咱們進入他的判斷方法共享資源compareAndSetState中看下他是怎麼修改預期值的,stateOffset是當前state屬性成員在內存中的偏移量,他會經過內存中的偏移量去拿到內存中的值 和咱們的預期值對比,若是相等就修改,這裏面設計的好處是直接跟內存交互,不讓咱們java代碼操做,能夠在java層面解決多線程問題

 

 

 

 上面圖片是線程搶佔成功的邏輯,其它線程搶佔失敗就走下面acquire(1)的邏輯了,這個acquire邏輯是由AQS來實現的;

  • ! tryAcquire(arg)
  • addWaiter 將未得到鎖的線程加入到隊列
  • acquireQueued(); 去搶佔鎖或者阻塞. 

 

咱們先看下tryAcquire(arg)的實現,咱們選擇它NonfairSync實現

 

這下面的邏輯是繼續去搶佔鎖的邏輯,

 final boolean nonfairTryAcquire(int acquires) {
//獲取當前線程 final Thread current = Thread.currentThread();
//判斷其狀態 int c = getState();
//條件成立表示無鎖, if (c == 0) {
//無鎖的操做必定要變成CAS操做,由於修改自己存在原子性問題 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
// 這裏面是判斷重入的,判斷當前線程和咱們有鎖的線程是否相等 else if (current == getExclusiveOwnerThread()) {
//若是相等就加一個次數 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded");
// 由於當前是有鎖狀態,因此不用再用CAS操做 setState(nextc); return true; } return false; }

  條件! tryAcquire(arg)不成立就會進入acquireQueued(addWaiter(Node.EXCLUSIVE), arg))判斷中來,將未得到鎖的線程加入到隊列;addWaiter是作一個鏈表而後加入acquireQueued中進行循環的判斷;Node.EXCLUSIVE表示節點互斥的一個特性;咱們進入addWaiter方法

private Node addWaiter(Node mode) {
//進來第一件事是先構造一個節點,這個節點會先把當前線程和mode(表示獨佔)傳進來,若是有多個線程沒有搶到鎖那就有多個線程進入這個方法,也就表明了有多個Node節點 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure
//這裏會拿到一個tail節點,tail表示尾部節點,通常鏈表都會有一個頭節點Head和尾節點Tail,這一步的頭尾節點尚未初始化,仍是空指向 Node pred = tail;
if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }
//第一次進來尾節點必定是空的,因此第一次進來是走enq方法 enq(node); return node; }

  咱們進入enq(node)方法

private Node enq(final Node node) {
//經過自旋的方式進行FOR循環 for (;;) {
//獲得一個尾節點,此時尾節點仍是空 Node t = tail; if (t == null) { // Must initialize
//初始化一個空的Node節點,這個compareAndSetHead只有在空的狀況下才會替換,CAS保證只有一個線程能替換成功 if (compareAndSetHead(new Node()))
//將頭和尾都指向這個剛剛初始化的空節點,到這一步的時序圖如圖一;這一步完成後初始化就完成了,而後進入下一次循環t就不爲空了走else邏輯 tail = head; } else {
//node表示當前進來的線程,咱們假設是B線程進來了,此時由於t不爲空了,因此當前線程的prev指向空的Node節點 node.prev = t; if (compareAndSetTail(t, node)) {
//操做尾部節點t.next表示上一個節點的指向指向當前節點,這樣一個雙向鏈表就造成了,在多個for循環後的時序圖就如圖二 t.next = node; return t; } } } }

  

圖一

 圖二

 

addWaiter(Node.EXCLUSIVE), arg)代碼執行完成後,他會把參數返回添加到acquireQueued裏面去,咱們進入acquireQueued,這裏面必定會作的一件事就是阻塞列表中的線程

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
//又是自旋 for (;;) {
// 假設這裏面的node是咱們線程B的話,他的predecessor()方法能夠點進去看下,會發現是當前線程的prev,由上面時序圖會發現其實就是Head節點 final Node p = node.predecessor();
//若是頭節點是head節點就會去搶佔一次鎖,成功就得到鎖,失敗走下面 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; }
//是否要掛起一個線程,咱們進入shouldParkAfterFailedAcquire方法 if (shouldParkAfterFailedAcquire(p, node) &&
//parkAndCheckInterrupt是掛起(阻塞) parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

  阻塞狀態是沒有必要去搶佔鎖的,下面就是經過判斷是否是偏鎖狀態來決定要不要去釋放鎖,若是是偏鎖就釋放鎖

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//等待狀態,若是線程出現異常會出來偏鎖狀態 int ws = pred.waitStatus;
//SIGNAL是喚醒狀態成立就能夠放心掛起(-1) if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true;
//偏鎖狀態ws會大於o if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do {
//將取消狀態的移除節點 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */
//替換節點狀態改爲SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

  

 

 

 前面的掛起完成後表明lock.lock()方法執行完成了,接下來咱們就講下lock.unlock()釋放鎖的過程,這時候釋放鎖是線程A來釋放鎖,咱們來看lock.unlock()的ReentrantLock實現

 

public final boolean release(int arg) {
//進入tryRelease方法 if (tryRelease(arg)) {
Node h = head; if (h != null && h.waitStatus != 0)
//重置信息完成後會經過下面方法進行喚醒阻塞線程 unparkSuccessor(h); return true; } return false; }

  

protected final boolean tryRelease(int releases) {
//將state恢復原有值 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true;
//若是恰好c==0就釋放線程並把線程清空,如圖三 setExclusiveOwnerThread(null); } setState(c); return free; }

  

 

 圖三

咱們進入unparkSuccessor方法中

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
//若是成立 if (ws < 0)
//先恢復成初始狀態 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
//獲取下一個節點 Node s = node.next;
//若是下個節點爲空,則除去無效節點 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; }
if (s != null)
// 喚醒下一個節點,喚醒後的線程又要搶佔鎖又會進入前面的acquireQueued方法進行自旋,搶佔失敗的線程又要掛起
//喚醒完成後喚醒的線會去執行代碼程序
LockSupport.unpark(s.thread);
}

  

相關文章
相關標籤/搜索