多線程保護性暫停模式詳解:DUBBO請求和響應是怎樣對應上的

歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我我的微信「java_front」一塊兒交流學習java

1 文章概述

在多線程編程實踐中,咱們確定會面臨線程間數據交互的問題。在處理這類問題時須要使用一些設計模式,從而保證程序的正確性和健壯性。編程

保護性暫停設計模式就是解決多線程間數據交互問題的一種模式。本文先從基礎案例介紹保護性暫停基本概念和實踐,再由淺入深,最終分析DUBBO源碼中保護性暫停設計模式使用場景。設計模式

2 什麼是保護性暫停

咱們設想這樣一種場景:線程A生產數據,線程B讀取數據這個數據。服務器

可是有一種狀況:線程B準備讀取數據時,此時線程A尚未生產出數據。微信

在這種狀況下線程B不能一直空轉,也不能當即退出,線程B要等到生產數據完成並拿到數據以後才退出。多線程

那麼在數據沒有生產出這段時間,線程B須要執行一種等待機制,這樣能夠達到對系統保護目的,這就是保護性暫停。架構

保護性暫停有多種實現方式,本文咱們用synchronized/wait/notify的方式實現。併發

@Getter
@Setter
public class MyData implements Serializable {
    private static final long serialVersionUID = 1L;
    private String message;

    public MyData(String message) {
        this.message = message;
    }
}

class Resource1 {
    private MyData data;
    private Object lock = new Object();

    public MyData getData() {
        synchronized (lock) {
            while (data == null) {
                try {
                    // 沒有數據則釋放鎖並暫停等待被喚醒
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return data;
        }
    }

    public void sendData(MyData data) {
        synchronized (lock) {
            // 生產數據後喚醒消費線程
            this.data = data;
            lock.notifyAll();
        }
    }
}

/** * 保護性暫停實例一 * * @author 今日頭條號「JAVA前線」 */
public class ProtectDesignTest1 {

    public static void main(String[] args) {
        Resource1 resource = new Resource1();
        new Thread(() -> {
            try {
                MyData data = new MyData("hello");
                System.out.println(Thread.currentThread().getName() + "生產數據=" + data);
                // 模擬發送耗時
                TimeUnit.SECONDS.sleep(3);
                resource.sendData(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            MyData data = resource.getData();
            System.out.println(Thread.currentThread().getName() + "接收到數據=" + data);
        }, "t2").start();
    }
}

在上述實例中線程1生產數據,線程2消費數據。Resource1類中經過wait/notify實現了保護性暫停設計模式。app

3 加一個超時時間

上述實例中若是線程2沒有獲取到數據,那麼線程2直到拿到數據纔會退出。如今咱們給獲取數據指定一個超時時間,若是在這個時間內沒有獲取到數據則拋出超時異常。雖然只是加一個參數,可是其中有不少細節須要注意。框架

3.1 一段有問題的代碼

咱們分析下面這段代碼

class Resource2 {
    private MyData data;
    private Object lock = new Object();

    public MyData getData(int timeOut) {
        synchronized (lock) {
            while (data == null) {
                try {
                    // 代碼1
                    lock.wait(timeOut);
                    break;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (data == null) {
                throw new RuntimeException("超時未獲取到結果");
            }
            return data;
        }
    }

    public void sendData(MyData data) {
        synchronized (lock) {
            this.data = data;
            lock.notifyAll();
        }
    }
}


/** * 保護性暫停實例二 * * @author 今日頭條號「JAVA前線」 */
public class ProtectDesignTest2 {

    public static void main(String[] args) {
        Resource2 resource = new Resource2();
        new Thread(() -> {
            try {
                MyData data = new MyData("hello");
                System.out.println(Thread.currentThread().getName() + "生產數據=" + data);
                // 模擬發送耗時
                TimeUnit.SECONDS.sleep(3);
                resource.sendData(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            MyData data = resource.getData(1000);
            System.out.println(Thread.currentThread().getName() + "接收到數據=" + data);
        }, "t2").start();
    }
}

這段代碼看似沒有問題,使用的也是wait帶有超時時間的參數,那麼問題可能出在哪裏呢?

問題是線程虛假喚醒帶來的。若是尚未到超時時間代碼1就被虛假喚醒,此時data尚未值就會直接跳出循環,這樣沒有達到咱們預期的超時時間才跳出循環的預期。

關於虛假喚醒這個概念,咱們看看JDK官方文檔相關介紹。

A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup. While this will rarely occur in practice, applications must guard against it by testing for the condition that should have caused the thread to be awakened, and continuing to wait if the condition is not satisfied. In other words, waits should always occur in loops, like this one:

 

synchronized (obj) {

while (<condition does not hold>)

obj.wait(timeout);

}

官方文檔告訴咱們一個線程可能會在沒有被notify時虛假喚醒,因此判斷是否繼續wait時必須用while循環。咱們在寫代碼時必定也要注意線程虛假喚醒問題。

3.2 正確實例

上面咱們明白了虛假喚醒問題,如今咱們對代碼進行修改,說明參看代碼註釋。

class Resource3 {
    private MyData data;
    private Object lock = new Object();

    public MyData getData(int timeOut) {
        synchronized (lock) {
            // 運行時長
            long timePassed = 0;
            // 開始時間
            long begin = System.currentTimeMillis();
            // 若是結果爲空
            while (data == null) {
                try {
                    // 若是運行時長大於超時時間退出循環
                    if (timePassed > timeOut) {
                        break;
                    }
                    // 若是運行時長小於超時時間表示虛假喚醒 -> 只需再等待時間差值
                    long waitTime = timeOut - timePassed;

                    // 等待時間差值
                    lock.wait(waitTime);

                    // 結果不爲空直接返回
                    if (data != null) {
                        break;
                    }
                    // 被喚醒後計算運行時長
                    timePassed = System.currentTimeMillis() - begin;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (data == null) {
                throw new RuntimeException("超時未獲取到結果");
            }
            return data;
        }
    }

    public void sendData(MyData data) {
        synchronized (lock) {
            this.data = data;
            lock.notifyAll();
        }
    }
}

/** * 保護性暫停實例三 * * @author 今日頭條號「JAVA前線」 */
public class ProtectDesignTest3 {

    public static void main(String[] args) {
        Resource3 resource = new Resource3();
        new Thread(() -> {
            try {
                MyData data = new MyData("hello");
                System.out.println(Thread.currentThread().getName() + "生產數據=" + data);
                // 模擬發送耗時
                TimeUnit.SECONDS.sleep(3);
                resource.sendData(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            MyData data = resource.getData(1000);
            System.out.println(Thread.currentThread().getName() + "接收到數據=" + data);
        }, "t2").start();
    }
}

4 加一個編號

如今再來設想一個場景:如今有三個生產數據的線程一、二、3,三個獲取數據的線程四、五、6,咱們但願每一個獲取數據線程都只拿到其中一個生產線程的數據,不能多拿也不能少拿。

這裏引入一個Futures模型,這個模型爲每一個資源進行編號並存儲在容器中,例如線程1生產的數據被拿走則從容器中刪除,一直到容器爲空結束。

@Getter
@Setter
public class MyNewData implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final AtomicLong ID = new AtomicLong(0);
    private Long id;
    private String message;

    public MyNewData(String message) {
        this.id = newId();
        this.message = message;
    }

    /** * 自增到最大值會回到最小值(負值能夠做爲識別ID) */
    private static long newId() {
        return ID.getAndIncrement();
    }

    public Long getId() {
        return this.id;
    }
}

class MyResource {
    private MyNewData data;
    private Object lock = new Object();

    public MyNewData getData(int timeOut) {
        synchronized (lock) {
            long timePassed = 0;
            long begin = System.currentTimeMillis();
            while (data == null) {
                try {
                    if (timePassed > timeOut) {
                        break;
                    }
                    long waitTime = timeOut - timePassed;
                    lock.wait(waitTime);
                    if (data != null) {
                        break;
                    }
                    timePassed = System.currentTimeMillis() - begin;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (data == null) {
                throw new RuntimeException("超時未獲取到結果");
            }
            return data;
        }
    }

    public void sendData(MyNewData data) {
        synchronized (lock) {
            this.data = data;
            lock.notifyAll();
        }
    }
}

class MyFutures {
    private static final Map<Long, MyResource> FUTURES = new ConcurrentHashMap<>();

    public static MyResource newResource(MyNewData data) {
        final MyResource future = new MyResource();
        FUTURES.put(data.getId(), future);
        return future;
    }

    public static MyResource getResource(Long id) {
        return FUTURES.remove(id);
    }

    public static Set<Long> getIds() {
        return FUTURES.keySet();
    }
}


/** * 保護性暫停實例四 * * @author 今日頭條號「JAVA前線」 */
public class ProtectDesignTest4 {

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 3; i++) {
            final int index = i;
            new Thread(() -> {
                try {
                    MyNewData data = new MyNewData("hello_" + index);
                    MyResource resource = MyFutures.newResource(data);
                    // 模擬發送耗時
                    TimeUnit.SECONDS.sleep(1);
                    resource.sendData(data);
                    System.out.println("生產數據data=" + data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }).start();
        }
        TimeUnit.SECONDS.sleep(1);

        for (Long i : MyFutures.getIds()) {
            final long index = i;
            new Thread(() -> {
                MyResource resource = MyFutures.getResource(index);
                int timeOut = 3000;
                System.out.println("接收數據data=" + resource.getData(timeOut));
            }).start();
        }
    }
}

5 DUBBO應用實例

咱們順着這一個鏈路跟蹤代碼:消費者發送請求 > 提供者接收請求並執行,而且將運行結果發送給消費者 >消費者接收結果。

(1) 消費者發送請求

消費者發送的數據包含請求ID,而且將關係維護進FUTURES容器

final class HeaderExchangeChannel implements ExchangeChannel {

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        // 代碼1
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
}

class DefaultFuture implements ResponseFuture {

    // FUTURES容器
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

    private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        // 請求ID
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
}

(2) 提供者接收請求並執行,而且將運行結果發送給消費者

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        // response與請求ID對應
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();
            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);
            channel.send(res);
            return;
        }
        // message = RpcInvocation包含方法名、參數名、參數值等
        Object msg = req.getData();
        try {

            // DubboProtocol.reply執行實際業務方法
            CompletableFuture<Object> future = handler.reply(channel, msg);

            // 若是請求已經完成則發送結果
            if (future.isDone()) {
                res.setStatus(Response.OK);
                res.setResult(future.get());
                channel.send(res);
                return;
            }
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }
}

(3) 消費者接收結果

如下DUBBO源碼很好體現了保護性暫停這個設計模式,說明參看註釋

class DefaultFuture implements ResponseFuture {
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();

    public static void received(Channel channel, Response response) {
        try {
            // 取出對應的請求對象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                            + ", response " + response
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                               + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }


    @Override public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {

                    // 放棄鎖並使當前線程阻塞,直到發出信號中斷它或者達到超時時間
                    done.await(timeout, TimeUnit.MILLISECONDS);

                    // 阻塞結束後再判斷是否完成
                    if (isDone()) {
                        break;
                    }

                    // 阻塞結束後判斷是否超時
                    if(System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // response對象仍然爲空則拋出超時異常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            // 接收到服務器響應賦值response
            response = res;
            if (done != null) {
                // 喚醒get方法中處於等待的代碼塊
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
}

6 文章總結

本文咱們從基礎案例介紹保護性暫停基本概念和實踐,最終分析DUBBO源碼中保護性暫停設計模式使用場景。咱們在設計併發框架時要注意虛假喚醒問題,以及請求和響應關係對應問題,但願本文對你們有所幫助。

歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我我的微信「java_front」一塊兒交流學習

相關文章
相關標籤/搜索