【Java併發工具類】Semaphore

前言

1965年,荷蘭計算機科學家Dijkstra提出的信號量機制成爲一種高效的進程同步機制。這以後的15年,信號量一直都是併發編程領域的終結者。1980年,管程被提出,成爲繼信號量以後的在併發編程領域的第二個選擇。目前幾乎全部的語言都支持信號量機制,Java也不例外。Java中提供了Semaphore併發工具類來支持信號量機制。下面咱們就來了解Java實現的信號量機制。
首先介紹信號量模型,而後介紹如何使用,最後使用信號量來實現一個限流器。html

信號量模型

信號量模型圖(圖來自參考[1]):java

image-20200215222026469

信號量模型總結爲:一個計數器、一個等待隊列和三個對外調用的方法。
計數器和等待隊列時對外透明的,全部咱們只能經過三個對外方法來訪問計數器和等待隊列。
init():設置計數器的初始值。
down():計數器的值減一。若是此時計數器的值小於0,則當前線程插入等待隊列並阻塞,不然當前線程能夠繼續執行。
up():計數器的值加一。若是此時計數器的值小於或者等於0,則喚醒等待隊列中的一個線程,並將其從等待隊列中移除。數據庫

這三個方法都是原子性的,由實現信號量模型的方法保證。在Java SDK中,信號量模型是由java.util.concurrent.Semaphore實現。編程

信號量模型代碼化大體相似以下:安全

class Semaphore{
    int count; // 計數器
    Queue queue; // 等待隊列
    
    // 初始化操做
    Semaphore(int c){
        this.count=c;
    }
    
    void down(){
        this.count--; // 計數器值減一
        if(this.count < 0){
            // 將當前線程插入等待隊列
            // 阻塞當前線程
        }
    }
    
    void up(){
        this.count++; // 計數器值加一
        if(this.count <= 0) {
            // 移除等待隊列中的某個線程T
            // 喚醒線程T
        }
    }
}

在信號量模型中,down()up()這兩個操做也被成爲P操做(荷蘭語proberen,測試)和V操做(荷荷蘭語verhogen,增長)。在我學的操做系統教材中(C語言實現),P操做對應wait(),V操做對應singal()。雖然叫法不一樣,可是語義都是相同的。在Java SDK併發包中,down()up()分別對應於Semaphore中的acquire()release()併發

如何使用信號量

信號量有時也被稱爲紅綠燈,咱們想一想紅綠燈時怎麼控制交通的,就知道該如何使用信號量。車輛路過十字路時,須要先檢查是否爲綠燈,若是是則通行,不然就等待。想一想和加鎖機制有點類似,都是同樣的操做,先檢查是否符合條件(「嘗試獲取」),符合(「獲取到」)則線程繼續運行,不然阻塞線程。app

下面使用累加器的例子來講明如何使用信號量。編程語言

count+=1操做是個臨界區,只容許一個線程執行,即要保證互斥。因而咱們在進入臨界區以前,使用down()即Java中的acquire(),在退出以後使用up()即Java中的release()。ide

static int count;
//初始化信號量
static final Semaphore s = new Semaphore(1); // 構造函數參數爲1,表示只容許一個線程進行臨界區。可實現一個互斥鎖的功能。
//用信號量保證互斥    
static void addOne() {
    s.acquire(); // 獲取一個許可(可看做加鎖機制中加鎖)
    try {
        count+=1;
    } finally {
        s.release(); // 歸還許可(可看作加鎖機制中解鎖)
    }
}

完整代碼以下:函數

package com.sakura.concrrent;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
    static int count;
    static final Semaphore s = new Semaphore(1);
    static void addOne() throws InterruptedException {
        //只會有一個線程將信號量中的計數器減爲1,而另一個線程只能將信號量中計數器減爲-1,致使被阻塞
        s.acquire();  
        try {
            count +=1;
            System.out.println("Now thread is " + Thread.currentThread() + "   and count is " + count);
        }finally {
            //進入臨界區的線程在執行完臨界區代碼後將信號量中計數器的值加1而後,此時信號量中計數器的值爲0,則從阻塞隊列中喚醒被阻塞的進程
            s.release();   
        }
    }

    public static void main(String[] args) {
        // 建立兩個線程運行
        MyThread thread1 = new MyThread();
        MyThread thread2 = new MyThread();

        thread1.start();
        thread2.start();
        System.out.println("main thread");

    }
}
class MyThread extends Thread{
    @Override
    public void run() {
        super.run();
        for(int i=0; i<10; i++) {                   
            try {
                SemaphoreTest.addOne();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

運行結果:

若是Semaphore的構造函數參數(許可數量,內置計數器的值)修改一下:

static final Semaphore s = new Semaphore(2);

計數器值的爲2,那麼就容許有兩個線程進入臨界區,咱們的count值就會出現問題

快速實現一個限流器

當設置信號量的計數器爲1時,可實現一個簡單的互斥鎖功能。可是,咱們前面剛介紹過Java SDK中的Lock,Semaphore的用途顯然不會與Lock一致,否則就重複造輪子了。Semaphore最重要的一個功能即是:能夠容許多個線程訪問一個臨界區。(上述例子咱們就設置了計數器的值爲2,可發現thread1和thread2均可進入臨界區。)

咱們會在什麼地方碰見這種需求呢?
各類池化資源,例如鏈接池、對象池、線程池等等。例如,數據庫鏈接池,在同一時刻,必定是容許多個線程同時使用鏈接池,固然,每一個鏈接在被釋放以前,是不容許其餘線程使用的。

咱們設計以下能夠容許N個線程使用的對象池,咱們將信號量的計數器值設爲N,就可讓N個線程同時進行臨界區,多餘的就會被阻塞。(代碼來自參考[1])

class ObjPool<T, R> {
    final List<T> pool;    //使用List保存實例對象
    // 用信號量實現限流器
    final Semaphore sem;
    
    // 構造函數
    ObjPool(int size, T t){
        pool = new Vector<T>(){}; 
        for(int i=0; i<size; i++){
            pool.add(t);
        }
        sem = new Semaphore(size);
    }
    
    // 獲取對象池的對象,調用 func
    R exec(Function<T,R> func) {
        T t = null;
        sem.acquire();    //容許N個進程同時進入臨界區
        try {
            //咱們須要注意,由於多個進行能夠進入臨界區,因此Vector的remove方法是線程安全的
            t = pool.remove(0);    
            return func.apply(t);    //獲取對象池匯中的一個對象後,調用func函數
        } finally {
            pool.add(t);    //離開臨界區以前,將以前獲取的對象放回到池中
            sem.release();    //使得計數器加1,若是信號量中計數器小於等於0,那麼說明有線程在等待,此時就會自動喚醒等待線程
        }
    }
}
// 建立對象池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);

// 經過對象池獲取 t,以後執行  
pool.exec(t -> {
    System.out.println(t);
    return t.toString();
});

小結

記得學習操做系統時,信號量類型分爲了好幾種整型信號量、記錄型信號量、AND信號量以及「信號量集」(具體瞭解可戳參考[2])。我認爲Java SDK中Semaphore應該是記錄型信號量的實現。不禁想起,編程語言是對OS層面操做的一種抽象描述。這句話須要品須要細細品。

參考: [1] 極客時間專欄王寶令《Java併發編程實戰》 [2] 靜水深流.操做系統之信號量機制總結.https://www.cnblogs.com/IamJiangXiaoKun/p/9464336.html

相關文章
相關標籤/搜索