線程通訊

線程通訊的目的是爲了可以讓線程之間相互發送信號。另外,線程通訊還可以使得線程等待其它線程的信號,好比,線程B能夠等待線程A的信號,這個信號能夠是線程A已經處理完成的信號。html

①同步java

這裏講的同步是指多個線程經過synchronized關鍵字這種方式來實現線程間的通訊。多線程

參考示例:分佈式

public class MyObject {

    synchronized public void methodA() {
        //do something....
    }

    synchronized public void methodB() {
        //do some other thing
    }
}

public class ThreadA extends Thread {

    private MyObject object;
//省略構造方法
    @Override
    public void run() {
        super.run();
        object.methodA();
    }
}

public class ThreadB extends Thread {

    private MyObject object;
//省略構造方法
    @Override
    public void run() {
        super.run();
        object.methodB();
    }
}

public class Run {
    public static void main(String[] args) {
        MyObject object = new MyObject();

        //線程A與線程B 持有的是同一個對象:object
        ThreadA a = new ThreadA(object);
        ThreadB b = new ThreadB(object);
        a.start();
        b.start();
    }
}
複製代碼

因爲線程A和線程B持有同一個MyObject類的對象object,儘管這兩個線程須要調用不一樣的方法,可是它們是同步執行的,好比:線程B須要等待線程A執行完了methodA()方法以後,它才能執行methodB()方法。這樣,線程A和線程B就實現了 通訊。ide

這種方式,本質上就是「共享內存」式的通訊。多個線程須要訪問同一個共享變量,誰拿到了鎖(得到了訪問權限),誰就能夠執行。post

 

②while輪詢的方式測試

代碼以下:ui

複製代碼
 1 import java.util.ArrayList;
 2 import java.util.List;
 3 
 4 public class MyList {
 5 
 6     private List<String> list = new ArrayList<String>();
 7     public void add() {
 8         list.add("elements");
 9     }
10     public int size() {
11         return list.size();
12     }
13 }
14 
15 import mylist.MyList;
16 
17 public class ThreadA extends Thread {
18 
19     private MyList list;
20 
21     public ThreadA(MyList list) {
22         super();
23         this.list = list;
24     }
25 
26     @Override
27     public void run() {
28         try {
29             for (int i = 0; i < 10; i++) {
30                 list.add();
31                 System.out.println("添加了" + (i + 1) + "個元素");
32                 Thread.sleep(1000);
33             }
34         } catch (InterruptedException e) {
35             e.printStackTrace();
36         }
37     }
38 }
39 
40 import mylist.MyList;
41 
42 public class ThreadB extends Thread {
43 
44     private MyList list;
45 
46     public ThreadB(MyList list) {
47         super();
48         this.list = list;
49     }
50 
51     @Override
52     public void run() {
53         try {
54             while (true) {
55                 if (list.size() == 5) {
56                     System.out.println("==5, 線程b準備退出了");
57                     throw new InterruptedException();
58                 }
59             }
60         } catch (InterruptedException e) {
61             e.printStackTrace();
62         }
63     }
64 }
65 
66 import mylist.MyList;
67 import extthread.ThreadA;
68 import extthread.ThreadB;
69 
70 public class Test {
71 
72     public static void main(String[] args) {
73         MyList service = new MyList();
74 
75         ThreadA a = new ThreadA(service);
76         a.setName("A");
77         a.start();
78 
79         ThreadB b = new ThreadB(service);
80         b.setName("B");
81         b.start();
82     }
83 }
複製代碼

在這種方式下,線程A不斷地改變條件,線程ThreadB不停地經過while語句檢測這個條件(list.size()==5)是否成立 ,從而實現了線程間的通訊。可是這種方式會浪費CPU資源。之因此說它浪費資源,是由於JVM調度器將CPU交給線程B執行時,它沒作啥「有用」的工做,只是在不斷地測試 某個條件是否成立。就相似於現實生活中,某我的一直看着手機屏幕是否有電話來了,而不是: 在幹別的事情,當有電話來時,響鈴通知TA電話來了。關於線程的輪詢的影響,可參考:JAVA多線程之當一個線程在執行死循環時會影響另一個線程嗎?this

這種方式還存在另一個問題:url

輪詢的條件的可見性問題,關於內存可見性問題,可參考:JAVA多線程之volatile 與 synchronized 的比較中的第一點「一,volatile關鍵字的可見性

線程都是先把變量讀取到本地線程棧空間,而後再去再去修改的本地變量。所以,若是線程B每次都在取本地的 條件變量,那麼儘管另一個線程已經改變了輪詢的條件,它也察覺不到,這樣也會形成死循環。

 

③wait/notify機制

代碼以下:

複製代碼
 1 import java.util.ArrayList;
 2 import java.util.List;
 3 
 4 public class MyList {
 5 
 6     private static List<String> list = new ArrayList<String>();
 7 
 8     public static void add() {
 9         list.add("anyString");
10     }
11 
12     public static int size() {
13         return list.size();
14     }
15 }
16 
17 
18 public class ThreadA extends Thread {
19 
20     private Object lock;
21 
22     public ThreadA(Object lock) {
23         super();
24         this.lock = lock;
25     }
26 
27     @Override
28     public void run() {
29         try {
30             synchronized (lock) {
31                 if (MyList.size() != 5) {
32                     System.out.println("wait begin "
33                             + System.currentTimeMillis());
34                     lock.wait();
35                     System.out.println("wait end  "
36                             + System.currentTimeMillis());
37                 }
38             }
39         } catch (InterruptedException e) {
40             e.printStackTrace();
41         }
42     }
43 }
44 
45 
46 public class ThreadB extends Thread {
47     private Object lock;
48 
49     public ThreadB(Object lock) {
50         super();
51         this.lock = lock;
52     }
53 
54     @Override
55     public void run() {
56         try {
57             synchronized (lock) {
58                 for (int i = 0; i < 10; i++) {
59                     MyList.add();
60                     if (MyList.size() == 5) {
61                         lock.notify();
62                         System.out.println("已經發出了通知");
63                     }
64                     System.out.println("添加了" + (i + 1) + "個元素!");
65                     Thread.sleep(1000);
66                 }
67             }
68         } catch (InterruptedException e) {
69             e.printStackTrace();
70         }
71     }
72 }
73 
74 public class Run {
75 
76     public static void main(String[] args) {
77 
78         try {
79             Object lock = new Object();
80 
81             ThreadA a = new ThreadA(lock);
82             a.start();
83 
84             Thread.sleep(50);
85 
86             ThreadB b = new ThreadB(lock);
87             b.start();
88         } catch (InterruptedException e) {
89             e.printStackTrace();
90         }
91     }
92 }
複製代碼

線程A要等待某個條件知足時(list.size()==5),才執行操做。線程B則向list中添加元素,改變list 的size。

A,B之間如何通訊的呢?也就是說,線程A如何知道 list.size() 已經爲5了呢?

這裏用到了Object類的 wait() 和 notify() 方法。

當條件未知足時(list.size() !=5),線程A調用wait() 放棄CPU,並進入阻塞狀態。---不像②while輪詢那樣佔用CPU

當條件知足時,線程B調用 notify()通知 線程A,所謂通知線程A,就是喚醒線程A,並讓它進入可運行狀態。

這種方式的一個好處就是CPU的利用率提升了。

可是也有一些缺點:好比,線程B先執行,一會兒添加了5個元素並調用了notify()發送了通知,而此時線程A還執行;當線程A執行並調用wait()時,那它永遠就不可能被喚醒了。由於,線程B已經發了通知了,之後再也不發通知了。這說明:通知過早,會打亂程序的執行邏輯。

 

④管道通訊就是使用java.io.PipedInputStream 和 java.io.PipedOutputStream進行通訊

具體就不介紹了。分佈式系統中說的兩種通訊機制:共享內存機制和消息通訊機制。感受前面的①中的synchronized關鍵字和②中的while輪詢 「屬於」 共享內存機制,因爲是輪詢的條件使用了volatile關鍵字修飾時,這就表示它們經過判斷這個「共享的條件變量「是否改變了,來實現進程間的交流。

而管道通訊,更像消息傳遞機制,也就是說:經過管道,將一個線程中的消息發送給另外一個。

 

 

「編寫兩個線程,一個線程打印1~25,另外一個線程打印字母A~Z,打印順序爲12A34B56C……5152Z,要求使用線程間的通訊。」

1. 第一種解法,包含多種小的不一樣實現方式,但一個共同點就是靠一個共享變量來作控制;

a. 利用最基本的synchronizednotifywait

  1. public class MethodOne {  
  2.     private final ThreadToGo threadToGo = new ThreadToGo();  
  3.     public Runnable newThreadOne() {  
  4.         final String[] inputArr = Helper.buildNoArr(52);  
  5.         return new Runnable() {  
  6.             private String[] arr = inputArr;  
  7.             public void run() {  
  8.                 try {  
  9.                     for (int i = 0; i < arr.length; i=i+2) {  
  10.                         synchronized (threadToGo) {  
  11.                             while (threadToGo.value == 2)  
  12.                                 threadToGo.wait();  
  13.                             Helper.print(arr[i], arr[i + 1]);  
  14.                             threadToGo.value = 2;  
  15.                             threadToGo.notify();  
  16.                         }  
  17.                     }  
  18.                 } catch (InterruptedException e) {  
  19.                     System.out.println("Oops...");  
  20.                 }  
  21.             }  
  22.         };  
  23.     }  
  24.     public Runnable newThreadTwo() {  
  25.         final String[] inputArr = Helper.buildCharArr(26);  
  26.         return new Runnable() {  
  27.             private String[] arr = inputArr;  
  28.             public void run() {  
  29.                 try {  
  30.                     for (int i = 0; i < arr.length; i++) {  
  31.                         synchronized (threadToGo) {  
  32.                             while (threadToGo.value == 1)  
  33.                                 threadToGo.wait();  
  34.                             Helper.print(arr[i]);  
  35.                             threadToGo.value = 1;  
  36.                             threadToGo.notify();  
  37.                         }  
  38.                     }  
  39.                 } catch (InterruptedException e) {  
  40.                     System.out.println("Oops...");  
  41.                 }  
  42.             }  
  43.         };  
  44.     }  
  45.     class ThreadToGo {  
  46.         int value = 1;  
  47.     }  
  48.     public static void main(String args[]) throws InterruptedException {  
  49.         MethodOne one = new MethodOne();  
  50.         Helper.instance.run(one.newThreadOne());  
  51.         Helper.instance.run(one.newThreadTwo());  
  52.         Helper.instance.shutdown();  
  53.     }  
  54. }  
相關文章
相關標籤/搜索