dubbo

dubbo原理結構網絡

組件角色:併發

Registry註冊中心app

  經過將服務統一管理起來,能夠有效地優化內部應用對服務發佈/使用的流程和管理。服務註冊中心能夠經過特定協議來完成服務對外的統一。Dubbo提供的註冊中心有以下幾種類型可供選擇:框架

  • Multicast註冊中心
  • Zookeeper註冊中心
  • Redis註冊中心
  • Simple註冊中心

Provider服務提供者:異步

  提供具體服務的提供方。socket

Consumer消費者:ide

  服務的消費方。優化

Monitor監控中心:ui

  統計服務的調用次調和調用時間的監控中心。
Container: 服務運行容器。this

 

具體的框架

Dubbo缺省協議採用單一長鏈接和NIO異步通信,適合於小數據量大併發的服務調用,以及服務消費者機器數遠大於服務提供者機器數的狀況。

Dubbo缺省協議,使用基於mina1.1.7+hessian3.2.1的tbremoting交互。

  • 鏈接個數:單鏈接
  • 鏈接方式:長鏈接
  • 傳輸協議:TCP
  • 傳輸方式:NIO異步傳輸
  • 序列化:Hessian二進制序列化
  • 適用範圍:傳入傳出參數數據包較小(建議小於100K),消費者比提供者個數多,單一消費者沒法壓滿提供者,儘可能不要用dubbo協議傳輸大文件或超大字符串。
  • 適用場景:常規遠程服務方法調用

主要原理就是消費方經過服務方提供接口的動態代理實現遠程調用,動態代理中將接口名稱、方法名稱、參數、進行打包,將其序列化再經過底層網絡通訊組件mina/netty發送到服務方。服務方將請求的結果返回給服務端。

大體步驟以下:

  1. client一個線程調用遠程接口,生成一個惟一的ID(好比一段隨機字符串,UUID等),Dubbo是使用AtomicLong從0開始累計數字的
  2. 將打包的方法調用信息(如調用的接口名稱,方法名稱,參數值列表等),和處理結果的回調對象callback,所有封裝在一塊兒,組成一個對象object
  3. 向專門存放調用信息的全局ConcurrentHashMap裏面put(ID, object)
  4. ID和打包的方法調用信息封裝成一對象connRequest,使用IoSession.write(connRequest)異步發送出去
  5. 當前線程再使用callback的get()方法試圖獲取遠程返回的結果,在get()內部,則使用synchronized獲取回調對象callback的鎖, 再先檢測是否已經獲取到結果,若是沒有,而後調用callback的wait()方法,釋放callback上的鎖,讓當前線程處於等待狀態。
  6. 服務端接收到請求並處理後,將結果(此結果中包含了前面的ID,即回傳)發送給客戶端,客戶端socket鏈接上專門監聽消息的線程收到消息,分析結果,取到ID,再從前面的ConcurrentHashMap裏面get(ID),從而找到callback,將方法調用結果設置到callback對象裏。
  7. 監聽線程接着使用synchronized獲取回調對象callback的鎖(由於前面調用過wait(),那個線程已釋放callback的鎖了),再notifyAll(),喚醒前面處於等待狀態的線程繼續執行(callback的get()方法繼續執行就能拿到調用結果了),至此,整個過程結束。

思考:

  • 問:當前線程怎麼讓它「暫停」,等結果回來後,再向後執行?

     答:先生成一個對象obj,在一個全局map裏put(ID,obj)存放起來,再用synchronized獲取obj鎖,再調用obj.wait()讓當前線程處於等待狀態,而後另外一消息監聽線程等到服務端結果來了後,再map.get(ID)找到obj,再用synchronized獲取obj鎖,再調用obj.notifyAll()喚醒前面處於等待狀態的線程。

  • 問:正如前面所說,Socket通訊是一個全雙工的方式,若是有多個線程同時進行遠程方法調用,這時創建在client server之間的socket鏈接上會有不少雙方發送的消息傳遞,先後順序也多是亂七八糟的,server處理完結果後,將結果消息發送給client,client收到不少消息,怎麼知道哪一個消息結果是原先哪一個線程調用的?
      答:使用一個 ID,讓其惟一,而後傳遞給服務端,再服務端又回傳回來,這樣就知道結果是原先哪一個線程的了。

關於回調對象

客戶端代碼:

 1 //同步調用遠程接口
 2 public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {
 3         byte protocol = getProtocol(control);
 4         if (!TRConstants.isValidProtocol(protocol)) {
 5             throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");
 6         }
 7         ResponseFuture future = invokeWithFuture(appRequest, control);
 8         return future.get();  //獲取結果時讓當前線程等待,ResponseFuture其實就是前面說的callback
 9 }
10 public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {
11          byte protocol = getProtocol(control);
12          long timeout = getTimeout(control);
13          ConnectionRequest request = new ConnectionRequest(appRequest);
14          request.setSerializeProtocol(protocol);
15          Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);
16          connection.sendRequestWithCallback(request, adapter, timeout);
17          return adapter;
18 }
 1 Callback2FutureAdapter implements ResponseFuture
 2 public Object get() throws RemotingException, InterruptedException {
 3     synchronized (this) {  // 旋鎖
 4         while (!isDone) {  // 是否有結果了
 5             wait(); //沒結果是釋放鎖,讓當前線程處於等待狀態
 6         }
 7     }
 8     if (errorCode == TRConstants.RESULT_TIMEOUT) {
 9          throw new TimeoutException("Wait response timeout, request["
10          + connectionRequest.getAppRequest() + "].");
11     }
12     else if (errorCode > 0) {
13         throw new RemotingException(errorMsg);
14     }
15     else {
16          return appResp;
17     }
18 }
19 客戶端收到服務端結果後,回調時相關方法,即設置isDone = true並notifyAll()
20 public void handleResponse(Object _appResponse) {
21          appResp = _appResponse; //將遠程調用結果設置到callback中來
22          setDone();
23 }
24 public void onRemotingException(int _errorType, String _errorMsg) {
25          errorCode = _errorType;
26          errorMsg = _errorMsg;
27          setDone();
28 }
29 private void setDone() {
30          isDone = true;
31          synchronized (this) { //獲取鎖,由於前面wait()已經釋放了callback的鎖了
32              notifyAll(); // 喚醒處於等待的線程
33          }
34 }

通訊部分源碼

 1  
 2 // 用來存放請求和回調的MAP
 3 private final ConcurrentHashMap<Long, Object[]> requestResidents;
 4  
 5 //發送消息出去
 6 void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {
 7          long requestId = connRequest.getId();
 8          long waitBegin = System.currentTimeMillis();
 9          long waitEnd = waitBegin + timeoutMs;
10          Object[] queue = new Object[4];
11          int idx = 0;
12          queue[idx++] = waitEnd;
13          queue[idx++] = waitBegin;   //用於記錄日誌
14          queue[idx++] = connRequest; //用於記錄日誌
15          queue[idx++] = callback;
16          requestResidents.put(requestId, queue); // 記錄響應隊列
17          write(connRequest);
18  
19          // 埋點記錄等待響應的Map的大小
20          StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),
21                    1L);
22 }
23 public void write(final Object connectionMsg) {
24 //mina裏的IoSession.write()發送消息
25          WriteFuture writeFuture = ioSession.write(connectionMsg);
26          // 註冊FutureListener,當請求發送失敗後,可以當即作出響應
27          writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));
28 }
29  
30 /**
31 * 在獲得響應後,刪除對應的請求隊列,並執行回調
32 * 調用者:MINA線程
33 */
34 public void putResponse(final ConnectionResponse connResp) {
35          final long requestId = connResp.getRequestId();
36          Object[] queue = requestResidents.remove(requestId);
37          if (null == queue) {
38              Object appResp = connResp.getAppResponse();
39              String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();
40              StringBuilder sb = new StringBuilder();
41              sb.append("Not found response receiver for requestId=[").append(requestId).append("],");
42              sb.append("from [").append(connResp.getHost()).append("],");
43              sb.append("response type [").append(appRespClazz).append("].");
44              LOGGER.warn(sb.toString());
45              return;
46          }
47          int idx = 0;
48          idx++;
49          long waitBegin = (Long) queue[idx++];
50          ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];
51          ResponseCallback callback = (ResponseCallback) queue[idx++];
52          // ** 把回調任務交給業務提供的線程池執行 **
53          Executor callbackExecutor = callback.getExecutor();
54          callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));
55  
56          long duration = System.currentTimeMillis() - waitBegin; // 實際讀響應時間
57          logIfResponseError(connResp, duration, connRequest.getAppRequest());
58 }
相關文章
相關標籤/搜索