CountDownLatch 閉鎖、FutureTask、Semaphore信號量、Barrier柵欄

  同步工具類能夠是任何一個對象。阻塞隊列能夠做爲同步工具類,其餘類型的同步工具類還包括信號量(Semaphore)、柵欄(Barrier)、以及閉鎖(Latch)。java

  全部的同步工具類都包含一些特定的結構化屬性:它們封裝了一些狀態,這些狀態將決定執行同步工具類的線程是繼續執行仍是等待,此外還提供了一些方法對狀態進行操做,以及另外一些方法用於高效地等待同步工具類進入到預期狀態。算法

1.閉鎖

  閉鎖是一種同步工具類,能夠延遲線程進度直到其到達終止狀態。閉鎖的做用至關於一扇門:在閉鎖到達結束狀態以前,這扇門一直是關閉的,而且沒有任何線程能經過,當到達結束狀態時容許全部的線程經過。當閉鎖到達結束狀態後,將不會再改變狀態,所以這扇門將永遠打開。閉鎖能夠用來確保某些活動直到其餘活動都完成才繼續執行。安全

1.1 CountDownLatch

  CountDownLatch是一種靈活的閉鎖實現,它可使一個或多個線程等待一組線程。閉鎖狀態包括一個計數器,該計數器被初始化爲一個正數,表示須要等待的事件數量。countDown遞減計數器,表示一個事件已經發生,而await方法等待計數器達到零,這表示全部須要等待的事件都已經發生。若是計數器的值非零,那麼await會一直阻塞直到計數器爲零,或者等待中的線程中斷,或者等待超時。併發

查看源碼發現:咱們傳進去的參數至關於內部Sync的狀態,每次調用countDown的時候將狀態值減一,狀態值爲0表示結束狀態(await會解除阻塞)ide

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    public void countDown() {
        sync.releaseShared(1);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

 

查看sync的源碼:函數

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }
    ...
}

 

例如:實現一個統計多個線程併發執行任務的用時功能:工具

  當線程執行run中代碼的時候會阻塞到startLatch.await(); 直到主線程調用startLatch.countDown(); 將計數器減一。這時全部線程開始執行任務。測試

  當線程執行完的時候endLatch.countDown();將結束必鎖的計數器減一,此時主線程阻塞在endLatch.await();,直到5個線程都執行完主線程也解除阻塞。ui

package cn.qlq.thread.tone;

import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * @author Administrator
 *
 */
public class Demo4 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo4.class);

    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch startLatch = new CountDownLatch(1);
        final CountDownLatch endLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            Thread.sleep(1 * 1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        startLatch.await();// 起始閉鎖的計數器阻塞等到計數器減到零(標記第一個線程開始執行)
                        Thread.sleep(1 * 1000);
                        endLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        // 實現計時
        long startTime = System.nanoTime();
        startLatch.countDown();// 將起始閉鎖的計數器減一
        endLatch.await();// 結束閉鎖阻塞直到計數器爲零
        long endTime = System.nanoTime();
        LOGGER.error("結束,用時{}", endTime - startTime);
    }
}

 

1.2 FutureTask

  FutureTask也能夠用作閉鎖。(Futuretask實現了Future的語義,表示一種抽象的可生成計算結果的計算)。FutureTask的計算是經過Callable實現的,至關於一種可生產運算結果的Runnable,而且能夠處於如下三種狀態:等待運行、正在運行和運行完成。執行完成表示計算的全部可能結束方式,包括正常結束、異常取消和運行完成。當FutureTask進入完成狀態後,它會永遠中止在這個狀態。this

  Future.get取決於任務的狀態。若是任務已經完成,那麼get會當即返回結果;不然get將阻塞直到任務進入完成狀態,而後返回結果或者拋出異常。FutureTask將計算結果從執行計算的線程傳遞到獲取這個結果的線程,而FutureTask的規範確保了這種傳遞過程能實現結果的安全發佈。

package threadTest;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 實現callable接口,實現Callable接口
 * 
 *
 */
public class MyCallable implements Callable<String> {

    /**
     * 實現call方法,接口中拋出異常。由於子類不能夠比父類幹更多的壞事,因此子類能夠不拋出異常
     */
    @Override
    public String call() {
        System.out.println(Thread.currentThread().getName() + "   執行callable的call方法");
        return "result";
    }

    public static void main(String[] args) {
        test1();
    }

    /**
     * 單個線程
     */
    public static void test1() {
        // 1.建立固定大小的線程池
        ExecutorService es = Executors.newFixedThreadPool(1);
        // 2.提交線程任務,用Future接口接受返回的實現類
        Future<String> future = es.submit(new MyCallable());
        // 3.關閉線程池
        es.shutdown();
        // 4.調用future.get()獲取callable執行完成的返回結果
        String result;
        try {
            result = future.get();
            System.out.println(Thread.currentThread().getName() + "\t" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

結果:

pool-1-thread-1 執行callable的call方法
main result

 

查看源碼:

public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
...}

 

 

2.Semaphore 信號量

  計數信號量(counting Semaphore)用來控制同時訪問某個資源的數量,或者同時執行某個操做的數量。計數信號量還能夠實現某種資源池,或者對容器實施邊界。

  信號量是1個的Semaphore意味着只能被1個線程佔用,能夠用來設計同步(至關於互斥鎖)。信號量大於1的Semaphore能夠用來設計控制併發數,或者設計有界容器。

  Semaphore中管理着一組虛擬的許可(permit),許可的初始數量可由構造函數指定。在執行操做時首先得到許可(只要還有剩餘的許可),並在使用後釋放。若是沒有許可,那麼acquire將會一直阻塞直到有許可(或者直到中斷或者操做超時)。release方法將返回一個許可給信號量。計算信號量的一種簡化形式是二值信號量,即初始值爲1的Semaphore。二值信號量能夠用做互斥體(mutex),並具有不可重入的加鎖語義:誰擁有了這個惟一的許可誰就擁有了互斥鎖。

例如:例如信號量構造一個有界阻塞容器:

  信號量的計數值初始化爲容器的最大值。add操做在向底層容器添加一個元素以前,首先要獲取一個許可。若是add沒有添加任何元素,那麼會馬上釋放信號量。一樣,remove操做釋放一個許可,使更多的元素能加到容器中。

class BoundedHashSet<T> {
    private Set<T> set;
    private Semaphore semaphore;

    public BoundedHashSet(int bound) {
        set = Collections.synchronizedSet(new HashSet());
        semaphore = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        semaphore.acquire();// 嘗試獲取信號量
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded) {// 若是添加失敗就釋放信號量,添加成功就佔用一個信號量
                semaphore.release();
            }
        }
    }

    public boolean remove(T o) throws InterruptedException {
        boolean remove = set.remove(o);
        if (remove)// 若是刪除成功以後就釋放一個信號量
            semaphore.release();
        return remove;
    }
}

 

測試代碼:

        BoundedHashSet<String> boundedHashSet = new BoundedHashSet<String>(3);
        System.out.println(boundedHashSet.add("1"));
        System.out.println(boundedHashSet.add("2"));
        System.out.println(boundedHashSet.add("2"));
        System.out.println(boundedHashSet.add("3"));
        System.out.println(boundedHashSet.add("4"));// 將會一直阻塞到這裏
        System.out.println("=========");

結果:(JVM不會關閉)

 

注意:

1.Semaphore能夠指定公平鎖仍是非公平鎖,默認是非公平鎖

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

 

2.acquire方法和release方法是能夠有參數的,表示獲取/返還的信號量個數

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

 

 3. Barrier柵欄

   柵欄(Barrier)相似於閉鎖(一種同步工具,能夠延遲線程直到其達到其終止狀態),它能阻塞一組線程直到某個事件發生。柵欄與閉鎖的區別在於全部線程必須同時到達柵欄位置,才能繼續執行。閉鎖等於等待事件,而柵欄用於等待其餘線程。柵欄能夠用於實現一些協議,例如幾個家庭成員決定在某個地方集合:"全部人6:00到達目的地,而後討論下一步的事情"。

3.1  CyclicBarrier柵欄(循環屏障)

  CyclicBarrier可使必定數量的參與方反覆地在柵欄位置聚集,它在並行迭代算法中很是有用:這種算法一般將一個問題劃分紅一系列相互獨立的子問題。當線程到達柵欄位置時將調用await方法,這個方法將阻塞到全部線程到達柵欄位置。若是全部線程都到達柵欄,那麼柵欄將打開全部線程被釋放,而柵欄將被重置以便下次使用。若是對await的調用超時,或者await阻塞的線程被中斷,那麼柵欄就被認爲是打破了,全部阻塞的await調用都將終止並拋出BrokenBarrierException。若是成功的經過柵欄,那麼await將爲每一個線程返回一個惟一的到達索引號,咱們能夠用這些索引號"選舉"產生一個領導線程,並在下一次迭代中由該領導線程執行一些特殊的工做。CyclicBarrier還可使你將一個柵欄操做傳遞給構造函數,這是一個Runnable,當成功的經過柵欄時會(在一個子線程)執行它,但在阻塞過程被釋放以前是不能執行的。

  CyclicBarrier的構造方法能夠傳入參與的數量(也就是被柵欄攔截的線程的數量),也能夠傳入一個Runnable對象。

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

 

例如:

package cn.qlq.thread.tone;

import java.util.concurrent.CyclicBarrier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * @author Administrator
 *
 */
public class Demo2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class);

    public static void main(String[] args) throws InterruptedException {
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        for (int i = 0; i < 4; i++) {
            Thread.sleep(2 * 1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                }
            }).start();
        }
    }
}

結果:

18:08:00 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:04 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2

  00的時候0線程到達柵欄進入阻塞,02的時候1線程到達柵欄,因爲柵欄的參與者是2因此此時至關於全部線程到達柵欄,柵欄放開,而後柵欄被重置。

  04的時候2線程到達柵欄進入阻塞,06的時候3線程到達柵欄,因爲柵欄的參與者是2因此此時至關於全部參與者線程到達柵欄,而後柵欄放開。

 

咱們將柵欄的參與者改成5查看結果:

final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

結果:4個線程會阻塞到await方法處,並且JVM不會關閉,由於柵欄的參與者不夠5個因此被一直阻塞。

 3.2  Exchanger

  Exchanger至關於一個兩方(Two-party)柵欄,各方在柵欄位置上交換數據。當兩方執行不對稱的操做時,Exchanger很是有用。例如:一個線程向緩衝區寫東西,另外一個線程從緩衝區讀數據Exchanger至關於參與者只有兩個的CyclicBarrier。

  兩個線程會阻塞在exchanger.exchange方法上,泛型能夠指定其交換的數據類型。

例如:兩個線程交換本身的線程名稱

package cn.qlq.thread.tone;

import java.util.concurrent.Exchanger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * @author Administrator
 *
 */
public class Demo3 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class);

    public static void main(String[] args) throws InterruptedException {
        final Exchanger<String> exchanger = new Exchanger<String>();// 泛型指定交換的數據
        for (int i = 0; i < 4; i++) {
            Thread.sleep(2 * 1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                    try {
                        String exchange = exchanger.exchange(Thread.currentThread().getName());
                        LOGGER.error("threadName -> {},exchange->{}", Thread.currentThread().getName(), exchange);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                }
            }).start();
        }
    }
}

結果:

18:28:33 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-018:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-118:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-1,exchange->Thread-018:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-0,exchange->Thread-118:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-118:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-018:28:37 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-218:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-318:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-3,exchange->Thread-218:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-2,exchange->Thread-318:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-318:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-2

相關文章
相關標籤/搜索