dubbo_遠程同步調用原理

Dubbo缺省協議採用單一長鏈接和NIO異步通信,適合於小數據量大併發的服務調用,以及服務消費者機器數遠大於服務提供者機器數的狀況。java

 

【原創】Alibaba Dubbo框架同步調用原理分析-1 - sun - 學無止境

Dubbo缺省協議,使用基於mina1.1.7+hessian3.2.1的tbremoting交互。apache

  • 鏈接個數:單鏈接
  • 鏈接方式:長鏈接
  • 傳輸協議:TCP
  • 傳輸方式:NIO異步傳輸
  • 序列化:Hessian二進制序列化
  • 適用範圍:傳入傳出參數數據包較小(建議小於100K),消費者比提供者個數多,單一消費者沒法壓滿提供者,儘可能不要用dubbo協議傳輸大文件或超大字符串
  • 適用場景:常規遠程服務方法調用

 一般,一個典型的同步遠程調用應該是這樣的:併發

1,  客戶端線程調用遠程接口,向服務端發送請求,同時當前線程應該處於「暫停「狀態,即線程不能向後執行了,必須要拿到服務端給本身的結果後才能向後執行
2, 服務端接到客戶端請求後,處理請求,將結果給客戶端

3, 客戶端收到結果,而後當前線程繼續日後執行app

 

Dubbo裏使用到了Socket(採用apache mina框架作底層調用)來創建長鏈接,發送、接收數據,底層使用apache mina框架的IoSession進行發送消息。  
Dubbo底層使用Socket發送消息的形式進行數據傳遞,結合了mina框架,使用IoSession.write()方法,這個方法調用後對於整個遠程調用(從發出請求到接收到結果)來講是一個異步的,即對於當前線程來講,將請求發送出來,線程就能夠日後執行了,至於服務端的結果,是服務端處理完成後,再以消息的形式發送給客戶端的。
因而這裏出現了2個問題:
  • 當前線程怎麼讓它「暫停」,等結果回來後,再向後執行?
  • 正如前面所說,Socket通訊是一個全雙工的方式,若是有多個線程同時進行遠程方法調用,這時創建在client server之間的socket鏈接上會有不少雙方發送的消息傳遞,先後順序也多是亂七八糟的,server處理完結果後,將結果消息發送給client,client收到不少消息,怎麼知道哪一個消息結果是原先哪一個線程調用的?
   基本原理以下:
  1. client一個線程調用遠程接口,生成一個惟一的ID(好比一段隨機字符串,UUID等),Dubbo是使用AtomicLong從0開始累計數字的
  2. 將打包的方法調用信息(如調用的接口名稱,方法名稱,參數值列表等),和處理結果的回調對象callback,所有封裝在一塊兒,組成一個對象object
  3. 向專門存放調用信息的全局ConcurrentHashMap裏面put(ID, object)
  4. ID和打包的方法調用信息封裝成一對象connRequest,使用IoSession.write(connRequest)異步發送出去
  5. 當前線程再使用callback的get()方法試圖獲取遠程返回的結果,在get()內部,則使用synchronized獲取回調對象callback的鎖, 再先檢測是否已經獲取到結果,若是沒有,而後調用callback的wait()方法,釋放callback上的鎖,讓當前線程處於等待狀態。
  6. 服務端接收到請求並處理後,將結果(此結果中包含了前面的ID,即回傳)發送給客戶端,客戶端socket鏈接上專門監聽消息的線程收到消息,分析結果,取到ID,再從前面的ConcurrentHashMap裏面get(ID),從而找到callback,將方法調用結果設置到callback對象裏。
  7. 監聽線程接着使用synchronized獲取回調對象callback的鎖(由於前面調用過wait(),那個線程已釋放callback的鎖了),再notifyAll(),喚醒前面處於等待狀態的線程繼續執行(callback的get()方法繼續執行就能拿到調用結果了),至此,整個過程結束。

   須要注意的是,這裏的callback對象是每次調用產生一個新的,不能共享,不然會有問題;另外ID必需至少保證在一個Socket鏈接裏面是惟一的。框架

 

如今,前面兩個問題已經有答案了,

 

  • 當前線程怎麼讓它「暫停」,等結果回來後,再向後執行?
     答:先生成一個對象obj,在一個全局map裏put(ID,obj)存放起來,再用synchronized獲取obj鎖,再調用obj.wait()讓當前線程處於等待狀態,而後另外一消息監聽線程等到服務端結果來了後,再map.get(ID)找到obj,再用synchronized獲取obj鎖,再調用obj.notifyAll()喚醒前面處於等待狀態的線程。
  • 正如前面所說,Socket通訊是一個全雙工的方式,若是有多個線程同時進行遠程方法調用,這時創建在client server之間的socket鏈接上會有不少雙方發送的消息傳遞,先後順序也多是亂七八糟的,server處理完結果後,將結果消息發送給client,client收到不少消息,怎麼知道哪一個消息結果是原先哪一個線程調用的?

 

       答:使用一個ID,讓其惟一,而後傳遞給服務端,再服務端又回傳回來,這樣就知道結果是原先哪一個線程的了。
 

另外:異步

   服務端在處理客戶端的消息,而後再處理時,使用了線程池來並行處理,不用一個一個消息的處理socket

   一樣,客戶端接收到服務端的消息,也是使用線程池來處理消息,再回調學習

   消息中間件rabbitmq遠程接口調用,同步調用的原理跟這相似,詳見:rabbitmq 學習-9- RpcClient發送消息和同步接收消息原理
 
關鍵代碼:
com.taobao.remoting.impl.DefaultClient.java
//同步調用遠程接口
public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {
        byte protocol = getProtocol(control);
        if (!TRConstants.isValidProtocol(protocol)) {
            throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");
        }
        ResponseFuture future = invokeWithFuture(appRequest, control);
        return future.get();  //獲取結果時讓當前線程等待,ResponseFuture其實就是前面說的callback
}
public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {
         byte protocol = getProtocol(control);
         long timeout = getTimeout(control);
         ConnectionRequest request = new ConnectionRequest(appRequest);
         request.setSerializeProtocol(protocol);
         Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);
         connection.sendRequestWithCallback(request, adapter, timeout);
         return adapter;
}
Callback2FutureAdapter implements ResponseFuture
public Object get() throws RemotingException, InterruptedException {
 synchronized (this) {  // 旋鎖
     while (!isDone) {  // 是否有結果了
  wait(); //沒結果是釋放鎖,讓當前線程處於等待狀態
     }
 }
 if (errorCode == TRConstants.RESULT_TIMEOUT) {
     throw new TimeoutException("Wait response timeout, request["
      + connectionRequest.getAppRequest() + "].");
 }
 else if (errorCode > 0) {
     throw new RemotingException(errorMsg);
 }
 else {
     return appResp;
 }
}
客戶端收到服務端結果後,回調時相關方法,即設置isDone = true並notifyAll()
public void handleResponse(Object _appResponse) {
         appResp = _appResponse; //將遠程調用結果設置到callback中來
         setDone();
}
public void onRemotingException(int _errorType, String _errorMsg) {
         errorCode = _errorType;
         errorMsg = _errorMsg;
         setDone();
}
private void setDone() {
         isDone = true;
         synchronized (this) { //獲取鎖,由於前面wait()已經釋放了callback的鎖了
             notifyAll(); // 喚醒處於等待的線程
         }
}
CallbackExecutorTask
static private class CallbackExecutorTask implements Runnable {
         final ConnectionResponse resp;
         final ResponseCallback callback;
         final Thread createThread;
 
         CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) {
             resp = _resp;
             callback = _cb;
             createThread = Thread.currentThread();
         }
 
         public void run() {
             // 預防這種狀況:業務提供的Executor,讓調用者線程來執行任務
             if (createThread == Thread.currentThread()
                       && callback.getExecutor() != DIYExecutor.getInstance()) {
                   StringBuilder sb = new StringBuilder();
                   sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:");
                   sb.append("Can not callback task on the network io thhread.");
                   LOGGER.warn(sb.toString());
                   return;
             }
 
             if (TRConstants.RESULT_SUCCESS == resp.getResult()) {
                   callback.handleResponse(resp.getAppResponse()); //設置調用結果
             }
             else {
                   callback.onRemotingException(resp.getResult(), resp
                            .getErrorMsg());  //處理調用異常
             }
         }
}

 

 
轉自http://blog.163.com/tsing_hua/blog/static/1396222242012819557547/
相關文章
相關標籤/搜索