Java併發之同步器設計

Java併發以內存模型瞭解到多進程(線程)讀取共享資源的時候存在競爭條件java

共享資源

計算機中經過設計同步器來協調進程(線程)之間執行順序。同步器做用就像登機安檢人員同樣能夠協調旅客按順序經過。node

在Java中,同步器能夠理解爲一個對象,它根據自身狀態協調線程的執行順序。好比鎖(Lock),信號量(Semaphore),屏障(CyclicBarrier),阻塞隊列(Blocking Queue)。編程

這些同步器在功能設計上有所不一樣,可是內部實現上有共通的地方。segmentfault

同步器

同步器的設計通常包含幾個方面:狀態變量設計(同步器內部狀態)訪問條件設定狀態更新等待方式通知策略多線程

同步器構造

訪問條件是控制線程是否能執行(訪問共享對象)的條件,它每每與狀態變量緊密相關。而通知策略線程釋放鎖定狀態後通知其它等待線程的方式,通常有如下幾種狀況併發

  1. 通知全部等待的線程。
  2. 通知1個隨機的N個等待線程。
  3. 通知1個特定的N個等待線程

看下面例子,經過方式的同步器學習

public class Lock{
  // 狀態變量 isLocked
  private boolean isLocked = false; 
  public synchronized void lock() throws InterruptedException{
    // 訪問條件 當isLocked=false 時得到訪問權限不然等待
    while(isLocked){
      // 阻塞等待
      wait();
    }
    //狀態更新 線程得到訪問權限
    isLocked = true;
  }
  
  public synchronized void unlock(){
    //狀態更新 線程釋放訪問權限
    isLocked = false;
    // 通知策略 object.notify | object.notifyAll
    notify(); 
  }
}

咱們用計數信號量控制同時執行操做活動數。這裏模擬一個鏈接池。fetch

public class PoolSemaphore {
      // 狀態變量 actives 計數器
    private int actives = 0;
    private int max;
    public PoolSemaphore(int max) {
        this.max = max;
    }
    public synchronized void acquire() throws InterruptedException {
        //訪問條件 激活數小於最大限制時,得到訪問權限不然等待
        while (this.actives == max) wait();
        //狀態更新 線程得到訪問權限
        this.actives++;
        // 通知策略 object.notify | object.notifyAll
        this.notify();
    }
    public synchronized void release() throws InterruptedException {
        //訪問條件 激活數不爲0時,得到訪問權限不然等待
        while (this.actives == 0) wait();
         //狀態更新 線程得到訪問權限
        this.actives--;
        // 通知策略 object.notify | object.notifyAll
        this.notify();
    }
}

原子指令

同步器設計裏面,最重要的操做邏輯是「若是知足條件,以更新狀態變量來標誌線程得到釋放訪問權限」,該操做應具有原子性ui

好比test-and-set 計算機原子指令,意思是進行條件判斷知足則設置新值this

function Lock(boolean *lock) { 
    while (test_and_set(lock) == 1); 
}

另外還有不少原子指令 fetch-and-add compare-and-swap,注意這些指令需硬件支持纔有效。

同步操做中,利用計算機原子指令,能夠避開鎖,提高效率。java中沒有 test-and-set 的支持,不過 java.util.concurrent.atomic 給咱們提供了不少原子類API,裏面支持了 getAndSetcompareAndSet 操做。

看下面例子,主要在區別是等待方式不同,上面是經過wait()阻塞等待,下面是無阻塞循環

public class Lock{
  // 狀態變量 isLocked
  private AtomicBoolean isLocked = new AtomicBoolean(false);
  public void lock() throws InterruptedException{
    // 等待方式 變爲自旋等待
    while(!isLocked.compareAndSet(false, true));
    //狀態更新 線程得到訪問權限
    isLocked.set(true);
  }
  
  public synchronized void unlock(){
    //狀態更新 線程釋放訪問權限
    isLocked.set(false);
  }
}

關於阻塞擴展說明

阻塞意味着須要將進程線程狀態進行轉存,以便還原後恢復執行。這種操做是昂貴繁重,而線程基於進程之上相對比較輕量。線程的阻塞在不一樣編程平臺實現方式也有所不一樣,像Java是基於JVM運行,因此它由JVM完成實現。

在《Java Concurrency in Practice》中,做者提到

競爭性同步可能須要OS活動,這增長了成本。當爭用鎖時,未獲取鎖的線程必須阻塞。 JVM能夠經過 旋轉等待(反覆嘗試獲取鎖直到成功)來實現阻塞,也能夠經過操做 系統掛起阻塞的線程來實現阻塞。哪一種效率更高取決於上下文切換開銷與鎖定可用以前的時間之間的關係。對於短暫的等待,最好使用自旋等待;對於長時間的等待,最好使用暫停。一些JVM基於對過去等待時間的分析數據來自適應地在這二者之間進行選擇,可是大多數JVM只是掛起線程等待鎖定。

從上面能夠看出JVM實現阻塞兩種方式

  • 旋轉等待(spin-waiting),簡單理解是不暫停執行,以循環的方式等待,適合短期場景。
  • 經過操做系統掛起線程。

JVM中經過 -XX: +UseSpinning 開啓旋轉等待, -XX: PreBlockSpi =10指定最大旋轉次數。

useSpinning.png

AQS

AQS是AbstractQueuedSynchronizer簡稱。本節對AQS只作簡單闡述,並不全面。

java.util.concurrent包中的 ReentrantLock,CountDownLatch,Semaphore,CyclicBarrier等都是基因而AQS同步器實現。

狀態變量 是用 int state 來表示,狀態的獲取與更新經過如下API操做。

int getState()
    void setState(int newState)
    boolean compareAndSetState(int expect, int update)

該狀態值在不一樣API中有不一樣表示意義。好比ReentrantLock中表示持有鎖的線程獲取鎖的次數Semaphore表示剩餘許可數。

關於等待方式通知策略的設計

AQS經過維護一個FIFO同步隊列(Sync queue)來進行同步管理。當多線程爭用共享資源時被阻塞入隊。而線程阻塞與喚醒是經過 LockSupport.park/unpark API實現。

它定義了兩種資源共享方式。

  • Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)
  • Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)

每一個節點包含waitStatus(節點狀態),prev(前繼),next(後繼),thread(入隊時線程),nextWaiter(condition隊列的後繼節點)

線程等待隊列

waitStatus 有如下取值。

  • CANCELLED(1) 表示線程已取消。當發生超時或中斷,節點狀態變爲取消,以後狀態再也不改變。
  • SIGNAL(-1) 表示後繼節點等待前繼的喚醒。後繼節點入隊時,會將前繼狀態更新爲SIGNAL。
  • CONDITION(-2) 表示線程在Condition queue 裏面等待。當其餘線程調用了Condition.signal()方法後,CONDITION狀態的節點將從 Condition queue 轉移到 Sync queue,等待獲取鎖。
  • PROPAGATE(-3) 在共享模式下,當前節點釋放後,確保有效通知後繼節點。
  • (0) 節點加入隊列時的默認狀態。

AQS 幾個關鍵 API

  • tryAcquire(int) 獨佔方式下,嘗試去獲取資源。成功返回true,不然false。
  • tryRelease(int) 獨佔方式下,嘗試釋放資源,成功返回true,不然false。
  • tryAcquireShared(int) 共享方式下,嘗試獲取資源。返回負數爲失敗,零和正數爲成功並表示剩餘資源。
  • tryReleaseShared(int) 共享方式下,嘗試釋放資源,若是釋放後容許喚醒後續等待節點返回true,不然false。
  • isHeldExclusively() 判斷線程是否正在獨佔資源。

acquire(int arg)

public final void acquire(int arg) {
        if (
          // 嘗試直接去獲取資源,若是成功則直接返回
          !tryAcquire(arg)
            &&
            //線程阻塞在同步隊列等待獲取資源。等待過程當中被中斷,則返回true,不然false
            acquireQueued(
              // 標記該線程爲獨佔方式,並加入同步隊列尾部。
              addWaiter(Node.EXCLUSIVE), arg) 
           )
            selfInterrupt();
    }

release(int arg)

public final boolean release(int arg) {
          // 嘗試釋放資源
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
              // 喚醒下一個線程(後繼節點)
              unparkSuccessor(h);
            return true;
        }
        return false;
    }
private void unparkSuccessor(Node node) {
        ....
              Node s = node.next; // 找到後繼節點
        if (s == null || s.waitStatus > 0) {//無後繼或節點已取消
            s = null;
           // 找到有效的等待節點 
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); // 喚醒線程
    }

總結

本文記錄併發編程中同步器設計的一些共性特徵。並簡單介紹了Java中的AQS。

歡迎你們留言交流,一塊兒學習分享!!!

相關文章
相關標籤/搜索