java高併發系列 - 第13天:JUC中的Condition對象

本文目標:java

  1. synchronized中實現線程等待和喚醒
  2. Condition簡介及經常使用方法介紹及相關示例
  3. 使用Condition實現生產者消費者
  4. 使用Condition實現同步阻塞隊列

Object對象中的wait(),notify()方法,用於線程等待和喚醒等待中的線程,你們應該比較熟悉,想再次瞭解的朋友能夠移步到線程的基本操做微信

synchronized中等待和喚醒線程示例

package com.itsoku.chat09;

import java.util.concurrent.TimeUnit;

/**
 * 微信公衆號:javacode2018,獲取年薪50萬課程
 */
public class Demo1 {
    static Object lock = new Object();

    public static class T1 extends Thread {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + "," + this.getName() + "準備獲取鎖!");
            synchronized (lock) {
                System.out.println(System.currentTimeMillis() + "," + this.getName() + "獲取鎖成功!");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(System.currentTimeMillis() + "," + this.getName() + "釋放鎖成功!");
        }
    }

    public static class T2 extends Thread {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + "," + this.getName() + "準備獲取鎖!");
            synchronized (lock) {
                System.out.println(System.currentTimeMillis() + "," + this.getName() + "獲取鎖成功!");
                lock.notify();
                System.out.println(System.currentTimeMillis() + "," + this.getName() + " notify!");
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(System.currentTimeMillis() + "," + this.getName() + "準備釋放鎖!");
            }
            System.out.println(System.currentTimeMillis() + "," + this.getName() + "釋放鎖成功!");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        T1 t1 = new T1();
        t1.setName("t1");
        t1.start();
        TimeUnit.SECONDS.sleep(5);
        T2 t2 = new T2();
        t2.setName("t2");
        t2.start();
    }
}

輸出:併發

1:1563530109234,t1準備獲取鎖!
2:1563530109234,t1獲取鎖成功!
3:1563530114236,t2準備獲取鎖!
4:1563530114236,t2獲取鎖成功!
5:1563530114236,t2 notify!
6:1563530119237,t2準備釋放鎖!
7:1563530119237,t2釋放鎖成功!
8:1563530119237,t1釋放鎖成功!

代碼結合輸出的結果咱們分析一下:ide

  1. 線程t1先獲取鎖,而後調用了wait()方法將線程置爲等待狀態,而後會釋放lock的鎖
  2. 主線程等待5秒以後,啓動線程t2,t2獲取到了鎖,結果中一、3行時間相差5秒左右
  3. t2調用lock.notify()方法,準備將等待在lock上的線程t1喚醒,notify()方法以後又休眠了5秒,看一下輸出的五、8可知,notify()方法以後,t1並不能當即被喚醒,須要等到t2將synchronized塊執行完畢,釋放鎖以後,t1才被喚醒
  4. wait()方法和notify()方法必須放在同步塊內調用(synchronized塊內),不然會報錯

Condition使用簡介

在瞭解Condition以前,須要先了解一下重入鎖ReentrantLock,能夠移步到:JUC中的ReentranLock高併發

任何一個java對象都自然繼承於Object類,在線程間實現通訊的每每會應用到Object的幾個方法,好比wait()、wait(long timeout)、wait(long timeout, int nanos)與notify()、notifyAll()幾個方法實現等待/通知機制,一樣的, 在java Lock體系下依然會有一樣的方法實現等待/通知機制。this

從總體上來看Object的wait和notify/notify是與對象監視器配合完成線程間的等待/通知機制,而Condition與Lock配合完成等待通知機制,前者是java底層級別的,後者是語言級別的,具備更高的可控制性和擴展性。二者除了在使用方式上不一樣外,在功能特性上仍是有不少的不一樣:線程

  1. Condition可以支持不響應中斷,而經過使用Object方式不支持
  2. Condition可以支持多個等待隊列(new 多個Condition對象),而Object方式只能支持一個
  3. Condition可以支持超時時間的設置,而Object不支持

Condition由ReentrantLock對象建立,而且能夠同時建立多個,Condition接口在使用前必須先調用ReentrantLock的lock()方法得到鎖,以後調用Condition接口的await()將釋放鎖,而且在該Condition上等待,直到有其餘線程調用Condition的signal()方法喚醒線程,使用方式和wait()、notify()相似。3d

示例代碼:code

package com.itsoku.chat09;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 微信公衆號:javacode2018,獲取年薪50萬課程
 */
public class Demo2 {
    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();

    public static class T1 extends Thread {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + "," + this.getName() + "準備獲取鎖!");
            lock.lock();
            try {
                System.out.println(System.currentTimeMillis() + "," + this.getName() + "獲取鎖成功!");
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            System.out.println(System.currentTimeMillis() + "," + this.getName() + "釋放鎖成功!");
        }
    }

    public static class T2 extends Thread {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + "," + this.getName() + "準備獲取鎖!");
            lock.lock();
            try {
                System.out.println(System.currentTimeMillis() + "," + this.getName() + "獲取鎖成功!");
                condition.signal();
                System.out.println(System.currentTimeMillis() + "," + this.getName() + " signal!");
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(System.currentTimeMillis() + "," + this.getName() + "準備釋放鎖!");
            } finally {
                lock.unlock();
            }
            System.out.println(System.currentTimeMillis() + "," + this.getName() + "釋放鎖成功!");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        T1 t1 = new T1();
        t1.setName("t1");
        t1.start();
        TimeUnit.SECONDS.sleep(5);
        T2 t2 = new T2();
        t2.setName("t2");
        t2.start();
    }
}

輸出:對象

1563532185827,t1準備獲取鎖!
1563532185827,t1獲取鎖成功!
1563532190829,t2準備獲取鎖!
1563532190829,t2獲取鎖成功!
1563532190829,t2 signal!
1563532195829,t2準備釋放鎖!
1563532195829,t2釋放鎖成功!
1563532195829,t1釋放鎖成功!

輸出的結果和使用synchronized關鍵字的實例相似。

Condition.await()方法和Object.wait()方法相似,當使用Condition.await()方法時,須要先獲取Condition對象關聯的ReentrantLock的鎖,在Condition.await()方法被調用時,當前線程會釋放這個鎖,而且當前線程會進行等待(處於阻塞狀態)。在signal()方法被調用後,系統會從Condition對象的等待隊列中喚醒一個線程,一旦線程被喚醒,被喚醒的線程會嘗試從新獲取鎖,一旦獲取成功,就能夠繼續執行了。所以,在signal被調用後,通常須要釋放相關的鎖,讓給其餘被喚醒的線程,讓他能夠繼續執行。

Condition經常使用方法

Condition接口提供的經常使用方法有:

和Object中wait相似的方法

  1. void await() throws InterruptedException:當前線程進入等待狀態,若是其餘線程調用condition的signal或者signalAll方法而且當前線程獲取Lock從await方法返回,若是在等待狀態中被中斷會拋出被中斷異常;
  2. long awaitNanos(long nanosTimeout):當前線程進入等待狀態直到被通知,中斷或者超時
  3. boolean await(long time, TimeUnit unit) throws InterruptedException:同第二種,支持自定義時間單位,false:表示方法超時以後自動返回的,true:表示等待還未超時時,await方法就返回了(超時以前,被其餘線程喚醒了)
  4. boolean awaitUntil(Date deadline) throws InterruptedException:當前線程進入等待狀態直到被通知,中斷或者到了某個時間
  5. void awaitUninterruptibly();:當前線程進入等待狀態,不會響應線程中斷操做,只能經過喚醒的方式讓線程繼續

和Object的notify/notifyAll相似的方法

  1. void signal():喚醒一個等待在condition上的線程,將該線程從等待隊列中轉移到同步隊列中,若是在同步隊列中可以競爭到Lock則能夠從等待方法中返回。
  2. void signalAll():與1的區別在於可以喚醒全部等待在condition上的線程

Condition.await()過程當中被打斷

package com.itsoku.chat09;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 微信公衆號:javacode2018,獲取年薪50萬課程
 */
public class Demo4 {
    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();

    public static class T1 extends Thread {
        @Override
        public void run() {
            lock.lock();
            try {
                condition.await();
            } catch (InterruptedException e) {
                System.out.println("中斷標誌:" + this.isInterrupted());
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        T1 t1 = new T1();
        t1.setName("t1");
        t1.start();
        TimeUnit.SECONDS.sleep(2);
        //給t1線程發送中斷信號
        System.out.println("一、t1中斷標誌:" + t1.isInterrupted());
        t1.interrupt();
        System.out.println("二、t1中斷標誌:" + t1.isInterrupted());
    }
}

輸出:

一、t1中斷標誌:false
二、t1中斷標誌:true
中斷標誌:false
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
    at com.itsoku.chat09.Demo4$T1.run(Demo4.java:19)

調用condition.await()以後,線程進入阻塞中,調用t1.interrupt(),給t1線程發送中斷信號,await()方法內部會檢測到線程中斷信號,而後觸發InterruptedException異常,線程中斷標誌被清除。從輸出結果中能夠看出,線程t1中斷標誌的變換過程:false->true->false

await(long time, TimeUnit unit)超時以後自動返回

package com.itsoku.chat09;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 微信公衆號:javacode2018,獲取年薪50萬課程
 */
public class Demo5 {
    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();

    public static class T1 extends Thread {
        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println(System.currentTimeMillis() + "," + this.getName() + ",start");
                boolean r = condition.await(2, TimeUnit.SECONDS);
                System.out.println(r);
                System.out.println(System.currentTimeMillis() + "," + this.getName() + ",end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        T1 t1 = new T1();
        t1.setName("t1");
        t1.start();
    }
}

輸出:

1563541624082,t1,start
false
1563541626085,t1,end

t1線程等待2秒以後,自動返回繼續執行,最後await方法返回false,await返回false表示超時以後自動返回

await(long time, TimeUnit unit)超時以前被喚醒

package com.itsoku.chat09;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 微信公衆號:javacode2018,獲取年薪50萬課程
 */
public class Demo6 {
    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();

    public static class T1 extends Thread {
        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println(System.currentTimeMillis() + "," + this.getName() + ",start");
                boolean r = condition.await(5, TimeUnit.SECONDS);
                System.out.println(r);
                System.out.println(System.currentTimeMillis() + "," + this.getName() + ",end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        T1 t1 = new T1();
        t1.setName("t1");
        t1.start();
        //休眠1秒以後,喚醒t1線程
        TimeUnit.SECONDS.sleep(1);
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

輸出:

1563542046046,t1,start
true
1563542047048,t1,end

t1線程中調用condition.await(5, TimeUnit.SECONDS);方法會釋放鎖,等待5秒,主線程休眠1秒,而後獲取鎖,以後調用signal()方法喚醒t1,輸出結果中發現await後過了1秒(一、3行輸出結果的時間差),await方法就返回了,而且返回值是true。true表示await方法超時以前被其餘線程喚醒了。

long awaitNanos(long nanosTimeout)超時返回

package com.itsoku.chat09;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 微信公衆號:javacode2018,獲取年薪50萬課程
 */
public class Demo7 {
    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();

    public static class T1 extends Thread {
        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println(System.currentTimeMillis() + "," + this.getName() + ",start");
                long r = condition.awaitNanos(TimeUnit.SECONDS.toNanos(5));
                System.out.println(r);
                System.out.println(System.currentTimeMillis() + "," + this.getName() + ",end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        T1 t1 = new T1();
        t1.setName("t1");
        t1.start();
    }
}

輸出:

1563542547302,t1,start
-258200
1563542552304,t1,end

awaitNanos參數爲納秒,能夠調用TimeUnit中的一些方法將時間轉換爲納秒。

t1調用await方法等待5秒超時返回,返回結果爲負數,表示超時以後返回的。

waitNanos(long nanosTimeout)超時以前被喚醒

package com.itsoku.chat09;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 微信公衆號:javacode2018,獲取年薪50萬課程
 */
public class Demo8 {
    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();

    public static class T1 extends Thread {
        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println(System.currentTimeMillis() + "," + this.getName() + ",start");
                long r = condition.awaitNanos(TimeUnit.SECONDS.toNanos(5));
                System.out.println(r);
                System.out.println(System.currentTimeMillis() + "," + this.getName() + ",end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        T1 t1 = new T1();
        t1.setName("t1");
        t1.start();
        //休眠1秒以後,喚醒t1線程
        TimeUnit.SECONDS.sleep(1);
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

輸出:

1563542915991,t1,start
3999988500
1563542916992,t1,end

t1中調用await休眠5秒,主線程休眠1秒以後,調用signal()喚醒線程t1,await方法返回正數,表示返回時距離超時時間還有多久,將近4秒,返回正數表示,線程在超時以前被喚醒了。

其餘幾個有參的await方法和無參的await方法同樣,線程調用interrupt()方法時,這些方法都會觸發InterruptedException異常,而且線程的中斷標誌會被清除。

同一個鎖支持建立多個Condition

使用兩個Condition來實現一個阻塞隊列的例子:

package com.itsoku.chat09;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 微信公衆號:javacode2018,獲取年薪50萬課程
 */
public class BlockingQueueDemo<E> {
    int size;//阻塞隊列最大容量

    ReentrantLock lock = new ReentrantLock();

    LinkedList<E> list = new LinkedList<>();//隊列底層實現

    Condition notFull = lock.newCondition();//隊列滿時的等待條件
    Condition notEmpty = lock.newCondition();//隊列空時的等待條件

    public BlockingQueueDemo(int size) {
        this.size = size;
    }

    public void enqueue(E e) throws InterruptedException {
        lock.lock();
        try {
            while (list.size() == size)//隊列已滿,在notFull條件上等待
                notFull.await();
            list.add(e);//入隊:加入鏈表末尾
            System.out.println("入隊:" + e);
            notEmpty.signal(); //通知在notEmpty條件上等待的線程
        } finally {
            lock.unlock();
        }
    }

    public E dequeue() throws InterruptedException {
        E e;
        lock.lock();
        try {
            while (list.size() == 0)//隊列爲空,在notEmpty條件上等待
                notEmpty.await();
            e = list.removeFirst();//出隊:移除鏈表首元素
            System.out.println("出隊:" + e);
            notFull.signal();//通知在notFull條件上等待的線程
            return e;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        BlockingQueueDemo<Integer> queue = new BlockingQueueDemo<>(2);
        for (int i = 0; i < 10; i++) {
            int data = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        queue.enqueue(data);
                    } catch (InterruptedException e) {

                    }
                }
            }).start();
        }
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Integer data = queue.dequeue();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

代碼很是容易理解,建立了一個阻塞隊列,大小爲3,隊列滿的時候,會被阻塞,等待其餘線程去消費,隊列中的元素被消費以後,會喚醒生產者,生產數據進入隊列。上面代碼將隊列大小置爲1,能夠實現同步阻塞隊列,生產1個元素以後,生產者會被阻塞,待消費者消費隊列中的元素以後,生產者才能繼續工做。

Object的監視器方法與Condition接口的對比

對比項 Object 監視器方法 Condition
前置條件 獲取對象的鎖 調用Lock.lock獲取鎖,調用Lock.newCondition()獲取Condition對象
調用方式 直接調用,如:object.wait() 直接調用,如:condition.await()
等待隊列個數 一個 多個,使用多個condition實現
當前線程釋放鎖並進入等待狀態 支持 支持
當前線程釋放鎖進入等待狀態中不響應中斷 不支持 支持
當前線程釋放鎖並進入超時等待狀態 支持 支持
當前線程釋放鎖並進入等待狀態到未來某個時間 不支持 支持
喚醒等待隊列中的一個線程 支持 支持
喚醒等待隊列中的所有線程 支持 支持

總結

  1. 使用condition的步驟:建立condition對象,獲取鎖,而後調用condition的方法
  2. 一個ReentrantLock支持牀多個condition對象
  3. void await() throws InterruptedException;方法會釋放鎖,讓當前線程等待,支持喚醒,支持線程中斷
  4. void awaitUninterruptibly();方法會釋放鎖,讓當前線程等待,支持喚醒,不支持線程中斷
  5. long awaitNanos(long nanosTimeout) throws InterruptedException;參數爲納秒,此方法會釋放鎖,讓當前線程等待,支持喚醒,支持中斷。超時以後返回的,結果爲負數;超時以前返回的,結果爲正數(表示返回時距離超時時間相差的納秒數)
  6. boolean await(long time, TimeUnit unit) throws InterruptedException;方法會釋放鎖,讓當前線程等待,支持喚醒,支持中斷。超時以後返回的,結果爲false;超時以前返回的,結果爲true
  7. boolean awaitUntil(Date deadline) throws InterruptedException;參數表示超時的截止時間點,方法會釋放鎖,讓當前線程等待,支持喚醒,支持中斷。超時以後返回的,結果爲false;超時以前返回的,結果爲true
  8. void signal();會喚醒一個等待中的線程,而後被喚醒的線程會被加入同步隊列,去嘗試獲取鎖
  9. void signalAll();會喚醒全部等待中的線程,將全部等待中的線程加入同步隊列,而後去嘗試獲取鎖

java高併發系列連載中,總計估計會有四五十篇文章,能夠關注公衆號:javacode2018,獲取最新文章。

java高併發系列 - 第13天:JUC中的Condition對象

相關文章
相關標籤/搜索