dubbo原理結構網絡
組件角色:併發
Registry註冊中心:app
經過將服務統一管理起來,能夠有效地優化內部應用對服務發佈/使用的流程和管理。服務註冊中心能夠經過特定協議來完成服務對外的統一。Dubbo提供的註冊中心有以下幾種類型可供選擇:框架
Provider服務提供者:異步
提供具體服務的提供方。socket
Consumer消費者:ide
服務的消費方。優化
Monitor監控中心:ui
統計服務的調用次調和調用時間的監控中心。
Container: 服務運行容器。this
具體的框架
Dubbo缺省協議採用單一長鏈接和NIO異步通信,適合於小數據量大併發的服務調用,以及服務消費者機器數遠大於服務提供者機器數的狀況。
Dubbo缺省協議,使用基於mina1.1.7+hessian3.2.1的tbremoting交互。
主要原理就是消費方經過服務方提供接口的動態代理實現遠程調用,動態代理中將接口名稱、方法名稱、參數、進行打包,將其序列化再經過底層網絡通訊組件mina/netty發送到服務方。服務方將請求的結果返回給服務端。
大體步驟以下:
思考:
答:先生成一個對象obj,在一個全局map裏put(ID,obj)存放起來,再用synchronized獲取obj鎖,再調用obj.wait()讓當前線程處於等待狀態,而後另外一消息監聽線程等到服務端結果來了後,再map.get(ID)找到obj,再用synchronized獲取obj鎖,再調用obj.notifyAll()喚醒前面處於等待狀態的線程。
關於回調對象
客戶端代碼:
1 //同步調用遠程接口 2 public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException { 3 byte protocol = getProtocol(control); 4 if (!TRConstants.isValidProtocol(protocol)) { 5 throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync."); 6 } 7 ResponseFuture future = invokeWithFuture(appRequest, control); 8 return future.get(); //獲取結果時讓當前線程等待,ResponseFuture其實就是前面說的callback 9 } 10 public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) { 11 byte protocol = getProtocol(control); 12 long timeout = getTimeout(control); 13 ConnectionRequest request = new ConnectionRequest(appRequest); 14 request.setSerializeProtocol(protocol); 15 Callback2FutureAdapter adapter = new Callback2FutureAdapter(request); 16 connection.sendRequestWithCallback(request, adapter, timeout); 17 return adapter; 18 }
1 Callback2FutureAdapter implements ResponseFuture 2 public Object get() throws RemotingException, InterruptedException { 3 synchronized (this) { // 旋鎖 4 while (!isDone) { // 是否有結果了 5 wait(); //沒結果是釋放鎖,讓當前線程處於等待狀態 6 } 7 } 8 if (errorCode == TRConstants.RESULT_TIMEOUT) { 9 throw new TimeoutException("Wait response timeout, request[" 10 + connectionRequest.getAppRequest() + "]."); 11 } 12 else if (errorCode > 0) { 13 throw new RemotingException(errorMsg); 14 } 15 else { 16 return appResp; 17 } 18 } 19 客戶端收到服務端結果後,回調時相關方法,即設置isDone = true並notifyAll() 20 public void handleResponse(Object _appResponse) { 21 appResp = _appResponse; //將遠程調用結果設置到callback中來 22 setDone(); 23 } 24 public void onRemotingException(int _errorType, String _errorMsg) { 25 errorCode = _errorType; 26 errorMsg = _errorMsg; 27 setDone(); 28 } 29 private void setDone() { 30 isDone = true; 31 synchronized (this) { //獲取鎖,由於前面wait()已經釋放了callback的鎖了 32 notifyAll(); // 喚醒處於等待的線程 33 } 34 }
通訊部分源碼
1 2 // 用來存放請求和回調的MAP 3 private final ConcurrentHashMap<Long, Object[]> requestResidents; 4 5 //發送消息出去 6 void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) { 7 long requestId = connRequest.getId(); 8 long waitBegin = System.currentTimeMillis(); 9 long waitEnd = waitBegin + timeoutMs; 10 Object[] queue = new Object[4]; 11 int idx = 0; 12 queue[idx++] = waitEnd; 13 queue[idx++] = waitBegin; //用於記錄日誌 14 queue[idx++] = connRequest; //用於記錄日誌 15 queue[idx++] = callback; 16 requestResidents.put(requestId, queue); // 記錄響應隊列 17 write(connRequest); 18 19 // 埋點記錄等待響應的Map的大小 20 StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(), 21 1L); 22 } 23 public void write(final Object connectionMsg) { 24 //mina裏的IoSession.write()發送消息 25 WriteFuture writeFuture = ioSession.write(connectionMsg); 26 // 註冊FutureListener,當請求發送失敗後,可以當即作出響應 27 writeFuture.addListener(new MsgWrittenListener(this, connectionMsg)); 28 } 29 30 /** 31 * 在獲得響應後,刪除對應的請求隊列,並執行回調 32 * 調用者:MINA線程 33 */ 34 public void putResponse(final ConnectionResponse connResp) { 35 final long requestId = connResp.getRequestId(); 36 Object[] queue = requestResidents.remove(requestId); 37 if (null == queue) { 38 Object appResp = connResp.getAppResponse(); 39 String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName(); 40 StringBuilder sb = new StringBuilder(); 41 sb.append("Not found response receiver for requestId=[").append(requestId).append("],"); 42 sb.append("from [").append(connResp.getHost()).append("],"); 43 sb.append("response type [").append(appRespClazz).append("]."); 44 LOGGER.warn(sb.toString()); 45 return; 46 } 47 int idx = 0; 48 idx++; 49 long waitBegin = (Long) queue[idx++]; 50 ConnectionRequest connRequest = (ConnectionRequest) queue[idx++]; 51 ResponseCallback callback = (ResponseCallback) queue[idx++]; 52 // ** 把回調任務交給業務提供的線程池執行 ** 53 Executor callbackExecutor = callback.getExecutor(); 54 callbackExecutor.execute(new CallbackExecutorTask(connResp, callback)); 55 56 long duration = System.currentTimeMillis() - waitBegin; // 實際讀響應時間 57 logIfResponseError(connResp, duration, connRequest.getAppRequest()); 58 }