Java中的線程協做之Condition

1、Condition接口

一、Condition接口的經常使用方法介紹

 1 /**
 2  * 已經獲取到鎖的線程調用該方法會進入等待狀態,知道其餘持有鎖的線程通知(signal)等待隊列中的線程或者被中斷退出等待隊列;
 3  * 若是該線程已經從該方法中返回,表名線程已經獲取到了Condition對象對應的鎖
 4  */
 5 public final void await() throws InterruptedException {...}
 6 /**
 7  * 仍是進入等待狀態的方法,只是該方法對中斷不敏感:當前調用該方法的線程只有被通知(signal)才能從等待隊列中退出
 8  */
 9 public final void awaitUninterruptibly() {...}
10 /**
11  * 當前線程進入等待狀態,被通知、中斷或者超時以後被喚醒。返回值就是表示剩餘的時間,即
12  * 若是在nanosTimeout納秒以前被喚醒,返回值就是實際耗時;若是返回值是0或者負數,就認爲是超時了
13  */
14 public final long awaitNanos(long nanosTimeout) {...}
15 /**
16  * 調用該方法的線程會進入等待狀態直到被通知、中斷或者到達某個超時時間。
17  * 意味着沒有到達指定的某個時間被通知,就會返回true;若是到達指定時間,返回false
18  */
19 public final boolean awaitUntil(Date deadline){}
20 /**
21  * 當前持有Condition對象對應鎖的線程,調用該方法以後會喚醒一個等待在Condition上的線程
22  */
23 public final void signal() {}
24 /**
25  * 當前持有Condition對象對應鎖的線程,調用該方法以後會喚醒等待在Condition上的全部線程
26  */
27 public final void signalAll() {}

  Condition的使用模板:Condition的獲取必須經過Lock的newCondition方法,表示Condition對象與該鎖關聯,通常講Condition對象做爲成員變量,調用上面的await方法以後當前線程纔會釋放鎖並在等待隊列中進行等待;當其餘的線程(在沒有中斷的狀況下)調用該condition對象的signal方法的時候就會通知等待隊列中的等待線程從await方法返回(返回以前已經獲取鎖)。java

 1 Lock lock = new ReentrantLock();
 2 Condition  con = lock.newCondition();
 3 public void conWait() {
 4     lock.lock();
 5     try {
 6         con.await();
 7     } catch(InterruptedException e) {
 8         ...
 9     }finally {
10         lock.unlock();
11     }
12 }
13 
14 public void conSignal() {
15     lock.lock();
16     try {
17         con.signal();
18     } catch(InterruptedException e) {
19         ...
20     }finally {
21         lock.unlock();
22     }
23 }

二、Condition的實現分析

a)源碼流程分析

  咱們經過跟蹤源碼能夠看出來,首先建立鎖對象(new ReentrantLock()),而後根據鎖對象關聯響應的Condition對象,而後經過Condition對象中維護的等待隊列實現等待(await)通知(signal)機制。node

 1 public Condition newCondition() { //ReentrantLock類中的方法
 2     return sync.newCondition();
 3 }
 4 //ConditionObject類實現Condition接口,除此室外ConditionObject也是AQS的一個內部類,Condition的操做須要與鎖關聯起來
 5 final ConditionObject newCondition() {
 6     return new ConditionObject();
 7 }
 8 //AQS的內部類ConditionObject,其中維護了一個等待隊列,經過該隊列實現等待通知機制
 9 public class ConditionObject{ 
10     /**
11      * 返回等待隊列中的線程集合
12      * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
13      *         returns {@code false}
14      */
15     protected final Collection<Thread> getWaitingThreads() {
16         if (!isHeldExclusively())
17             throw new IllegalMonitorStateException();
18         ArrayList<Thread> list = new ArrayList<Thread>();
19         for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
20             if (w.waitStatus == Node.CONDITION) {
21                 Thread t = w.thread;
22                 if (t != null)
23                     list.add(t);
24             }
25         }
26         return list;
27     }
28 }

b)具體實現

  上面說到了Condition是經過等待隊列來實現等待通知功能的,那麼就分析等待隊列和等待通知機制的實現ide

①等待隊列實現

  等待隊列是一個FIFO的隊列,其中每一個結點都包含一個處於Condition對象上等待的線程引用(當一個獲取到鎖的線程調用await方法,就會釋放鎖資源,被包裝成一個Node而後添加到等待隊列中進入等待狀態;這裏面的Node結點仍是和AQS中的實現機理同樣,Node是AQS中的靜態內部類)。ui

  ConditionObject類中有下面兩個屬性,分別表明一個Condition對應的等待隊列的首節點和尾結點。當前線程調用await方法以後就會被構形成一個Node結點而後加入到等待隊列的尾部。this

1 /** Condition等待隊列頭結點 */
2 private transient Node firstWaiter;
3 /** Condition等待隊列尾結點 */
4 private transient Node lastWaiter;

   下面是等待隊列的基本結構,Condition對象中有首尾結點的引用。新增長的結點須要將原有的尾結點的下一節點指向它,而後更新lastWaiter便可。spa

  上面的狀況是一個Condition對象對應一個等待隊列和一個同步隊列(上面新添加的Node3就是從同步隊列中移除而後添加過來的),在同步器組件實現中,會擁有一個同步隊列和多個等待隊列。線程

②等待操做的實現

  持有鎖的線程調Condition的await方法以後會釋放鎖,而後進入等待狀態。既然是持有鎖的線程,那麼該線程應該位於同步隊列的首節點位置,其調用await方法以後就會從同步隊列首節點移到等待隊列的尾結點等待。具體將其移到等待隊列是addConditionWaiter方法實現。下面是await方法和addConditionWaiter方法的實現分析。code

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); //將當前線程加入等待隊列
    int savedState = fullyRelease(node); //釋放當前線程持有的鎖
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
    Node t = lastWaiter;
    /**
     * waitStatus值表示線程正在等待條件(本來結點在等待隊列中,結點線程等待在Condition上,當其餘線程對
     * Condition調用了signal()方法以後)該結點會從等待隊列中轉移到同步隊列中,進行同步狀態的獲取 
     * static final int CONDITION = -2;
    */
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION); //構形成Condition的等待隊列中的對應的結點
    //增長的結點須要將原有的尾結點的下一節點指向它,而後更新lastWaiter
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

③通知操做的實現

   通知操做的實現機制就是將當前等待隊列中的首節點中的線程喚醒,將其加入同步隊列中。對象

1 public final void signal() {
2     if (!isHeldExclusively()) //檢查當前線程是否獲取鎖
3         throw new IllegalMonitorStateException();
4     Node first = firstWaiter;
5     if (first != null)
6         doSignal(first);
7 }

   喚醒線程使其進入同步隊列以後,咱們再來看await方法中那些沒有執行的代碼。blog

 1 public final void await() throws InterruptedException {
 2     if (Thread.interrupted())
 3         throw new InterruptedException();
 4     Node node = addConditionWaiter(); //將當前線程加入等待隊列
 5     int savedState = fullyRelease(node); //釋放當前線程持有的鎖
 6     int interruptMode = 0;
 7     //根據下面的源碼能夠看出,當前線程若是掉用await方法以後會進入等待隊列,那麼在退出等待隊列以前會一直執行這個循環
 8     while (!isOnSyncQueue(node)) { 
 9         LockSupport.park(this); //喚醒節點中的線程
10         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
11             break;
12     }
13     //經過acquireQueued源碼能夠發現,獲取鎖的流程和ReentrantLock這種獨佔式獲取同步狀態的流程基本一致
14     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
15         interruptMode = REINTERRUPT;
16     if (node.nextWaiter != null) // clean up if cancelled
17         unlinkCancelledWaiters();
18     if (interruptMode != 0)
19         reportInterruptAfterWait(interruptMode);
20 }
21 final boolean isOnSyncQueue(Node node) {
22     if (node.waitStatus == Node.CONDITION || node.prev == null) //判斷當前隊列是否在等待隊列中
23         return false;
24     if (node.next != null) // If has successor, it must be on queue
25         return true;
26     return findNodeFromTail(node);
27 }
28 //競爭鎖資源的同步隊列
29 final boolean acquireQueued(final Node node, int arg) {
30     boolean failed = true;
31     try {
32         boolean interrupted = false;
33         for (;;) {
34             final Node p = node.predecessor(); //獲得當前結點的前驅結點
35             if (p == head && tryAcquire(arg)) { //當前結點的前驅結點爲頭結點,而且嘗試獲取鎖成功
36                 setHead(node); //將當前獲取到鎖的結點設置爲頭結點
37                 p.next = null; // help GC
38                 failed = false;
39                 return interrupted;
40             }
41              //若是獲取同步狀態失敗,應該自旋等待繼續獲取而且校驗本身的中斷標誌位信息
42             if (shouldParkAfterFailedAcquire(p, node) &&
43                 parkAndCheckInterrupt())
44                 interrupted = true;
45         }
46     } finally {
47         if (failed)
48             cancelAcquire(node);
49     }
50 }

  從上面的代碼中咱們能夠看出,當調用await方法的線程在沒有回到同步隊列以前,都會一直在while (!isOnSyncQueue(node)){...}循環中,只有被喚醒退出等待隊列進入同步隊列纔會從循環中退出;以後調用acquireQueued()開始自旋等待鎖的獲取,這個自旋的過程和前面介紹的AQS中獨佔式鎖的獲取流程同樣;最後,若是線程從這個自旋的過程退出了,就表明當前線程再次獲取了鎖資源,最後也從await方法返回。因此,一個線程調用await方法以後,只有最終獲取到鎖纔會從該方法返回。而對於signalAll而言就是對等待隊列中的每一個線程通知(signal)一次,這樣就能夠將等待隊列中的全部線程移到同步隊列中進行鎖資源的獲取。

2、Condition接口使用

一、Condition接口搭配ReentrantLock實現生產者消費者模式

 1 package cn.source.condition;
 2 
 3 import java.util.LinkedList;
 4 import java.util.concurrent.TimeUnit;
 5 import java.util.concurrent.locks.Condition;
 6 import java.util.concurrent.locks.Lock;
 7 import java.util.concurrent.locks.ReentrantLock;
 8 
 9 public class ConditionProducerAndConsumer<E> {
10     
11     private LinkedList<E> list = new LinkedList<E>();
12     private static final int MAX_NUM = 10; //容器的最大數量
13     private int count = 0; //容器中實際數量
14     
15     private Lock lock = new ReentrantLock();
16     private Condition producer = lock.newCondition();
17     private Condition consumer = lock.newCondition();
18     
19     private int getCount() {
20         return count;
21     }
22     
23     private void put(E e) {
24         lock.lock(); //首先須要獲取鎖
25         try {
26             //這裏是判斷容器是否已滿,注意須要使用while:若是使用if的話可能致使全部的消費線程都處於等待狀態
27             while(list.size() == MAX_NUM) { 
28                 System.out.println(Thread.currentThread().getName() + "正在等待中");
29                 producer.await(); //生產者線程進入等待狀態
30             }
31             //添加元素
32             list.add(e);
33             count ++;
34             consumer.signalAll();//將消費者線程喚醒
35         } catch (InterruptedException e1) {
36             e1.printStackTrace();
37         } finally {
38             lock.unlock();
39         }
40     }
41     
42     private E get() {
43         E e = null;
44         lock.lock();
45         try {
46             while(list.size() == 0) {
47                 System.out.println(Thread.currentThread().getName() + "正在等待");
48                 consumer.await(); //消費者線程進入等待狀態
49             }
50             e = list.removeFirst();
51             count --;
52             producer.signalAll(); //消費元素以後,將生產者線程喚醒
53         } catch (InterruptedException e1) {
54             e1.printStackTrace();
55         } finally {
56             lock.unlock();
57         }
58         return e;
59     }
60     
61     public static void main(String[] args) {
62         SyncProducerAndConsumer<String> syncProducerAndConsumer = new SyncProducerAndConsumer<>();
63         for (int i = 0; i < 10; i++) { //開啓10個線程
64             new Thread(new Runnable() {
65                 @Override
66                 public void run() {
67                     for (int j = 0; j < 5; j++) { //每一個線程從容器中獲取5次數據
68                         System.out.println(syncProducerAndConsumer.get());
69                     }
70                 }
71                 
72             }, "消費者線程" + i).start();;
73         }
74         //休眠2秒,全部的消費者線程都已經啓動而且處於等待狀態
75         try {
76             TimeUnit.SECONDS.sleep(2);
77         } catch (InterruptedException e) {
78             e.printStackTrace();
79         }
80         
81         for (int i = 0; i < 2; i++) { //開啓兩個生產者線程
82             new Thread(new Runnable() {
83                 @Override
84                 public void run() { 
85                     for (int j = 0; j < 25; j++) { //每一個生產者線程想容器中添加25個數據,當容器中數據到達10個的時候生產者線程會阻塞
86                         syncProducerAndConsumer.put("add value " + j);
87                     }
88                 }
89             }, "生產者線程"+i).start();
90         }
91     }
92 
93 }

二、synchronized組合wait/notify實現生產者消費者模式

 1 package cn.source.condition;
 2 
 3 import java.util.LinkedList;
 4 import java.util.concurrent.TimeUnit;
 5 
 6 public class SyncProducerAndConsumer<E> {
 7 
 8     private LinkedList<E> list = new LinkedList<E>();
 9     private static final int MAX_NUM = 10; //容器的最大數量
10     private int count = 0; //容器中實際數量
11     
12     public synchronized int getCount() {
13         return count;
14     }
15     
16     public synchronized void put(E e) { 
17         while(list.size() == MAX_NUM) { //這裏是判斷容器是否已滿,注意須要使用while:若是使用if的話可能致使全部的消費線程都處於等待狀態
18             try {
19                 this.wait(); //容器滿了以後,生產者線程進入等待狀態
20             } catch (InterruptedException e1) {
21                 e1.printStackTrace();
22             }
23         }
24         //容器未滿,生產者線程就想容器中添加數據
25         list.add(e);
26         count ++;
27         this.notifyAll(); //此時容器中已經存在數據,喚醒等待的消費者線程
28     }
29     
30     public synchronized E get() {
31         E e = null;
32         while(list.size() == 0) { //判斷容器是否爲空,若是爲空就進入等待狀態,這裏也使用while
33             try {
34                 this.wait();
35             } catch (InterruptedException e1) {
36                 e1.printStackTrace();
37             }
38         }
39         e = list.removeFirst();
40         count --;
41         this.notifyAll();
42         return e;
43     }
44     
45     public static void main(String[] args) {
46         SyncProducerAndConsumer<String> syncProducerAndConsumer = new SyncProducerAndConsumer<>();
47         for (int i = 0; i < 10; i++) { //開啓10個線程
48             new Thread(new Runnable() {
49                 @Override
50                 public void run() {
51                     for (int j = 0; j < 5; j++) { //每一個線程從容器中獲取5次數據
52                         System.out.println(syncProducerAndConsumer.get());
53                     }
54                 }
55                 
56             }, "消費者線程" + i).start();;
57         }
58         //休眠2秒,全部的消費者線程都已經啓動而且處於等待狀態
59         try {
60             TimeUnit.SECONDS.sleep(2);
61         } catch (InterruptedException e) {
62             e.printStackTrace();
63         }
64         
65         for (int i = 0; i < 2; i++) { //開啓兩個生產者線程
66             new Thread(new Runnable() {
67                 @Override
68                 public void run() { 
69                     for (int j = 0; j < 25; j++) { //每一個生產者線程想容器中添加25個數據,當容器中數據到達10個的時候生產者線程會阻塞
70                         syncProducerAndConsumer.put("add value " + j);
71                     }
72                 }
73             }, "生產者線程"+i).start();
74         }
75     }
76     
77     
78 }

三、Object中的等待喚醒機制和Condition的等待通知機制對比

相關文章
相關標籤/搜索