Java中的併發工具類

1、等待多線程完成的CountDownLatch

需求場景:當咱們須要解析一個Excel裏多個sheet的數據,此時能夠考慮使用多線程,每一個線程解析一個sheet裏的數據,等到sheet都解析完以後,程序須要提示解析完成。
固然咱們可使用join方法,join用於讓當前線程等待join線程執行結束。在JDK1.5以後的併發包中提供的CountDownLatch也能夠實現join的功能。java

CountDownLatch容許一個或多個線程等待其餘線程完成操做。
CountDownLatch的構造函數接收一個int類型的參數做爲計數器,若是你想等待N個點完成,就傳入N。當咱們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變爲零。因爲countDown方法能夠用在任何地方,因此這裏說的N個點,能夠是N個線程,也能夠是1個線程裏的N個執行步驟。用在多個線程時,只須要把這個CountDownLatch的引用傳遞到線程裏便可。數據庫

public class CountDownLatchTest {
    static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
//        new Thread(new Runnable() {
//            @Override
//            public void run() {
//                System.out.println(1);
//                c.countDown();
//                System.out.println(2);
//                c.countDown();
//
//            }
//        }).start();
//        c.await();
//        System.out.println(3);
        
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
                c.countDown();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(2);
                c.countDown();
            }
        }).start();

        c.await();
        System.out.println(3);
    }
}

上面執行結果多是2 1 finished,也多是1 2 finished,總之一、2在finished以前輸出。編程

2、同步屏障CyclicBarrier

CyclicBarrier要作的事是讓一組線程到達一個屏障(也能夠叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障纔會開門,全部被屏障攔截的線程纔會繼續運行。多線程

public class CyclicBarrierTest {
    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(1);
            }
        }).start();

        try {
            c.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
}

由於主線程和子線程的調度是由CPU決定的,兩個線程都有可能先執行,因此輸出1 2,有多是輸出2 1。併發

若是把new CyclicBarrier(2)修改爲new CyclicBarrier(3),則主線程和子線程會永遠等待,由於沒有第三個線程執行await()方法,既沒有第三個線程到達屏障,因此兩個線程都不會繼續執行。ide

CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrieAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。函數

public class CyclicBarrierTest1 {
    static CyclicBarrier c=new CyclicBarrier(2, new Runnable() {
        @Override
        public void run() {
            System.out.println("initialize....");
        }
    });

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println(1);
            }
        }).start();
        try {
            c.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(2);
    }
}

上面會首先輸出initialize....工具

應用場景:
CyclicBarrier能夠用於多線程計算數據,最後合併計算結果的場景。例如,用一個Excel保存了用戶全部銀行流水,每一個Sheet保存一個帳戶近一年的每筆銀行流水,如今須要統計用戶的日均銀行流水,先用多線程處理每一個sheet裏的銀行流水,都執行完以後,獲得每一個sheet的日均銀行流水,最後,再用barrierAction根據這些線程的計算結果,計算出整個Excel的日均銀行流水。ui

public class CyclicBarrierTest2 implements Runnable{
    private CyclicBarrier c = new CyclicBarrier(4, this);

    private Executor executor = Executors.newFixedThreadPool(4);

    private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

    private void count() {
        for (int i=0;i<4;i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //模擬計算當前sheet的流水數據。
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                    try {
                        //銀流計算完成,插入一個屏障
                        c.await();
                    } catch (Exception e){
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    @Override
    public void run() {
        int result=0;
        //全部線程到達屏障後,彙總每一個sheet計算出的結果
        for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
            result+=sheet.getValue();
        }
        sheetBankWaterCount.put("result", result);
        System.out.println(result);
    }

    public static void main(String[] args) {
        CyclicBarrierTest2 barrierTest2=new CyclicBarrierTest2();
        barrierTest2.count();
    }
}

控制併發線程數的Semaphore

Semaphore用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理地使用公共資源。
應用場景:
Semaphore能夠用於作流量控制,特別是公共資源有限的應用場景,好比數據庫鏈接。假若有一個需求要讀取幾萬個文件的數據,由於都是IO密集型任務,咱們能夠啓動幾十個線程併發地讀取,可是若是讀到內存中,還須要存儲到數據庫中,而數據庫的鏈接數只有10個,這是咱們必須控制只有10個線程同時獲取數據庫鏈接保存數據,不然會報錯沒法獲取數據庫鏈接。這個時候,就可使用Semaphore來作流量控制。this

public class SemaphoreTest {
    private static final int THREAD_COUNT=30;
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args){
        for (int i=0;i<THREAD_COUNT;i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire();
                        System.out.println("save data");
                        s.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        threadPool.shutdown();
    }
}

在代碼中,雖然有30個線程在執行,可是隻容許10個併發執行。Semaphore的構造方法Semaphore(int permits)接受一個整數的數字,表示可用的許可證數量。Semaphore(10)表示容許10個線程獲取許可證,也就是最大併發數10.Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證,使用完以後調用release()方法歸還許可證。還能夠用tryAcquire()方法嘗試獲取許可證。

線程間交換數據的Exchanger

Exchanger是一個用於線程間協做的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程能夠交換彼此的數據。這兩個線程經過exchanger方法交換數據,若是第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就能夠交換數據,將本線程生產出來的數據傳遞給對方。

應用場景:
Exchanger可用與校對工做,好比咱們須要將紙質銀行流水經過人工的方式錄入成電子銀行流水,爲了不錯誤,採用AB崗兩人進行錄入,錄入到Excel以後,系統須要加載這兩個Excel,並對兩個Excel數據進行校對,看看是否錄入一致。

public class ExchangerTest {
    private static final Exchanger<String> exchanger=new Exchanger<>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {

        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                String a = "銀行流水A";
                try {
                    a=exchanger.exchange(a);
                    System.out.println("交換後,a="+a);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                String b = "銀行流水B";
                try {
                    b=exchanger.exchange(b);
                    System.out.println("交換後,b="+b);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        threadPool.shutdown();
    }
}

上面輸出:

交換後,a=銀行流水B
交換後,b=銀行流水A

若是兩個線程有一個沒有執行exchange()方法,則會一直等待,若是擔憂有特殊狀況發生,避免一直等待,能夠用exchange(V x, long timeout, TimeUnit unit)設置最大等待時長。

參考:《Java併發編程的藝術》-方騰飛

相關文章
相關標籤/搜索