[Java併發-9]Lock和Condition(下) Dubbo如何用管程實現異步轉同步?

在上一篇文章中,咱們講到 Java SDK 併發包裏的 Lock 有別於 synchronized 隱式鎖的三個特性:可以響應中斷、支持超時和非阻塞地獲取鎖。那今天咱們接着再來詳細聊聊 Java SDK 併發包裏的 Condition。編程

Condition 實現了管程模型裏面的條件變量

在以前咱們詳細講過, Java 語言內置的管程裏只有一個條件變量,而 Lock&Condition 實現的管程是支持多個條件變量的,這是兩者的一個重要區別。併發

在不少併發場景下,支持多個條件變量可以讓咱們的併發程序可讀性更好,實現起來也更容易。例如,實現一個阻塞隊列,就須要兩個條件變量。框架

這裏咱們溫故知新下前面的內容。異步

public class BlockedQueue<T>{
  final Lock lock =
    new ReentrantLock();
  // 條件變量:隊列不滿  
  final Condition notFull =
    lock.newCondition();
  // 條件變量:隊列不空  
  final Condition notEmpty =
    lock.newCondition();

  // 入隊
  void enq(T x) {
    lock.lock();
    try {
      while (隊列已滿){
        // 等待隊列不滿
        notFull.await();
      }  
      // 省略入隊操做...
      // 入隊後, 通知可出隊
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
  // 出隊
  void deq(){
    lock.lock();
    try {
      while (隊列已空){
        // 等待隊列不空
        notEmpty.await();
      }  
      // 省略出隊操做...
      // 出隊後,通知可入隊
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

不過,這裏你須要注意,Lock 和 Condition 實現的管程,線程等待和通知須要調用 await()、signal()、signalAll(),它們的語義和 wait()、notify()、notifyAll() 是相同的, 不要相互使用。源碼分析

下面咱們就來看看在知名項目 Dubbo 中,Lock 和 Condition 是怎麼用的。不過在開始介紹源碼以前,我還先要介紹兩個概念:同步和異步。線程

通俗點來說就是調用方是否須要等待結果,若是須要等待結果,就是同步;若是不須要等待結果,就是異步

Dubbo 源碼分析

其實在編程領域,異步的場景仍是挺多的,好比 TCP 協議自己就是異步的,咱們工做中常常用到的 RPC 調用,在 TCP 協議層面,發送完 RPC 請求後,線程是不會等待 RPC 的響應結果的 。可能你會以爲奇怪,平時工做中的 RPC 調用大多數都是同步的啊?這是怎麼回事呢?code

其實很簡單,必定是有人幫你作了異步轉同步的事情。例如目前知名的 RPC 框架 Dubbo 就給咱們作了異步轉同步的事情,那它是怎麼作的呢?下面咱們就來分析一下 Dubbo 的相關源碼。隊列

對於下面一個簡單的 RPC 調用,默認狀況下 sayHello() 方法,是個同步方法,也就是說,執行 service.sayHello(「dubbo」) 的時候,線程會停下來等結果。get

DemoService service = 初始化部分省略
String message = 
  service.sayHello("dubbo");
System.out.println(message);

不過爲了理清先後關係,仍是有必要分析一下調用 DefaultFuture.get() 以前發生了什麼。DubboInvoker 的 108 行調用了 DefaultFuture.get()。這一行先調用了 request(inv, timeout) 方法,這個方法其實就是發送 RPC 請求,以後經過調用 get() 方法等待 RPC 返回結果。同步

public class DubboInvoker{
  Result doInvoke(Invocation inv){
    // 下面這行就是源碼中 108 行
    // 爲了便於展現,作了修改
    return currentClient 
      .request(inv, timeout)
      .get();
  }
}

DefaultFuture 這個類是很關鍵,代碼精簡以後內容以下。
不過在看代碼以前,你仍是有必要重複一下咱們的需求:

  1. 當 RPC 返回結果以前,阻塞調用線程,讓調用線程等待;
  2. 當 RPC 返回結果後,喚醒調用線程,讓調用線程從新執行。

不知道你有沒有似曾相識的感受,這需求其實就是經典的等待 - 通知機制嗎?這個時候想必你的腦海裏應該可以浮現出管程的解決方案了。有了本身的方案以後,咱們再來看看 Dubbo 是怎麼實現的。

// 建立鎖與條件變量
private final Lock lock 
    = new ReentrantLock();
private final Condition done 
    = lock.newCondition();

// 調用方經過該方法等待結果
Object get(int timeout){
  long start = System.nanoTime();
  lock.lock();
  try {
    while (!isDone()) {
      done.await(timeout);
      long cur=System.nanoTime();
      if (isDone() || 
          cur-start > timeout){
        break;
      }
    }
  } finally {
    lock.unlock();
  }
  if (!isDone()) {
    throw new TimeoutException();
  }
  return returnFromResponse();
}
// RPC 結果是否已經返回
boolean isDone() {
  return response != null;
}
// RPC 結果返回時調用該方法   
private void doReceived(Response res) {
  lock.lock();
  try {
    response = res;
    if (done != null) {
      done.signal();
    }
  } finally {
    lock.unlock();
  }
}

調用線程經過調用 get() 方法等待 RPC 返回結果,這個方法裏面,你看到的都是熟悉的「面孔」:調用 lock() 獲取鎖,在 finally 裏面調用 unlock() 釋放鎖;獲取鎖後,經過經典的在循環中調用 await() 方法來實現等待。

當 RPC 結果返回時,會調用 doReceived() 方法,這個方法裏面,調用 lock() 獲取鎖,在 finally 裏面調用 unlock() 釋放鎖,獲取鎖後經過調用 signal() 來通知調用線程,結果已經返回,不用繼續等待了。

小結

Lock&Condition 是管程的一種實現,因此可否用好 Lock 和 Condition 要看你對管程模型理解得怎麼樣。管程的技術前面咱們已經專門用了一篇文章作了介紹,你能夠結合着來學,理論聯繫實踐,有助於加深理解。

Lock&Condition 實現的管程相對於 synchronized 實現的管程來講更加靈活、功能也更豐富。

但若是你對實現感興趣,Java SDK 併發包裏鎖和條件變量是如何實現的,能夠參考《Java 併發編程的藝術》一書的第 5 章《Java 中的鎖》,裏面詳細介紹了實現原理。

相關文章
相關標籤/搜索