前段時間花了大量時間去研讀JUC中同步器AbstractQueuedSynchronizer
的源碼實現,再結合好久以前看過的一篇關於Object
提供的等待和喚醒機制的JVM實現,發現二者有很多的關聯,因而決定從新研讀一下Object
中提供的阻塞和喚醒方法。本文閱讀JDK類庫源碼使用的JDK版本是JDK11,由於本文內容可能不適合於其餘版本。java
java.lang.Object
做爲全部非基本類型的基類,也就是說全部java.lang.Object
的子類都具有阻塞和喚醒的功能。下面詳細分析Object
提供的阻塞和喚醒API。算法
等待-wait()
方法提供了阻塞的功能,分超時和永久阻塞的版本,實際上,底層只提供了一個JNI方法:api
// 這個是底層提供的JNI方法,帶超時的阻塞等待,響應中斷,其餘兩個只是變體 public final native void wait(long timeoutMillis) throws InterruptedException; // 變體方法1,永久阻塞,響應中斷 public final void wait() throws InterruptedException { wait(0L); } // 變體方法2,帶超時的阻塞,超時時間分兩段:毫秒和納秒,實際上納秒大於0直接毫秒加1(這麼暴力...),響應中斷 public final void wait(long timeoutMillis, int nanos) throws InterruptedException { if (timeoutMillis < 0) { throw new IllegalArgumentException("timeoutMillis value is negative"); } if (nanos < 0 || nanos > 999999) { throw new IllegalArgumentException("nanosecond timeout value out of range"); } if (nanos > 0) { timeoutMillis++; } wait(timeoutMillis); }
也就是隻有一個wait(long timeoutMillis)
方法是JNI接口,其餘兩個方法至關於:數據結構
wait()
等價於wait(0L)
。wait(long timeoutMillis, int nanos)
在參數合法的狀況下等價於wait(timeoutMillis + 1L)
。因爲wait(long timeoutMillis, int nanos)
是參數最完整的方法,它的API註釋特別長,這裏直接翻譯和摘取它註釋中的核心要素:併發
wait()
方法以後,當前線程會把自身放到當前對象的等待集合(wait-set),而後釋放全部在此對象上的同步聲明(then to relinquish any nd all synchronization claims on this object),謹記只有當前對象上的同步聲明會被釋放,當前線程在其餘對象上的同步鎖只有在調用其wait()
方法以後纔會釋放。notify()
或者中斷)就會從等待集合(wait-set)中移除而且從新容許被線程調度器調度。一般狀況下,這個被喚醒的線程會與其餘線程競爭對象上的同步權(鎖),一旦線程從新控制了對象(regained control of the object),它對對象的全部同步聲明都恢復到之前的狀態,即恢復到調用wait()
方法時(筆者認爲,其實準確來講,是調用wait()
方法前)的狀態。wait()
以前,或者調用過wait()
方法以後處於阻塞等待狀態,一旦線程調用了Thread#interrupt()
,線程就會中斷而且拋出InterruptedException
異常,線程的中斷狀態會被清除。InterruptedException
異常會延遲到在第4點提到"它對對象的全部同步聲明都恢復到之前的狀態"的時候拋出。值得注意的還有:dom
一個線程必須成爲此對象的監視器鎖的擁有者才能正常調用wait()
系列方法,也就是wait()
系列方法必須在同步代碼塊(synchronized
代碼塊)中調用,不然會拋出IllegalMonitorStateException
異常,這一點是初學者或者不瞭解wait()
的機制的開發者常常會犯的問題。數據結構和算法
上面的五點描述能夠寫個簡單的同步代碼塊僞代碼時序總結一下:ide
final Object lock = new Object(); synchronized(lock){ 一、線程進入同步代碼塊,意味着獲取對象監視器鎖成功 while(!condition){ lock.wait(); 2.線程調用wait()進行阻塞等待 break; } 3.線程從wait()的阻塞等待中被喚醒,恢復到第1步以後的同步狀態 4.繼續執行後面的代碼,直到離開同步代碼塊 }
notify()
方法的方法簽名以下:源碼分析
@HotSpotIntrinsicCandidate public final native void notify();
下面按照慣例翻譯一下其API註釋:ui
wait()
方法才能阻塞在對象監視器上。notify()
方法的線程)釋放對象上的鎖。被喚醒的線程會與其餘線程競爭在對象上進行同步(換言之只有得到對象的同步控制權才能繼續執行),在成爲下一個鎖定此對象的線程時,被喚醒的線程沒有可靠的特權或劣勢。IllegalMonitorStateException
異常。notifyAll()
方法的方法簽名以下:
@HotSpotIntrinsicCandidate public final native void notifyAll();
1.喚醒全部阻塞等待在此對象監視器上的線程,一個線程經過調用wait()
方法才能阻塞在對象監視器上。
其餘註釋的描述和notify()
方法相似。
咱們常常看到的資料中提到synchronized
關鍵字的用法:
Class
對象。對於同步代碼塊而言,synchronized
關鍵字抽象到字節碼層面就是同步代碼塊中的字節碼執行在monitorenter
和monitorexit
指令之間:
synchronized(xxxx){ ...coding block } ↓↓↓↓↓↓↓↓↓↓ monitorenter; ...coding block - bytecode monitorexit;
JVM須要保證每個monitorenter都有一個monitorexit與之相對應。任何對象都有一個monitor(其實是ObjectMonitor
)與之相關聯,當且一個monitor被持有以後,它將處於鎖定狀態。線程執行到monitorenter指令時,將會嘗試獲取對象所對應的monitor全部權,即嘗試獲取對象的鎖。
對於同步(靜態)方法而言,synchronized
方法則會被翻譯成普通的方法調用和返回指令,如:invokevirtual
等等,在JVM字節碼層面並無任何特別的指令來實現被synchronized
修飾的方法,而是在Class
文件的方法表中將該方法的access_flags
字段中的synchronized
標誌位置1,表示該方法是同步方法並使用調用該方法的對象或該方法所屬的Class
在JVM的內部對象表示Klass
作爲鎖對象。
其實從開發者角度簡單理解,這兩種方式只是在獲取鎖的時機有所不一樣。
下面重複闡述幾個第一眼看起來不合理倒是事實的問題(其實前文已經說起過):
synchronized
方法或者代碼塊,至關於獲取監視器鎖成功,若是此時成功調用wait()
系列方法,那麼它會當即釋放監視器鎖,而且添加到等待集合(Wait Set)中進行阻塞等待。synchronized
方法或者代碼塊以後,它能夠調用notify(All)
方法喚醒等待集合中正在阻塞的線程,可是這個喚醒操做並非調用notify(All)
方法後當即生效,而是在該線程退出synchronized
方法或者代碼塊以後才生效。wait()
方法阻塞過程當中被喚醒的線程會競爭監視器目標對象的控制權,一旦從新控制了對象,那麼線程的同步狀態就會恢復到步入synchronized
方法或者代碼塊時候的狀態(也就是成功獲取到對象監視器鎖時候的狀態),這個時候線程纔可以繼續執行。爲了驗證這三點,能夠寫個簡單的Demo:
public class Lock { @Getter private final Object lock = new Object(); } public class WaitMain { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); public static void main(String[] args) throws Exception { final Lock lock = new Lock(); new Thread(new WaitRunnable(lock), "WaitThread-1").start(); new Thread(new WaitRunnable(lock), "WaitThread-2").start(); Thread.sleep(50); new Thread(new NotifyRunnable(lock), "NotifyThread").start(); Thread.sleep(Integer.MAX_VALUE); } @RequiredArgsConstructor private static class WaitRunnable implements Runnable { private final Lock lock; @Override public void run() { synchronized (lock) { System.out.println(String.format("[%s]-線程[%s]獲取鎖成功,準備執行wait方法", F.format(LocalDateTime.now()), Thread.currentThread().getName())); while (true) { try { lock.wait(); } catch (InterruptedException e) { //ignore } System.out.println(String.format("[%s]-線程[%s]從wait中喚醒,準備exit", F.format(LocalDateTime.now()), Thread.currentThread().getName())); try { Thread.sleep(500); } catch (InterruptedException e) { //ignore } break; } } } } @RequiredArgsConstructor private static class NotifyRunnable implements Runnable { private final Lock lock; @Override public void run() { synchronized (lock) { System.out.println(String.format("[%s]-線程[%s]獲取鎖成功,準備執行notifyAll方法", F.format(LocalDateTime.now()), Thread.currentThread().getName())); lock.notifyAll(); System.out.println(String.format("[%s]-線程[%s]先休眠3000ms", F.format(LocalDateTime.now()), Thread.currentThread().getName())); try { Thread.sleep(3000); } catch (InterruptedException e) { //ignore } System.out.println(String.format("[%s]-線程[%s]準備exit", F.format(LocalDateTime.now()), Thread.currentThread().getName())); } } } }
某個時刻的執行結果以下:
[2019-04-27 23:28:17.617]-線程[WaitThread-1]獲取鎖成功,準備執行wait方法 [2019-04-27 23:28:17.631]-線程[WaitThread-2]獲取鎖成功,準備執行wait方法 [2019-04-27 23:28:17.657]-線程[NotifyThread]獲取鎖成功,準備執行notifyAll方法 <-------- 這一步執行完說明WaitThread已經釋放了鎖 [2019-04-27 23:28:17.657]-線程[NotifyThread]先休眠3000ms [2019-04-27 23:28:20.658]-線程[NotifyThread]準備exit <------- 這一步後NotifyThread離開同步代碼塊 [2019-04-27 23:28:20.658]-線程[WaitThread-1]從wait中喚醒,準備exit <------- 這一步WaitThread-1解除阻塞 [2019-04-27 23:28:21.160]-線程[WaitThread-2]從wait中喚醒,準備exit <------- 這一步WaitThread-2解除阻塞,注意發生時間在WaitThread-1解除阻塞500ms以後,符合咱們前面提到的第3點
若是結合wait()
和notify()
能夠簡單總結出一個同步代碼塊的僞代碼以下:
final Object lock = new Object(); // 等待 synchronized(lock){ 一、線程進入同步代碼塊,意味着獲取對象監視器鎖成功 while(!condition){ lock.wait(); 2.線程調用wait()進行阻塞等待 break; } 3.線程從wait()的阻塞等待中被喚醒,嘗試恢復第1步以後的同步狀態,並不會立刻生效,直到notify被調用而且調用notify方法的線程已經釋放鎖,同時當前線程須要競爭成功 4.繼續執行後面的代碼,直到離開同步代碼塊 } // 喚醒 synchronized(lock){ 一、線程進入同步代碼塊,意味着獲取對象監視器鎖成功 lock.notify(); 2.喚醒其中一個在對象監視器上等待的線程 3.準備推出同步代碼塊釋放鎖,只有釋放鎖以後第2步纔會生效 }
結合前面分析過的知識點以及參考資料中的文章,從新畫一個圖理解一下對象監視器以及相應阻塞和喚醒API的工做示意過程:
ObjectMonitor
中的_EntryList屬性):存放等待鎖而且處於阻塞狀態的線程。ObjectMonitor
中的_WaitSet屬性):存放處於等待阻塞狀態的線程。ObjectMonitor
中的_owner屬性):指向得到對象監視器的線程,在同一個時刻只能有一個線程被The Owner持有,通俗來看,它就是監視器的控制權。經過Object
提供的阻塞和喚醒機制舉幾個簡單的使用例子。
假設有如下場景:廁所的只有一個卡位,廁所維修工修廁所的時候,任何人不能上廁所。當廁所維修工修完廁所的時候,上廁所的人須要"獲得廁所的控制權"才能上廁所。
// 廁所類 public class Toilet { // 廁所的鎖 private final Object lock = new Object(); private boolean available; public Object getLock() { return lock; } public void setAvailable(boolean available) { this.available = available; } public boolean getAvailable() { return available; } } // 廁所維修工 @RequiredArgsConstructor public class ToiletRepairer implements Runnable { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); private final Toilet toilet; @Override public void run() { synchronized (toilet.getLock()) { System.out.println(String.format("[%s]-廁所維修員獲得了廁所的鎖,維修廁所要用5000ms...", LocalDateTime.now().format(F))); try { Thread.sleep(5000); } catch (Exception e) { // ignore } toilet.setAvailable(true); toilet.getLock().notifyAll(); System.out.println(String.format("[%s]-廁所維修員維修完畢...", LocalDateTime.now().format(F))); } } } //上廁所的任務 @RequiredArgsConstructor public class ToiletTask implements Runnable { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); private final Toilet toilet; private final String name; private final Random random; @Override public void run() { synchronized (toilet.getLock()) { System.out.println(String.format("[%s]-%s獲得了廁所的鎖...", LocalDateTime.now().format(F), name)); while (!toilet.getAvailable()) { try { toilet.getLock().wait(); } catch (InterruptedException e) { //ignore } int time = random.nextInt(3) + 1; try { // 模擬上廁所用時 TimeUnit.SECONDS.sleep(time); } catch (InterruptedException e) { //ignore } System.out.println(String.format("[%s]-%s上廁所用了%s秒...", LocalDateTime.now().format(F), name, time)); } } } } // 場景入口 public class Main { public static void main(String[] args) throws Exception { Toilet toilet = new Toilet(); Random random = new Random(); Thread toiletRepairer = new Thread(new ToiletRepairer(toilet), "ToiletRepairer"); Thread thread1 = new Thread(new ToiletTask(toilet, "張三", random), "thread-1"); Thread thread2 = new Thread(new ToiletTask(toilet, "李四", random), "thread-2"); Thread thread3 = new Thread(new ToiletTask(toilet, "王五", random), "thread-3"); thread1.start(); thread2.start(); thread3.start(); Thread.sleep(50); toiletRepairer.start(); Thread.sleep(Integer.MAX_VALUE); } }
某次執行的結果以下:
[2019-04-29 01:07:25.914]-張三獲得了廁所的鎖... [2019-04-29 01:07:25.931]-李四獲得了廁所的鎖... [2019-04-29 01:07:25.931]-王五獲得了廁所的鎖... [2019-04-29 01:07:25.951]-廁所維修員獲得了廁所的鎖,維修廁所要用5000ms... [2019-04-29 01:07:30.951]-廁所維修員維修完畢... [2019-04-29 01:07:32.952]-張三上廁所用了2秒... [2019-04-29 01:07:35.952]-王五上廁所用了3秒... [2019-04-29 01:07:37.953]-李四上廁所用了2秒...
實現一個簡單固定容量的阻塞隊列,接口以下:
public interface BlockingQueue<T> { void put(T value) throws InterruptedException; T take() throws InterruptedException; }
其中put(T value)
會阻塞直到隊列中有可用的容量,而take()
方法會阻塞直到有元素投放到隊列中。實現以下:
public class DefaultBlockingQueue<T> implements BlockingQueue<T> { private Object[] elements; private final Object notEmpty = new Object(); private final Object notFull = new Object(); private int count; private int takeIndex; private int putIndex; public DefaultBlockingQueue(int capacity) { this.elements = new Object[capacity]; } @Override public void put(T value) throws InterruptedException { synchronized (notFull) { while (count == elements.length) { notFull.wait(); } } final Object[] items = this.elements; items[putIndex] = value; if (++putIndex == items.length) { putIndex = 0; } count++; synchronized (notEmpty) { notEmpty.notify(); } } @SuppressWarnings("unchecked") @Override public T take() throws InterruptedException { synchronized (notEmpty) { while (count == 0) { notEmpty.wait(); } } final Object[] items = this.elements; T value = (T) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) { takeIndex = 0; } count--; synchronized (notFull) { notFull.notify(); } return value; } }
場景入口類:
public class Main { public static void main(String[] args) throws Exception { BlockingQueue<String> queue = new DefaultBlockingQueue<>(5); Runnable r = () -> { while (true) { try { String take = queue.take(); System.out.println(String.format("線程%s消費消息-%s", Thread.currentThread().getName(), take)); } catch (Exception e) { e.printStackTrace(); } } }; new Thread(r, "thread-1").start(); new Thread(r, "thread-2").start(); IntStream.range(0, 10).forEach(i -> { try { queue.put(String.valueOf(i)); } catch (InterruptedException e) { //ignore } }); Thread.sleep(Integer.MAX_VALUE); } }
某次執行結果以下:
線程thread-1消費消息-0 線程thread-2消費消息-1 線程thread-1消費消息-2 線程thread-2消費消息-3 線程thread-1消費消息-4 線程thread-2消費消息-5 線程thread-1消費消息-6 線程thread-2消費消息-7 線程thread-1消費消息-8 線程thread-2消費消息-9
上面這個例子就是簡單的單生產者-多消費者的模型。
這裏實現一個極度簡陋的固定容量的線程池,功能是:初始化固定數量的活躍線程,阻塞直到有可用的線程用於提交任務。它只有一個接口方法,接口定義以下:
public interface ThreadPool { void execute(Runnable runnable); }
具體實現以下:
public class DefaultThreadPool implements ThreadPool { private final int capacity; private List<Worker> initWorkers; private Deque<Worker> availableWorkers; private Deque<Worker> busyWorkers; private final Object nextLock = new Object(); public DefaultThreadPool(int capacity) { this.capacity = capacity; init(capacity); } private void init(int capacity) { initWorkers = new ArrayList<>(capacity); availableWorkers = new LinkedList<>(); busyWorkers = new LinkedList<>(); for (int i = 0; i < capacity; i++) { Worker worker = new Worker(); worker.setName("Worker-" + (i + 1)); worker.setDaemon(true); initWorkers.add(worker); } for (Worker w : initWorkers) { w.start(); availableWorkers.add(w); } } @Override public void execute(Runnable runnable) { if (null == runnable) { return; } synchronized (nextLock) { while (availableWorkers.size() < 1) { try { nextLock.wait(500); } catch (InterruptedException e) { //ignore } } Worker worker = availableWorkers.removeFirst(); busyWorkers.add(worker); worker.run(runnable); nextLock.notifyAll(); } } private void makeAvailable(Worker worker) { synchronized (nextLock) { availableWorkers.add(worker); busyWorkers.remove(worker); nextLock.notifyAll(); } } private class Worker extends Thread { private final Object lock = new Object(); private Runnable runnable; private AtomicBoolean run = new AtomicBoolean(true); private void run(Runnable runnable) { synchronized (lock) { if (null != this.runnable) { throw new IllegalStateException("Already running a Runnable!"); } this.runnable = runnable; lock.notifyAll(); } } @Override public void run() { boolean ran = false; while (run.get()) { try { synchronized (lock) { while (runnable == null && run.get()) { lock.wait(500); } if (runnable != null) { ran = true; runnable.run(); } } } catch (Exception e) { e.printStackTrace(); } finally { synchronized (lock) { runnable = null; } if (ran) { ran = false; makeAvailable(this); } } } } } }
場景類入口:
public class Main { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); public static void main(String[] args) throws Exception{ ThreadPool threadPool = new DefaultThreadPool(2); threadPool.execute(() -> { try { System.out.println(String.format("[%s]-任務一開始執行持續3秒...", LocalDateTime.now().format(F))); Thread.sleep(3000); System.out.println(String.format("[%s]-任務一執行結束...", LocalDateTime.now().format(F))); }catch (Exception e){ //ignore } }); threadPool.execute(() -> { try { System.out.println(String.format("[%s]-任務二開始執行持續4秒...", LocalDateTime.now().format(F))); Thread.sleep(4000); System.out.println(String.format("[%s]-任務二執行結束...", LocalDateTime.now().format(F))); }catch (Exception e){ //ignore } }); threadPool.execute(() -> { try { System.out.println(String.format("[%s]-任務三開始執行持續5秒...", LocalDateTime.now().format(F))); Thread.sleep(5000); System.out.println(String.format("[%s]-任務三執行結束...", LocalDateTime.now().format(F))); }catch (Exception e){ //ignore } }); Thread.sleep(Integer.MAX_VALUE); } }
某次執行結果以下:
[2019-04-29 02:07:25.465]-任務二開始執行持續4秒... [2019-04-29 02:07:25.465]-任務一開始執行持續3秒... [2019-04-29 02:07:28.486]-任務一執行結束... [2019-04-29 02:07:28.486]-任務三開始執行持續5秒... [2019-04-29 02:07:29.486]-任務二執行結束... [2019-04-29 02:07:33.487]-任務三執行結束...
鑑於筆者C語言學得很差,這裏就沒法深刻分析JVM源碼的實現,只能結合一些現有的資料和本身的理解從新梳理一下Object
提供的阻塞和喚醒機制這些知識點。結合以前看過JUC同步器的源碼,一時醒悟過來,JUC同步器只是在數據結構和算法層面使用Java語言對原來JVM中C語言的阻塞和喚醒機制即Object
提供的那幾個JNI方法進行了一次實現而已。
最後,Object
提供的阻塞等待喚醒機制是JVM實現的(若是特別熟悉C語言能夠經過JVM源碼研究其實現,對於大部分開發者來講是黑箱),除非是特別熟練或者是JDK版本過低還沒有引入JUC包,通常狀況下不該該優先選擇Object
,而應該考慮專門爲併發設計的JUC包中的類庫。
參考資料:
Github Page:http://www.throwable.club/2019/04/30/java-object-wait-notify/
Coding Page:http://throwable.coding.me/2019/04/30/java-object-wait-notify/
(本文完 c-7-d e-a-20190430)