Java多線程Semaphore和CountDownLatch

我的博客項目地址前端

但願各位幫忙點個star,給我加個小星星✨java


最近忙着畢設,要作前端,因此看更多的是React的知識,小後端🐶仍是要繼續學習總結,否則就要沒(tou)時(lan)間繼續寫了。git

Semaphore

中文含義是信號量,它是synchronized的升級版。github

synchronized 關鍵字,表明這個方法加鎖,至關於無論哪個線程(例如線程A),運行到這個方法時,都要檢查有沒有其它線程B(或者C、 D等)正在用這個方法(或者該類的其餘同步方法),有的話要等正在使用synchronized方法的線程B(或者C 、D)運行完這個方法後再運行此線程A,沒有的話,鎖定調用者,而後直接運行。它包括兩種用法:synchronized 方法和 synchronized 塊。(度娘解釋)後端

Semaphore主要的做用是控制線程的併發數,若是單純的使用synchronized是不能實現的。併發

簡單看

Semaphore.java
/** * A counting semaphore. Conceptually, a semaphore maintains a set of * permits. Each {@link #acquire} blocks if necessary until a permit is * available, and then takes it. Each {@link #release} adds a permit, * potentially releasing a blocking acquirer. * However, no actual permit objects are used; the {@code Semaphore} just * keeps a count of the number available and acts accordingly. * 簡單的來說,信號量維持了一組許可證(permits),每次調用的時候,須要獲取permit才能進行進行操做。 每一個線程調用semaphore.acquire()獲取一個許可證後,就會減小許可證數量。 當沒有許可證的時候,線程會阻塞,直到以前得到許可證的線程操做完釋放才能進行。 複製代碼

Semaphore結構

/** * Creates a {@code Semaphore} with the given number of * permits and nonfair fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /** * Creates a {@code Semaphore} with the given number of * permits and the given fairness setting. * * @param permits the initial number of permits available. * This value may be negative, in which case releases * must occur before any acquires will be granted. * @param fair {@code true} if this semaphore will guarantee * first-in first-out granting of permits under contention, * else {@code false} */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
複製代碼

兩個構造方法: 參數permit是許可的意思,表明在同一時間內,最多容許多少個線程同時執行acquire()和release()之間的代碼,Semaphore發放許可的操做是減法操做。socket

參數fair,表示內部使用的是FairSync(公平鎖)或者NonfairSync(非公平鎖),表示每次線程獲取鎖的機會是不是公平的,具體的能夠看底層實現,繼承自AbstractQueuedSynchronizer,經過state判斷是不是加鎖狀態,state 爲0,表示鎖未被獲取,不爲0,表示已被獲取。ide

如何用

第一種,同步的執行一個任務(與Synchronized類似)

//多個線程裏,保持一份信號量
private Semaphore semaphore = new Semaphore(1);

//線程中調用
public void testMethod() {
        try {
            //獲取鎖
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() +
                    "begin Time" + System.currentTimeMillis());
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() +
                    "end Time" + System.currentTimeMillis());

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //釋放鎖
            semaphore.release();
        }
    }
複製代碼

這種比較簡單,就不深刻展現了,要看的是在多個線程下如何控制併發數量。工具

第二種,控制線程併發數量

首先是執行類學習

public class TestService {
    //許可證數量爲2
    private Semaphore semaphore = new Semaphore(2);

    public void testMethod() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() +
                    "begin Time" + System.currentTimeMillis());
            //停頓5秒
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() +
                    "end Time" + System.currentTimeMillis());

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            semaphore.release();
        }
    }
}
複製代碼

建立一個線程池,分配任務:

public static void main(String[] args) throws IOException {
        //建立線程池,本身新建一個ThreadFactory,定義線程名字
        ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new ThreadFactory() {

            @Override
            public Thread newThread(@NotNull Runnable r) {
                return new Thread(r, "當前線程哈希值是:" + r.hashCode());
            }
        });

        TestService testService = new TestService();
        //分派任務
        for (int i = 0; i < 10; i++){
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    testService.testMethod();
                }
            });
            poolExecutor.submit(thread);
        }
         //關閉線程池,等待池中的線程任務執行完畢
        poolExecutor.shutdown();
    }
複製代碼

執行結果:

Connected to the target VM, address: '127.0.0.1:60937', transport: 'socket'
當前線程哈希值是:55909012begin Time1521281771135
當前線程哈希值是:922151033begin Time1521281771135
當前線程哈希值是:55909012end Time1521281776136
當前線程哈希值是:922151033end Time1521281776136
當前線程哈希值是:1915058446begin Time1521281776136
當前線程哈希值是:1387228415begin Time1521281776136
當前線程哈希值是:1915058446end Time1521281781140
當前線程哈希值是:1387228415end Time1521281781140
當前線程哈希值是:748658608begin Time1521281781140
當前線程哈希值是:167185492begin Time1521281781140
當前線程哈希值是:167185492end Time1521281786145
當前線程哈希值是:748658608end Time1521281786145
當前線程哈希值是:1937348256begin Time1521281786145
當前線程哈希值是:1358444045begin Time1521281786145
當前線程哈希值是:1937348256end Time1521281791148
當前線程哈希值是:1358444045end Time1521281791148
當前線程哈希值是:331844619begin Time1521281791148
當前線程哈希值是:64830413begin Time1521281791148
Disconnected from the target VM, address: '127.0.0.1:60937', transport: 'socket'
當前線程哈希值是:331844619end Time1521281796152
當前線程哈希值是:64830413end Time1521281796152

Process finished with exit code 0
複製代碼

能夠看到,在設定的線程睡眠5秒內,只有兩個線程同時執行acquire()和release()之間的邏輯,經過Semaphore控制了線程的併發數量。

其它方法

  • acquire(int) : 一次獲取多個許可
  • acquireUninterruptibly() : 使等待進入acquire()方法,不容許被終止
  • tryAcquire() : 嘗試地得到一個許可,若是獲取不到就返回false,一般與if判斷使用,具備無阻塞的特色
  • tryAcquire(long timeout, TimeUnit unit) : 多少時間內獲取不到許可就放棄

還有不少方法,諸如availablePermits()/drainPermits()/hasQueuedThreads()/getQueueLength()等,感興趣的話請怒開IDE查看具體實現吧。

CountDownLatch

CountDownLatch也是一個工具類,可使線程同步的處理上更加靈活,CountDownLatch也是減法操做

簡單介紹

/* A synchronization aid that allows one or more threads to wait until * a set of operations being performed in other threads completes. 一個同步輔助類,容許一個或多個線程等待,直到在其餘線程中執行的一組操做完成。 使用效果是,給定一個計數,當使用這個CountDownLatch類的線程判斷計數不爲0的時候,線程處於wait狀態,若是爲0,就繼續進行。 複製代碼

舉個🌰: 來了一輛小汽車,要等滿5我的纔開車,來了1個,不開;再來1個,仍是不開,最後5我的到齊了,開車開車。

類結構&構造方法

//這貨也是繼承AbstractQueuedSynchronizer
    private final Sync sync;

    /** * Constructs a {@code CountDownLatch} initialized with the given count. * * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
複製代碼

參數count表示要等待的數量

方法示範

執行類,等待5秒後執行countDown

public class TestService {

    private Semaphore semaphore = new Semaphore(1);

    private CountDownLatch countDownLatch;


    public TestService(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public void testMethod() {
        try {
            semaphore.acquire();
            System.out.println("當前線程是: " + Thread.currentThread().getName() +
                    " 時間是: " + System.currentTimeMillis());
            //等待5秒
            Thread.sleep(1000);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            countDownLatch.countDown();
            semaphore.release();
        }
    }
}
複製代碼

運行類

public static void main(String[] args) throws IOException, InterruptedException {

        ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool(new ThreadFactory() {

            @Override
            public Thread newThread(@NotNull Runnable r) {
                return new Thread(r, "當前線程哈希值是:" + r.hashCode());
            }
        });

        //設定十個限制
        CountDownLatch countDownLatch = new CountDownLatch(10);
        TestService testService = new TestService(countDownLatch);

        for (int i = 0; i < 10; i++){
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    testService.testMethod();
                }
            });
            thread.setName("" + i);
            poolExecutor.submit(thread);
        }
        //關閉線程池,等待池中的線程任務執行完畢
        poolExecutor.shutdown();
        System.out.println("poolExecutor分發任務結束: " + System.currentTimeMillis());
        countDownLatch.await();
        System.out.println("CountDown方法結束: " + System.currentTimeMillis());
    }
複製代碼

執行日誌:

當前線程是: 當前線程哈希值是:922151033 時間是: 1521287832750
poolExecutor分發任務結束: 1521287832752
當前線程是: 當前線程哈希值是:55909012 時間是: 1521287833755
當前線程是: 當前線程哈希值是:1387228415 時間是: 1521287834759
當前線程是: 當前線程哈希值是:748658608 時間是: 1521287835763
當前線程是: 當前線程哈希值是:167185492 時間是: 1521287836765
當前線程是: 當前線程哈希值是:1937348256 時間是: 1521287837769
當前線程是: 當前線程哈希值是:1358444045 時間是: 1521287838770
當前線程是: 當前線程哈希值是:331844619 時間是: 1521287839774
當前線程是: 當前線程哈希值是:64830413 時間是: 1521287840776
當前線程是: 當前線程哈希值是:653687670 時間是: 1521287841779
CountDown方法結束: 1521287842784

複製代碼

能夠看到,在線程池控制1個併發線程,poolExecutor提交任務以後打印日誌,可是countDownLatch.await()方法以後的代碼,由於count沒有減到0,不能執行。

在TestService方法中,每隔一秒執行countDownLatch.countDown()方法,最後十個線程跑完,count減到0,countDownLatch.await()方法以後的代碼才能夠執行。

方法

  • await() : 等待
  • countDown() : 計數減一
  • await(long timeout, TimeUnit unit) : 在限定時間內進行等待,超過期間返回false
  • getCount() : 獲取計數count

小結

Semaphore做爲信號量,用來控制線程的併發數量,CountDownLatch用來控制線程執行任務的時機也挺不錯的。它們兩個的理解和使用都比較簡單,好了,又填了一個坑,下次繼續挖坑和填坑hhh

相關文章
相關標籤/搜索