Dubbo缺省協議採用單一長鏈接和NIO異步通信,適合於小數據量大併發的服務調用,以及服務消費者機器數遠大於服務提供者機器數的狀況。java
Dubbo缺省協議,使用基於mina1.1.7+hessian3.2.1的tbremoting交互。apache
一般,一個典型的同步遠程調用應該是這樣的:併發
3, 客戶端收到結果,而後當前線程繼續日後執行app
須要注意的是,這裏的callback對象是每次調用產生一個新的,不能共享,不然會有問題;另外ID必需至少保證在一個Socket鏈接裏面是惟一的。框架
另外:異步
服務端在處理客戶端的消息,而後再處理時,使用了線程池來並行處理,不用一個一個消息的處理socket
一樣,客戶端接收到服務端的消息,也是使用線程池來處理消息,再回調學習
com.taobao.remoting.impl.DefaultClient.java //同步調用遠程接口 public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException { byte protocol = getProtocol(control); if (!TRConstants.isValidProtocol(protocol)) { throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync."); } ResponseFuture future = invokeWithFuture(appRequest, control); return future.get(); //獲取結果時讓當前線程等待,ResponseFuture其實就是前面說的callback } public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) { byte protocol = getProtocol(control); long timeout = getTimeout(control); ConnectionRequest request = new ConnectionRequest(appRequest); request.setSerializeProtocol(protocol); Callback2FutureAdapter adapter = new Callback2FutureAdapter(request); connection.sendRequestWithCallback(request, adapter, timeout); return adapter; }
Callback2FutureAdapter implements ResponseFuture public Object get() throws RemotingException, InterruptedException { synchronized (this) { // 旋鎖 while (!isDone) { // 是否有結果了 wait(); //沒結果是釋放鎖,讓當前線程處於等待狀態 } } if (errorCode == TRConstants.RESULT_TIMEOUT) { throw new TimeoutException("Wait response timeout, request[" + connectionRequest.getAppRequest() + "]."); } else if (errorCode > 0) { throw new RemotingException(errorMsg); } else { return appResp; } } 客戶端收到服務端結果後,回調時相關方法,即設置isDone = true並notifyAll() public void handleResponse(Object _appResponse) { appResp = _appResponse; //將遠程調用結果設置到callback中來 setDone(); } public void onRemotingException(int _errorType, String _errorMsg) { errorCode = _errorType; errorMsg = _errorMsg; setDone(); } private void setDone() { isDone = true; synchronized (this) { //獲取鎖,由於前面wait()已經釋放了callback的鎖了 notifyAll(); // 喚醒處於等待的線程 } }
CallbackExecutorTask static private class CallbackExecutorTask implements Runnable { final ConnectionResponse resp; final ResponseCallback callback; final Thread createThread; CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) { resp = _resp; callback = _cb; createThread = Thread.currentThread(); } public void run() { // 預防這種狀況:業務提供的Executor,讓調用者線程來執行任務 if (createThread == Thread.currentThread() && callback.getExecutor() != DIYExecutor.getInstance()) { StringBuilder sb = new StringBuilder(); sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:"); sb.append("Can not callback task on the network io thhread."); LOGGER.warn(sb.toString()); return; } if (TRConstants.RESULT_SUCCESS == resp.getResult()) { callback.handleResponse(resp.getAppResponse()); //設置調用結果 } else { callback.onRemotingException(resp.getResult(), resp .getErrorMsg()); //處理調用異常 } } }