目標:介紹遠程調用中跟dubbo協議相關的設計和實現,介紹dubbo-rpc-dubbo的源碼。
Dubbo 缺省協議採用單一長鏈接和 NIO 異步通信,適合於小數據量大併發的服務調用,以及服務消費者機器數遠大於服務提供者機器數的狀況。反之,Dubbo 缺省協議不適合傳送大數據量的服務,好比傳文件,傳視頻等,除非請求量很低。這是官方文檔的原話,而且官方文檔還介紹了爲何使用單一長鏈接和 NIO 異步通信以及爲何不適合傳輸大數據的服務。我就不贅述了。java
咱們先來看看dubbo-rpc-dubbo下的包結構:git
該類是dubbo協議獨自實現的的invoker,其中實現了調用方法的三種模式,分別是異步發送、單向發送和同步發送,具體在下面介紹。github
/** * 信息交換客戶端數組 */ private final ExchangeClient[] clients; /** * 客戶端數組位置 */ private final AtomicPositiveInteger index = new AtomicPositiveInteger(); /** * 版本號 */ private final String version; /** * 銷燬鎖 */ private final ReentrantLock destroyLock = new ReentrantLock(); /** * Invoker對象集合 */ private final Set<Invoker<?>> invokers;
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { // rpc會話域 RpcInvocation inv = (RpcInvocation) invocation; // 得到方法名 final String methodName = RpcUtils.getMethodName(invocation); // 把path放入到附加值中 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); // 把版本號放入到附加值 inv.setAttachment(Constants.VERSION_KEY, version); // 當前的客戶端 ExchangeClient currentClient; // 若是數組內就一個客戶端,則直接取出 if (clients.length == 1) { currentClient = clients[0]; } else { // 取模輪詢 從數組中取,當取到最後一個時,從頭開始 currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 是否啓用異步 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是不是單向發送 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 得到超時時間 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 若是是單項發送 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // 單向發送只負責發送消息,不等待服務端應答,因此沒有返回值 currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { // 異步調用 ResponseFuture future = currentClient.request(inv, timeout); // 保存future,方便後期處理 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { // 同步調用,等待返回結果 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
在調用invoker的時候,經過遠程通訊將Invocation信息傳遞給服務端,服務端在接收到該invocation信息後,要找到對應的本地方法,而後經過反射執行該方法,將方法的執行結果返回給客戶端,在這裏,客戶端發送有三種模式:segmentfault
@Override public boolean isAvailable() { if (!super.isAvailable()) return false; for (ExchangeClient client : clients) { // 只要有一個客戶端鏈接而且不是隻讀,則表示存活 if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) { //cannot write == not Available ? return true; } } return false; }
該方法是檢查服務端是否存活。api
@Override public void destroy() { // in order to avoid closing a client multiple times, a counter is used in case of connection per jvm, every // time when client.close() is called, counter counts down once, and when counter reaches zero, client will be // closed. if (super.isDestroyed()) { return; } else { // double check to avoid dup close // 得到銷燬鎖 destroyLock.lock(); try { if (super.isDestroyed()) { return; } // 銷燬 super.destroy(); // 從集合中移除 if (invokers != null) { invokers.remove(this); } for (ExchangeClient client : clients) { try { // 關閉每個客戶端 client.close(ConfigUtils.getServerShutdownTimeout()); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } finally { // 釋放鎖 destroyLock.unlock(); } } }
該方法是銷燬服務端,關閉全部鏈接到遠程通訊客戶端。數組
該類繼承了AbstractExporter,是dubbo協議中獨有的服務暴露者。緩存
/** * 服務key */ private final String key; /** * 服務暴露者集合 */ private final Map<String, Exporter<?>> exporterMap; public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) { super(invoker); this.key = key; this.exporterMap = exporterMap; } @Override public void unexport() { super.unexport(); // 從集合中移除該key exporterMap.remove(key); }
其中對於服務暴露者用集合作了緩存,而且只重寫了了unexport。服務器
該類是dubbo協議的核心實現,其中增長了好比延遲加載等處理。 而且其中還包括了對服務暴露和服務引用的邏輯處理。併發
public static final String NAME = "dubbo"; /** * 默認端口號 */ public static final int DEFAULT_PORT = 20880; /** * 回調名稱 */ private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke"; /** * dubbo協議的單例 */ private static DubboProtocol INSTANCE; /** * 信息交換服務器集合 key:host:port value:ExchangeServer */ private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger> /** * 信息交換客戶端集合 */ private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger> /** * 懶加載的客戶端集合 */ private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<String, LazyConnectExchangeClient>(); /** * 鎖集合 */ private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>(); /** * 序列化類名集合 */ private final Set<String> optimizers = new ConcurrentHashSet<String>(); //consumer side export a stub service for dispatching event //servicekey-stubmethods /** * 本地存根服務方法集合 */ private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<String, String>(); /** * 新建一個請求處理器 */ private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { /** * 回覆請求結果,返回的是請求結果 * @param channel * @param message * @return * @throws RemotingException */ @Override public Object reply(ExchangeChannel channel, Object message) throws RemotingException { // 若是請求消息屬於會話域 if (message instanceof Invocation) { Invocation inv = (Invocation) message; // 得到暴露的invoker Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback // 若是是回調服務 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { // 得到 方法定義 String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; // 判斷看是否有會話域中的方法 if (methodsStr == null || methodsStr.indexOf(",") == -1) { hasMethod = inv.getMethodName().equals(methodsStr); } else { // 若是方法不止一個,則分割後遍歷查詢,找到了則設置爲true String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } // 若是沒有該方法,則打印告警日誌 if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } // 設置遠程地址 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); // 調用下一個調用鏈 return invoker.invoke(inv); } // 不然拋出異常 throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } /** * 接收消息 * @param channel * @param message * @throws RemotingException */ @Override public void received(Channel channel, Object message) throws RemotingException { // 若是消息是會話域中的消息,則調用reply方法。 if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } @Override public void connected(Channel channel) throws RemotingException { // 接收鏈接事件 invoke(channel, Constants.ON_CONNECT_KEY); } @Override public void disconnected(Channel channel) throws RemotingException { if (logger.isInfoEnabled()) { logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } // 接收斷開鏈接事件 invoke(channel, Constants.ON_DISCONNECT_KEY); } /** * 接收事件 * @param channel * @param methodKey */ private void invoke(Channel channel, String methodKey) { // 建立會話域 Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { // 接收事件 received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } /** * 建立會話域, 把url內的值加入到會話域的附加值中 * @param channel * @param url * @param methodKey * @return */ private Invocation createInvocation(Channel channel, URL url, String methodKey) { // 得到方法,methodKey是onconnect或者ondisconnect String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } // 建立一個rpc會話域 RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]); // 加入附加值path invocation.setAttachment(Constants.PATH_KEY, url.getPath()); // 加入附加值group invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); // 加入附加值interface invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); // 加入附加值version invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); // 若是是本地存根服務,則加入附加值dubbo.stub.event爲true if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } };
該屬性中關鍵的是實例化了一個請求處理器,其中實現了基於dubbo協議等鏈接、取消鏈接、回覆請求結果等方法。app
@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. // 獲得服務key group+"/"+serviceName+":"+serviceVersion+":"+port String key = serviceKey(url); // 建立exporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 加入到集合 exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); // 若是是本地存根事件而不是回調服務 if (isStubSupportEvent && !isCallbackservice) { // 得到本地存根的方法 String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); // 若是爲空,則拋出異常 if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { // 加入集合 stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 打開服務 openServer(url); // 序列化 optimizeSerialization(url); return exporter; }
該方法是基於dubbo協議的服務暴露,除了對於存根服務和本地服務進行標記之外,打開服務和序列化分別在openServer和optimizeSerialization中實現。
private void openServer(URL url) { // find server. String key = url.getAddress(); //client can export a service which's only for server to invoke // 客戶端是否能夠暴露僅供服務器調用的服務 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // 若是是的話 if (isServer) { // 得到信息交換服務器 ExchangeServer server = serverMap.get(key); if (server == null) { // 從新建立服務器對象,而後放入集合 serverMap.put(key, createServer(url)); } else { // server supports reset, use together with override // 重置 server.reset(url); } } }
該方法就是打開服務。其中的邏輯實際上是把服務對象放入集合中進行緩存,若是該地址對應的服務器不存在,則調用createServer建立一個服務器對象。
private ExchangeServer createServer(URL url) { // send readonly event when server closes, it's enabled by default // 服務器關閉時發送readonly事件,默認狀況下啓用 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // enable heartbeat by default // 心跳默認間隔一分鐘 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 得到遠程通信服務端實現方式,默認用netty3 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); /** * 若是沒有該配置,則拋出異常 */ if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); /** * 添加編解碼器DubboCodec實現 */ url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { // 啓動服務器 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } // 得到客戶端側設置的遠程通訊方式 str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { // 得到遠程通訊的實現集合 Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); // 若是客戶端側設置的遠程通訊方式不在支持的方式中,則拋出異常 if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
該方法就是根據url攜帶的遠程通訊實現方法來建立一個服務器對象。
private void optimizeSerialization(URL url) throws RpcException { // 得到類名 String className = url.getParameter(Constants.OPTIMIZER_KEY, ""); if (StringUtils.isEmpty(className) || optimizers.contains(className)) { return; } logger.info("Optimizing the serialization process for Kryo, FST, etc..."); try { // 加載類 Class clazz = Thread.currentThread().getContextClassLoader().loadClass(className); if (!SerializationOptimizer.class.isAssignableFrom(clazz)) { throw new RpcException("The serialization optimizer " + className + " isn't an instance of " + SerializationOptimizer.class.getName()); } // 強制類型轉化爲SerializationOptimizer SerializationOptimizer optimizer = (SerializationOptimizer) clazz.newInstance(); if (optimizer.getSerializableClasses() == null) { return; } // 遍歷序列化的類,把該類放入到集合進行緩存 for (Class c : optimizer.getSerializableClasses()) { SerializableClassRegistry.registerClass(c); } // 加入到集合 optimizers.add(className); } catch (ClassNotFoundException e) { throw new RpcException("Cannot find the serialization optimizer class: " + className, e); } catch (InstantiationException e) { throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e); } catch (IllegalAccessException e) { throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e); } }
該方法是把序列化的類放入到集合,以便進行序列化
@Override public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // 序列化 optimizeSerialization(url); // create rpc invoker. 建立一個DubboInvoker對象 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); // 把該invoker放入集合 invokers.add(invoker); return invoker; }
該方法是服務引用,其中就是新建一個DubboInvoker對象後把它放入到集合。
private ExchangeClient[] getClients(URL url) { // whether to share connection // 一個鏈接是否對於一個服務 boolean service_share_connect = false; // 得到url中歡愉鏈接共享的配置 默認爲0 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // if not configured, connection is shared, otherwise, one connection for one service // 若是爲0,則是共享類,而且鏈接數爲1 if (connections == 0) { service_share_connect = true; connections = 1; } // 建立數組 ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { // 若是共享,則得到共享客戶端對象,不然新建客戶端 if (service_share_connect) { clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; }
該方法是得到客戶端集合的方法,分爲共享客戶端和非共享客戶端。共享客戶端是共用同一個鏈接,非共享客戶端是每一個客戶端都有本身的一個鏈接。
private ExchangeClient getSharedClient(URL url) { String key = url.getAddress(); // 從集合中取出客戶端對象 ReferenceCountExchangeClient client = referenceClientMap.get(key); // 若是不爲空而且沒關閉鏈接,則計數器加1,返回 if (client != null) { if (!client.isClosed()) { client.incrementAndGetCount(); return client; } else { // 若是鏈接斷開,則從集合中移除 referenceClientMap.remove(key); } } locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { // 若是集合中有該key if (referenceClientMap.containsKey(key)) { // 則直接返回client return referenceClientMap.get(key); } // 不然新建一個鏈接 ExchangeClient exchangeClient = initClient(url); client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); // 存入集合 referenceClientMap.put(key, client); // 從ghostClientMap中移除 ghostClientMap.remove(key); // 從對象鎖中移除 locks.remove(key); return client; } }
該方法是得到分享的客戶端鏈接。
private ExchangeClient initClient(URL url) { // client type setting. // 得到客戶端的實現方法 默認netty3 String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); // 添加編碼器 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // enable heartbeat by default // 默認開啓心跳 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO is not allowed since it has severe performance issue. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client; try { // connection should be lazy // 是否須要延遲鏈接,,默認不開啓 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { // 建立延遲鏈接的客戶端 client = new LazyConnectExchangeClient(url, requestHandler); } else { // 不然就直接鏈接 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
該方法是新建一個客戶端鏈接
@Override public void destroy() { // 遍歷服務器逐個關閉 for (String key : new ArrayList<String>(serverMap.keySet())) { ExchangeServer server = serverMap.remove(key); if (server != null) { try { if (logger.isInfoEnabled()) { logger.info("Close dubbo server: " + server.getLocalAddress()); } server.close(ConfigUtils.getServerShutdownTimeout()); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } // 遍歷客戶端集合逐個關閉 for (String key : new ArrayList<String>(referenceClientMap.keySet())) { ExchangeClient client = referenceClientMap.remove(key); if (client != null) { try { if (logger.isInfoEnabled()) { logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); } client.close(ConfigUtils.getServerShutdownTimeout()); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } // 遍歷懶加載的集合,逐個關閉客戶端 for (String key : new ArrayList<String>(ghostClientMap.keySet())) { ExchangeClient client = ghostClientMap.remove(key); if (client != null) { try { if (logger.isInfoEnabled()) { logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress()); } client.close(ConfigUtils.getServerShutdownTimeout()); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } stubServiceMethodsMap.clear(); super.destroy(); }
該方法是銷燬的方法重寫。
該類是對當前通道內的客戶端調用消息進行包裝
/** * 通道 */ private final Channel channel; /** * 服務key */ private final String serviceKey; /** * 當前的客戶端 */ private final ExchangeClient currentClient;
@Override protected Result doInvoke(Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; // use interface's name as service path to export if it's not found on client side // 設置服務path,默認用接口名稱 inv.setAttachment(Constants.PATH_KEY, getInterface().getName()); // 設置回調的服務key inv.setAttachment(Constants.CALLBACK_SERVICE_KEY, serviceKey); try { // 若是是異步的 if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { // may have concurrency issue // 直接發送請求消息 currentClient.send(inv, getUrl().getMethodParameter(invocation.getMethodName(), Constants.SENT_KEY, false)); return new RpcResult(); } // 得到超時時間 int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (timeout > 0) { return (Result) currentClient.request(inv, timeout).get(); } else { return (Result) currentClient.request(inv).get(); } } catch (RpcException e) { throw e; } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, e.getMessage(), e); } catch (Throwable e) { // here is non-biz exception, wrap it. throw new RpcException(e.getMessage(), e); } }
該方法是在invoker調用的時候對發送請求消息進行了包裝。
該類是個內部沒,繼承了ClientDelegate,其中將編碼器變成了dubbo的編碼器,其餘方法比較簡單。
該類主要作了對於會話域內的數據進行序列化和解碼。
private static final Logger log = LoggerFactory.getLogger(DecodeableRpcInvocation.class); /** * 通道 */ private Channel channel; /** * 序列化類型 */ private byte serializationType; /** * 輸入流 */ private InputStream inputStream; /** * 請求 */ private Request request; /** * 是否解碼 */ private volatile boolean hasDecoded;
@Override public void decode() throws Exception { // 若是沒有解碼,則進行解碼 if (!hasDecoded && channel != null && inputStream != null) { try { decode(channel, inputStream); } catch (Throwable e) { if (log.isWarnEnabled()) { log.warn("Decode rpc invocation failed: " + e.getMessage(), e); } request.setBroken(true); request.setData(e); } finally { // 設置已經解碼 hasDecoded = true; } } } @Override public Object decode(Channel channel, InputStream input) throws IOException { // 對數據進行反序列化 ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); // dubbo版本 String dubboVersion = in.readUTF(); // 請求中放入dubbo版本 request.setVersion(dubboVersion); // 附加值內加入dubbo版本,path以及版本號 setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion); setAttachment(Constants.PATH_KEY, in.readUTF()); setAttachment(Constants.VERSION_KEY, in.readUTF()); // 設置方法名稱 setMethodName(in.readUTF()); try { // 方法參數數組 Object[] args; // 方法參數類型數組 Class<?>[] pts; // 描述 String desc = in.readUTF(); // 若是爲空,則方法參數數組和對方法參數類型數組都設置爲空 if (desc.length() == 0) { pts = DubboCodec.EMPTY_CLASS_ARRAY; args = DubboCodec.EMPTY_OBJECT_ARRAY; } else { // 分割成類,得到類數組 pts = ReflectUtils.desc2classArray(desc); // 建立等長等數組 args = new Object[pts.length]; for (int i = 0; i < args.length; i++) { try { // 讀取對象放入數組中 args[i] = in.readObject(pts[i]); } catch (Exception e) { if (log.isWarnEnabled()) { log.warn("Decode argument failed: " + e.getMessage(), e); } } } } // 設置參數類型 setParameterTypes(pts); Map<String, String> map = (Map<String, String>) in.readObject(Map.class); if (map != null && map.size() > 0) { // 得到全部附加值 Map<String, String> attachment = getAttachments(); if (attachment == null) { attachment = new HashMap<String, String>(); } // 把流中讀到的配置放入附加值 attachment.putAll(map); // 放回去 setAttachments(attachment); } //decode argument ,may be callback for (int i = 0; i < args.length; i++) { // 若是是回調,則再一次解碼 args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]); } setArguments(args); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read invocation data failed.", e)); } finally { if (in instanceof Cleanable) { ((Cleanable) in).cleanup(); } } return this; }
該方法就是處理Invocation內數據的邏輯,其中主要是作了序列化和解碼。把讀取出來的設置放入對對應位置傳遞給後面的調用。
該類是作了基於dubbo協議對prc結果的解碼
private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class); /** * 通道 */ private Channel channel; /** * 序列化類型 */ private byte serializationType; /** * 輸入流 */ private InputStream inputStream; /** * 響應 */ private Response response; /** * 會話域 */ private Invocation invocation; /** * 是否解碼 */ private volatile boolean hasDecoded;
@Override public Object decode(Channel channel, InputStream input) throws IOException { // 反序列化 ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); byte flag = in.readByte(); // 根據返回的不一樣結果來進行處理 switch (flag) { case DubboCodec.RESPONSE_NULL_VALUE: // 返回結果爲空 break; case DubboCodec.RESPONSE_VALUE: // try { // 得到返回類型數組 Type[] returnType = RpcUtils.getReturnTypes(invocation); // 根據返回類型讀取返回結果而且放入RpcResult setValue(returnType == null || returnType.length == 0 ? in.readObject() : (returnType.length == 1 ? in.readObject((Class<?>) returnType[0]) : in.readObject((Class<?>) returnType[0], returnType[1]))); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; case DubboCodec.RESPONSE_WITH_EXCEPTION: // 返回結果有異常 try { Object obj = in.readObject(); // 把異常放入RpcResult if (obj instanceof Throwable == false) throw new IOException("Response data error, expect Throwable, but get " + obj); setException((Throwable) obj); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS: // 返回值爲空,可是有附加值 try { // 把附加值加入到RpcResult setAttachments((Map<String, String>) in.readObject(Map.class)); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS: // 返回值 try { // 設置返回結果 Type[] returnType = RpcUtils.getReturnTypes(invocation); setValue(returnType == null || returnType.length == 0 ? in.readObject() : (returnType.length == 1 ? in.readObject((Class<?>) returnType[0]) : in.readObject((Class<?>) returnType[0], returnType[1]))); // 設置附加值 setAttachments((Map<String, String>) in.readObject(Map.class)); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS: // 返回結果有異常而且有附加值 try { // 設置異常 Object obj = in.readObject(); if (obj instanceof Throwable == false) throw new IOException("Response data error, expect Throwable, but get " + obj); setException((Throwable) obj); // 設置附加值 setAttachments((Map<String, String>) in.readObject(Map.class)); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read response data failed.", e)); } break; default: throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag); } if (in instanceof Cleanable) { ((Cleanable) in).cleanup(); } return this; } @Override public void decode() throws Exception { // 若是沒有解碼 if (!hasDecoded && channel != null && inputStream != null) { try { // 進行解碼 decode(channel, inputStream); } catch (Throwable e) { if (log.isWarnEnabled()) { log.warn("Decode rpc result failed: " + e.getMessage(), e); } response.setStatus(Response.CLIENT_ERROR); response.setErrorMessage(StringUtils.toString(e)); } finally { hasDecoded = true; } } }
該方法是對響應結果的解碼,其中根據不一樣的返回結果來對RpcResult設置不一樣的值。
該類實現了ExchangeClient接口,是ExchangeClient的裝飾器,用到了裝飾模式,是延遲鏈接的客戶端實現類。
// when this warning rises from invocation, program probably have bug. /** * 延遲鏈接請求錯誤key */ static final String REQUEST_WITH_WARNING_KEY = "lazyclient_request_with_warning"; private final static Logger logger = LoggerFactory.getLogger(LazyConnectExchangeClient.class); /** * 是否在延遲鏈接請求時錯誤 */ protected final boolean requestWithWarning; /** * url對象 */ private final URL url; /** * 請求處理器 */ private final ExchangeHandler requestHandler; /** * 鏈接鎖 */ private final Lock connectLock = new ReentrantLock(); // lazy connect, initial state for connection /** * 初始化狀態 */ private final boolean initialState; /** * 客戶端對象 */ private volatile ExchangeClient client; /** * 錯誤次數 */ private AtomicLong warningcount = new AtomicLong(0);
能夠看到有屬性ExchangeClient client,該類中不少方法就直接調用了client的方法。
public LazyConnectExchangeClient(URL url, ExchangeHandler requestHandler) { // lazy connect, need set send.reconnect = true, to avoid channel bad status. // 默認有重連 this.url = url.addParameter(Constants.SEND_RECONNECT_KEY, Boolean.TRUE.toString()); this.requestHandler = requestHandler; // 默認延遲鏈接初始化成功 this.initialState = url.getParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Constants.DEFAULT_LAZY_CONNECT_INITIAL_STATE); // 默認沒有錯誤 this.requestWithWarning = url.getParameter(REQUEST_WITH_WARNING_KEY, false); }
private void initClient() throws RemotingException { // 若是客戶端已經初始化,則直接返回 if (client != null) return; if (logger.isInfoEnabled()) { logger.info("Lazy connect to " + url); } // 得到鏈接鎖 connectLock.lock(); try { // 二次判空 if (client != null) return; // 新建一個客戶端 this.client = Exchangers.connect(url, requestHandler); } finally { // 釋放鎖 connectLock.unlock(); } }
該方法是初始化客戶端的方法。
@Override public ResponseFuture request(Object request) throws RemotingException { warning(request); initClient(); return client.request(request); }
該方法在調用client.request前調用了前面兩個方法,initClient我在上面講到了,就是用來初始化客戶端的。而warning是用來報錯的。
private void warning(Object request) { if (requestWithWarning) { // 每5000次報錯一次 if (warningcount.get() % 5000 == 0) { logger.warn(new IllegalStateException("safe guard client , should not be called ,must have a bug.")); } warningcount.incrementAndGet(); } }
每5000次記錄報錯一次。
該類也是對ExchangeClient的裝飾,其中加強了調用次數多功能。
/** * url對象 */ private final URL url; /** * 計數 */ private final AtomicInteger refenceCount = new AtomicInteger(0); // private final ExchangeHandler handler; /** * 延遲鏈接客戶端集合 */ private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap; /** * 客戶端對象 */ private ExchangeClient client;
// ghost client private LazyConnectExchangeClient replaceWithLazyClient() { // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false // 設置延遲鏈接初始化狀態、是否重連、是否已經重連等配置 URL lazyUrl = url.addParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.FALSE) .addParameter(Constants.RECONNECT_KEY, Boolean.FALSE) .addParameter(Constants.SEND_RECONNECT_KEY, Boolean.TRUE.toString()) .addParameter("warning", Boolean.TRUE.toString()) .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true) .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient"); // 得到服務地址 String key = url.getAddress(); // in worst case there's only one ghost connection. // 從集合中獲取客戶端 LazyConnectExchangeClient gclient = ghostClientMap.get(key); // 若是對應等客戶端不存在或者已經關閉鏈接,則從新建立一個延遲鏈接等客戶端,而且放入集合 if (gclient == null || gclient.isClosed()) { gclient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler()); ghostClientMap.put(key, gclient); } return gclient; }
該方法是用延遲鏈接替代,該方法在close方法中被調用。
@Override public void close(int timeout) { if (refenceCount.decrementAndGet() <= 0) { if (timeout == 0) { client.close(); } else { client.close(timeout); } client = replaceWithLazyClient(); } }
該類實現了Future接口,是響應的Future適配器。其中是基於ResponseFuture作適配。其中比較簡單,我就很少講解了。
該類是針對回調服務的編解碼器。
/** * 代理工廠 */ private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); /** * dubbo協議 */ private static final DubboProtocol protocol = DubboProtocol.getDubboProtocol(); /** * 回調的標誌 */ private static final byte CALLBACK_NONE = 0x0; /** * 回調的建立標誌 */ private static final byte CALLBACK_CREATE = 0x1; /** * 回調的銷燬標誌 */ private static final byte CALLBACK_DESTROY = 0x2; /** * 回調參數key */ private static final String INV_ATT_CALLBACK_KEY = "sys_callback_arg-";
public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException { // get URL directly // 直接得到url URL url = inv.getInvoker() == null ? null : inv.getInvoker().getUrl(); // 設置回調標誌 byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex); // 得到參數集合 Object[] args = inv.getArguments(); // 得到參數類型集合 Class<?>[] pts = inv.getParameterTypes(); // 根據不一樣的回調狀態來設置附加值和返回參數 switch (callbackstatus) { case CallbackServiceCodec.CALLBACK_NONE: return args[paraIndex]; case CallbackServiceCodec.CALLBACK_CREATE: inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], true)); return null; case CallbackServiceCodec.CALLBACK_DESTROY: inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], false)); return null; default: return args[paraIndex]; } }
該方法是對會話域的信息進行編碼。
public static Object decodeInvocationArgument(Channel channel, RpcInvocation inv, Class<?>[] pts, int paraIndex, Object inObject) throws IOException { // if it's a callback, create proxy on client side, callback interface on client side can be invoked through channel // need get URL from channel and env when decode URL url = null; try { // 得到url url = DubboProtocol.getDubboProtocol().getInvoker(channel, inv).getUrl(); } catch (RemotingException e) { if (logger.isInfoEnabled()) { logger.info(e.getMessage(), e); } return inObject; } // 得到回調狀態 byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex); // 根據回調狀態來返回結果 switch (callbackstatus) { case CallbackServiceCodec.CALLBACK_NONE: return inObject; case CallbackServiceCodec.CALLBACK_CREATE: try { return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), true); } catch (Exception e) { logger.error(e.getMessage(), e); throw new IOException(StringUtils.toString(e)); } case CallbackServiceCodec.CALLBACK_DESTROY: try { return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), false); } catch (Exception e) { throw new IOException(StringUtils.toString(e)); } default: return inObject; } }
該方法是對會話域內的信息進行解碼。
private static byte isCallBack(URL url, String methodName, int argIndex) { // parameter callback rule: method-name.parameter-index(starting from 0).callback // 參數的規則:ethod-name.parameter-index(starting from 0).callback byte isCallback = CALLBACK_NONE; if (url != null) { // 得到回調的值 String callback = url.getParameter(methodName + "." + argIndex + ".callback"); if (callback != null) { // 若是爲true,則設置爲建立標誌 if (callback.equalsIgnoreCase("true")) { isCallback = CALLBACK_CREATE; // 若是爲false,則設置爲銷燬標誌 } else if (callback.equalsIgnoreCase("false")) { isCallback = CALLBACK_DESTROY; } } } return isCallback; }
該方法是根據url攜帶的參數設置回調的標誌,以供執行不一樣的編解碼邏輯。
private static String exportOrunexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException { // 返回對象的hashCode int instid = System.identityHashCode(inst); Map<String, String> params = new HashMap<String, String>(3); // no need to new client again // 設置不是服務端標誌爲否 params.put(Constants.IS_SERVER_KEY, Boolean.FALSE.toString()); // mark it's a callback, for troubleshooting // 設置是回調服務標誌爲true params.put(Constants.IS_CALLBACK_SERVICE, Boolean.TRUE.toString()); String group = url.getParameter(Constants.GROUP_KEY); if (group != null && group.length() > 0) { // 設置是消費側仍是提供側 params.put(Constants.GROUP_KEY, group); } // add method, for verifying against method, automatic fallback (see dubbo protocol) // 添加方法,在dubbo的協議裏面用到 params.put(Constants.METHODS_KEY, StringUtils.join(Wrapper.getWrapper(clazz).getDeclaredMethodNames(), ",")); Map<String, String> tmpmap = new HashMap<String, String>(url.getParameters()); tmpmap.putAll(params); // 移除版本信息 tmpmap.remove(Constants.VERSION_KEY);// doesn't need to distinguish version for callback // 設置接口名 tmpmap.put(Constants.INTERFACE_KEY, clazz.getName()); // 建立服務暴露的url URL exporturl = new URL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(), channel.getLocalAddress().getPort(), clazz.getName() + "." + instid, tmpmap); // no need to generate multiple exporters for different channel in the same JVM, cache key cannot collide. // 得到緩存的key String cacheKey = getClientSideCallbackServiceCacheKey(instid); // 得到計數的key String countkey = getClientSideCountKey(clazz.getName()); // 若是是暴露服務 if (export) { // one channel can have multiple callback instances, no need to re-export for different instance. if (!channel.hasAttribute(cacheKey)) { if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) { // 得到代理對象 Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exporturl); // should destroy resource? // 暴露服務 Exporter<?> exporter = protocol.export(invoker); // this is used for tracing if instid has published service or not. // 放到通道 channel.setAttribute(cacheKey, exporter); logger.info("export a callback service :" + exporturl + ", on " + channel + ", url is: " + url); // 計數器加1 increaseInstanceCount(channel, countkey); } } } else { // 若是通道內已經有該服務的緩存 if (channel.hasAttribute(cacheKey)) { // 則得到該暴露者 Exporter<?> exporter = (Exporter<?>) channel.getAttribute(cacheKey); // 取消暴露 exporter.unexport(); // 移除該緩存 channel.removeAttribute(cacheKey); // 計數器減1 decreaseInstanceCount(channel, countkey); } } return String.valueOf(instid); }
該方法是在客戶端側暴露服務和取消暴露服務。
private static Object referOrdestroyCallbackService(Channel channel, URL url, Class<?> clazz, Invocation inv, int instid, boolean isRefer) { Object proxy = null; // 得到服務調用的緩存key String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid); // 得到代理緩存key String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid); // 從通道內得到代理對象 proxy = channel.getAttribute(proxyCacheKey); // 得到計數器key String countkey = getServerSideCountKey(channel, clazz.getName()); // 若是是服務引用 if (isRefer) { // 若是代理對象爲空 if (proxy == null) { // 得到服務引用的url URL referurl = URL.valueOf("callback://" + url.getAddress() + "/" + clazz.getName() + "?" + Constants.INTERFACE_KEY + "=" + clazz.getName()); referurl = referurl.addParametersIfAbsent(url.getParameters()).removeParameter(Constants.METHODS_KEY); if (!isInstancesOverLimit(channel, referurl, clazz.getName(), instid, true)) { @SuppressWarnings("rawtypes") Invoker<?> invoker = new ChannelWrappedInvoker(clazz, channel, referurl, String.valueOf(instid)); // 得到代理類 proxy = proxyFactory.getProxy(invoker); // 設置代理類 channel.setAttribute(proxyCacheKey, proxy); // 設置實體域 channel.setAttribute(invokerCacheKey, invoker); // 計數器加1 increaseInstanceCount(channel, countkey); //convert error fail fast . //ignore concurrent problem. Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>) channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY); if (callbackInvokers == null) { // 建立回調的服務實體域集合 callbackInvokers = new ConcurrentHashSet<Invoker<?>>(1); // 把該實體域加入集合中 callbackInvokers.add(invoker); channel.setAttribute(Constants.CHANNEL_CALLBACK_KEY, callbackInvokers); } logger.info("method " + inv.getMethodName() + " include a callback service :" + invoker.getUrl() + ", a proxy :" + invoker + " has been created."); } } } else { // 銷燬 if (proxy != null) { Invoker<?> invoker = (Invoker<?>) channel.getAttribute(invokerCacheKey); try { Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>) channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY); if (callbackInvokers != null) { // 從集合中移除 callbackInvokers.remove(invoker); } // 銷燬該調用 invoker.destroy(); } catch (Exception e) { logger.error(e.getMessage(), e); } // cancel refer, directly remove from the map // 取消引用,直接從集合中移除 channel.removeAttribute(proxyCacheKey); channel.removeAttribute(invokerCacheKey); // 計數器減1 decreaseInstanceCount(channel, countkey); } } return proxy; }
該方法是在服務端側進行服務引用或者銷燬回調服務。
該類是dubbo的編解碼器,分別針對dubbo協議的request和response進行編碼和解碼。
/** * dubbo名稱 */ public static final String NAME = "dubbo"; /** * 協議版本號 */ public static final String DUBBO_VERSION = Version.getProtocolVersion(); /** * 響應攜帶着異常 */ public static final byte RESPONSE_WITH_EXCEPTION = 0; /** * 響應 */ public static final byte RESPONSE_VALUE = 1; /** * 響應結果爲空 */ public static final byte RESPONSE_NULL_VALUE = 2; /** * 響應結果有異常而且帶有附加值 */ public static final byte RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS = 3; /** * 響應結果有附加值 */ public static final byte RESPONSE_VALUE_WITH_ATTACHMENTS = 4; /** * 響應結果爲空並帶有附加值 */ public static final byte RESPONSE_NULL_VALUE_WITH_ATTACHMENTS = 5; /** * 對象空集合 */ public static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; /** * 空的類集合 */ public static final Class<?>[] EMPTY_CLASS_ARRAY = new Class<?>[0]; private static final Logger log = LoggerFactory.getLogger(DubboCodec.class);
@Override protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); // get request id. long id = Bytes.bytes2long(header, 4); // 若是是response if ((flag & FLAG_REQUEST) == 0) { // decode response. // 建立一個response Response res = new Response(id); // 若是是事件,則設置事件,這裏有個問題,我提交了pr在新版本已經修復 if ((flag & FLAG_EVENT) != 0) { res.setEvent(Response.HEARTBEAT_EVENT); } // get status. // 設置狀態 byte status = header[3]; res.setStatus(status); try { // 反序列化 ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); // 若是狀態是響應成功 if (status == Response.OK) { Object data; // 若是是心跳事件,則按照心跳事件解碼 if (res.isHeartbeat()) { data = decodeHeartbeatData(channel, in); } else if (res.isEvent()) { // 若是是事件,則 data = decodeEventData(channel, in); } else { // 不然對結果進行解碼 DecodeableRpcResult result; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); result.decode(); } else { result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } // 把結果從新放入response中 res.setResult(data); } else { // 不然設置錯誤信息 res.setErrorMessage(in.readUTF()); } } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode response failed: " + t.getMessage(), t); } res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } return res; } else { // decode request. // 若是該消息是request Request req = new Request(id); // 設置版本 req.setVersion(Version.getProtocolVersion()); // 設置是不是雙向請求 req.setTwoWay((flag & FLAG_TWOWAY) != 0); // 設置是不是事件,該地方問題也在新版本修復 if ((flag & FLAG_EVENT) != 0) { req.setEvent(Request.HEARTBEAT_EVENT); } try { Object data; // 反序列化 ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto); // 進行解碼 if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, in); } else if (req.isEvent()) { data = decodeEventData(channel, in); } else { DecodeableRpcInvocation inv; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); inv.decode(); } else { inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } // 把body數據設置到response req.setData(data); } catch (Throwable t) { if (log.isWarnEnabled()) { log.warn("Decode request failed: " + t.getMessage(), t); } // bad request req.setBroken(true); req.setData(t); } return req; } }
該方法是對request和response進行解碼,用位運算來進行解碼,其中的邏輯跟我在 《dubbo源碼解析(十)遠程通訊——Exchange層》中講到的編解碼器邏輯差很少。
@Override protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { RpcInvocation inv = (RpcInvocation) data; // 輸出版本 out.writeUTF(version); // 輸出path out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); // 輸出版本號 out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); // 輸出方法名稱 out.writeUTF(inv.getMethodName()); // 輸出參數類型 out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); // 輸出參數 Object[] args = inv.getArguments(); if (args != null) for (int i = 0; i < args.length; i++) { out.writeObject(encodeInvocationArgument(channel, inv, i)); } // 輸出附加值 out.writeObject(inv.getAttachments()); }
該方法是對請求數據的編碼。
@Override protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { Result result = (Result) data; // currently, the version value in Response records the version of Request boolean attach = Version.isSupportResponseAttatchment(version); // 得到異常 Throwable th = result.getException(); if (th == null) { Object ret = result.getValue(); // 根據結果的不一樣輸出不一樣的值 if (ret == null) { out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE); } else { out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE); out.writeObject(ret); } } else { // 若是有異常,則輸出異常 out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION); out.writeObject(th); } if (attach) { // returns current version of Response to consumer side. // 在附加值中加入版本號 result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); // 輸出版本號 out.writeObject(result.getAttachments()); } }
該方法是對響應數據的編碼。
該類是對DubboCodec的功能加強,增長了消息長度的限制。
public final class DubboCountCodec implements Codec2 { private DubboCodec codec = new DubboCodec(); @Override public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { codec.encode(channel, buffer, msg); } @Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { // 保存讀取的標誌 int save = buffer.readerIndex(); MultiMessage result = MultiMessage.create(); do { Object obj = codec.decode(channel, buffer); // 粘包拆包 if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) { buffer.readerIndex(save); break; } else { // 增長消息 result.addMessage(obj); // 記錄消息長度 logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); } } while (true); // 若是結果爲空,則返回須要更多的輸入 if (result.isEmpty()) { return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1) { return result.get(0); } return result; } private void logMessageLength(Object result, int bytes) { if (bytes <= 0) { return; } // 若是是request類型 if (result instanceof Request) { try { // 設置附加值 ((RpcInvocation) ((Request) result).getData()).setAttachment( Constants.INPUT_KEY, String.valueOf(bytes)); } catch (Throwable e) { /* ignore */ } } else if (result instanceof Response) { try { // 設置附加值 輸出的長度 ((RpcResult) ((Response) result).getResult()).setAttachment( Constants.OUTPUT_KEY, String.valueOf(bytes)); } catch (Throwable e) { /* ignore */ } } } }
該過濾器是加強的功能是通道的跟蹤,會在通道內把最大的調用次數和如今的調用數量放進去。方便使用telnet來跟蹤服務的調用次數等。
/** * 跟蹤數量的最大值key */ private static final String TRACE_MAX = "trace.max"; /** * 跟蹤的數量 */ private static final String TRACE_COUNT = "trace.count"; /** * 通道集合 */ private static final ConcurrentMap<String, Set<Channel>> tracers = new ConcurrentHashMap<String, Set<Channel>>();
public static void addTracer(Class<?> type, String method, Channel channel, int max) { // 設置最大的數量 channel.setAttribute(TRACE_MAX, max); // 設置當前的數量 channel.setAttribute(TRACE_COUNT, new AtomicInteger()); // 得到key String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName(); // 得到通道集合 Set<Channel> channels = tracers.get(key); // 若是爲空,則新建 if (channels == null) { tracers.putIfAbsent(key, new ConcurrentHashSet<Channel>()); channels = tracers.get(key); } channels.add(channel); }
該方法是對某一個通道進行跟蹤,把如今的調用數量放到屬性裏面
public static void removeTracer(Class<?> type, String method, Channel channel) { // 移除最大值屬性 channel.removeAttribute(TRACE_MAX); // 移除數量屬性 channel.removeAttribute(TRACE_COUNT); String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName(); Set<Channel> channels = tracers.get(key); if (channels != null) { // 集合中移除該通道 channels.remove(channel); } }
該方法是移除通道的跟蹤。
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 開始時間 long start = System.currentTimeMillis(); // 調用下一個調用鏈 得到結果 Result result = invoker.invoke(invocation); // 調用結束時間 long end = System.currentTimeMillis(); // 若是通道跟蹤大小大於0 if (tracers.size() > 0) { // 服務key String key = invoker.getInterface().getName() + "." + invocation.getMethodName(); // 得到通道集合 Set<Channel> channels = tracers.get(key); if (channels == null || channels.isEmpty()) { key = invoker.getInterface().getName(); channels = tracers.get(key); } if (channels != null && !channels.isEmpty()) { // 遍歷通道集合 for (Channel channel : new ArrayList<Channel>(channels)) { // 若是通道是鏈接的 if (channel.isConnected()) { try { // 得到跟蹤的最大數 int max = 1; Integer m = (Integer) channel.getAttribute(TRACE_MAX); if (m != null) { max = (int) m; } // 得到跟蹤數量 int count = 0; AtomicInteger c = (AtomicInteger) channel.getAttribute(TRACE_COUNT); if (c == null) { c = new AtomicInteger(); channel.setAttribute(TRACE_COUNT, c); } count = c.getAndIncrement(); // 若是數量小於最大數量則發送 if (count < max) { String prompt = channel.getUrl().getParameter(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT); channel.send("\r\n" + RpcContext.getContext().getRemoteAddress() + " -> " + invoker.getInterface().getName() + "." + invocation.getMethodName() + "(" + JSON.toJSONString(invocation.getArguments()) + ")" + " -> " + JSON.toJSONString(result.getValue()) + "\r\nelapsed: " + (end - start) + " ms." + "\r\n\r\n" + prompt); } // 若是數量大於等於max - 1,則移除該通道 if (count >= max - 1) { channels.remove(channel); } } catch (Throwable e) { channels.remove(channel); logger.warn(e.getMessage(), e); } } else { // 若是未鏈接,也移除該通道 channels.remove(channel); } } } } return result; }
該方法是當服務被調用時,進行跟蹤或者取消跟蹤的處理邏輯,是核心的功能加強邏輯。
該類是處理異步和同步調用結果的過濾器。
@Override public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { // 是不是異步的調用 final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); fireInvokeCallback(invoker, invocation); // need to configure if there's return value before the invocation in order to help invoker to judge if it's // necessary to return future. Result result = invoker.invoke(invocation); if (isAsync) { // 調用異步處理 asyncCallback(invoker, invocation); } else { // 調用同步結果處理 syncCallback(invoker, invocation, result); } return result; }
該方法中根據是否爲異步調用來分別執行asyncCallback和syncCallback方法。
private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) { // 若是有異常 if (result.hasException()) { // 則調用異常的結果處理 fireThrowCallback(invoker, invocation, result.getException()); } else { // 調用正常的結果處理 fireReturnCallback(invoker, invocation, result.getValue()); } }
該方法是同步調用的返回結果處理,比較簡單。
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) { Future<?> f = RpcContext.getContext().getFuture(); if (f instanceof FutureAdapter) { ResponseFuture future = ((FutureAdapter<?>) f).getFuture(); // 設置回調 future.setCallback(new ResponseCallback() { @Override public void done(Object rpcResult) { // 若是結果爲空,則打印錯誤日誌 if (rpcResult == null) { logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName())); return; } ///must be rpcResult // 若是不是Result則打印錯誤日誌 if (!(rpcResult instanceof Result)) { logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName())); return; } Result result = (Result) rpcResult; if (result.hasException()) { // 若是有異常,則調用異常處理方法 fireThrowCallback(invoker, invocation, result.getException()); } else { // 若是正常的返回結果,則調用正常的處理方法 fireReturnCallback(invoker, invocation, result.getValue()); } } @Override public void caught(Throwable exception) { fireThrowCallback(invoker, invocation, exception); } }); } }
該方法是異步調用的結果處理,把異步返回結果的邏輯寫在回調函數裏面。
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) { // 得到調用的方法 final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY)); // 得到調用的服務 final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY)); if (onInvokeMethod == null && onInvokeInst == null) { return; } if (onInvokeMethod == null || onInvokeInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } // 若是不能夠訪問,則設置爲可訪問 if (!onInvokeMethod.isAccessible()) { onInvokeMethod.setAccessible(true); } // 得到參數數組 Object[] params = invocation.getArguments(); try { // 調用方法 onInvokeMethod.invoke(onInvokeInst, params); } catch (InvocationTargetException e) { fireThrowCallback(invoker, invocation, e.getTargetException()); } catch (Throwable e) { fireThrowCallback(invoker, invocation, e); } }
該方法是調用方法的執行。
private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) { final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY)); final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY)); //not set onreturn callback if (onReturnMethod == null && onReturnInst == null) { return; } if (onReturnMethod == null || onReturnInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } if (!onReturnMethod.isAccessible()) { onReturnMethod.setAccessible(true); } Object[] args = invocation.getArguments(); Object[] params; // 得到返回結果類型 Class<?>[] rParaTypes = onReturnMethod.getParameterTypes(); // 設置參數和返回結果 if (rParaTypes.length > 1) { if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) { params = new Object[2]; params[0] = result; params[1] = args; } else { params = new Object[args.length + 1]; params[0] = result; System.arraycopy(args, 0, params, 1, args.length); } } else { params = new Object[]{result}; } try { // 調用方法 onReturnMethod.invoke(onReturnInst, params); } catch (InvocationTargetException e) { fireThrowCallback(invoker, invocation, e.getTargetException()); } catch (Throwable e) { fireThrowCallback(invoker, invocation, e); } }
該方法是正常的返回結果的處理。
private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) { final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY)); final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY)); //onthrow callback not configured if (onthrowMethod == null && onthrowInst == null) { return; } if (onthrowMethod == null || onthrowInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } if (!onthrowMethod.isAccessible()) { onthrowMethod.setAccessible(true); } // 得到拋出異常的類型 Class<?>[] rParaTypes = onthrowMethod.getParameterTypes(); if (rParaTypes[0].isAssignableFrom(exception.getClass())) { try { Object[] args = invocation.getArguments(); Object[] params; // 把類型和拋出的異常值放入返回結果 if (rParaTypes.length > 1) { if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) { params = new Object[2]; params[0] = exception; params[1] = args; } else { params = new Object[args.length + 1]; params[0] = exception; System.arraycopy(args, 0, params, 1, args.length); } } else { params = new Object[]{exception}; } // 調用下一個調用連 onthrowMethod.invoke(onthrowInst, params); } catch (Throwable e) { logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e); } } else { logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception); } }
該方法是異常拋出時的結果處理。
該類是對於服務狀態的監控設置。
public class ServerStatusChecker implements StatusChecker { @Override public Status check() { // 得到服務集合 Collection<ExchangeServer> servers = DubboProtocol.getDubboProtocol().getServers(); // 若是爲空則返回UNKNOWN的狀態 if (servers == null || servers.isEmpty()) { return new Status(Status.Level.UNKNOWN); } // 設置狀態爲ok Status.Level level = Status.Level.OK; StringBuilder buf = new StringBuilder(); // 遍歷集合 for (ExchangeServer server : servers) { // 若是服務沒有綁定到本地端口 if (!server.isBound()) { // 狀態改成error level = Status.Level.ERROR; // 加入服務本地地址 buf.setLength(0); buf.append(server.getLocalAddress()); break; } if (buf.length() > 0) { buf.append(","); } // 若是服務綁定了本地端口,拼接clients數量 buf.append(server.getLocalAddress()); buf.append("(clients:"); buf.append(server.getChannels().size()); buf.append(")"); } return new Status(level, buf.toString()); } }
該類是對於線程池的狀態進行監控。
@Activate public class ThreadPoolStatusChecker implements StatusChecker { @Override public Status check() { // 得到數據中心 DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); // 得到線程池集合 Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY); StringBuilder msg = new StringBuilder(); // 設置爲ok Status.Level level = Status.Level.OK; // 遍歷線程池集合 for (Map.Entry<String, Object> entry : executors.entrySet()) { String port = entry.getKey(); ExecutorService executor = (ExecutorService) entry.getValue(); if (executor != null && executor instanceof ThreadPoolExecutor) { ThreadPoolExecutor tp = (ThreadPoolExecutor) executor; boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1; Status.Level lvl = Status.Level.OK; // 若是活躍數量超過了最大的線程數量,則設置warn if (!ok) { level = Status.Level.WARN; lvl = Status.Level.WARN; } if (msg.length() > 0) { msg.append(";"); } // 輸出線程池相關信息 msg.append("Pool status:" + lvl + ", max:" + tp.getMaximumPoolSize() + ", core:" + tp.getCorePoolSize() + ", largest:" + tp.getLargestPoolSize() + ", active:" + tp.getActiveCount() + ", task:" + tp.getTaskCount() + ", service port: " + port); } } return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString()); } }
邏輯比較簡單,我就不贅述了。
關於telnet下的相關實現請感興趣的朋友直接查看,裏面都是對於telnet命令的實現,內容比較獨立。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了遠程調用中關於dubbo協議的部分,dubbo協議是官方推薦使用的協議,而且對於telnet命令也作了很好的支持,要看懂這部分的邏輯,必須先對於以前的一些接口設計瞭解的很清楚。接下來我將開始對rpc模塊關於hessian協議部分進行講解。