Java 線程通訊之 wait/notify 機制

前言

Java 線程通訊是將多個獨立的線程個體進行關聯處理,使得線程與線程之間能進行相互通訊。好比線程 A 修改了對象的值,而後通知給線程 B,使線程 B 可以知道線程 A 修改的值,這就是線程通訊。java

<!-- more -->多線程

wait/notify 機制

一個線程調用 Object 的 wait() 方法,使其線程被阻塞;另外一線程調用 Object 的 notify()/notifyAll() 方法,wait() 阻塞的線程繼續執行。dom

wai/notify 方法學習

方法 說明
wait() 當前線程被阻塞,線程進入 WAITING 狀態
wait(long) 設置線程阻塞時長,線程會進入 TIMED_WAITING 狀態。若是設置時間內(毫秒)沒有通知,則超時返回
wait(long, int) 納秒級別的線程阻塞時長設置
notify() 通知同一個對象上已執行 wait() 方法且得到對象鎖的等待線程
notifyAll() 通知同一對象上全部等待的線程

實現 wait/notify 機制的條件:測試

  • 調用 wait 線程和 notify 線程必須擁有相同對象鎖。
  • wait() 方法和 notify()/notifyAll() 方法必須在 Synchronized 方法或代碼塊中。

因爲 wait/notify 方法是定義在java.lang.Object中,因此在任何 Java 對象上均可以使用。線程

wait 方法

在執行 wait() 方法前,當前線程必須已得到對象鎖。調用它時會阻塞當前線程,進入等待狀態,在當前 wait() 處暫停線程。同時,wait() 方法執行後,會當即釋放得到的對象鎖。3d

下面經過案例來查看 wait() 釋放鎖。代理

首先查看不使用 wait() 方法時的代碼執行狀況:code

package top.ytao.demo.thread.waitnotify;

/**
 * Created by YangTao
 */
public class WaitTest {
    
    static Object object = new Object();
    
    public static void main(String[] args) {

        new Thread(() -> {
            synchronized (object){
                System.out.println("開始線程 A");
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("結束線程 A");
            }
        }, "線程 A").start();


        new Thread(() -> {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (object){
                System.out.println("開始線程 B");

                System.out.println("結束線程 B");
            }
        }, "線程 B").start();

    }

}

建立 A、B 兩個線程,。首先在 B 線程建立後 sleep ,保證 B 線程的打印後於 A 線程執行。在 A 線程中,獲取到對象鎖後,sleep 一段時間,且時間大於 B 線程的 sleep 時間。對象

執行結果爲:

從上圖結果中,能夠看到,B 線程必定等 A 線程執行完 synchronize 代碼塊釋放對象鎖後 A 線程再獲取對象鎖進入 synchronize 代碼塊中。在這過程當中,Thread.sleep() 方法也不會釋放鎖。

當前在 A 線程 synchronize 代碼塊中執行 wait() 方法後,就會主動釋放對象鎖,A 線程代碼以下:

new Thread(() -> {
    synchronized (object){
        System.out.println("開始線程 A");
        try {
            // 調用 object 對象的 wait 方法
            object.wait();
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("結束線程 A");
    }
}, "線程 A").start();

執行結果:

同時 A 線程一直處於阻塞狀態,不會打印結束線程 A

wait(long) 方法是設置超時時間,當等待時間大於設置的超時時間後,會繼續往 wait(long) 方法後的代碼執行。

new Thread(() -> {
    synchronized (object){
        System.out.println("開始線程 A");
        try {
            object.wait(1000);
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("結束線程 A");
    }
}, "線程 A").start();

執行結果

同理,wait(long, int) 方法與 wait(long) 一樣,只是多個納秒級別的時間設置。

notify 方法

一樣,在執行 notify() 方法前,當前線程也必須已得到線程鎖。調用 notify() 方法後,會通知一個執行了 wait() 方法的阻塞等待線程,使該等待線程從新獲取到對象鎖,而後繼續執行 wait() 後面的代碼。可是,與 wait() 方法不一樣,執行 notify() 後,不會當即釋放對象鎖,而須要執行完 synchronize 的代碼塊或方法纔會釋放鎖,因此接收通知的線程也不會當即得到鎖,也須要等待執行 notify() 方法的線程釋放鎖後再獲取鎖。

notify()

下面是 notify() 方法的使用,實現一個完整的 wait/notify 的例子,同時驗證發出通知後,執行 notify() 方法的線程是否當即釋放鎖,執行 wait() 方法的線程是否當即獲取鎖。

package top.ytao.demo.thread.waitnotify;

/**
 * Created by YangTao
 */
public class WaitNotifyTest {

    static Object object = new Object();

    public static void main(String[] args) {
        System.out.println();

        new Thread(() -> {
            synchronized (object){
                System.out.println("開始線程 A");
                try {
                    object.wait();
                    System.out.println("A 線程從新獲取到鎖,繼續進行");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("結束線程 A");
            }
        }, "線程 A").start();


        new Thread(() -> {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (object){
                System.out.println("開始線程 B");
                object.notify();
                System.out.println("線程 B 通知完線程 A");
                try {
                    // 試驗執行完 notify() 方法後,A 線程是否能當即獲取到鎖
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("結束線程 B");
            }
        }, "線程 B").start();

    }

}

以上 A 線程執行 wait() 方法,B 線程執行 notify() 方法,執行結果爲:

執行結果中能夠看到,B 線程執行 notify() 方法後,即便 sleep 了,A 線程也沒有獲取到鎖,可知,notify() 方法並無釋放鎖。

notify() 是通知到等待中的線程,可是調用一次 notify() 方法,只能通知到一個執行 wait() 方法的等待線程。若是有多個等待狀態的線程,則需屢次調用 notify() 方法,通知到線程順序則根據執行 wait() 方法的前後順序進行通知。

下面建立有兩個執行 wait() 方法的線程的代碼:

package top.ytao.demo.thread.waitnotify;

/**
 * Created by YangTao
 */
public class MultiWaitNotifyTest {

    static Object object = new Object();

    public static void main(String[] args) {
        System.out.println();

        new Thread(() -> {
            synchronized (object){
                System.out.println("開始線程 A");
                try {
                    object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("結束線程 A");
            }
        }, "線程 A").start();


        new Thread(() -> {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (object){
                System.out.println("開始線程 B");
                try {
                    object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("結束線程 B");
            }
        }, "線程 B").start();


        new Thread(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (object){
                System.out.println("開始通知線程 C");
                object.notify();
                object.notify();
                System.out.println("結束通知線程 C");
            }
        }, "線程 C").start();

    }

}

先 A 線程執行 wait() 方法,而後 B 線程執行 wait() 方法,最後 C 線程調用兩次 notify() 方法,執行結果:

notifyAll()

通知多個等待狀態的線程,經過屢次調用 notify() 方法實現的方案,在實際應用過程當中,實現過程不太友好,若是是想通知全部等待狀態的線程,可以使用 notifyAll() 方法,就能喚醒全部線程。

實現方式,只需將上面 C 線程的屢次調用 notify() 方法部分改成調用一次 notifyAll() 方法便可。

new Thread(() -> {
    try {
        Thread.sleep(3000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    synchronized (object){
        System.out.println("開始通知線程 C");
        object.notifyAll();
        System.out.println("結束通知線程 C");
    }
}, "線程 C").start();

執行結果:

根據不一樣 JVM 的實現,notifyAll() 的喚醒順序會有所不一樣,當前測試環境中,以倒序順序喚醒線程。

實現生產者消費者模式

生產消費者模式就是一個線程生產數據進行存儲,另外一線程進行數據提取消費。下面就以兩個線程來模擬,生產者生成一個 UUID 存放到 List 對象中,消費者讀取 List 對象中的數據,讀取完成後進行清除。

實現代碼以下:

package top.ytao.demo.thread.waitnotify;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

/**
 * Created by YangTao
 */
public class WaitNotifyModelTest {

    // 存儲生產者產生的數據
    static List<String> list = new ArrayList<>();

    public static void main(String[] args) {

        new Thread(() -> {
            while (true){
                synchronized (list){
                    // 判斷 list 中是否有數據,若是有數據的話,就進入等待狀態,等數據消費完
                    if (list.size() != 0){
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    // list 中沒有數據時,產生數據添加到 list 中
                    list.add(UUID.randomUUID().toString());
                    list.notify();
                    System.out.println(Thread.currentThread().getName() + list);
                }
            }
        }, "生產者線程 A ").start();


        new Thread(() -> {
            while (true){
                synchronized (list){
                    // 若是 list 中沒有數據,則進入等待狀態,等收到有數據通知後再繼續運行
                    if (list.size() == 0){
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    // 有數據時,讀取數據
                    System.out.println(Thread.currentThread().getName() + list);
                    list.notify();
                    // 讀取完畢,將當前這條 UUID 數據進行清除
                    list.clear();
                }
            }
        }, "消費者線程 B ").start();

    }

}

運行結果:

生產者線程運行時,若是已存在未消費的數據,則當前線程進入等待狀態,收到通知後,代表數據已消費完,再繼續向 list 中添加數據。

消費者線程運行時,若是不存在未消費的數據,則當前線程進入等待狀態,收到通知後,代表 List 中已有新數據被添加,繼續執行代碼消費數據並清除。

不論是生產者仍是消費者,基於對象鎖,一次只能一個線程能獲取到,若是生產者獲取到鎖就校驗是否須要生成數據,若是消費者獲取到鎖就校驗是否有數據可消費。

一個簡單的生產者消費者模式就以完成。

總結

等待/通知機制是實現 Java 線程間通訊的一種方式,將多線程中,各個獨立運行的線程經過相互通訊來更高效的協做完成工做,更大效率利用 CPU 處理程序。這也是學習或研究 Java 線程的必學知識點。

推薦閱讀

《Java 線程基礎,從這篇開始》

《你必須會的 JDK 動態代理和 CGLIB 動態代理》

《Dubbo 系列》

相關文章
相關標籤/搜索