系統要實現某個全局功能一定要須要各個子模塊之間的協調和配合,就像一個團隊要完成某項任務的時候須要團隊各個成員之間密切配合同樣。而對於系統中的各個子線程來講,若是要完成一個系統功能,一樣須要各個線程的配合,這樣就少不了線程之間的通訊與協做。常見的線程之間通訊方式有以下幾種:html
一、wait和notify/notifyAlljava
二、await和signal/signalAll多線程
三、sleep/yield/joinapp
四、CyclicBarrier 柵欄ide
五、CountDownLatch 閉鎖工具
六、Semaphore 信號量oop
1、wait和notify/notifyAllpost
在使用以前先明確 :ui
wait和notify是Object的方法,任何一個對象都具備該方法。在使用的時候,首先須要設置一個全局鎖對象,經過對該鎖的釋放和持有來控制該線程的運行和等待。所以在調用wait和notify的時候,該線程必需要已經持有該鎖,而後纔可調用,不然將會拋出IllegalMonitorStateException異常。
肯定要讓哪一個線程等待?讓哪一個線程等待就在哪一個線程中調用鎖對象的wait方法。調用wait等待的是當前線程,而不是被調用線程,並非theread.wait()就可讓thread等待,而是讓當前線程(調用wait方法的線程,不是調用者)進行等待。儘可能不要把線程對象當作全局鎖使用,以避免混淆等待線程。
看一下使用方法:(代碼中省略了main方法,對sleep()和println()方法進行了封裝)this
package thread.blogs.cooperation; import scala.Console; import java.util.concurrent.atomic.AtomicInteger; /** * Created by PerkinsZhu on 2017/8/21 10:25. */ public class TestWaitAndNotify { public static void main(String[] args) { TestWaitAndNotify test = new TestWaitAndNotify(); test.testWait(); } Object obj = new Object();//建立一個全局變量,用來協調各個線程 ThreadLocal<AtomicInteger> num = new ThreadLocal<AtomicInteger>();//設置一個線程wait和notify的觸發條件 class MyRunner implements Runnable { @Override public void run() { num.set(new AtomicInteger(0)); while (true) { Console.println(Thread.currentThread().getName()); if (num.get().getAndIncrement() == 1) { synchronized (obj) {//若是要想調用wait方法,則必須持有該對象。不然將會拋出IllegalMonitorStateException try { Console.println(Thread.currentThread().getName() + "掛起等待"); obj.wait();//同一個線程能夠wait屢次,多個線程也可使用同一個obj調用wait Console.println(Thread.currentThread().getName() + "喚醒!!!"); } catch (InterruptedException e) { e.printStackTrace(); } } } sleep(1000); } } } private void testWait() { MyRunner runner = new MyRunner(); new Thread(runner).start(); new Thread(runner).start(); AtomicInteger num03 = new AtomicInteger(0); Thread th03 = new Thread(new Runnable() { @Override public void run() { while (true) { synchronized (obj) {//調用notify/notifyAll和wait同樣,一樣須要持有該對象 if (num03.getAndIncrement() == 5) { obj.notify();//喚醒最早一個掛在obj上面的線程.每次只喚醒一個。這裏是按照等待的前後順序進行喚醒 } } sleep(1000); } } }); th03.start(); } private void sleep(int time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
運行結果以下:
Thread-1
Thread-0
Thread-1
Thread-0
Thread-1掛起等待
Thread-0掛起等待
Thread-1喚醒!!!
Thread-1
Thread-1
Thread-1
從執行結果只中能夠看出,在執行兩次輸出以後,兩個線程被分別掛起等待。過一會以後線程1被成功喚醒。這裏之因此喚醒的是Thread-1是由於Thread-1是第一個先掛起的,因此在notify()方法在喚起wait線程的時候也是公平的,依照wait的掛起順序來執行喚醒。
在使用wait的時候,同一個obj能夠被多個線程調用obj.wait(),也能夠被同一個線程執行屢次obj.wait();
例如,修改try catch代碼代碼塊
Console.println(Thread.currentThread().getName() + "掛起等待"); obj.wait();//執行屢次wait操做 obj.wait(); obj.wait(); Console.println(Thread.currentThread().getName() + "喚醒!!!");
而後只啓動一個線程
new Thread(runner,"thread--01").start(); // new Thread(runner,"thread--02").start();
執行結果以下:
thread--01 thread--01 thread--01掛起等待
線程一直停滯在此處,沒法繼續執行,這是由於線程調用了三此wait,而若是要想成功喚醒線程,則一樣須要調用三次notify或者調用一次notifyAll()。這裏就再也不列出代碼。
wait方法有兩個重載方法:
public final native void wait(long timeout) throws InterruptedException;
public final void wait(long timeout, int nanos) throws InterruptedException ;
兩個方法都是wait指定時間以後,若是依舊沒有被其它線程喚醒或者被中斷則會自動中止wait。其中第二個方法指定了時間的單位。
public final void wait(long timeout, int nanos) throws InterruptedException Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object, or some other thread interrupts the current thread, or a certain amount of real time has elapsed. This method is similar to the wait method of one argument, but it allows finer control over the amount of time to wait for a notification before giving up. The amount of real time, measured in nanoseconds, is given by: 1000000*timeout+nanos In all other respects, this method does the same thing as the method wait(long) of one argument. In particular, wait(0, 0) means the same thing as wait(0). The current thread must own this object's monitor. The thread releases ownership of this monitor and waits until either of the following two conditions has occurred: Another thread notifies threads waiting on this object's monitor to wake up either through a call to the notify method or the notifyAll method. The timeout period, specified by timeout milliseconds plus nanos nanoseconds arguments, has elapsed. The thread then waits until it can re-obtain ownership of the monitor and resumes execution. As in the one argument version, interrupts and spurious wakeups are possible, and this method should always be used in a loop: synchronized (obj) { while (<condition does not hold>) obj.wait(timeout, nanos); ... // Perform action appropriate to condition } This method should only be called by a thread that is the owner of this object's monitor. See the notify method for a description of the ways in which a thread can become the owner of a monitor.
注意這裏的synchronized的目的不是加鎖控制線程的串行,而是爲了持有鎖來調用wait和notify對象。
在理解這線程調用obj.wait()的時候能夠理解爲"掛在obj對象上的線程",而對於線程調用obj.notify()能夠理解爲"喚起最後一個掛在obj上面的那個線程",而對於線程調用obj.notifyAll(),則能夠理解爲"喚起全部掛在obj對象上的線程"。obj對象在這裏起的做用就是一個信息載體中介。各個線程經過這個中介進行通行協做,控制線程之間的暫停和執行。
2、await和signal/signalAll
await和signal是Condition的兩個方法,其做用和wait和notify同樣,目的都是讓線程掛起等待,不一樣的是,這兩種方法是屬於Condition的兩個方法,而Condition對象是由ReentrantLock調用newCondition()方法獲得的。Condition對象就至關於前面所說的中介,在線程中調用contiton.await()和condition.signal()能夠分別使線程等待和喚醒。
如須要了解tryLock的使用能夠看這裏:多線程(五) java的線程鎖
使用示例:
package thread.blogs.cooperation; import scala.Console; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Created by PerkinsZhu on 2017/8/21 11:59. */ public class TestCondition { public static void main(String[] args) { TestCondition test = new TestCondition(); test.testWait(); } ReentrantLock lock = new ReentrantLock(); ThreadLocal<AtomicInteger> num = new ThreadLocal<AtomicInteger>(); Condition condition = lock.newCondition(); private void testWait() { new Thread(new Runnable() { @Override public void run() { num.set(new AtomicInteger(1)); while (true) { if (num.get().getAndIncrement() == 5) { Console.println("signal---!!!"); try { lock.lock(); condition.signal(); } finally { lock.unlock(); } } Console.println("thread ---- 01"); sleep(1000); } } }).start(); new Thread(new Runnable() { @Override public void run() { num.set(new AtomicInteger(1)); while (true) { if (num.get().getAndIncrement() == 2) { try { //lock.tryLock(); //lock.tryLock(5000, TimeUnit.MILLISECONDS); lock.lock();//這裏一樣要加鎖,不然會拋出IllegalMonitorStateException異常。注意的是這裏不要使用synchronized進行加鎖,而是使用lock condition.await();//注意這裏不要調用wait!!! } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } Console.println("thread ---- 02"); sleep(1000); } } }).start(); } private void sleep(int time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
在使用Condition的時候和Synchronized沒有太大的區別,只是調用的方法變爲await和signal。須要注意的是這裏加鎖再也不使用synchronized()進行加鎖,而是使用lock和unlock進行加鎖。
執行結果以下:
thread ---- 01 thread ---- 02 thread ---- 01 thread ---- 01 thread ---- 01 signal---!!! thread ---- 01 thread ---- 02 thread ---- 02 thread ---- 01
3、sleep/yield/join
對於sleep()方法應該很熟悉了,讓當前線程睡眠一段時間。期間不會釋聽任何持有的鎖。
public static native void sleep(long millis) throws InterruptedException;
1 /** 2 * Causes the currently executing thread to sleep (temporarily cease 3 * execution) for the specified number of milliseconds, subject to 4 * the precision and accuracy of system timers and schedulers. The thread 5 * does not lose ownership of any monitors. 6 * 7 * @param millis 8 * the length of time to sleep in milliseconds 9 * 10 * @throws IllegalArgumentException 11 * if the value of {@code millis} is negative 12 * 13 * @throws InterruptedException 14 * if any thread has interrupted the current thread. The 15 * <i>interrupted status</i> of the current thread is 16 * cleared when this exception is thrown. 17 */
對於yield()方法可能使用的狀況少一下。其做用主要是讓當前線程從運行狀態轉變爲就緒狀態,由線程調度從新選擇就緒狀態的線程分配CPU資源。至於最終會選取哪一個線程分配CPU資源就由調度策略來決定了,有可能仍是該線程,有可能換爲其它線程。
1 /** 2 * A hint to the scheduler that the current thread is willing to yield 3 * its current use of a processor. The scheduler is free to ignore this 4 * hint. 5 * 6 * <p> Yield is a heuristic attempt to improve relative progression 7 * between threads that would otherwise over-utilise a CPU. Its use 8 * should be combined with detailed profiling and benchmarking to 9 * ensure that it actually has the desired effect. 10 * 11 * <p> It is rarely appropriate to use this method. It may be useful 12 * for debugging or testing purposes, where it may help to reproduce 13 * bugs due to race conditions. It may also be useful when designing 14 * concurrency control constructs such as the ones in the 15 * {@link java.util.concurrent.locks} package. 16 */ 17 public static native void yield();
對於join方法,做用是暫停當前線程,等待被調用線程指向結束以後再繼續執行。
1 /** 2 * Waits for this thread to die. 3 * 4 * <p> An invocation of this method behaves in exactly the same 5 * way as the invocation 6 * 7 * <blockquote> 8 * {@linkplain #join(long) join}{@code (0)} 9 * </blockquote> 10 * 11 * @throws InterruptedException 12 * if any thread has interrupted the current thread. The 13 * <i>interrupted status</i> of the current thread is 14 * cleared when this exception is thrown. 15 */ 16 public final void join() throws InterruptedException;
使用join的時候須要注意:
一、調用join的時候,當前線程不會釋放掉鎖,若是調用線程也須要該鎖則就會致使死鎖!
二、join方法不會啓動調用線程,因此,在調用join以前,該調用線程必須已經start啓動,不然不會達到想要的效果。
join的底層實際是就是使用了一個自旋等待機制,判斷調用線程是否死亡,若是沒有則一直讓當前線程wait。能夠看一下底層實現源碼:
1 public final synchronized void join(long millis) throws InterruptedException { 2 long base = System.currentTimeMillis(); 3 long now = 0; 4 if (millis < 0) { 5 throw new IllegalArgumentException("timeout value is negative"); 6 } 7 if (millis == 0) { 8 while (isAlive()) {//若是調用者依舊沒有結束,讓當前線程進行等待 9 wait(0);//注意這裏的wait是等待的當前線程,而不是調用者線程 10 } 11 } else { 12 while (isAlive()) { 13 long delay = millis - now; 14 if (delay <= 0) { 15 break; 16 } 17 wait(delay);//指定等待的時間 18 now = System.currentTimeMillis() - base; 19 } 20 } 21 }
4、CyclicBarrier柵欄
CyclicBarrier字面理解爲線程屏障,當指定數量的線程執行到指定位置的時候,才能觸發後續動做的進行。其最終目的是讓全部線程同時開始後續的工做。
例如:三個員工來公司開會,因爲三人住的地方與公司距離不一樣,因此到會議室的時間也不一樣。而會議開始必須等待三者都到達會議室以後才能進行。
代碼以下:
1 package thread.blogs.cooperation; 2 3 import scala.Console; 4 5 import java.util.concurrent.CyclicBarrier; 6 7 /** 8 * Created by PerkinsZhu on 2017/8/30 10:32. 9 */ 10 public class TestCyclicBarrier { 11 public static void main(String[] args) { 12 testCyclicBarrier(); 13 } 14 15 private static void testCyclicBarrier() { 16 /** 17 * 注意這裏等待的是三個線程。這就至關於一個線程計數器,當指定個數的線程執行 barrier.await();方法以後,纔會執行後續的代碼,不然每一個線程都會一直進行等待。 18 * 若是把3修改成4,則將永遠等待下去,不會開始會議。 19 * 若是把3修改成2,則小張到達以後就會提早開始會議,不會繼續等待小王。 20 */ 21 CyclicBarrier barrier = new CyclicBarrier(3); 22 23 Thread 小李 = new Thread(new MyRunner(barrier, "小李", 2000)); 24 小李.start(); 25 Thread 小張 = new Thread(new MyRunner(barrier, "小張", 4000)); 26 小張.start(); 27 Thread 小王 = new Thread(new MyRunner(barrier, "小王", 5000)); 28 小王.start(); 29 } 30 31 static class MyRunner implements Runnable { 32 CyclicBarrier barrier; 33 String name; 34 int time; 35 36 public MyRunner(CyclicBarrier barrier, String name, int time) { 37 this.barrier = barrier; 38 this.name = name; 39 this.time = time; 40 } 41 42 @Override 43 public void run() { 44 Console.println(name + " 開始出發去公司。"); 45 sleep(time); 46 Console.println(name + " 終於到會議室!!!"); 47 try { 48 barrier.await(); 49 } catch (Exception e) { 50 e.printStackTrace(); 51 } 52 startMeeting(name); 53 } 54 } 55 56 private static void startMeeting(String name) { 57 Console.println(name + "說:人齊了。會議開始!!"); 58 } 59 60 private static void sleep(int time) { 61 try { 62 Thread.sleep(time); 63 } catch (InterruptedException e) { 64 e.printStackTrace(); 65 } 66 } 67 }
運行結果:
1 小李 開始出發去公司。 2 小王 開始出發去公司。 3 小張 開始出發去公司。 4 小李 終於到會議室!!! 5 小張 終於到會議室!!! 6 小王 終於到會議室!!! 7 小王說:人齊了。會議開始!! 8 小李說:人齊了。會議開始!! 9 小張說:人齊了。會議開始!!
在使用CyclicBarrier的時候,提供了一個重載的構造器。
public CyclicBarrier(int parties, Runnable barrierAction) {}
barrierAction會在觸發。
例如修改上面的代碼21行爲:一組線程中的最後一個線程到達以後(但在釋放全部線程以前)
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() { @Override public void run() { Console.println("======"); } });
運行結果:
小張 開始出發去公司。 小李 開始出發去公司。 小王 開始出發去公司。 小李 終於到會議室!!! 小張 終於到會議室!!! 小王 終於到會議室!!! ====== 小王說:人齊了。會議開始!! 小李說:人齊了。會議開始!! 小張說:人齊了。會議開始!!
5、CountDownLatch閉鎖
與CycliBarrier不一樣的是CountDownLatch是某一個線程等待其餘線程執行到某一位置以後,該線程(調用countDownLatch.await();等待的線程)纔會繼續後續工做。而CycliBarrier是各個線程執行到某位置以後,而後全部線程一齊開始後續的工做。相同的是二者都屬於線程計數器。
使用示例以下: boss等待全部員工來開會,當全部人員都到齊以後,boss宣佈開始會議!!!
1 package thread.blogs.cooperation; 2 3 import scala.Console; 4 5 import java.util.concurrent.CountDownLatch; 6 7 /** 8 * Created by PerkinsZhu on 2017/8/30 10:32. 9 */ 10 public class TestCyclicBarrier { 11 public static void main(String[] args) { 12 testCyclicBarrier(); 13 } 14 15 private static void testCyclicBarrier() { 16 17 CountDownLatch countDownLatch = new CountDownLatch(3);//注意這裏的參數指定了等待的線程數量 18 19 new Thread(new MyRunner(countDownLatch, "小李", 2000)).start(); 20 new Thread(new MyRunner(countDownLatch, "小張", 4000)).start(); 21 new Thread(new MyRunner(countDownLatch, "小王", 5000)).start(); 22 23 try { 24 Console.println("等待員工到來開會。。。。。。。"); 25 countDownLatch.await();//注意這裏是await。主線程將會一直等待在這裏,當全部線程都執行 countDownLatch.countDown();以後當前線程纔會繼續執行 26 startMeeting("Boss"); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 } 31 32 static class MyRunner implements Runnable { 33 CountDownLatch countDownLatch; 34 String name; 35 int time; 36 37 public MyRunner(CountDownLatch countDownLatch, String name, int time) { 38 this.countDownLatch = countDownLatch; 39 this.name = name; 40 this.time = time; 41 } 42 43 @Override 44 public void run() { 45 Console.println(name + " 開始出發去公司。"); 46 sleep(time); 47 Console.println(name + " 終於到會議室!!!"); 48 countDownLatch.countDown();
Console.println(name + " 準備好了!!"); 49 } 50 } 51 52 private static void startMeeting(String name) { 53 Console.println(name + "說:人齊了。會議開始!!"); 54 } 55 56 private static void sleep(int time) { 57 try { 58 Thread.sleep(time); 59 } catch (InterruptedException e) { 60 e.printStackTrace(); 61 } 62 } 63 }
執行結果以下:
等待員工到來開會。。。。。。。
小王 開始出發去公司。
小張 開始出發去公司。
小李 開始出發去公司。
小李 終於到會議室!!!
小李 準備好了!!
小張 終於到會議室!!!
小張 準備好了!!
小王 終於到會議室!!!
小王 準備好了!!
Boss說:人齊了。會議開始!!
注意區分是某一個線程等待其餘線程仍是全部線程在達到某一條件以後一塊兒執行!!!
六、Semaphore 信號量
Semaphore在線程協做方面主要用於控制同時訪問臨界區資源的線程個數。信號量是屬於操做系統層面的概念,jdk提供了操做接口。
使用示例以下:
1 package thread.blogs.cooperation; 2 3 import scala.Console; 4 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.Semaphore; 8 9 /** 10 * Created by PerkinsZhu on 2017/8/30 11:43. 11 */ 12 public class TestSemaphore { 13 public static void main(String[] args) { 14 testSemaphore(); 15 } 16 17 private static void testSemaphore() { 18 Semaphore semaphore = new Semaphore(2, true);//指定同時訪問臨界區資源的線程數量。第二個參數指定以公平方式訪問臨界區資源 19 ExecutorService excutorService = Executors.newFixedThreadPool(10); 20 for (int i = 0; i < 6; i++) {//啓動10個線程請求資源 21 excutorService.execute(new MyRunner(semaphore)); 22 sleep(0);//逐個啓動線程 23 } 24 excutorService.shutdown(); 25 } 26 27 static class MyRunner implements Runnable { 28 Semaphore semaphore; 29 30 public MyRunner(Semaphore semaphore) { 31 this.semaphore = semaphore; 32 } 33 34 @Override 35 public void run() { 36 String name = Thread.currentThread().getName(); 37 try { 38 Console.println(name + " ------請求資源!!"); 39 //semaphore.acquire(2);//設置請求資源的數量。必須有足夠數量的資源纔可進去臨界區。不過釋放的時候也要一塊兒釋放,請求幾個就要調用幾回release() 40 semaphore.acquire();//請求獲取資源,若是有空閒資源則會當即獲取,進入臨界區,不然將會等待,一直等待到獲取到臨界區資源 41 Console.println(name + " ======獲取資源!!"); 42 sleep(1000); 43 //semaphore.release(); 44 semaphore.release();//釋放資源 45 Console.println(name + " ******釋放資源!!"); 46 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 } 51 } 52 53 private static void sleep(int time) { 54 try { 55 Thread.sleep(time); 56 } catch (InterruptedException e) { 57 e.printStackTrace(); 58 } 59 } 60 }
執行結果以下:
pool-1-thread-1 ------請求資源!! pool-1-thread-2 ------請求資源!! pool-1-thread-6 ------請求資源!! pool-1-thread-5 ------請求資源!! pool-1-thread-3 ------請求資源!! pool-1-thread-4 ------請求資源!! pool-1-thread-2 ======獲取資源!! pool-1-thread-1 ======獲取資源!! pool-1-thread-1 ******釋放資源!! pool-1-thread-6 ======獲取資源!! pool-1-thread-5 ======獲取資源!! pool-1-thread-2 ******釋放資源!! pool-1-thread-6 ******釋放資源!! pool-1-thread-4 ======獲取資源!! pool-1-thread-3 ======獲取資源!! pool-1-thread-5 ******釋放資源!! pool-1-thread-4 ******釋放資源!! pool-1-thread-3 ******釋放資源!!
根據結果能夠看出只有當有線程釋放資源以後,纔會有新的線程獲取到資源。即控制了同一時間訪問臨界區資源的線程數量。當Semaphore(1)設置爲1的時候,此時能夠當作鎖來使用。多線程(五) java的線程鎖
線程之間的通訊和協做方式大概就以上六種,要熟悉每種工具的使用場景和方法特性,經過靈活的組合各個工具來靈活控制各個線程的工做。在決定使用哪一種工具以前必需要明確本身的目的是什麼,要實現什麼樣的機制,這樣才能肯定選擇哪一種工具協調各個線程。
=========================================
原文連接:多線程(六)線程間的通訊和協做轉載請註明出處!
=========================================
---end