死磕java concurrent包系列(六)基於AQS解析信號量Semaphore

Semaphore

以前分析AQS的時候,內部有兩種模式,獨佔模式和共享模式,前面的ReentrantLock都是使用獨佔模式,而Semaphore一樣做爲一個基於AQS實現的併發組件,它是基於共享模式實現的,咱們先看看它的使用場景前端

Semaphore共享鎖的基本使用

假設有20我的去銀行櫃面辦理業務,銀行只有3個櫃面,同時只能辦理三我的,若是基於這種有限的、咱們須要控制資源的狀況,使用Semaphore比較方便:node

public class SemaphoreTest {
  //排隊總人數
  private static final int COUNT =20;
  //只有三個櫃檯
  private static final Semaphore AVALIABLECOUNT = new Semaphore(3);

  public static void main(String[] args) {
    //建立一個線程池
    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(COUNT);
    BasicThreadFactory.Builder builder = new BasicThreadFactory.Builder().namingPattern("線程池");
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(COUNT, COUNT, 30L, TimeUnit.SECONDS, workQueue,
        builder.build());
    for (int i = 0; i < COUNT; i++) {
      final int count = i;
      //排隊的人都須要被服務,因此全部的人直接提交線程池處理
      threadPoolExecutor.execute(() -> {
        try {
          //使用acquire獲取共享鎖
          AVALIABLECOUNT.acquire();
          System.out.println(Thread.currentThread().getName());
          System.out.println("服務號"+count+"正在服務");
          Thread.sleep(1000);
        }catch (Exception e){
          System.out.println(e.getMessage());
        }
        finally {
          //獲取完了以後釋放資源
          AVALIABLECOUNT.release();
        }
      });
    }
    threadPoolExecutor.shutdown();
  }
}
複製代碼

輸出以下:咱們執行代碼,能夠發現每隔1秒幾乎同一時間出現3條線程訪,以下圖 設計模式

QQ20181212-142037.gif

Semaphore內部原理解析

Semaphore的內部結構

在深刻分析Semaphore的內部原理前先看看一張類圖結構 安全

image.png
這個結構和ReentrantLock基本上徹底一致,Semaphore內部一樣存在繼承自AQS的內部類Sync以及繼承自Sync的公平鎖(FairSync)和非公平鎖(NofairSync),從這點也足以說明Semaphore的內部實現原理也是基於AQS併發組件的。 在以前的文章中,咱們提到過,AQS是基礎組件,只負責核心併發操做,如加入或維護同步隊列,控制同步狀態等,而具體的加鎖和解鎖操做交由子類完成,所以子類Semaphore共享鎖的獲取與釋放須要本身實現,這兩個方法分別是獲取鎖的tryAcquireShared(int arg)方法和釋放鎖的tryReleaseShared(int arg)方法,這點從Semaphore的內部結構徹底能夠看出來。 咱們在調用Semaphore的方法時,其內部則是經過間接調用其內部類或AQS執行的。下面咱們就從Semaphore的源碼入手分析共享鎖實現原理,這裏先從非公平鎖入手。

非公平鎖的共享鎖

一樣的,咱們先看看構造方法:bash

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and the given fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     * @param fair {@code true} if this semaphore will guarantee
     *        first-in first-out granting of permits under contention,
     *        else {@code false}
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
複製代碼

咱們經過默認構造函數建立時,誕生的就是非公平鎖,接下來咱們看一下構造方法的入參permits的傳遞:併發

static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
          super(permits);
    }
   //調用父類Sync的nonfairTryAcquireShared
   protected int tryAcquireShared(int acquires) {
       return nonfairTryAcquireShared(acquires);
   }
}

複製代碼

在Sync中:函數

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        //直接將該值設置爲AQS中的state的值
        Sync(int permits) {
            setState(permits);
        }
複製代碼

因此Semaphore的入參permit直接傳入設置到AQS中的state中。 接下來咱們看看acquire()方法,咱們先通俗的解釋一下它的執行過程: 當一個線程請求到來時,state值表明的許可數,那麼請求線程將會得到同步狀態即對共享資源的訪問權,並更新state的值(通常是對state值減1),但若是請求線程過多,state值表明的許可數已減爲0,則請求線程將沒法獲取同步狀態,線程將被加入到同步隊列並阻塞,直到其餘線程釋放同步狀態(通常是對state值加1)纔可能獲取對共享資源的訪問權。 調用Semaphore的acquire()方法後將會調用到AQS的acquireSharedInterruptibly():oop

//Semaphore的acquire()
    public void acquire() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //判斷是否被中斷
        if (Thread.interrupted())
            throw new InterruptedException();
        //若是tryAcquireShared(arg)不小於0,則說明當前還有permit可被使用
        if (tryAcquireShared(arg) < 0)
            //若是許可被用完了,沒有剩餘許可 則加入同步隊列等待
            doAcquireSharedInterruptibly(arg);
    }
複製代碼

在acquireSharedInterruptibly()方法內部先進行了線程中斷的判斷,那麼先嚐試調用tryAcquireShared(arg)方法獲取同步狀態,若是此時許可獲取成功,那麼方法執行結束,若是獲取失敗,則說明沒有剩餘許可了,那麼調用doAcquireSharedInterruptibly(arg);方法加入同步隊列等待。 這裏的tryAcquireShared(arg)是個模板方法設計模式,AQS內部沒有提供具體實現,由子類實現,也就是有Semaphore內部本身實現,該方法在Semaphore內部非公平鎖的實現以下post

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                //remaining < 0說明許可已經供不該求了,這個時候進來的線程須要被阻塞
                //不然CAS操做更新avaliable的值,它表示剩餘的許可數
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
複製代碼

nonfairTryAcquireShared(int acquires)方法內部,先獲取state的值,並執行減法操做,獲得remaining值,它能夠理解爲剩餘的許可數,若是remaining<0,說明請求的許可數過大,此時直接返回一個負數的remaining;若是remaining大於0,說明還有剩餘的許可數,則能夠訪問共享資源,後續將被加入同步隊列(經過doAcquireSharedInterruptibly(arg))。 注意Semaphore的acquire()可能存在併發操做,所以nonfairTryAcquireShared()方法體內部採用死循環+無鎖(CAS)併發的操做保證對state值修改的安全性。 例如:假設permit值爲5,有多個線程併發accquire獲取許可,線程1運行時獲得的remainin是5-1=4,線程2運行時,獲得的remaining一樣是5-1=4,可是執行compareAndSetState時,線程2 更快一點,執行CAS操做:判斷state如今是否爲5,若是爲5,則CAS更新爲4. 這個時候線程1也執行CAS操做,判斷state如今是否爲5,發現不爲5,因此CAS失敗,這時候須要這個死循環去重試。ui

若是remaining大於0,說明還有剩餘的許可數,則能夠訪問共享資源,後續將被加入同步隊列,接下來看入隊的操做,這一部分與ReentrantLock差很少:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //使用SHARED類型建立共享模式的Node
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //獲取前序節點
                final Node p = node.predecessor();
                //若是前序節點是頭節點,說明本身的Node在隊列最前端,此時可能共享資源隨時被釋放
                //因此須要再次嘗試獲取共享資源
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //若是獲取共享資源成功
                    if (r >= 0) {
                        //已經獲取資源後,node已經沒有意義,因此清理head節點並傳播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //若是不是頭節點
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

在方法中,因爲當前線程沒有獲取同步狀態,所以建立一個共享模式類型(Node.SHARED)的結點並經過addWaiter(Node.SHARED)加入同步隊列,加入完成後,當前線程進入自旋狀態,首先判斷前驅結點是否爲head,若是是,那麼嘗試獲取同步狀態並返回r值,若是r大於0,則說明獲取同步狀態成功,將當前線程設置爲head並傳播,傳播指的是,通知後續結點繼續獲取同步狀態,到此return結束,獲取到同步狀態的線程將會執行原定的任務。

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        setHead(node);//設置爲頭結點
        /* 
         * 嘗試去喚醒隊列中的下一個節點,若是知足以下條件: 
         * 還有剩餘許可(propagate > 0), 
         * 或者h.waitStatus爲PROPAGATE(被上一個操做設置) 
         * 而且 
         *   下一個節點處於共享模式或者爲null。 
         * 
         * 這兩項檢查中的保守主義可能會致使沒必要要的喚醒,但只有在有
         * 有在多個線程爭取得到/釋放同步狀態時纔會發生,因此大多
         * 數狀況下會立馬得到須要的信號
         */  
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
            //喚醒後繼節點,由於是共享模式,因此容許多個線程同時獲取同步狀態
                doReleaseShared();
        }
    }

複製代碼

但若是前驅結點不爲head或前驅結點爲head並嘗試獲取同步狀態失敗(與),那麼調用shouldParkAfterFailedAcquire(p, node)方法判斷前驅結點的waitStatus值是否爲SIGNAL並調整同步隊列中的node結點狀態,若是返回true,那麼執行parkAndCheckInterrupt()方法,將當前線程掛起。 shouldParkAfterFailedAcquire方法與ReentrantLock中的一模一樣:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //獲取當前結點的等待狀態
        int ws = pred.waitStatus;
        //若是爲等待喚醒(SIGNAL)狀態則返回true
        if (ws == Node.SIGNAL)
            return true;
        //若是ws>0 則說明是結束狀態,
        //遍歷前驅結點直到找到沒有結束狀態的結點
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //若是ws小於0又不是SIGNAL狀態,說明是node是首次加入的線程
            //則將其前驅節點設置爲SIGNAL狀態。下次執行shouldParkAfterFailedAcquire方法時就
            //知足ws == Node.SIGNAL條件了
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

複製代碼

這個方法是AQS中的,若是不懂的話,能夠參考以前在ReentrantLock中也分析過:juejin.im/post/5c021b… 中自旋的部分。 到此,加入同步隊列的整個過程完成。

總結

在AQS中存在一個volatile變量state,當咱們建立Semaphore對象傳入許可數值時,最終會賦值給state,state的數值表明可同時操做共享數據的線程數量,每當一個線程請求(如調用Semaphored的acquire()方法)獲取同步狀態成功,state的值將會減小1,直到state爲0時,表示已沒有可用的許可數,也就是對共享數據進行操做的線程數已達到最大值,其餘後來線程將被阻塞,此時AQS內部會將線程封裝成共享模式的Node結點,加入同步隊列中等待並開啓自旋操做。只有當持有對共享數據訪問權限的線程執行完成任務並釋放同步狀態後,同步隊列中的對於的結點線程纔有可能獲取同步狀態並被喚醒執行同步操做,注意在同步隊列中獲取到同步狀態的結點將被設置成head並清空相關線程數據(畢竟線程已在執行也就沒有必要保存信息了),AQS經過這種方式便實現共享鎖,用圖表示以下:

1544670486895.jpg

##非公平鎖的釋放鎖 接下來看一下釋放鎖:

public void release() {
       sync.releaseShared(1);
}

//調用到AQS中的releaseShared(int arg) 
public final boolean releaseShared(int arg) {
       //調用子類Semaphore實現的tryReleaseShared方法嘗試釋放同步狀態
      if (tryReleaseShared(arg)) {
          doReleaseShared();
          return true;
      }
      return false;
  }
複製代碼

顯然Semaphore間接調用了AQS中的releaseShared(int arg)方法,經過tryReleaseShared(arg)方法嘗試釋放同步狀態,若是釋放成功,那麼將調用doReleaseShared()喚醒同步隊列中後繼結點的線程,tryReleaseShared(int releases)方法以下:

//在Semaphore的內部類Sync中實現的
protected final boolean tryReleaseShared(int releases) {
       for (;;) {
              //獲取當前state
             int current = getState();
             //釋放狀態state增長releases
             int next = current + releases;
             if (next < current) // overflow
                 throw new Error("Maximum permit count exceeded");
              //經過CAS更新state的值
             if (compareAndSetState(current, next))
                 return true;
         }
        }
複製代碼

邏輯很簡單,釋放同步狀態,更新state的值,一樣的,經過for死循環和CAS操做來保證線程安全問題,由於可能存在多個線程同時釋放同步狀態的場景。釋放成功後經過doReleaseShared()方法喚醒後繼結點。

private void doReleaseShared() {
    /* 
     * 若是頭節點的後繼節點須要喚醒,那麼執行喚醒  
     * 動做;若是不須要,將頭結點的等待狀態設置爲PROPAGATE保證   
     * 喚醒傳遞。另外,爲了防止過程當中有新節點進入(隊列),這裏必  
     * 需作循環,因此,和其餘unparkSuccessor方法使用方式不同  
     * 的是,若是(頭結點)等待狀態設置失敗,從新檢測。 
     */  
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            // 獲取頭節點對應的線程的狀態
            int ws = h.waitStatus;
            // 若是頭節點對應的線程是SIGNAL狀態,則意味着頭
            //結點的後繼結點所對應的線程須要被unpark喚醒。
            if (ws == Node.SIGNAL) {
                // 修改頭結點對應的線程狀態設置爲0。失敗的話,則繼續循環。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 喚醒頭結點h的後繼結點所對應的線程
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 若是頭結點發生變化,則繼續循環。不然,退出循環。
        if (h == head)                   // loop if head changed
            break;
    }
}


//喚醒傳入結點的後繼結點對應的線程
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
      if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);
       //拿到後繼結點
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node t = tail; t != null && t != node; t = t.prev)
              if (t.waitStatus <= 0)
                  s = t;
      }
      if (s != null)
          //喚醒該線程
          LockSupport.unpark(s.thread);
    }

複製代碼

顯然doReleaseShared()方法中經過調用unparkSuccessor(h)方法喚醒head的後繼結點對應的線程。這個方法在以前獲取資源時也會被調用:

if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }

複製代碼

兩種狀況下都是爲喚醒後繼節點,由於是共享模式,因此容許多個線程同時獲取同步狀態。釋放操做的過程仍是相對簡單些的,即嘗試更新state值,更新成功調用doReleaseShared()方法喚醒後繼結點對應的線程。

公平鎖的共享鎖

公平鎖的中的共享模式實現除了在獲取同步狀態時與非公平鎖不一樣外,其餘基本同樣:

static final class FairSync extends Sync {
        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                //這裏是重點,先判斷隊列中是否有結點再執行
                //同步狀態獲取。
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

相比之下,對於非公平鎖:
    final int nonfairTryAcquireShared(int acquires) {
         //使用死循環
         for (;;) {
             //每當有線程獲取共享資源時,就直接嘗試CAS操做
             int available = getState();
             int remaining = available - acquires;
             //判斷信號量是否已小於0或者CAS執行是否成功
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
         }
     }

複製代碼

從代碼中能夠看出,與非公平鎖tryAcquireShared(int acquires)方法實現的惟一不一樣是,在嘗試獲取同步狀態前,先調用了hasQueuedPredecessors()方法判斷同步隊列中是否存在結點,若是存在則返回-1,即將線程加入同步隊列等待,後續經過Node結構保證喚醒的順序。從而保證先到來的線程請求必定會先執行,也就是所謂的公平鎖。其餘操做,與前面分析的非公平鎖同樣。

總結

AQS做爲核心併發組件,它經過state值來控制對共享資源訪問的線程數,內部的Node有獨佔模式(EXCLUSIVE)和共享模式(SHARED):

  • 對於ReenTrantLock:state默認爲0,每次加鎖後state更新爲1,更新爲1以後若是還有線程嘗試獲取鎖,則加入同步隊列等待;每當線程釋放鎖時,再更新爲0並喚醒隊列中的線程
  • 對於Semaphore:State默認爲許可數,每當線程請求同步狀態成功,state值將會減1,若是超過限制數量的線程將被封裝共享模式的Node結點加入同步隊列封裝成獨佔模式(EXCLUSIVE)等待,直到其餘執行線程釋放同步狀態,纔有機會得到執行權,而每一個線程執行完成任務釋放同步狀態後,state值將會增長1,這就是共享鎖的基本實現模型。

AQS是採用模板方法的設計模式構建的,它做爲基礎組件,封裝的是核心併發操做,可是實現上分爲兩種模式,即共享模式(如Semaphore)與獨佔模式(如ReetrantLock,這兩個模式的本質區別在於多個線程能不能共享一把鎖),而這兩種模式的加鎖與解鎖實現方式是不同的,但AQS只關注內部公共方法實現並不關心外部不一樣模式的實現,因此提供了模板方法給子類使用:也就是說實現獨佔鎖,如ReentrantLock須要本身實現tryAcquire()方法和tryRelease()方法,而實現共享模式的Semaphore,則須要實現tryAcquireShared()方法和tryReleaseShared()方法,這樣作的好處是顯而易見的,不管是共享模式仍是獨佔模式,其基礎的實現都是同一套組件(AQS),只不過是加鎖解鎖的邏輯不一樣罷了,更重要的是若是咱們須要自定義鎖的話,也變得很是簡單,只須要選擇不一樣的模式實現不一樣的加鎖和解鎖的模板方法便可。 不論是ReentrantLock仍是Semaphore,公平鎖與非公平鎖的不一樣之處在於公平鎖會在線程請求同步狀態前,判斷同步隊列是否存在Node,若是存在就將請求線程封裝成Node結點加入同步隊列,從而保證每一個線程獲取同步狀態都是先到先得的順序執行的。非公平鎖則是經過競爭的方式獲取,無論同步隊列是否存在Node結點,只有經過競爭獲取就能夠獲取線程執行權。

相關文章
相關標籤/搜索