Java線程在運行的生命週期中可能處於以下6種不一樣的狀態,在給定的一個時刻,線程只能處於其中的一個狀態。html
線程狀態 | 說明 |
---|---|
NEW | 初始狀態,線程剛被建立,可是並未啓動(還未調用start方法)。 |
RUNNABLE | 運行狀態,JAVA線程將操做系統中的就緒(READY)和運行(RUNNING)兩種狀態籠統地稱爲「運行中」。 |
BLOCKED | 阻塞狀態,表示線程阻塞於鎖。 |
WAITING | 等待狀態,表示該線程無限期等待另外一個線程執行一個特別的動做。 |
TIMED_WAITING | 超時等待狀態,不一樣於WAITING的是,它能夠在指定時間自動返回。 |
TERMINATED | 終止狀態,表示當前狀態已經執行完畢。 |
線程在自身的生命週期中,並非固定地處於某個狀態,而是隨着代碼的執行在不一樣的狀態之間進行切換。java
<img src="https://img-blog.csdnimg.cn/20200929220833538.png" style="zoom:80%;" />node
在【併發編程基礎】線程基礎(經常使用方法、狀態)一文中,主要學習了wait()、join()和sleep()等方法,在【併發編程】深刻理解synchronized原理一文中,主要探討了synchronized原理。下面就進行park()/unpark()、wait()/notify()/notifyAll()的學習。c++
wait( ),notify( ),notifyAll( )都是Object基礎類中的方法,因此在任何 Java 對象上均可以使用。web
public class Object { // 致使當前線程等待,直到另外一個線程調用此對象的notify()方法或notifyAll()方法。 public final void wait() throws InterruptedException { wait(0); } // 致使當前線程等待,直到另外一個線程調用此對象的notify()方法或notifyAll()方法,或者已通過了指定的時間。 public final native void wait(long timeout) throws InterruptedException; // 喚醒正在此對象監視器上等待的單個線程。 public final native void notify(); // 喚醒等待此對象監視器的全部線程。 public final native void notifyAll(); }
打開objectMonitor.cpp,查看wait方法:shell
... // create a node to be put into the queue // Critically, after we reset() the event but prior to park(), we must check // for a pending interrupt. ObjectWaiter node(Self); // 將當前線程封裝成ObjectWatier node.TState = ObjectWaiter::TS_WAIT ; // 狀態改成等待狀態 Self->_ParkEvent->reset() ; OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag // Enter the waiting queue, which is a circular doubly linked list in this case // but it could be a priority queue or any data structure. // _WaitSetLock protects the wait queue. Normally the wait queue is accessed only // by the the owner of the monitor *except* in the case where park() // returns because of a timeout of interrupt. Contention is exceptionally rare // so we use a simple spin-lock instead of a heavier-weight blocking lock. Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;// 自旋操做 AddWaiter (&node) ; Thread::SpinRelease (&_WaitSetLock) ; // 添加到_WaitSet節點中 ...
查看AddWaiter()方法:編程
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev == NULL, "node already in list"); assert(node->_next == NULL, "node already in list"); // put node at end of queue (circular doubly linked list) if (_WaitSet == NULL) { _WaitSet = node; node->_prev = node; node->_next = node; } else { ObjectWaiter* head = _WaitSet ; // 經過雙向鏈表的方式,將ObjectWaiter對象添加到_WaitSet列表中 ObjectWaiter* tail = head->_prev; assert(tail->_next == head, "invariant check"); tail->_next = node; head->_prev = node; node->_next = head; node->_prev = tail; } }
查看notify方法源碼:多線程
void ObjectMonitor::notify(TRAPS) { CHECK_OWNER(); if (_WaitSet == NULL) { TEVENT (Empty-Notify) ;// _WaitSet=NULL代表沒有等待狀態的線程,直接返回。 return ; } DTRACE_MONITOR_PROBE(notify, this, object(), THREAD); int Policy = Knob_MoveNotifyee ; Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ; ObjectWaiter * iterator = DequeueWaiter() ;// 獲取一個ObjectWaiter對象 if (iterator != NULL) { ... ObjectWaiter * List = _EntryList ; if (List != NULL) { assert (List->_prev == NULL, "invariant") ; assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ; assert (List != iterator, "invariant") ; } // 根據不一樣狀態採起不一樣策略,將從_WaitSet列表中移出來的ObjectWaiter對象加入到_EntryList列表中。 if (Policy == 0) { // prepend to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { List->_prev = iterator ; iterator->_next = List ; iterator->_prev = NULL ; _EntryList = iterator ; } } else if (Policy == 1) {...} else // append to EntryList if (Policy == 2) {...} else // prepend to cxq if (Policy == 3) { // append to cxq ... } else { ParkEvent * ev = iterator->_event ; iterator->TState = ObjectWaiter::TS_RUN ; OrderAccess::fence() ; // 被喚醒的線程又變成run狀態。 ev->unpark() ; } }
查看notifyAll方法源碼:併發
void ObjectMonitor::notifyAll(TRAPS) { CHECK_OWNER(); ObjectWaiter* iterator; if (_WaitSet == NULL) { TEVENT (Empty-NotifyAll) ;// _WaitSet=NULL代表沒有等待狀態的線程,直接返回。 return ; } DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD); int Policy = Knob_MoveNotifyee ; int Tally = 0 ; Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ; for (;;) { iterator = DequeueWaiter () ;// 循環獲取因此ObjectWaiter對象 ... ObjectWaiter * List = _EntryList ; if (List != NULL) { assert (List->_prev == NULL, "invariant") ; assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ; assert (List != iterator, "invariant") ; } // 根據不一樣狀態採起不一樣策略,將從_WaitSet列表中移出來的ObjectWaiter對象加入到_EntryList列表中。 if (Policy == 0) { // prepend to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { List->_prev = iterator ; iterator->_next = List ; iterator->_prev = NULL ; _EntryList = iterator ; } } else if (Policy == 1) { // append to EntryList ... } else if (Policy == 2) { // prepend to cxq ... } else if (Policy == 3) { // append to cxq ... } else { ParkEvent * ev = iterator->_event ; iterator->TState = ObjectWaiter::TS_RUN ;// 被喚醒的線程又變成run狀態。 OrderAccess::fence() ; ev->unpark() ; } ...
可見,wait()與notify()/notifyAll()的實現都跟Monitor有很大關聯。app
wait方法的使用對應上圖的第3步,也就是說,調用wait()
、notify()
/notifyAll()
方法的對象必須已經獲取到鎖。
如何確保調用對象獲取到鎖呢?使用sychronized關鍵字唄!因此說這些方法調用也必須發生在sychronized修飾的同步代碼塊內。
(1)什麼是等待/通知機制
等待/通知機制是多個線程間的一種協做機制。談到線程咱們常常想到的是線程間的競爭(race),好比去競爭鎖。但這並非故事的所有,線程間也有協做機制。就比如咱們在公司中與同事關係,可能存在在晉升時的競爭,但更多時候是一塊兒合做以完成某些任務。
wait/notify 就是線程間的一種協做機制。
當一個線程調用wait()/wait(long)方法後,進入WAITING狀態或者TIMED_WAITING狀態(阻塞),並釋放鎖與CPU資源。只有其餘獲取到鎖的線程執行完他們的指定代碼事後,再經過notify()方法將其喚醒。 若是須要,也可使用 notifyAll()
來喚醒全部的阻塞線程。
(2)等待/通知使用方法
等待/通知機制就是用於解決線程間通訊的問題的,使用到的3個方法的含義以下:
注意:
哪怕只通知了一個等待的線程,被通知線程也不能當即恢復執行,由於它當初中斷的地方是在同步塊內,而此刻它已經不持有鎖,因此它須要再次嘗試去獲取鎖(極可能面臨其它線程的競爭),成功後才能在當初調用 wait 方法以後的地方恢復執行。
總結以下:
- 若是能獲取鎖,線程就從 WAITING/TIMED_WAITING 狀態轉換爲RUNNABLE 狀態;
- 不然,從 Wait Set 區出來,又進入 Entry Set區,線程就從 WAITING 狀態又變成 BLOCKED 狀態。
(3)調用wait和notify方法須要注意的細節
下面就經過一個案例來進一步瞭解等待/通知機制:
public class WaitAndNotify { static boolean flag = true; static Object lock = new Object(); public static void main(String[] args) throws Exception { Thread waitThread = new Thread(new Wait(), "WaitThread"); waitThread.start(); TimeUnit.SECONDS.sleep(1); Thread notifyThread = new Thread(new Notify(), "NotifyThread"); notifyThread.start(); } static class Wait implements Runnable { @Override public void run() { // 加鎖,擁有lock的Monitor synchronized (lock) { // 當條件不知足時,繼續wait,同時釋放了lock的鎖 while (flag) { try { System.out.println(Thread.currentThread().getName() + " flag is true. wait @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.wait(); } catch (InterruptedException e) { } } // 當條件知足時,完成工做 System.out.println(Thread.currentThread().getName() + " flag is false. running @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); } } } static class Notify implements Runnable { @Override public void run() { // 加鎖,擁有lock的Monitor synchronized (lock) { // 得到lock的鎖,而後進行通知 System.out.println(Thread.currentThread().getName() + " hold lock. notify @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.notifyAll(); flag = false; SleepUtils.second(5); } // 再次加鎖 synchronized (lock) { System.out.println(Thread.currentThread().getName() + " hold lock. again. sleep @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); SleepUtils.second(5); } } } } class SleepUtils { public static final void second(long seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { } } }
輸出結果可能以下:
WaitThread flag is true. wait @ 00:38:53 NotifyThread hold lock. notify @ 00:38:54 NotifyThread hold lock. again. sleep @ 00:38:59 WaitThread flag is false. running @ 00:39:04
也可能以下:
WaitThread flag is true. wait @ 00:38:53 NotifyThread hold lock. notify @ 00:38:54 WaitThread flag is false. running @ 00:39:04 NotifyThread hold lock. again. sleep @ 00:38:59
之因此出現這類狀況,在於調用notify()或notifyAll()方法調用後,waitThread是否成功獲取到鎖。競爭成功,則進入RUNNABLE狀態;若是競爭失敗,waitThread會從新進入到Entry Set區再從新去競爭鎖。也就是說,從wait()方法返回的前提是得到了調用對象的鎖。
從上述細節中能夠看到,等待/通知機制依託於同步機制,其目的就是確保等待線程從wait()方法返回時可以感知到通知線程對變量作出的修改。
下圖描述了上述示例的過程:
從上面案例中,能夠提煉出等待/通知的經典範式——生產者/消費者模式。該範式主要分爲兩部分,分別針對生產者(通知方)和消費者(等待方)。
消費者遵循以下原則:
(1)獲取對象的鎖。
(2)若是條件不知足,那麼調用對象的wait()方法,被通知後仍要檢查條件。
(3)條件知足則執行對應的邏輯。
對應的僞代碼以下。
synchronized (對象) { while (條件) { 對象.wait(); } 對應的處理邏輯 }
生產者遵循以下原則:
(1)得到對象的鎖。
(2)改變條件。
(3)通知全部等待在對象上的線程。對應的僞代碼以下。
對應的僞代碼以下。
synchronized (對象) { 改變的條件 對象.notifyAll();//或者 對象.notify() }
代碼實例:
首先建了一個簡單的 Product
類,用來表示生產和消費的產品。
public class Product { private String name; public Product(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
建立生產者類:
public class Producer implements Runnable { private Queue<Product> queue; private int maxCapacity; public Producer(Queue<Product> queue, int maxCapacity) { this.queue = queue; this.maxCapacity = maxCapacity; } @Override public void run() { synchronized (queue) { while (queue.size() == maxCapacity) { try { System.out.println("生產者" + Thread.currentThread().getName() + "Queue 已滿,WAITING"); wait(); System.out.println("生產者" + Thread.currentThread().getName() + "退出等待"); } catch (InterruptedException e) { e.printStackTrace(); } } if (queue.size() == 0) { //隊列裏的產品從無到有,須要通知在等待的消費者 queue.notifyAll(); } Integer i = new Random().nextInt(50); queue.offer(new Product("產品" + i.toString())); System.out.println("生產者" + Thread.currentThread().getName() + "生產了產品" + i.toString()); } } }
建立消費者類:
public class Consumer implements Runnable { private Queue<Product> queue; private int maxCapacity; public Consumer(Queue queue, int maxCapacity) { this.queue = queue; this.maxCapacity = maxCapacity; } @Override public void run() { synchronized (queue) { while (queue.isEmpty()) { try { System.out.println("消費者" + Thread.currentThread().getName() + "Queue已空,WAITING"); wait(); System.out.println("消費者" + Thread.currentThread().getName() + "退出等待"); } catch (InterruptedException e) { e.printStackTrace(); } } if (queue.size() == maxCapacity) { queue.notifyAll(); } Product product = queue.poll(); System.out.println("消費者" + Thread.currentThread().getName() + "消費了" + product.getName()); } } }
開啓多線程:
public class TreadTest { public static void main(String[] args) { Queue<Product> queue = new ArrayDeque<>(); for (int i = 0; i < 10; i++) { new Thread(new Producer((Queue<Product>) queue, 10)).start(); new Thread(new Consumer((Queue) queue, 10)).start(); } } }
測試結果:
生產者Thread-0生產了產品35 消費者Thread-1消費了產品35 生產者Thread-2生產了產品43 消費者Thread-3消費了產品43 消費者Thread-5Queue已空,WAITING 生產者Thread-6生產了產品17 生產者Thread-8生產了產品39 消費者Thread-7消費了產品17 生產者Thread-10生產了產品17 生產者Thread-12生產了產品3 消費者Thread-13消費了產品39 生產者Thread-14生產了產品10 消費者Thread-17消費了產品17 生產者Thread-16生產了產品8 消費者Thread-19消費了產品3 生產者Thread-4生產了產品29 消費者Thread-9消費了產品10 消費者Thread-11消費了產品8 消費者Thread-15消費了產品29 生產者Thread-18生產了產品33
LockSupport類是Java6(JSR166-JUC)引入的一個類,用來建立鎖和其餘同步工具類的基本線程阻塞原語。使用LockSupport類中的park()和unpark()方法也能夠實現線程的阻塞與喚醒。Park有停車的意思,假設線程爲車輛,那麼park方法表明着停車,而unpark方法則是指車輛啓動離開。
public static void park() { UNSAFE.park(false, 0L); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); } public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }
歸根到底仍是調用了UNSAFE類中的函數:
public native void unpark(Object var1); public native void park(boolean var1, long var2);
與 wait/notify 相比,park/unpark 方法更貼近操做系統層面的阻塞與喚醒線程,並不須要獲取對象的監視器。
park/unpark 原理可參考LockSupport中的park與unpark原理一文。
須要明白的是,每一個java線程都有一個Parker對象,主要三部分組成 _counter、 _cond和 _mutex。Parker類是這樣定義的:
class Parker : public os::PlatformParker { private: //表示許可 volatile int _counter ; ... public: Parker() : PlatformParker() { //初始化_counter _counter = 0 ; ... public: void park(bool isAbsolute, jlong time); void unpark(); ... } class PlatformParker : public CHeapObj<mtInternal> { protected: pthread_mutex_t _mutex [1] ; pthread_cond_t _cond [1] ; ... }
Parker類裏的_counter字段,就是用來記錄所謂的「許可」的。當調用park時,這個變量置爲了0;當調用unpark時,這個變量置爲1。
park和unpark的靈活之處在於,unpark函數能夠先於park調用。好比線程B調用unpark函數,給線程A發了一個「許可」,那麼當線程A調用park時,它發現已經有「許可」了,那麼它會立刻再繼續運行。
先調用park再調用upark時:
1.先調用park
(1)當前線程調用 Unsafe.park() 方法,檢查_counter狀況(爲0),得到 _mutex 互斥鎖。
(2)mutex對象有個等待隊列 _cond,線程進入等待隊列中阻塞。
(4)設置 _counter = 0。
2.再調用upark
(1)調用 Unsafe.unpark方法,設置 _counter 爲 1
(2)喚醒 _cond 條件變量中的 阻塞線程,線程恢復運行。
(3)設置 _counter 爲 0
先調用upark再調用park時:
(1)調用 Unsafe.unpark方法,設置 _counter 爲 1。
(2)當前線程調用 Unsafe.park() 方法。
(3)檢查 _counter ,本狀況爲 1,這時線程無需阻塞,繼續運行。
(4)設置 _counter 爲 0。
特別注意的是,LockSupport是不可重入的,若是一個線程連續2次調用LockSupport.park(),那麼該線程必定會一直阻塞下去。
public static void main(String[] args) throws Exception { Thread thread = Thread.currentThread(); LockSupport.unpark(thread); System.out.println("線程處於運行狀態"); LockSupport.park(); System.out.println("線程處於阻塞狀態"); LockSupport.park(); System.out.println("線程處於阻塞狀態"); LockSupport.unpark(thread); System.out.println("線程處於運行狀態???"); }
運行結果以下:
線程處於運行狀態 線程處於阻塞狀態
可見,在第二次調用park後,線程沒法再獲取許可出現了死鎖。