使用ReentrantLock和Condition來代替內置鎖和wait(),notify(),notifyAll()

使用ReentrantLock能夠替代內置鎖,當使用內置鎖的時候,咱們可使用wait() nitify()和notifyAll()來控制線程之間的協做,那麼,當咱們使用ReentrantLock的時候,咱們怎麼來處理線程之間的寫做呢?
JDK5.0爲咱們提供了Condition對象來替代內置鎖的 wait(),notify()和notifyAll()方法

內置鎖的話,就只能有一個等待隊列,全部的在某個對象上執行wait()方法的線程都會被加入到該對象的等待隊列中去(線程會被掛起),須要其餘的線程在同一個對象上調用notify()或者是notifyAll()方法來喚醒等待隊列中的線程java

而使用Condition的話,可使用不一樣的等待隊列,只須要使用lock.newCondition()便可定義一個Condition對象,每個Condition對象上都會有一個等待隊列(底層使用AQS),調用某個Condition對象的await()方法,就能夠把當前線程加入到這個Condition對象的等待隊列上dom

其餘的線程調用同一個Condition對象的sinal()或者是signalAll()方法則會喚醒等待隊列上的線程,使其可以繼續執行ui

咱們以一個現實中的例子來講明若何使用ReentrantLock和Condition如何替代synchronized和wait(),notify(),notifyAll():this

咱們模擬兩個線程,一個線程執行登陸操做,該登陸操做會阻塞,而後等待另一個線程將其喚醒(相似掃描登陸的場景,頁面會阻塞,等待掃碼和確認,而後頁面纔會跳轉)線程

首先是使用內置鎖的例子:日誌

package com.jiaoyiping.baseproject.condition;

import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

/**
 * Created with Intellij IDEA
 *
 * @author: jiaoyiping
 * Mail: jiaoyiping@gmail.com
 * Date: 2019/04/12
 * Time: 15:29
 * To change this template use File | Settings | Editor | File and Code Templates
 */

//使用內置鎖來實現的等待/通知模型

public class LoginServiceUseInnerLock {
    private ConcurrentHashMap<String, Result> loginMap = new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        LoginServiceUseInnerLock loginService = new LoginServiceUseInnerLock();
        String uuid = UUID.randomUUID().toString();
        System.out.println("[" + Thread.currentThread().getName() + "] 使用的UUID是: " + uuid);
        new Thread(() -> {
            loginService.login(uuid, 20_000);
            countDownLatch.countDown();
        }, "登陸線程").start();
        Thread.sleep(2_000);
        new Thread(() -> {
            loginService.confirm(uuid);
            countDownLatch.countDown();
        }, "確認線程").start();
        countDownLatch.await();
        System.out.println("[" + Thread.currentThread().getName() + "] 兩個線程都執行完畢了");

    }


    public void login(String code, int timeout) {
        Result result = new Result();
        result.setMessage("超時");
        loginMap.put(code, result);
        synchronized (result) {
            try {
                //超時的話,會自動返回,程序繼續
                System.out.println("[" + Thread.currentThread().getName() + "] 登陸線程掛起");
                result.wait(timeout);
                System.out.println("[" + Thread.currentThread().getName() + "] 登陸線程繼續執行,獲得的結果是:" + result.getMessage());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                loginMap.remove(code);
            }
        }
    }

    public void confirm(String code) {
        assert code != null;
        Result result = loginMap.get(code);
        if (result == null) {
            System.out.println("[" + Thread.currentThread().getName() + "] 請求不存在或者已通過期");
            return;
        }
        result.setMessage("成功");
        synchronized (result) {
            //喚醒等待隊列上的線程
            System.out.println("[" + Thread.currentThread().getName() + "] 確認線程開始喚醒阻塞的線程");
            result.notify();
        }

    }

    class Result implements Serializable {
        private static final long serialVersionUID = -4279280559711939661L;
        String message;

        public String getMessage() {
            return message;
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public Result() {
        }
        public Result(String message) {
            this.message = message;
        }
    }

使用內置鎖的時候,咱們把random生成的key和一個本身定義的Result對象放置到ConcurrentHashMap中去,登陸線程調用 Result對象的wait(timeout) 方法將當前線程掛起,並加入到Result對象的等待隊列上去code

確認線程根據key值,找到對應的Result對象,設置好message,而後調用Result對象的notify()方法喚醒等待隊列上的線程,登陸線程得以繼續執行對象

那咱們如何使用ReentrantLock和Condition來重寫這個例子:blog

package com.jiaoyiping.baseproject.condition;

import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created with Intellij IDEA
 *
 * @author: jiaoyiping
 * Mail: jiaoyiping@gmail.com
 * Date: 2019/04/12
 * Time: 14:56
 * To change this template use File | Settings | Editor | File and Code Templates
 */

//使用ReentrantLock和Condition來實現的等待/通知模型

public class LoginServiceUseCondition {

    private ReentrantLock lock = new ReentrantLock();
    ConcurrentHashMap<String, Result> conditions = new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException {
        LoginServiceUseCondition loginService = new LoginServiceUseCondition();
        String uuid = UUID.randomUUID().toString();
        System.out.println("[" + Thread.currentThread().getName() + "] 使用的UUID是:" + uuid);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        new Thread(() -> {
            loginService.login(uuid, 30_000);
            countDownLatch.countDown();
        }, "登陸線程").start();
        Thread.sleep(5_000);
        new Thread(() -> {
            try {
                Thread.sleep(3_000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            loginService.confirm(uuid);
            countDownLatch.countDown();
        }, "確認線程").start();
        countDownLatch.await();
        System.out.println("[" + Thread.currentThread().getName() + "] 兩個線程都執行完畢了,退出");
    }


    /**
     * 過了超時時間以後,鎖會自動釋放
     *
     * @param code
     * @param timeout
     */
    public void login(String code, int timeout) {
        assert code != null;
        try {
            lock.tryLock(timeout, TimeUnit.MILLISECONDS);
            Condition condition = lock.newCondition();
            Result result = new Result("超時", condition);
            conditions.put(code, result);
            System.out.println("[" + Thread.currentThread().getName() + "] login()的請求開始阻塞");
            condition.await();
            System.out.println("[" + Thread.currentThread().getName() + "] 結束等待,繼續執行,拿到的結果是" + result.getMessage());

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }


    }

    //確認線程(拿這個UUID,去找到對應的Condition,喚醒上邊的等待隊列,並把Condition對象移除掉)

    public void confirm(String code) {
        assert code != null;
        Result result = conditions.get(code);
        Condition condition = result.getCondition();
        if (condition != null) {
            try {
                System.out.println("[" + Thread.currentThread().getName() + "] 找到對應的Condition對象,將其等待隊列中的線程喚醒");
                lock.lock();
                result.setMessage("成功");
                condition.signal();
                conditions.remove(code);
            } finally {
                lock.unlock();
            }
        }
    }

    class Result implements Serializable {
        String message;
        final Condition condition;

        public Result(String message, Condition condition) {
            this.message = message;
            this.condition = condition;
        }

        public Result(Condition condition) {
            this.condition = condition;
        }

        public String getMessage() {
            return message;
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public Condition getCondition() {
            return condition;
        }
    }

}

上邊的例子說明了怎麼使用ReentrantLock和Condition來代替內置鎖和wait(),notify(),notifyAll()隊列

下邊的一個來自jdk中的例子,演示瞭如何使用同一個ReentrantLock上的多個等待隊列的狀況

來自JDK文檔中的示例(我稍加改造,加上了main方法和一些日誌):

package com.jiaoyiping.baseproject.condition;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

/**
 * Created with Intellij IDEA
 *
 * @author: jiaoyiping
 * Mail: jiaoyiping@gmail.com
 * Date: 2019/04/12
 * Time: 21:17
 * To change this template use File | Settings | Editor | File and Code Templates
 */

public class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[10];
    int putptr, takeptr, count;

    public static void main(String[] args) throws InterruptedException {
        BoundedBuffer boundedBuffer = new BoundedBuffer();
        CountDownLatch countDownLatch = new CountDownLatch(40);
        //分別啓動20個put線程和20個take線程

        IntStream.rangeClosed(1, 20).forEach(i -> {
            new Thread(() -> {
                try {
                    boundedBuffer.put(new Object());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }, "put線程 - " + i).start();
        });

        IntStream.rangeClosed(1, 20).forEach(i -> {
            new Thread(() -> {
                try {
                    boundedBuffer.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }, "take線程-" + i).start();
        });

        countDownLatch.await();
        System.out.println("[" + Thread.currentThread().getName() + "] 全部線程都執行完畢,退出");
    }

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            //put的線程,當隊列滿的時候掛起
            while (count == items.length) {
                System.out.println("[" + Thread.currentThread().getName() + "] 線程掛起");
                notFull.await();
            }
            Thread.sleep(1_000);
            items[putptr] = x;
            if (++putptr == items.length) {
                putptr = 0;
            }
            ++count;
            System.out.println("[" + Thread.currentThread().getName() + "] 執行完畢寫操做,喚醒take線程");
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            //take的線程,當隊列爲空的時候,掛起
            while (count == 0) {
                System.out.println("[" + Thread.currentThread().getName() + "] 線程掛起");
                notEmpty.await();
            }
            Thread.sleep(1_000);
            Object x = items[takeptr];
            if (++takeptr == items.length) {
                takeptr = 0;
            }
            --count;
            System.out.println("[" + Thread.currentThread().getName() + "] 執行完畢讀操做,喚醒put線程");
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

咱們看到,以上代碼中,使用到了兩個Condition:notFull和notEmpty,都是經過lock對象的newCondition()方法得來的

items被放滿以後,put的線程會在notFull的等待隊列上進行等待(執行了notFull.await()方法) put線程執行完操做以後,會調用 notEmpty.signal()來試圖喚醒在notEmpty上等待的線程(也就是給take線程發了一個信號,告訴它,items不是空的了,你能夠過來take了)

當item空了以後,take線程會在notEmpty的等待隊列上進行等待(執行了notEmpty的await()方法) 當take線程執行完操做以後,會調用notFull.signal()來喚醒在notFull上等待的線程(也就是給put線程發一個信號,告訴它,items不滿了,你能夠進行put操做了)

和內置方法相似,在調用await(),signal(),signalAll()等方法的時候,也必需要得到鎖,也就是必須在 lock.lock()和lock.unlock()代碼塊兒之間才能調用這些方法,不然就會拋出IllegalMonitorStateException

相關文章
相關標籤/搜索