【java併發】條件阻塞Condition的應用

問題1:有兩個線程,子線程先執行10次,而後主線程執行5次,而後再切換到子線程執行10,再主線程執行5次……如此往返執行50次。java

Condition將Object監視器方法(wait、notify 和 notifyAll)分解成大相徑庭的對象,以便經過將這些對象與任意Lock實現組合使用,爲每一個對象提供多個等待 set(wait-set)。其中,Lock 替代了synchronized方法和語句的使用,Condition替代了Object監視器方法的使用。數組

1. Condition的基本使用

因爲Condition能夠用來替代wait、notify等方法,因此能夠對比着以前寫過的線程間通訊的代碼來看,來實現摘要中的問題,以前用wait和notify來實現的,如今用Condition來改寫一下,代碼以下:安全

package com.jie.thread.condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * \* Created with IntelliJ IDEA.
 * \* User: wugong.jie
 * \* Date: 2018/3/9 12:41
 * \* To change this template use File | Settings | File Templates.
 * \* Description:
 * \
 */
public class Business {
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition(); //Condition是在具體的lock之上的
    private boolean bShouldSub = true;

    public void sub(int i) {
        lock.lock();
        try {
            while (!bShouldSub) {
                try {
                    condition.await(); //用condition來調用await方法
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            for (int j = 1; j <= 10; j++) {
                System.out.println("sub thread sequence of " + j
                        + ", loop of " + i);
            }
            bShouldSub = false;
            condition.signal(); //用condition來發出喚醒信號,喚醒某一個
        } finally {
            lock.unlock();
        }
    }

    /**
     *
     * @author wugong
     * @date 2018/3/9 12:54
     * @modify if true,please enter your name or update time
     * @param
     */
    public void mainThreadMethod(int i) {
        // 鎖住當前的線程,保證屬性變量bShouldSub安全性
        lock.lock();
        try {
            while (bShouldSub) {
                try {
                    condition.await(); //用condition來調用await方法
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 主線程執行5次
            for (int j = 1; j <= 5; j++) {
                System.out.println("main thread sequence of " + j
                        + ", loop of " + i);
            }
            // 容許喚醒子線程
            bShouldSub = true;
            condition.signal(); //用condition來發出喚醒信號麼,喚醒某一個
        } finally {
            lock.unlock();
        }
    }

}
package com.jie.thread.condition;

/**
 * \* Created with IntelliJ IDEA.
 * \* User: wugong.jie
 * \* Date: 2018/3/9 12:40
 * \* To change this template use File | Settings | File Templates.
 * \* Description:
 * \
 */
public class ConditionCommunication {

    public static void main(String[] args) {
        Business bussiness = new Business();
        // 開啓子線程
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 1; i <= 50; i++) {
                    bussiness.sub(i);
                }
            }
        }).start();
        // 主線程執行
        for (int i = 1; i <= 50; i++) {
            bussiness.mainThreadMethod(i);
        }
    }

}

從代碼來看,Condition的使用時和Lock一塊兒的,沒有Lock就無法使用Condition,由於Condition是經過Lock來new出來的,這種用法很簡單,只要掌握了synchronized和wait、notify的使用,徹底能夠掌握Lock和Condition的使用。併發

2. Condition的擴展

2.1 緩衝區的阻塞隊列

上面使用Lock和Condition來代替synchronized和Object監視器方法實現了兩個線程之間的通訊,如今再來寫個稍微高級點應用:模擬緩衝區的阻塞隊列。 
什麼叫緩衝區呢?舉個例子,如今有不少人要發消息,我是中轉站,我要幫別人把消息發出去,那麼如今我  就須要作兩件事,一件事是接收用戶發過來的消息,並按順序放到緩衝區,另外一件事是從緩衝區中按順序取出用戶發過來的消息,併發送出去。 
  如今把這個實際的問題抽象一下:緩衝區即一個數組,咱們能夠向數組中寫入數據,也能夠從數組中把數據取走,我要作的兩件事就是開啓兩個線程,一個存數據,一個取數據。可是問題來了,若是緩衝區滿了,說明接收的消息太多了,即發送過來的消息太快了,我另外一個線程還來不及發完,致使如今緩衝區沒地方放了,那麼此時就得阻塞存數據這個線程,讓其等待;相反,若是我轉發的太快,如今緩衝區全部內容都被我發完了,尚未用戶發新的消息來,那麼此時就得阻塞取數據這個線程。dom

好了,分析完了這個緩衝區的阻塞隊列,下面就用Condition技術來實現一下:ide

package com.jie.thread.condition.buffer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * \* Created with IntelliJ IDEA.
 * \* User: wugong.jie
 * \* Date: 2018/3/9 13:04
 * \* To change this template use File | Settings | File Templates.
 * \* Description:
 * \
 */
public class Buffer {
    final Lock lock = new ReentrantLock(); //定義一個鎖
    final Condition notFull = lock.newCondition(); //定義阻塞隊列滿了的Condition
    final Condition notEmpty = lock.newCondition();//定義阻塞隊列空了的Condition

    final Object[] items = new Object[10]; //爲了下面模擬,設置阻塞隊列的大小爲10,不要設太大

    int putptr, takeptr, count; //數組下標,用來標定位置的

    //往隊列中存數據
    public void put(Object x) throws InterruptedException {
        lock.lock(); //上鎖
        try {
            while (count == items.length) {
                System.out.println(Thread.currentThread().getName() + " 被阻塞了,暫時沒法存數據!");
                notFull.await();    //若是隊列滿了,那麼阻塞存數據這個線程,等待被喚醒
            }
            //若是沒滿,按順序往數組中存
            items[putptr] = x;
            if (++putptr == items.length) //這是到達數組末端的判斷,若是到了,再回到始端
                putptr = 0;
            ++count;    //消息數量
            System.out.println(Thread.currentThread().getName() + "存入:" + x+";還有:"+(10-count)+"個位置");
            notEmpty.signal(); //好了,如今隊列中有數據了,喚醒隊列空的那個線程,能夠取數據啦
        } finally {
            lock.unlock(); //放鎖
        }
    }

    //從隊列中取數據
    public Object take() throws InterruptedException {
        lock.lock(); //上鎖
        try {
            while (count == 0) {
                System.out.println(Thread.currentThread().getName() + " 被阻塞了,暫時沒法取數據!");
                notEmpty.await();  //若是隊列是空,那麼阻塞取數據這個線程,等待被喚醒
            }
            //若是沒空,按順序從數組中取
            Object x = items[takeptr];
            if (++takeptr == items.length) //判斷是否到達末端,若是到了,再回到始端
                takeptr = 0;
            --count; //消息數量
            System.out.println(Thread.currentThread().getName() + "取出:" + x+";還有"+(10-count)+"個位置");
            notFull.signal(); //好了,如今隊列中有位置了,喚醒隊列滿的那個線程,能夠存數據啦
            return x;
        } finally {
            lock.unlock(); //放鎖
        }
    }
}

這個程序很經典,我從官方JDK文檔中拿出來的,而後加了註釋。程序中定義了兩個Condition,分別針對兩個線程,等待和喚醒分別用不一樣的Condition來執行,思路很清晰,程序也很健壯。能夠考慮一個問題,爲啥要用兩個Codition呢?之因此這麼設計確定是有緣由的,若是用一個Condition,如今假設隊列滿了,可是有2個線程A和B同時存數據,那麼都進入了睡眠,好,如今另外一個線程取走一個了,而後喚醒了其中一個線程A,那麼A能夠存了,存完後,A又喚醒一個線程,若是B被喚醒了,那就出問題了,由於此時隊列是滿的,B不能存的,B存的話就會覆蓋原來還沒被取走的值,就由於使用了一個Condition,存和取都用這個Condition來睡眠和喚醒,就亂了套。到這裏,就能體會到這個Condition的用武之地了,如今來測試一下上面的阻塞隊列的效果:oop

package com.jie.thread.condition.buffer;

import java.util.Random;

/**
 * \* Created with IntelliJ IDEA.
 * \* User: wugong.jie
 * \* Date: 2018/3/9 13:06
 * \* To change this template use File | Settings | File Templates.
 * \* Description:
 * \
 */
public class BoundedBuffer {

    public static void main(String[] args) {
        Buffer buffer = new Buffer();
        for (int i = 0; i < 5; i++) { //開啓5個線程往緩衝區存數據
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        buffer.put(new Random().nextInt(1000)); //隨機存數據
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        for (int i = 0; i < 10; i++) { //開啓10個線程從緩衝區中取數據
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        buffer.take(); //從緩衝區取數據
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

}

結果:測試

Thread-0存入:730;還有:9個位置
Thread-4存入:304;還有:8個位置
Thread-1存入:803;還有:7個位置
Thread-8取出:730;還有8個位置
Thread-5取出:304;還有9個位置
Thread-12取出:803;還有10個位置
Thread-9 被阻塞了,暫時沒法取數據!
Thread-13 被阻塞了,暫時沒法取數據!
Thread-3存入:351;還有:9個位置
Thread-9取出:351;還有10個位置
Thread-7 被阻塞了,暫時沒法取數據!
Thread-11 被阻塞了,暫時沒法取數據!
Thread-2存入:411;還有:9個位置
Thread-13取出:411;還有10個位置
Thread-6 被阻塞了,暫時沒法取數據!
Thread-10 被阻塞了,暫時沒法取數據!
Thread-14 被阻塞了,暫時沒法取數據!

從結果中能夠看出,線程5和10搶先執行,發現隊列中沒有,因而就被阻塞了,睡在那了,直到隊列中有新的值存入才能夠取,可是它們兩運氣很差,存的數據又被其餘線程給搶先取走了,哈哈……能夠多運行幾回。若是想要看到存數據被阻塞,能夠將取數據的線程設置少一點,這裏我就不設了。this

2.2 兩個以上線程之間的喚醒

題目2:有三個線程,子線程1先執行10次,而後子線程2執行10次,而後主線程執行5次,而後再切換到子線程1執行10次,子線程2執行10次,主線程執行5次……如此往返執行50次。spa

如過不用Condition,還真很差弄,可是用Condition來作的話,就很是方便了,原理很簡單,定義三個Condition,子線程1執行完喚醒子線程2,子線程2執行完喚醒主線程,主線程執行完喚醒子線程1。喚醒機制和上面那個緩衝區道理差很少,下面看看代碼吧,很容易理解。

package com.jie.thread.condition.threeBusiness;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * \* Created with IntelliJ IDEA.
 * \* User: wugong.jie
 * \* Date: 2018/3/9 13:14
 * \* To change this template use File | Settings | File Templates.
 * \* Description:
 * \
 */
public class ThreeBusiness {

    Lock lock = new ReentrantLock();
    Condition condition1 = lock.newCondition(); //Condition是在具體的lock之上的
    Condition condition2 = lock.newCondition();
    Condition conditionMain = lock.newCondition();

    private int bShouldSub = 0;

    public void sub1(int i) {
        lock.lock();
        try {
            while (bShouldSub != 0) {
                try {
                    condition1.await(); //用condition來調用await方法
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            for (int j = 1; j <= 10; j++) {
                System.out.println("sub1 thread sequence of " + j
                        + ", loop of " + i);
            }
            bShouldSub = 1;
            condition2.signal(); //讓線程2執行
        } finally {
            lock.unlock();
        }
    }

    public void sub2(int i) {
        lock.lock();
        try {
            while (bShouldSub != 1) {
                try {
                    condition2.await(); //用condition來調用await方法
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            for (int j = 1; j <= 10; j++) {
                System.out.println("sub2 thread sequence of " + j
                        + ", loop of " + i);
            }
            bShouldSub = 2;
            conditionMain.signal(); //讓主線程執行
        } finally {
            lock.unlock();
        }
    }

    public void main(int i) {
        lock.lock();
        try {
            while (bShouldSub != 2) {
                try {
                    conditionMain.await(); //用condition來調用await方法
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            for (int j = 1; j <= 5; j++) {
                System.out.println("main thread sequence of " + j
                        + ", loop of " + i);
            }
            bShouldSub = 0;
            condition1.signal(); //讓線程1執行
        } finally {
            lock.unlock();
        }
    }

}
package com.jie.thread.condition.threeBusiness;

/**
 * \* Created with IntelliJ IDEA.
 * \* User: wugong.jie
 * \* Date: 2018/3/9 13:16
 * \* To change this template use File | Settings | File Templates.
 * \* Description:
 * \
 */
public class ThreeConditionCommunication {

    public static void main(String[] args) {
        ThreeBusiness threeBusiness = new ThreeBusiness();

        new Thread(new Runnable() {// 開啓一個子線程
            @Override
            public void run() {
                for (int i = 1; i <= 50; i++) {
                    threeBusiness.sub1(i);
                }

            }
        }).start();

        new Thread(new Runnable() {// 開啓另外一個子線程

            @Override
            public void run() {
                for (int i = 1; i <= 50; i++) {

                    threeBusiness.sub2(i);
                }
            }
        }).start();

        // main方法主線程
        for (int i = 1; i <= 50; i++) {
            threeBusiness.main(i);
        }

    }
}
相關文章
相關標籤/搜索