【併發編程】park與unpark、notify與notifyAll

1 線程狀態簡述

Java線程在運行的生命週期中可能處於以下6種不一樣的狀態,在給定的一個時刻,線程只能處於其中的一個狀態。html

線程狀態 說明
NEW 初始狀態,線程剛被建立,可是並未啓動(還未調用start方法)。
RUNNABLE 運行狀態,JAVA線程將操做系統中的就緒(READY)和運行(RUNNING)兩種狀態籠統地稱爲「運行中」。
BLOCKED 阻塞狀態,表示線程阻塞於鎖。
WAITING 等待狀態,表示該線程無限期等待另外一個線程執行一個特別的動做。
TIMED_WAITING 超時等待狀態,不一樣於WAITING的是,它能夠在指定時間自動返回。
TERMINATED 終止狀態,表示當前狀態已經執行完畢。

線程在自身的生命週期中,並非固定地處於某個狀態,而是隨着代碼的執行在不一樣的狀態之間進行切換。java

<img src="https://img-blog.csdnimg.cn/20200929220833538.png" style="zoom:80%;" />node

【併發編程基礎】線程基礎(經常使用方法、狀態)一文中,主要學習了wait()、join()和sleep()等方法,在【併發編程】深刻理解synchronized原理一文中,主要探討了synchronized原理。下面就進行park()/unpark()、wait()/notify()/notifyAll()的學習。c++

2 wait和notify/notifyAll

2.1 源碼簡析

wait( ),notify( ),notifyAll( )都是Object基礎類中的方法,因此在任何 Java 對象上均可以使用。web

public class Object {
    // 致使當前線程等待,直到另外一個線程調用此對象的notify()方法或notifyAll()方法。
    public final void wait() throws InterruptedException {
        wait(0);
    }

    // 致使當前線程等待,直到另外一個線程調用此對象的notify()方法或notifyAll()方法,或者已通過了指定的時間。
    public final native void wait(long timeout) throws InterruptedException;

    //  喚醒正在此對象監視器上等待的單個線程。
    public final native void notify();

    //  喚醒等待此對象監視器的全部線程。
    public final native void notifyAll();
}

打開objectMonitor.cpp,查看wait方法:shell

...
   // create a node to be put into the queue
   // Critically, after we reset() the event but prior to park(), we must check
   // for a pending interrupt.
   ObjectWaiter node(Self);                   // 將當前線程封裝成ObjectWatier
   node.TState = ObjectWaiter::TS_WAIT ; // 狀態改成等待狀態
   Self->_ParkEvent->reset() ;
   OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag

   // Enter the waiting queue, which is a circular doubly linked list in this case
   // but it could be a priority queue or any data structure.
   // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only
   // by the the owner of the monitor *except* in the case where park()
   // returns because of a timeout of interrupt.  Contention is exceptionally rare
   // so we use a simple spin-lock instead of a heavier-weight blocking lock.

   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;// 自旋操做
   AddWaiter (&node) ;
   Thread::SpinRelease (&_WaitSetLock) ;                 // 添加到_WaitSet節點中
    ...

查看AddWaiter()方法:編程

inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev == NULL, "node already in list");
  assert(node->_next == NULL, "node already in list");
  // put node at end of queue (circular doubly linked list)
  if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet ; // 經過雙向鏈表的方式,將ObjectWaiter對象添加到_WaitSet列表中
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}

查看notify方法源碼:多線程

void ObjectMonitor::notify(TRAPS) {
  CHECK_OWNER();
  if (_WaitSet == NULL) {
     TEVENT (Empty-Notify) ;// _WaitSet=NULL代表沒有等待狀態的線程,直接返回。
     return ;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);

  int Policy = Knob_MoveNotifyee ;

  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
  ObjectWaiter * iterator = DequeueWaiter() ;// 獲取一個ObjectWaiter對象
  if (iterator != NULL) {
      ...
     ObjectWaiter * List = _EntryList ;
     if (List != NULL) {
        assert (List->_prev == NULL, "invariant") ;
        assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (List != iterator, "invariant") ;
     }
     // 根據不一樣狀態採起不一樣策略,將從_WaitSet列表中移出來的ObjectWaiter對象加入到_EntryList列表中。
     if (Policy == 0) {       // prepend to EntryList
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
             List->_prev = iterator ;
             iterator->_next = List ;
             iterator->_prev = NULL ;
             _EntryList = iterator ;
        }
     } else
     if (Policy == 1) {...} else     // append to EntryList
     if (Policy == 2) {...} else     // prepend to cxq
     if (Policy == 3) {                 // append to cxq
         ...     
     } else {
        ParkEvent * ev = iterator->_event ;
        iterator->TState = ObjectWaiter::TS_RUN ;
        OrderAccess::fence() ;         // 被喚醒的線程又變成run狀態。
        ev->unpark() ;
     }
}

查看notifyAll方法源碼:併發

void ObjectMonitor::notifyAll(TRAPS) {
  CHECK_OWNER();
  ObjectWaiter* iterator;
  if (_WaitSet == NULL) {
      TEVENT (Empty-NotifyAll) ;// _WaitSet=NULL代表沒有等待狀態的線程,直接返回。
      return ;
  }
  DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);

  int Policy = Knob_MoveNotifyee ;
  int Tally = 0 ;
  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notifyall") ;

  for (;;) {
     iterator = DequeueWaiter () ;// 循環獲取因此ObjectWaiter對象
       ...
     ObjectWaiter * List = _EntryList ;
     if (List != NULL) {
        assert (List->_prev == NULL, "invariant") ;
        assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (List != iterator, "invariant") ;
     }
      // 根據不一樣狀態採起不一樣策略,將從_WaitSet列表中移出來的ObjectWaiter對象加入到_EntryList列表中。    
     if (Policy == 0) {       // prepend to EntryList
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
             List->_prev = iterator ;
             iterator->_next = List ;
             iterator->_prev = NULL ;
             _EntryList = iterator ;
        }
     } else
     if (Policy == 1) {      // append to EntryList
        ...
     } else
     if (Policy == 2) {      // prepend to cxq
        ...
     } else
     if (Policy == 3) {      // append to cxq
        ...
     } else {
        ParkEvent * ev = iterator->_event ;
        iterator->TState = ObjectWaiter::TS_RUN ;// 被喚醒的線程又變成run狀態。
        OrderAccess::fence() ;
        ev->unpark() ;
     }

      ...

可見,wait()與notify()/notifyAll()的實現都跟Monitor有很大關聯。app

  • 當多線程訪問一段同步代碼塊時,這些都線程會被被封裝成一個個ObjectWatier對象,並被放入 _EntryList列表中,也就是被放到 Entry Set(入口區) 中等待獲取鎖。
  • 若是該線程獲取到了鎖(acquire),線程就會成爲當前鎖的 Owner。
  • 獲取到鎖的線程可也以經過調用 wait 方法將鎖釋放(release),而後該線程對象會被放入_WaitSet列表中,進入Wait Set (等待區)進行等待(阻塞BLOCKED)。
  • 當獲取到鎖的對象調用notify/notifyAll方法喚醒等待區被阻塞的線程時,線程從新競爭鎖。若是競爭鎖成功,那麼線程就進入RUNNABLE狀態;若是競爭鎖失敗,這些線程會從新進入到Entry Set區再從新去競爭鎖。

wait方法的使用對應上圖的第3步,也就是說,調用wait()notify()/notifyAll()方法的對象必須已經獲取到鎖

如何確保調用對象獲取到鎖呢?使用sychronized關鍵字唄!因此說這些方法調用也必須發生在sychronized修飾的同步代碼塊內

2.2 等待/通知機制

(1)什麼是等待/通知機制

等待/通知機制是多個線程間的一種協做機制。談到線程咱們常常想到的是線程間的競爭(race),好比去競爭鎖。但這並非故事的所有,線程間也有協做機制。就比如咱們在公司中與同事關係,可能存在在晉升時的競爭,但更多時候是一塊兒合做以完成某些任務。

wait/notify 就是線程間的一種協做機制。

當一個線程調用wait()/wait(long)方法後,進入WAITING狀態或者TIMED_WAITING狀態(阻塞),並釋放鎖與CPU資源。只有其餘獲取到鎖的線程執行完他們的指定代碼事後,再經過notify()方法將其喚醒。 若是須要,也可使用 notifyAll()來喚醒全部的阻塞線程。

(2)等待/通知使用方法

等待/通知機制就是用於解決線程間通訊的問題的,使用到的3個方法的含義以下:

  1. wait:線程再也不活動,再也不參與調度,釋放它對鎖的擁有權。它還要等着別的線程執行一個特別的動做,也便是「通知(notify)」在這個對象上等待的線程從WAITING狀態中釋放出來,從新進入到調度隊列(ready queue)中。
  2. notify:喚醒一個等待當前對象的鎖的線程。喚醒在此對象監視器上等待的單個線程。
  3. notifyAll:喚醒在此對象監視器上等待的全部線程。

注意:

哪怕只通知了一個等待的線程,被通知線程也不能當即恢復執行,由於它當初中斷的地方是在同步塊內,而此刻它已經不持有鎖,因此它須要再次嘗試去獲取鎖(極可能面臨其它線程的競爭),成功後才能在當初調用 wait 方法以後的地方恢復執行。

總結以下:

  • 若是能獲取鎖,線程就從 WAITING/TIMED_WAITING 狀態轉換爲RUNNABLE 狀態;
  • 不然,從 Wait Set 區出來,又進入 Entry Set區,線程就從 WAITING 狀態又變成 BLOCKED 狀態。

(3)調用wait和notify方法須要注意的細節

  • wait方法與notify方法必需要由同一個鎖對象調用。由於對應的鎖對象能夠經過notify喚醒使用同一個鎖對象調用的wait方法後的線程。
  • wait方法與notify方法是屬於Object類的方法的。由於鎖對象能夠是任意對象,而任意對象的所屬類都是繼承了Object類的。
  • wait方法與notify方法必需要在同步代碼塊或者是同步函數中使用。由於必需要經過鎖對象調用這2個方法。

下面就經過一個案例來進一步瞭解等待/通知機制:

public class WaitAndNotify {

    static boolean flag = true;
    static Object lock = new Object();

    public static void main(String[] args) throws Exception {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {
        @Override
        public void run() {
            // 加鎖,擁有lock的Monitor
            synchronized (lock) {
                // 當條件不知足時,繼續wait,同時釋放了lock的鎖
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " flag is true. wait @ " +
                                new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {

                    }
                }
                // 當條件知足時,完成工做
                System.out.println(Thread.currentThread().getName() + " flag is false. running @ " +
                        new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }

    static class Notify implements Runnable {
        @Override
        public void run() {
            // 加鎖,擁有lock的Monitor
            synchronized (lock) {
                // 得到lock的鎖,而後進行通知
                System.out.println(Thread.currentThread().getName() + " hold lock. notify @ " +
                        new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                SleepUtils.second(5);
            }
            // 再次加鎖
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + " hold lock. again. sleep @ " +
                        new SimpleDateFormat("HH:mm:ss").format(new Date()));
                SleepUtils.second(5);
            }
        }
    }
}

class SleepUtils {
    public static final void second(long seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {

        }
    }
}

輸出結果可能以下:

WaitThread flag is true. wait @ 00:38:53
NotifyThread hold lock. notify @ 00:38:54
NotifyThread hold lock. again. sleep @ 00:38:59
WaitThread flag is false. running @ 00:39:04

也可能以下:

WaitThread flag is true. wait @ 00:38:53
NotifyThread hold lock. notify @ 00:38:54
WaitThread flag is false. running @ 00:39:04
NotifyThread hold lock. again. sleep @ 00:38:59

之因此出現這類狀況,在於調用notify()或notifyAll()方法調用後,waitThread是否成功獲取到鎖。競爭成功,則進入RUNNABLE狀態;若是競爭失敗,waitThread會從新進入到Entry Set區再從新去競爭鎖。也就是說,從wait()方法返回的前提是得到了調用對象的鎖

從上述細節中能夠看到,等待/通知機制依託於同步機制,其目的就是確保等待線程從wait()方法返回時可以感知到通知線程對變量作出的修改。

下圖描述了上述示例的過程:

2.3 生產者/消費者模式

從上面案例中,能夠提煉出等待/通知的經典範式——生產者/消費者模式。該範式主要分爲兩部分,分別針對生產者(通知方)和消費者(等待方)。

消費者遵循以下原則:

(1)獲取對象的鎖。

(2)若是條件不知足,那麼調用對象的wait()方法,被通知後仍要檢查條件。

(3)條件知足則執行對應的邏輯。

對應的僞代碼以下。

synchronized (對象) {
     while (條件) {
         對象.wait();
     }
     對應的處理邏輯
 }

生產者遵循以下原則:

(1)得到對象的鎖。

(2)改變條件。

(3)通知全部等待在對象上的線程。對應的僞代碼以下。

對應的僞代碼以下。

synchronized (對象) {
    改變的條件
    對象.notifyAll();//或者 對象.notify()
 }

代碼實例

首先建了一個簡單的 Product 類,用來表示生產和消費的產品。

public class Product {
    private String name;

    public Product(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

建立生產者類:

public class Producer implements Runnable {
    private Queue<Product> queue;
    private int maxCapacity;

    public Producer(Queue<Product> queue, int maxCapacity) {
        this.queue = queue;
        this.maxCapacity = maxCapacity;
    }

    @Override
    public void run() {
        synchronized (queue) {
            while (queue.size() == maxCapacity) {
                try {
                    System.out.println("生產者" + Thread.currentThread().getName() + "Queue 已滿,WAITING");
                    wait();
                    System.out.println("生產者" + Thread.currentThread().getName() + "退出等待");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (queue.size() == 0) { //隊列裏的產品從無到有,須要通知在等待的消費者
                queue.notifyAll();
            }
            Integer i = new Random().nextInt(50);
            queue.offer(new Product("產品" + i.toString()));
            System.out.println("生產者" + Thread.currentThread().getName() + "生產了產品" + i.toString());
        }
    }
}

建立消費者類:

public class Consumer implements Runnable {

    private Queue<Product> queue;
    private int maxCapacity;

    public Consumer(Queue queue, int maxCapacity) {
        this.queue = queue;
        this.maxCapacity = maxCapacity;
    }

    @Override
    public void run() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                try {
                    System.out.println("消費者" + Thread.currentThread().getName() + "Queue已空,WAITING");
                    wait();
                    System.out.println("消費者" + Thread.currentThread().getName() + "退出等待");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (queue.size() == maxCapacity) {
                queue.notifyAll();
            }

            Product product = queue.poll();
            System.out.println("消費者" + Thread.currentThread().getName() + "消費了" + product.getName());
        }
    }
}

開啓多線程:

public class TreadTest {
    public static void main(String[] args) {
        Queue<Product> queue = new ArrayDeque<>();
        for (int i = 0; i < 10; i++) {
            new Thread(new Producer((Queue<Product>) queue, 10)).start();
            new Thread(new Consumer((Queue) queue, 10)).start();
        }
    }
}

測試結果:

生產者Thread-0生產了產品35
消費者Thread-1消費了產品35
生產者Thread-2生產了產品43
消費者Thread-3消費了產品43
消費者Thread-5Queue已空,WAITING
生產者Thread-6生產了產品17
生產者Thread-8生產了產品39
消費者Thread-7消費了產品17
生產者Thread-10生產了產品17
生產者Thread-12生產了產品3
消費者Thread-13消費了產品39
生產者Thread-14生產了產品10
消費者Thread-17消費了產品17
生產者Thread-16生產了產品8
消費者Thread-19消費了產品3
生產者Thread-4生產了產品29
消費者Thread-9消費了產品10
消費者Thread-11消費了產品8
消費者Thread-15消費了產品29
生產者Thread-18生產了產品33

3 park與unpark

LockSupport類是Java6(JSR166-JUC)引入的一個類,用來建立鎖和其餘同步工具類的基本線程阻塞原語。使用LockSupport類中的park()和unpark()方法也能夠實現線程的阻塞與喚醒。Park有停車的意思,假設線程爲車輛,那麼park方法表明着停車,而unpark方法則是指車輛啓動離開。

public static void park() {
        UNSAFE.park(false, 0L);
    }

    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

歸根到底仍是調用了UNSAFE類中的函數:

public native void unpark(Object var1);

    public native void park(boolean var1, long var2);

與 wait/notify 相比,park/unpark 方法更貼近操做系統層面的阻塞與喚醒線程,並不須要獲取對象的監視器

park/unpark 原理可參考LockSupport中的park與unpark原理一文。

須要明白的是,每一個java線程都有一個Parker對象,主要三部分組成 _counter、 _cond和 _mutex。Parker類是這樣定義的:

class Parker : public os::PlatformParker {
private:
  //表示許可
  volatile int _counter ;
  ...
public:
  Parker() : PlatformParker() {
    //初始化_counter
    _counter       = 0 ; 
...
public:
  void park(bool isAbsolute, jlong time);
  void unpark();
  ...
}
class PlatformParker : public CHeapObj<mtInternal> {
  protected:
    pthread_mutex_t _mutex [1] ;
    pthread_cond_t  _cond  [1] ;
    ...
}

Parker類裏的_counter字段,就是用來記錄所謂的「許可」的。當調用park時,這個變量置爲了0;當調用unpark時,這個變量置爲1

park和unpark的靈活之處在於,unpark函數能夠先於park調用。好比線程B調用unpark函數,給線程A發了一個「許可」,那麼當線程A調用park時,它發現已經有「許可」了,那麼它會立刻再繼續運行。

先調用park再調用upark時

1.先調用park

(1)當前線程調用 Unsafe.park() 方法,檢查_counter狀況(爲0),得到 _mutex 互斥鎖。

(2)mutex對象有個等待隊列 _cond,線程進入等待隊列中阻塞。

(4)設置 _counter = 0。

2.再調用upark

(1)調用 Unsafe.unpark方法,設置 _counter 爲 1

(2)喚醒 _cond 條件變量中的 阻塞線程,線程恢復運行。

(3)設置 _counter 爲 0

先調用upark再調用park時

(1)調用 Unsafe.unpark方法,設置 _counter 爲 1。

(2)當前線程調用 Unsafe.park() 方法。

(3)檢查 _counter ,本狀況爲 1,這時線程無需阻塞,繼續運行。

(4)設置 _counter 爲 0。

特別注意的是,LockSupport是不可重入的,若是一個線程連續2次調用LockSupport.park(),那麼該線程必定會一直阻塞下去。

public static void main(String[] args) throws Exception {
        Thread thread = Thread.currentThread();

        LockSupport.unpark(thread);
        System.out.println("線程處於運行狀態");
        LockSupport.park();
        System.out.println("線程處於阻塞狀態");
        LockSupport.park();
        System.out.println("線程處於阻塞狀態");
        LockSupport.unpark(thread);
        System.out.println("線程處於運行狀態???");
    }

運行結果以下:

線程處於運行狀態
線程處於阻塞狀態

可見,在第二次調用park後,線程沒法再獲取許可出現了死鎖。

參考資料

JAVA併發編程的藝術

Java精通併發-透過openjdk源碼分析wait與notify方法的本地實現

LockSupport中的park與unpark原理

Java的LockSupport.park()實現分析

相關文章
相關標籤/搜索