Java 裏如何實現線程間通訊?

總結:html

1.等待通知機制git

    兩個線程經過對同一對象調用等待 wait() 和通知 notify() 方法來進行通信。github

2.join() 方法數據庫

3.volatile 共享內存編程

4.CountDownLatch 併發工具數據結構

5.CyclicBarrier 併發工具多線程

6.線程響應中斷:thread.interrupt(),thread.isInterrupted()併發

7.線程池 awaitTermination() 方法,isTerminated()方法異步

8.管道通訊 ?PipedWriter,PipedReaderide

9.Future

https://github.com/crossoverJie/Java-Interview/blob/master/MD/concurrent/thread-communication.md

 

一:notify()/notifyAll()/sleep()/wait()的區別 

notify():隨機喚醒一個等待該對象同步鎖的線程,進入就緒隊列等待CPU的調度;這裏的喚醒是由JVM肯定喚醒哪一個線程,並且不是按優先級決定。

notifyAll():喚醒全部的等待該對象同步鎖的線程,進入就緒隊列等待CPU調度;注意喚醒的是notify以前wait的線程,對於notify以後的wait線程是沒有效果的。

wait():調用時須要先得到該Object的鎖,調用後,會把當前的鎖釋放掉同時阻塞住;但能夠經過調用該Object的notify()或者notifyAll()來從新得到鎖。

sleep():在指定的時間內讓正在執行的線程暫停執行,但不會釋放鎖。

區別主要體如今這幾個方面?

  1.  咱們經過對這些方法分析,sleep()方法屬於Thread類,而wait()/notify()/notifyAll()屬於Object基礎類,也就是說每一個對象都有wait()/notify()/notifyAll()的功能。

  2. sleep()不會釋放鎖,而wait()會釋放鎖。

  3. sleep()必須捕獲異常,而wait()/notify()/notifyAll()不須要捕獲異常。

  4. sleep()能夠在任何地方使用,而wait()/notify()/notifyAll()只能在同步控制方法或者同步控制塊裏面使用。

二:如何實現多線程之間的通信和協做 

利用同步和互斥來解決多線程之間的通信和協做;能夠說資源互斥、協調競爭是要解決的因,而同步是競爭協調的果。

  1. 經過synchronized/notify/notifyAll來實現線程之間的通訊。

  2. 利用了Java5中提供的Lock/Condition來實現線程之間的相互通訊。

  3. 使用信號量,如:CyclicBarrier/Semaphore/Countdownbatch。Phaser

怎麼解決多線程計算的結果統計?

能夠用join()以及Future/FutureTask或CompletableFuture來解決。join()的功能是使異步執行的線程變成同步執行;使用join()後,直到這個線程退出,程序纔會往下執行。

 

1.Thread B等Thread A執行完再執行, 在Thread B中threadA.join() 方法

 

2.個線程按照指定方式有序交叉運行 能夠利用 object.wait() 和 object.notify() 兩個方法來實現。

 

3.四個線程 A B C D,其中 D 要等到 A B C 全執行完畢後才執行,並且 A B C 是同步運行的

利用 CountdownLatch 來實現這類通訊方式。其實簡單點來講,CountDownLatch 就是一個倒計數器。它的基本用法是:

  1. 建立一個計數器,設置初始值,CountdownLatch countDownLatch = new CountDownLatch(2);
  2. 在 等待線程 裏調用 countDownLatch.await() 方法,進入等待狀態,直到計數值變成 0;
  3. 在 其餘線程 裏,調用 countDownLatch.countDown() 方法,該方法會將計數值減少 1;
  4. 當 其餘線程 的 countDown() 方法把計數值變成 0 時,等待線程 裏的 countDownLatch.await()當即退出,繼續執行下面的代碼。

CountDownLatch 適用於一個線程去等待多個線程的狀況。

 

4.三個運動員各自準備,等到三我的都準備好後,再一塊兒跑

上面的 CountDownLatch 能夠用來倒計數,但當計數完畢,只有一個線程的 await() 會獲得響應,沒法讓多個線程同時觸發。

爲了實現線程間互相等待這種需求,咱們能夠利用 CyclicBarrier 數據結構,它的基本用法是:

  1. 先建立一個公共 CyclicBarrier 對象,設置 同時等待 的線程數,CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
  2. 這些線程同時開始本身作準備,自身準備完畢後,須要等待別人準備完畢,這時調用 cyclicBarrier.await(); 便可開始等待別人;
  3. 當指定的 同時等待 的線程數都調用了 cyclicBarrier.await();時,意味着這些線程都準備完畢好,而後這些線程才 同時繼續執行

 

5.子線程完成某件任務後,把獲得的結果回傳給主線程

在 Java 裏,有一個類是配合 Callable 使用的:FutureTask,不過注意,它獲取結果的 get 方法會阻塞主線程。

主線程調用 futureTask.get() 方法時阻塞主線程;而後 Callable 內部開始執行,並返回運算結果;此時 futureTask.get() 獲得結果,主線程恢復運行。

這裏咱們能夠學到,經過 FutureTask 和 Callable 能夠直接在主線程得到子線程的運算結果,只不過須要阻塞主線程。固然,若是不但願阻塞主線程,能夠考慮利用 ExecutorService,把 FutureTask 放到線程池去管理執行。

 

6.CompletableFuture還有更高級的用法

https://my.oschina.net/u/3705388/blog/1602862

 

 

CountDownLatch:倒數N個數,經過調用countDown()倒數。當到0時,countDownLatch.await();線程被喚醒。只阻塞countDownLatch.await();這一個線程,用於等待調用countDown()倒數線程執行到位後再執行。

CyclicBarrier:可循環屏障。設置N個數,調用cyclicBarrier.await();的線程也至關於對N減一併阻塞,當第N個線程調用cyclicBarrier.await()後,全部調用cyclicBarrier.await()阻塞的線程被喚醒繼續執行。在建立CyclicBarrier的時候能夠傳入一個Runnable,用於全部線程到達屏障後,便當即執行的線程,即此線程會與N個調用cyclicBarrier.await()的線程同時執行。另外CyclicBarrier可重置(調用reset()),重複使用。

Semaphore:信號量併發工具類,其提供了aquire()和release()方法來進行併發控制。設置一個N,aquire調用時根據併發數判斷是否阻塞,至關於發送令牌,一共有N個令牌,aquire獲取令牌,拿到令牌執行,沒拿到阻塞,執行完歸還令牌。通常用於資源限流,限量的工做場景。

 

Phaser:它把多個線程協做執行的任務劃分爲多個階段,編程時須要明確各個階段的任務,每一個階段均可以有任意個參與者,線程均可以隨時註冊並參與到某個階段。

構造

Phaser建立後,初始階段編號爲0,構造函數中指定初始參與個數。

註冊:Registration

Phaser支持經過register()和bulkRegister(int parties)方法來動態調整註冊任務的數量。

Arrival

每一個Phaser實例都會維護一個phase number,初始值爲0。每當全部註冊的任務都到達Phaser時,phase number累加,並在超過Integer.MAX_VALUE後清零。arrive()和arriveAndDeregister()方法用於記錄到達;其中arrive(),某個參與者完成任務後調用;arriveAndDeregister(),任務完成,取消本身的註冊。arriveAndAwaitAdvance(),本身完成等待其餘參與者完成,進入阻塞,直到Phaser成功進入下個階段。

Phaser提供了比CountDownLatch、CyclicBarrier更增強大、靈活的功能,從某種程度上來講,Phaser能夠替換他們,例子見網址:

https://www.cnblogs.com/chenssy/p/4989515.html

 

 

在併發編程中常常會使用到一些併發工具類,來對線程的併發量、執行流程、資源依賴等進行控制。這裏咱們主要探討三個常用的併發工具類:CountDownLatch,CyclicBarrier和Semaphore。

一:CountDownLatch 

從CountDownLatch的字面意思就能夠體現出其設計模型,countdown在英語裏具備倒計時的(倒數)意思,Latch就是門閂的意思。CountDownLatch的構造函數接受一個int值做爲計數器的初始值N,當程序調用countDown()的時候,N便會減1(體現出了倒數的意義),當N值減爲0的時候,阻塞在await()的線程便會喚醒,繼續執行。這裏經過一個例子來講明其應用場景。

假設咱們主線程須要建立5個工做線程來分別執行5個任務,主線程須要等待5個任務所有完成後纔會進行後續操做,那麼咱們就能夠聲明N=5的CountDownLatch,來進行控制。

代碼以下:

public class CountDownLatchDemo {

    private static final CountDownLatch countDownLatch = new CountDownLatch(5);

    public static void main(String[] args) throws InterruptedException {

        //循環建立5個工做線程

        for( int ix = 0; ix != 5; ix++ ){

            new Thread(new Runnable() {

                public void run() {

                    try{

                        System.out.println( Thread.currentThread().getName() + " start" );

                        Thread.sleep(1000);

                        countDownLatch.countDown();

                        System.out.println( Thread.currentThread().getName() + " stop" );

                    } catch ( InterruptedException ex ){

                    }

                }

            }, "Task-Thread-" + ix ).start();

 

            Thread.sleep(500);

        }

        //主線程等待全部任務完成

        countDownLatch.await();

        System.out.println("All task has completed.");

    }

}

運行結果:

Task-Thread-0 start

Task-Thread-1 start

Task-Thread-0 stop

Task-Thread-2 start

Task-Thread-1 stop

Task-Thread-3 start

Task-Thread-2 stop

Task-Thread-4 start

Task-Thread-3 stop

Task-Thread-4 stop

All task has completed.

在主線程建立了5個工做線程後,就會阻塞在countDownLatch.await(),等待5個工做線程所有完成任務後返回。任務的執行順序可能會不一樣,可是任務完成的Log必定會在最後顯示。CountDownLatch經過計數器值的控制,實現了容許一個或多個線程等待其餘線程完成操做的併發控制。

二:CyclicBarrier 

CyclicBarrier就字面意思是可循環的屏障,其體現了兩個特色,可循環和屏障。調用CyclicBarrier的await()方法即是在運行線程中插入了屏障,當線程運行到這個屏障時,便會阻塞在await()方法中,直到等待全部線程運行到屏障後,纔會返回。CyclicBarrier的構造函數一樣接受一個int類型的參數,表示屏障攔截線程的數目。另外一個特色循環即是體現處出了CyclicBarrier與CountDownLatch不一樣之處了,CyclicBarrier能夠經過reset()方法,將N值重置,循環使用,而CountDownLatch的計數器是不能重置的。此外,CyclicBarrier還提供了一個更高級的用法,容許咱們設置一個全部線程到達屏障後,便當即執行的Runnable類型的barrierAction(注意:barrierAction不會等待await()方法的返回才執行,是當即執行!)機會,這裏咱們經過如下代碼來測試一下CyclicBarrier的特性。

代碼以下:

public class CyclicBarrierDemo {

    private final static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new MyBarrierAction());

    private final static AtomicInteger atcIx = new AtomicInteger(1);

    public static void main(String[] args) {

        for (int ix = 0; ix != 10; ix++){

            new Thread(new Runnable() {

                public void run() {

                    try{

                        System.out.println(Thread.currentThread().getName() + " start");

                        Thread.sleep(atcIx.getAndIncrement() * 1000 );

                        cyclicBarrier.await();

                        System.out.println( Thread.currentThread().getName() + " stop" );

                    } catch ( Exception ex){

                    }

                }

            }, "Thread-" + ix).start();

        }

    }

 

    private static class MyBarrierAction implements Runnable {

        @Override

        public void run() {

            System.out.println("MyBarrierAction is call.");

        }

    }

}

運行結果:

Thread-0 start

Thread-1 start

Thread-2 start

Thread-3 start

Thread-4 start

MyBarrierAction is call.

Thread-4 stop

Thread-0 stop

Thread-1 stop

Thread-2 stop

Thread-3 stop

根據運行結果,咱們能夠看到一下幾點:

  1. 首先在線程沒有調用夠N次cyclicBarrier.await()時,全部線程都會阻塞在cyclicBarrier.await()上,也就是說必須N個線程同時到達屏障,纔會全部線程越過屏障繼續執行。

  2. 驗證了BarrierAction的執行時機是全部阻塞線程都到達屏障以後,而且BarrierAction執行後,全部線程纔會從await()方法返回,繼續執行。

三:Semaphore 

Semaphore信號量併發工具類,其提供了aquire()和release()方法來進行併發控制。Semaphore通常用於資源限流,限量的工做場景,例如數據庫鏈接控制。假設數據庫的最大負載在10個鏈接,而如今有100個客戶端想進行數據查詢,顯然咱們不能讓100個客戶端同時鏈接上來,找出數據庫服務的崩潰。那麼咱們能夠建立10張令牌,想要鏈接數據庫的客戶端,都必須先嚐試獲取令牌(Semaphore.aquire()),當客戶端獲取到令牌後即可以進行數據庫鏈接,並在完成數據查詢後歸還令牌(Semaphore.release()),這樣就能保證同時鏈接數據庫的客戶端不超過10個,由於只有10張令牌,這裏給出該場景的模擬代碼。

代碼以下:

public class SemaphoreDemo {

    private static final Semaphore semaphoreToken = new Semaphore(10);

    public static void main(String[] args) {

        for (int ix = 0; ix != 100; ix++) {

            new Thread(new Runnable() {

                public void run() {

                    try {

                        semaphoreToken.acquire();

                        System.out.println("select * from xxx");

                        semaphoreToken.release();

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                }

            }).start();

        }

    }

}

也許有同窗會問,aquire()函數獲取許可證的順序和調用的前後順序有關係嗎,也就是說該例子中客戶端是不是排隊獲取令牌的?答案不是,由於Semaphore默認是非公平的,固然其構造函數提供了設置爲公平信號量的參數。

 

 

多線程之間須要等待協調,才能完成某種工做,問怎麼設計這種協調方案?如:子線程循環10次,接着主線程循環100,接着又回到子線程循環10次,接着再回到主線程又循環100,如此循環50次。

public class Question {

    public static void main(String[] args) throws InterruptedException {

        final Object object = new Object();

        new Thread(new Runnable() {

 

            public void run() {

                for (int i = 0; i < 50; i++) {

                    synchronized (object) {

                        for (int j = 0; j < 10; j++) {

                            System.out.println("SubThread:" + (j + 1));

                        }

                        object.notify();

                        try {

                            object.wait();

                        } catch (InterruptedException e) {

                            e.printStackTrace();

                        }

                    }

                }

            }

        }).start();

 

        for (int i = 0; i < 50; i++) {

            synchronized (object) {

                //主線程讓出鎖,等待子線程喚醒

                object.wait();

                for (int j = 0; j < 100; j++) {

                    System.out.println("MainThread:" + (j + 1));

                }

                object.notify();

            }

        }

    }

}

 

 

https://mp.weixin.qq.com/s?__biz=MzIzMzgxOTQ5NA==&mid=2247483697&idx=1&sn=29715bb12acb9b123284c60009fd5e99&chksm=e8fe9d38df89142e5d7e58c6288b76cc9c62581b5bce5ff6220ad5f47e364ccf197d124f57cb&scene=21#wechat_redirect

相關文章
相關標籤/搜索