Java Thread系列(四)線程通訊

Java Thread系列(四)線程通訊

1、傳統通訊

public static void main(String[] args) {
    //volatile實現兩個線程間數據可見性
    private volatile static List list = new ArrayList();

    Thread t1 = new Thread(new Runnable() { // (1)
        public void run() {
            try {
                for(int i = 0; i <10; i++){
                    list.add(i);
                    System.out.println(Thread.currentThread().getName() + "線程添加第" + (i + 1) + "個元素..");
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "t1");

    Thread t2 = new Thread(new Runnable() { // (2)
        public void run() {
            while(true){
                if(list.size() == 5){
                    //do something
                    throw new RuntimeException(Thread.currentThread().getName() + 
                        "線程接到通知 size = " + list.size() + " 線程中止..");
                }
            }
        }
    }, "t2");

    t1.start();
    t2.start();
}
  1. t1 線程不斷將生產的數據放入 list 集合中java

  2. t2 線程開啓 while 循環監聽 t1 線程,雖然能夠實現 list.size()==5 時實時通知 t2 線程,但太浪費性能,考慮用 await/notify 提升性能,程序執行結果以下:git

t1線程添加第1個元素..
t1線程添加第2個元素..
t1線程添加第3個元素..
t1線程添加第4個元素..
t1線程添加第5個元素..
Exception in thread "t2" java.lang.RuntimeException: t2線程接到通知 size = 5 線程中止..
    at com.github.binarylei.thread._2_1conn.ListAdvice1$2.run(ListAdvice1.java:35)
    at java.lang.Thread.run(Thread.java:745)
t1線程添加第6個元素..
t1線程添加第7個元素..
t1線程添加第8個元素..
t1線程添加第9個元素..
t1線程添加第10個元素..

2、wait/notify 實現通訊

/**
 * 使用wait/notify方法實現線程單挑通訊(注意這兩個方法是Object類的方法)
 *   1. wait和notity必須配合synchronized關鍵字使用
 *   2. wait方法(關閉線程)釋放鎖,notify(喚醒線程)方法不釋放鎖
 * 缺點:通知不實時,使用CountDownLatch實現實時通知
 */
public static void main(String[] args) {
    private volatile static List list = new ArrayList();
    final Object lock = new Object();

    Thread t1 = new Thread(new Runnable() { // (1)
        public void run() {
            try {
                synchronized (lock) {
                    System.out.println("t1啓動..");
                    for(int i = 0; i <10; i++){
                        list.add(i);
                        System.out.println(Thread.currentThread().getName() + "線程添加第" + (i + 1) + "個元素..");
                        Thread.sleep(500);
                        if(list.size() == 5){
                            System.out.println("已經發出通知..");
                            lock.notify();
                        }
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "t1");

    Thread t2 = new Thread(new Runnable() { // (2)
        public void run() {
            synchronized (lock) {
                System.out.println("t2啓動..");
                if(list.size() != 5){
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //do something
                throw new RuntimeException(Thread.currentThread().getName() + 
                    "線程接到通知 size = " + list.size() + " 線程中止..");
            }
        }
    }, "t2");
}
  1. t1 線程當 list.size()==5lock.notify() 喚醒 t2 線程,注意 wait/notify 必須配合 synchronized 使用github

  2. t2 線程調用 lock.wait() 後處於一直阻塞狀態,直到 t1 線程調用 lock.notify() 喚醒該線程,假若沒有線程喚醒 t2 線程,那麼 t2 線程就一直處於阻塞狀態。本例中若 t1 線程先啓動,那麼 t2 線程調用 lock.wait() 就永遠阻塞沒法執行。程序執行結果以下:。安全

t2啓動..
t1啓動..
t1線程添加第1個元素..
t1線程添加第2個元素..
t1線程添加第3個元素..
t1線程添加第4個元素..
t1線程添加第5個元素..
已經發出通知..
t1線程添加第6個元素..
t1線程添加第7個元素..
t1線程添加第8個元素..
t1線程添加第9個元素..
t1線程添加第10個元素..
Exception in thread "t2" java.lang.RuntimeException: t2線程接到通知 size = 10 線程中止..
    at com.github.binarylei.thread._2_1conn.ListAdd2$2.run(ListAdd2.java:51)
    at java.lang.Thread.run(Thread.java:745)
  1. 因爲 t1 線程 lock.notify() 後不會釋放鎖,t2 線程雖然被喚醒但不能獲取鎖,因此通知就不那麼實時,只有等 t1 線程執行完成釋放鎖後 t2 線程才能得到鎖執行相應操做,解決方案:使用 CountDownLatch

3、CountDownLatch 實現實時通訊

public static void main(String[] args) {
    private volatile static List list = new ArrayList();
    final CountDownLatch countDownLatch = new CountDownLatch(1); // (1)

    Thread t1 = new Thread(new Runnable() {
        public void run() {
            try {
                System.out.println("t1啓動..");
                for(int i = 0; i <10; i++){
                    list.add(i);
                    System.out.println(Thread.currentThread().getName() + "線程添加第" + (i + 1) + "個元素..");
                    Thread.sleep(500);
                    if(list.size() == 5){
                        System.out.println("已經發出通知..");
                        countDownLatch.countDown(); // (2)
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }, "t1");

    Thread t2 = new Thread(new Runnable() {
        public void run() {
            System.out.println("t2啓動..");
            if(list.size() != 5){
                try {
                    countDownLatch.await(); // (3)
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //do something
            throw new RuntimeException(Thread.currentThread().getName() + 
                "線程接到通知 size = " + list.size() + " 線程中止..");
        }
    }, "t2");

    t1.start();
    t2.start();
}
  1. CountDownLatch 同步工具類,容許一個或多個線程一直等待,直到其餘線程的操做執行完後再執行,參數 1 表示須要等待的線程數量,具體來講就是參數爲幾就必須調用幾回 countDownLatch.countDown()多線程

  2. countDownLatch.countDown() 喚醒線程併發

  3. countDownLatch.await() 阻塞線程,程序執行結果以下:ide

t1啓動..
t1線程添加第1個元素..
t2啓動..
t1線程添加第2個元素..
t1線程添加第3個元素..
t1線程添加第4個元素..
t1線程添加第5個元素..
已經發出通知..
Exception in thread "t2" java.lang.RuntimeException: t2線程接到通知 size = 5 線程中止..
t1線程添加第6個元素..
    at com.github.binarylei.thread._2_1conn.ListAdd3$2.run(ListAdd3.java:47)
    at java.lang.Thread.run(Thread.java:745)
t1線程添加第7個元素..
t1線程添加第8個元素..
t1線程添加第9個元素..
t1線程添加第10個元素..

4、ThreadLocal

ThreadLocal 是線程局部變量,是一種多線程間併發訪問變量的無鎖解決方案。高併發

ThreadLocal 和 synchronized 比較?工具

  1. 與 synchronized 等加鎖的方式不一樣,ThreadLocal 徹底不提供鎖,而使用以空間換時間的手段,爲每一個線程提供變量的獨立副本,以保障線程安全。性能

  2. 從性能上說,ThreadLocal 不具備絕對的優點,在併發不是很高的時候,加鎖的性能會更好,但做爲一套無鎖的解決方案,在高併發量或者競爭激烈的場景,使用 ThreadLocal 能夠在必定程度上減小鎖競爭。

public static void main(String[] args) throws InterruptedException {
    final ThreadLocal<String> th = new ThreadLocal<String>();

    Thread t1 = new Thread(new Runnable() {
        public void run() {
            th.set("張三");
            System.out.println(th.get()); // => "張三"
        }
    }, "t1");
    
    Thread t2 = new Thread(new Runnable() {
        public void run() {
            try {
                Thread.sleep(1000);
                th.set("李四");
                System.out.println(th.get()); // => "李四"
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "t2");
    
    t1.start(); //t1:張三
    t2.start(); //t2:李四
}

5、自定義同步類窗口-Queue

Java 提供了一些同步類容器,它們是 線程安全 的,如 Vector、HashTable 等。這些同步類容器是由 Collections.synchronizedMap 等工廠方法去建立實現的,底層使用 synchronized 關鍵字,每次只有一個線程訪問容器。下面實現一個本身的同步類窗口。

import java.util.LinkedList;

public class MyQueue {   
    private LinkedList list = new LinkedList();
    private int max = 5;
    private int min = 1;
    private Object lock = new Object();

    public void put(Object obj) {  // (1)
        synchronized (lock) {
            while (list.size() == max) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    ;
                }
            }
            list.add(obj);
            lock.notify();
            System.out.println("put元素:" + obj);
        }
    }

    public Object take() {  // (2)
        Object obj;
        synchronized (lock) {
            while (list.size() == min) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    ;
                }
            }
            obj = list.removeFirst();
            lock.notify();
            System.out.println("take元素:" + obj);
        }
        return obj;
    }
}

測試

public static void main(String[] args) {
    final MyQueue myQueue = new MyQueue();
    myQueue.put("a");
    myQueue.put("b");
    myQueue.put("c");
    myQueue.put("d");
    myQueue.put("e");

    new Thread(new Runnable() {
        @Override
        public void run() {
            myQueue.put("f");
            myQueue.put("g");
            myQueue.put("h");
            myQueue.put("i");
        }
    }).start();

    new Thread(new Runnable() {
        @Override
        public void run() {
            myQueue.take();
            myQueue.take();
            myQueue.take();
        }
    }).start();
}

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索