dubbo源碼解析(二十四)遠程調用——dubbo協議

遠程調用——dubbo協議

目標:介紹遠程調用中跟dubbo協議相關的設計和實現,介紹dubbo-rpc-dubbo的源碼。

前言

Dubbo 缺省協議採用單一長鏈接和 NIO 異步通信,適合於小數據量大併發的服務調用,以及服務消費者機器數遠大於服務提供者機器數的狀況。反之,Dubbo 缺省協議不適合傳送大數據量的服務,好比傳文件,傳視頻等,除非請求量很低。這是官方文檔的原話,而且官方文檔還介紹了爲何使用單一長鏈接和 NIO 異步通信以及爲何不適合傳輸大數據的服務。我就不贅述了。java

咱們先來看看dubbo-rpc-dubbo下的包結構:git

dubbo-rpc-dubbo

  1. filter:該包下面是對於dubbo協議獨有的兩個過濾器
  2. status:該包下是作了對於服務和線程池狀態的檢測
  3. telnet:該包下是對於telnet命令的支持
  4. 最外層:最外層是dubbo協議的核心

源碼分析

(一)DubboInvoker

該類是dubbo協議獨自實現的的invoker,其中實現了調用方法的三種模式,分別是異步發送、單向發送和同步發送,具體在下面介紹。github

1.屬性

/**
 * 信息交換客戶端數組
 */
private final ExchangeClient[] clients;

/**
 * 客戶端數組位置
 */
private final AtomicPositiveInteger index = new AtomicPositiveInteger();

/**
 * 版本號
 */
private final String version;

/**
 * 銷燬鎖
 */
private final ReentrantLock destroyLock = new ReentrantLock();

/**
 * Invoker對象集合
 */
private final Set<Invoker<?>> invokers;

2.doInvoke

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    // rpc會話域
    RpcInvocation inv = (RpcInvocation) invocation;
    // 得到方法名
    final String methodName = RpcUtils.getMethodName(invocation);
    // 把path放入到附加值中
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    // 把版本號放入到附加值
    inv.setAttachment(Constants.VERSION_KEY, version);

    // 當前的客戶端
    ExchangeClient currentClient;
    // 若是數組內就一個客戶端,則直接取出
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        // 取模輪詢 從數組中取,當取到最後一個時,從頭開始
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 是否啓用異步
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // 是不是單向發送
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 得到超時時間
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 若是是單項發送
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            // 單向發送只負責發送消息,不等待服務端應答,因此沒有返回值
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
            // 異步調用
            ResponseFuture future = currentClient.request(inv, timeout);
            // 保存future,方便後期處理
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
            // 同步調用,等待返回結果
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

在調用invoker的時候,經過遠程通訊將Invocation信息傳遞給服務端,服務端在接收到該invocation信息後,要找到對應的本地方法,而後經過反射執行該方法,將方法的執行結果返回給客戶端,在這裏,客戶端發送有三種模式:segmentfault

  1. 異步發送,也就是當我發送調用後,我不阻塞等待結果,直接返回,將返回的future保存到上下文,方便後期使用。
  2. 單向發送,執行方法不須要返回結果。
  3. 同步發送,執行方法後,等待結果返回,不然一直阻塞。

3.isAvailable

@Override
public boolean isAvailable() {
    if (!super.isAvailable())
        return false;
    for (ExchangeClient client : clients) {
        // 只要有一個客戶端鏈接而且不是隻讀,則表示存活
        if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) {
            //cannot write == not Available ?
            return true;
        }
    }
    return false;
}

該方法是檢查服務端是否存活。api

4.destroy

@Override
public void destroy() {
    // in order to avoid closing a client multiple times, a counter is used in case of connection per jvm, every
    // time when client.close() is called, counter counts down once, and when counter reaches zero, client will be
    // closed.
    if (super.isDestroyed()) {
        return;
    } else {
        // double check to avoid dup close
        // 得到銷燬鎖
        destroyLock.lock();
        try {
            if (super.isDestroyed()) {
                return;
            }
            // 銷燬
            super.destroy();
            // 從集合中移除
            if (invokers != null) {
                invokers.remove(this);
            }
            for (ExchangeClient client : clients) {
                try {
                    // 關閉每個客戶端
                    client.close(ConfigUtils.getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }

        } finally {
            // 釋放鎖
            destroyLock.unlock();
        }
    }
}

該方法是銷燬服務端,關閉全部鏈接到遠程通訊客戶端。數組

(二)DubboExporter

該類繼承了AbstractExporter,是dubbo協議中獨有的服務暴露者。緩存

/**
 * 服務key
 */
private final String key;

/**
 * 服務暴露者集合
 */
private final Map<String, Exporter<?>> exporterMap;

public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
    super(invoker);
    this.key = key;
    this.exporterMap = exporterMap;
}

@Override
public void unexport() {
    super.unexport();
    // 從集合中移除該key
    exporterMap.remove(key);
}

其中對於服務暴露者用集合作了緩存,而且只重寫了了unexport。服務器

(三)DubboProtocol

該類是dubbo協議的核心實現,其中增長了好比延遲加載等處理。 而且其中還包括了對服務暴露和服務引用的邏輯處理。併發

1.屬性

public static final String NAME = "dubbo";

/**
 * 默認端口號
 */
public static final int DEFAULT_PORT = 20880;
/**
 * 回調名稱
 */
private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke";
/**
 * dubbo協議的單例
 */
private static DubboProtocol INSTANCE;
/**
 * 信息交換服務器集合 key:host:port  value:ExchangeServer
 */
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger>
/**
 * 信息交換客戶端集合
 */
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
/**
 * 懶加載的客戶端集合
 */
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<String, LazyConnectExchangeClient>();
/**
 * 鎖集合
 */
private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
/**
 * 序列化類名集合
 */
private final Set<String> optimizers = new ConcurrentHashSet<String>();
//consumer side export a stub service for dispatching event
//servicekey-stubmethods
/**
 * 本地存根服務方法集合
 */
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<String, String>();
/**
 * 新建一個請求處理器
 */
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
    /**
     * 回覆請求結果,返回的是請求結果
     * @param channel
     * @param message
     * @return
     * @throws RemotingException
     */
    @Override
    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
        // 若是請求消息屬於會話域
        if (message instanceof Invocation) {
            Invocation inv = (Invocation) message;
            // 得到暴露的invoker
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            // 若是是回調服務
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                // 得到 方法定義
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                // 判斷看是否有會話域中的方法
                if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    // 若是方法不止一個,則分割後遍歷查詢,找到了則設置爲true
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                // 若是沒有該方法,則打印告警日誌
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            // 設置遠程地址
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            // 調用下一個調用鏈
            return invoker.invoke(inv);
        }
        // 不然拋出異常
        throw new RemotingException(channel, "Unsupported request: "
                + (message == null ? null : (message.getClass().getName() + ": " + message))
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    /**
     * 接收消息
     * @param channel
     * @param message
     * @throws RemotingException
     */
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 若是消息是會話域中的消息,則調用reply方法。
        if (message instanceof Invocation) {
            reply((ExchangeChannel) channel, message);
        } else {
            super.received(channel, message);
        }
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        // 接收鏈接事件
        invoke(channel, Constants.ON_CONNECT_KEY);
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        if (logger.isInfoEnabled()) {
            logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
        }
        // 接收斷開鏈接事件
        invoke(channel, Constants.ON_DISCONNECT_KEY);
    }

    /**
     * 接收事件
     * @param channel
     * @param methodKey
     */
    private void invoke(Channel channel, String methodKey) {
        // 建立會話域
        Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
        if (invocation != null) {
            try {
                // 接收事件
                received(channel, invocation);
            } catch (Throwable t) {
                logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
            }
        }
    }

    /**
     * 建立會話域, 把url內的值加入到會話域的附加值中
     * @param channel
     * @param url
     * @param methodKey
     * @return
     */
    private Invocation createInvocation(Channel channel, URL url, String methodKey) {
        // 得到方法,methodKey是onconnect或者ondisconnect
        String method = url.getParameter(methodKey);
        if (method == null || method.length() == 0) {
            return null;
        }
        // 建立一個rpc會話域
        RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
        // 加入附加值path
        invocation.setAttachment(Constants.PATH_KEY, url.getPath());
        // 加入附加值group
        invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
        // 加入附加值interface
        invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
        // 加入附加值version
        invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
        // 若是是本地存根服務,則加入附加值dubbo.stub.event爲true
        if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
            invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
        }
        return invocation;
    }
};

該屬性中關鍵的是實例化了一個請求處理器,其中實現了基於dubbo協議等鏈接、取消鏈接、回覆請求結果等方法。app

2.export

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    // 獲得服務key  group+"/"+serviceName+":"+serviceVersion+":"+port
    String key = serviceKey(url);
    // 建立exporter
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    // 加入到集合
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
    // 若是是本地存根事件而不是回調服務
    if (isStubSupportEvent && !isCallbackservice) {
        // 得到本地存根的方法
        String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
        // 若是爲空,則拋出異常
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }
        } else {
            // 加入集合
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }


    // 打開服務
    openServer(url);
    // 序列化
    optimizeSerialization(url);
    return exporter;
}

該方法是基於dubbo協議的服務暴露,除了對於存根服務和本地服務進行標記之外,打開服務和序列化分別在openServer和optimizeSerialization中實現。

3.openServer

private void openServer(URL url) {
    // find server.
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    // 客戶端是否能夠暴露僅供服務器調用的服務
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
    // 若是是的話
    if (isServer) {
        // 得到信息交換服務器
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            // 從新建立服務器對象,而後放入集合
            serverMap.put(key, createServer(url));
        } else {
            // server supports reset, use together with override
            // 重置
            server.reset(url);
        }
    }
}

該方法就是打開服務。其中的邏輯實際上是把服務對象放入集合中進行緩存,若是該地址對應的服務器不存在,則調用createServer建立一個服務器對象。

4.createServer

private ExchangeServer createServer(URL url) {
    // send readonly event when server closes, it's enabled by default
    // 服務器關閉時發送readonly事件,默認狀況下啓用
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    // enable heartbeat by default
    // 心跳默認間隔一分鐘
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    // 得到遠程通信服務端實現方式,默認用netty3
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

    /**
     * 若是沒有該配置,則拋出異常
     */
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);

    /**
     * 添加編解碼器DubboCodec實現
     */
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        // 啓動服務器
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    // 得到客戶端側設置的遠程通訊方式
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        // 得到遠程通訊的實現集合
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        // 若是客戶端側設置的遠程通訊方式不在支持的方式中,則拋出異常
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

該方法就是根據url攜帶的遠程通訊實現方法來建立一個服務器對象。

5.optimizeSerialization

private void optimizeSerialization(URL url) throws RpcException {
    // 得到類名
    String className = url.getParameter(Constants.OPTIMIZER_KEY, "");
    if (StringUtils.isEmpty(className) || optimizers.contains(className)) {
        return;
    }

    logger.info("Optimizing the serialization process for Kryo, FST, etc...");

    try {
        // 加載類
        Class clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
        if (!SerializationOptimizer.class.isAssignableFrom(clazz)) {
            throw new RpcException("The serialization optimizer " + className + " isn't an instance of " + SerializationOptimizer.class.getName());
        }

        // 強制類型轉化爲SerializationOptimizer
        SerializationOptimizer optimizer = (SerializationOptimizer) clazz.newInstance();

        if (optimizer.getSerializableClasses() == null) {
            return;
        }

        // 遍歷序列化的類,把該類放入到集合進行緩存
        for (Class c : optimizer.getSerializableClasses()) {
            SerializableClassRegistry.registerClass(c);
        }

        // 加入到集合
        optimizers.add(className);
    } catch (ClassNotFoundException e) {
        throw new RpcException("Cannot find the serialization optimizer class: " + className, e);
    } catch (InstantiationException e) {
        throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
    } catch (IllegalAccessException e) {
        throw new RpcException("Cannot instantiate the serialization optimizer class: " + className, e);
    }
}

該方法是把序列化的類放入到集合,以便進行序列化

6.refer

@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    // 序列化
    optimizeSerialization(url);
    // create rpc invoker. 建立一個DubboInvoker對象
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    // 把該invoker放入集合
    invokers.add(invoker);
    return invoker;
}

該方法是服務引用,其中就是新建一個DubboInvoker對象後把它放入到集合。

7.getClients

private ExchangeClient[] getClients(URL url) {
    // whether to share connection
    // 一個鏈接是否對於一個服務
    boolean service_share_connect = false;
    // 得到url中歡愉鏈接共享的配置 默認爲0
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // if not configured, connection is shared, otherwise, one connection for one service
    // 若是爲0,則是共享類,而且鏈接數爲1
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    // 建立數組
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        // 若是共享,則得到共享客戶端對象,不然新建客戶端
        if (service_share_connect) {
            clients[i] = getSharedClient(url);
        } else {
            clients[i] = initClient(url);
        }
    }
    return clients;
}

該方法是得到客戶端集合的方法,分爲共享客戶端和非共享客戶端。共享客戶端是共用同一個鏈接,非共享客戶端是每一個客戶端都有本身的一個鏈接。

8.getSharedClient

private ExchangeClient getSharedClient(URL url) {
    String key = url.getAddress();
    // 從集合中取出客戶端對象
    ReferenceCountExchangeClient client = referenceClientMap.get(key);
    // 若是不爲空而且沒關閉鏈接,則計數器加1,返回
    if (client != null) {
        if (!client.isClosed()) {
            client.incrementAndGetCount();
            return client;
        } else {
            // 若是鏈接斷開,則從集合中移除
            referenceClientMap.remove(key);
        }
    }

    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
        // 若是集合中有該key
        if (referenceClientMap.containsKey(key)) {
            // 則直接返回client
            return referenceClientMap.get(key);
        }

        // 不然新建一個鏈接
        ExchangeClient exchangeClient = initClient(url);
        client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
        // 存入集合
        referenceClientMap.put(key, client);
        // 從ghostClientMap中移除
        ghostClientMap.remove(key);
        // 從對象鎖中移除
        locks.remove(key);
        return client;
    }
}

該方法是得到分享的客戶端鏈接。

9.initClient

private ExchangeClient initClient(URL url) {

    // client type setting.
    // 得到客戶端的實現方法 默認netty3
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

    // 添加編碼器
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    // enable heartbeat by default
    // 默認開啓心跳
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // BIO is not allowed since it has severe performance issue.
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: " + str + "," +
                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    }

    ExchangeClient client;
    try {
        // connection should be lazy
        // 是否須要延遲鏈接,,默認不開啓
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            // 建立延遲鏈接的客戶端
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 不然就直接鏈接
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }
    return client;
}

該方法是新建一個客戶端鏈接

10.destroy

@Override
public void destroy() {
    // 遍歷服務器逐個關閉
    for (String key : new ArrayList<String>(serverMap.keySet())) {
        ExchangeServer server = serverMap.remove(key);
        if (server != null) {
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Close dubbo server: " + server.getLocalAddress());
                }
                server.close(ConfigUtils.getServerShutdownTimeout());
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }

    // 遍歷客戶端集合逐個關閉
    for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
        ExchangeClient client = referenceClientMap.remove(key);
        if (client != null) {
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
                }
                client.close(ConfigUtils.getServerShutdownTimeout());
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }

    // 遍歷懶加載的集合,逐個關閉客戶端
    for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
        ExchangeClient client = ghostClientMap.remove(key);
        if (client != null) {
            try {
                if (logger.isInfoEnabled()) {
                    logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
                }
                client.close(ConfigUtils.getServerShutdownTimeout());
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    stubServiceMethodsMap.clear();
    super.destroy();
}

該方法是銷燬的方法重寫。

(四)ChannelWrappedInvoker

該類是對當前通道內的客戶端調用消息進行包裝

1.屬性

/**
 * 通道
 */
private final Channel channel;
/**
 * 服務key
 */
private final String serviceKey;
/**
 * 當前的客戶端
 */
private final ExchangeClient currentClient;

2.doInvoke

@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    // use interface's name as service path to export if it's not found on client side
    // 設置服務path,默認用接口名稱
    inv.setAttachment(Constants.PATH_KEY, getInterface().getName());
    // 設置回調的服務key
    inv.setAttachment(Constants.CALLBACK_SERVICE_KEY, serviceKey);

    try {
        // 若是是異步的
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { // may have concurrency issue
            // 直接發送請求消息
            currentClient.send(inv, getUrl().getMethodParameter(invocation.getMethodName(), Constants.SENT_KEY, false));
            return new RpcResult();
        }
        // 得到超時時間
        int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        if (timeout > 0) {
            return (Result) currentClient.request(inv, timeout).get();
        } else {
            return (Result) currentClient.request(inv).get();
        }
    } catch (RpcException e) {
        throw e;
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, e.getMessage(), e);
    } catch (Throwable e) { // here is non-biz exception, wrap it.
        throw new RpcException(e.getMessage(), e);
    }
}

該方法是在invoker調用的時候對發送請求消息進行了包裝。

3.ChannelWrapper

該類是個內部沒,繼承了ClientDelegate,其中將編碼器變成了dubbo的編碼器,其餘方法比較簡單。

(五)DecodeableRpcInvocation

該類主要作了對於會話域內的數據進行序列化和解碼。

1.屬性

private static final Logger log = LoggerFactory.getLogger(DecodeableRpcInvocation.class);

/**
 * 通道
 */
private Channel channel;

/**
 * 序列化類型
 */
private byte serializationType;

/**
 * 輸入流
 */
private InputStream inputStream;

/**
 * 請求
 */
private Request request;

/**
 * 是否解碼
 */
private volatile boolean hasDecoded;

2.decode

@Override
public void decode() throws Exception {
    // 若是沒有解碼,則進行解碼
    if (!hasDecoded && channel != null && inputStream != null) {
        try {
            decode(channel, inputStream);
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
            }
            request.setBroken(true);
            request.setData(e);
        } finally {
            // 設置已經解碼
            hasDecoded = true;
        }
    }
}

@Override
public Object decode(Channel channel, InputStream input) throws IOException {
    // 對數據進行反序列化
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
            .deserialize(channel.getUrl(), input);

    // dubbo版本
    String dubboVersion = in.readUTF();
    // 請求中放入dubbo版本
    request.setVersion(dubboVersion);
    // 附加值內加入dubbo版本,path以及版本號
    setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);

    setAttachment(Constants.PATH_KEY, in.readUTF());
    setAttachment(Constants.VERSION_KEY, in.readUTF());

    // 設置方法名稱
    setMethodName(in.readUTF());
    try {
        // 方法參數數組
        Object[] args;
        // 方法參數類型數組
        Class<?>[] pts;
        // 描述
        String desc = in.readUTF();
        // 若是爲空,則方法參數數組和對方法參數類型數組都設置爲空
        if (desc.length() == 0) {
            pts = DubboCodec.EMPTY_CLASS_ARRAY;
            args = DubboCodec.EMPTY_OBJECT_ARRAY;
        } else {
            // 分割成類,得到類數組
            pts = ReflectUtils.desc2classArray(desc);
            // 建立等長等數組
            args = new Object[pts.length];
            for (int i = 0; i < args.length; i++) {
                try {
                    // 讀取對象放入數組中
                    args[i] = in.readObject(pts[i]);
                } catch (Exception e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode argument failed: " + e.getMessage(), e);
                    }
                }
            }
        }
        // 設置參數類型
        setParameterTypes(pts);

        Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
        if (map != null && map.size() > 0) {
            // 得到全部附加值
            Map<String, String> attachment = getAttachments();
            if (attachment == null) {
                attachment = new HashMap<String, String>();
            }
            // 把流中讀到的配置放入附加值
            attachment.putAll(map);
            // 放回去
            setAttachments(attachment);
        }
        //decode argument ,may be callback
        for (int i = 0; i < args.length; i++) {
            // 若是是回調,則再一次解碼
            args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
        }

        setArguments(args);

    } catch (ClassNotFoundException e) {
        throw new IOException(StringUtils.toString("Read invocation data failed.", e));
    } finally {
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
    }
    return this;
}

該方法就是處理Invocation內數據的邏輯,其中主要是作了序列化和解碼。把讀取出來的設置放入對對應位置傳遞給後面的調用。

(六)DecodeableRpcResult

該類是作了基於dubbo協議對prc結果的解碼

1.屬性

private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class);

/**
 * 通道
 */
private Channel channel;

/**
 * 序列化類型
 */
private byte serializationType;

/**
 * 輸入流
 */
private InputStream inputStream;

/**
 * 響應
 */
private Response response;

/**
 * 會話域
 */
private Invocation invocation;

/**
 * 是否解碼
 */
private volatile boolean hasDecoded;

2.decode

@Override
public Object decode(Channel channel, InputStream input) throws IOException {
    // 反序列化
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
            .deserialize(channel.getUrl(), input);
    
    byte flag = in.readByte();
    // 根據返回的不一樣結果來進行處理
    switch (flag) {
        case DubboCodec.RESPONSE_NULL_VALUE:
            // 返回結果爲空
            break;
        case DubboCodec.RESPONSE_VALUE:
            //
            try {
                // 得到返回類型數組
                Type[] returnType = RpcUtils.getReturnTypes(invocation);
                // 根據返回類型讀取返回結果而且放入RpcResult
                setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                        (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                : in.readObject((Class<?>) returnType[0], returnType[1])));
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
        case DubboCodec.RESPONSE_WITH_EXCEPTION:
            // 返回結果有異常
            try {
                Object obj = in.readObject();
                // 把異常放入RpcResult
                if (obj instanceof Throwable == false)
                    throw new IOException("Response data error, expect Throwable, but get " + obj);
                setException((Throwable) obj);
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
        case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
            // 返回值爲空,可是有附加值
            try {
                // 把附加值加入到RpcResult
                setAttachments((Map<String, String>) in.readObject(Map.class));
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
        case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
            // 返回值
            try {
                // 設置返回結果
                Type[] returnType = RpcUtils.getReturnTypes(invocation);
                setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                        (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                : in.readObject((Class<?>) returnType[0], returnType[1])));
                // 設置附加值
                setAttachments((Map<String, String>) in.readObject(Map.class));
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
        case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
            // 返回結果有異常而且有附加值
            try {
                // 設置異常
                Object obj = in.readObject();
                if (obj instanceof Throwable == false)
                    throw new IOException("Response data error, expect Throwable, but get " + obj);
                setException((Throwable) obj);
                // 設置附加值
                setAttachments((Map<String, String>) in.readObject(Map.class));
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
        default:
            throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
    }
    if (in instanceof Cleanable) {
        ((Cleanable) in).cleanup();
    }
    return this;
}

@Override
public void decode() throws Exception {
    // 若是沒有解碼
    if (!hasDecoded && channel != null && inputStream != null) {
        try {
            // 進行解碼
            decode(channel, inputStream);
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn("Decode rpc result failed: " + e.getMessage(), e);
            }
            response.setStatus(Response.CLIENT_ERROR);
            response.setErrorMessage(StringUtils.toString(e));
        } finally {
            hasDecoded = true;
        }
    }
}

該方法是對響應結果的解碼,其中根據不一樣的返回結果來對RpcResult設置不一樣的值。

(七)LazyConnectExchangeClient

該類實現了ExchangeClient接口,是ExchangeClient的裝飾器,用到了裝飾模式,是延遲鏈接的客戶端實現類。

1.屬性

// when this warning rises from invocation, program probably have bug.
/**
 * 延遲鏈接請求錯誤key
 */
static final String REQUEST_WITH_WARNING_KEY = "lazyclient_request_with_warning";
private final static Logger logger = LoggerFactory.getLogger(LazyConnectExchangeClient.class);
/**
 * 是否在延遲鏈接請求時錯誤
 */
protected final boolean requestWithWarning;
/**
 * url對象
 */
private final URL url;
/**
 * 請求處理器
 */
private final ExchangeHandler requestHandler;
/**
 * 鏈接鎖
 */
private final Lock connectLock = new ReentrantLock();
// lazy connect, initial state for connection
/**
 * 初始化狀態
 */
private final boolean initialState;
/**
 * 客戶端對象
 */
private volatile ExchangeClient client;
/**
 * 錯誤次數
 */
private AtomicLong warningcount = new AtomicLong(0);

能夠看到有屬性ExchangeClient client,該類中不少方法就直接調用了client的方法。

2.構造方法

public LazyConnectExchangeClient(URL url, ExchangeHandler requestHandler) {
    // lazy connect, need set send.reconnect = true, to avoid channel bad status.
    // 默認有重連
    this.url = url.addParameter(Constants.SEND_RECONNECT_KEY, Boolean.TRUE.toString());
    this.requestHandler = requestHandler;
    // 默認延遲鏈接初始化成功
    this.initialState = url.getParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Constants.DEFAULT_LAZY_CONNECT_INITIAL_STATE);
    // 默認沒有錯誤
    this.requestWithWarning = url.getParameter(REQUEST_WITH_WARNING_KEY, false);
}

3.initClient

private void initClient() throws RemotingException {
    // 若是客戶端已經初始化,則直接返回
    if (client != null)
        return;
    if (logger.isInfoEnabled()) {
        logger.info("Lazy connect to " + url);
    }
    // 得到鏈接鎖
    connectLock.lock();
    try {
        // 二次判空
        if (client != null)
            return;
        // 新建一個客戶端
        this.client = Exchangers.connect(url, requestHandler);
    } finally {
        // 釋放鎖
        connectLock.unlock();
    }
}

該方法是初始化客戶端的方法。

4.request

@Override
public ResponseFuture request(Object request) throws RemotingException {
    warning(request);
    initClient();
    return client.request(request);
}

該方法在調用client.request前調用了前面兩個方法,initClient我在上面講到了,就是用來初始化客戶端的。而warning是用來報錯的。

5.warning

private void warning(Object request) {
    if (requestWithWarning) {
        // 每5000次報錯一次
        if (warningcount.get() % 5000 == 0) {
            logger.warn(new IllegalStateException("safe guard client , should not be called ,must have a bug."));
        }
        warningcount.incrementAndGet();
    }
}

每5000次記錄報錯一次。

(八)ReferenceCountExchangeClient

該類也是對ExchangeClient的裝飾,其中加強了調用次數多功能。

1.屬性

/**
 * url對象
 */
private final URL url;
/**
 * 計數
 */
private final AtomicInteger refenceCount = new AtomicInteger(0);

//    private final ExchangeHandler handler;
/**
 * 延遲鏈接客戶端集合
 */
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap;
/**
 * 客戶端對象
 */
private ExchangeClient client;

2.replaceWithLazyClient

// ghost client
private LazyConnectExchangeClient replaceWithLazyClient() {
    // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false
    // 設置延遲鏈接初始化狀態、是否重連、是否已經重連等配置
    URL lazyUrl = url.addParameter(Constants.LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.FALSE)
            .addParameter(Constants.RECONNECT_KEY, Boolean.FALSE)
            .addParameter(Constants.SEND_RECONNECT_KEY, Boolean.TRUE.toString())
            .addParameter("warning", Boolean.TRUE.toString())
            .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true)
            .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient");

    // 得到服務地址
    String key = url.getAddress();
    // in worst case there's only one ghost connection.
    // 從集合中獲取客戶端
    LazyConnectExchangeClient gclient = ghostClientMap.get(key);
    // 若是對應等客戶端不存在或者已經關閉鏈接,則從新建立一個延遲鏈接等客戶端,而且放入集合
    if (gclient == null || gclient.isClosed()) {
        gclient = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler());
        ghostClientMap.put(key, gclient);
    }
    return gclient;
}

該方法是用延遲鏈接替代,該方法在close方法中被調用。

3.close

@Override
public void close(int timeout) {
    if (refenceCount.decrementAndGet() <= 0) {
        if (timeout == 0) {
            client.close();
        } else {
            client.close(timeout);
        }
        client = replaceWithLazyClient();
    }
}

(九)FutureAdapter

該類實現了Future接口,是響應的Future適配器。其中是基於ResponseFuture作適配。其中比較簡單,我就很少講解了。

(十)CallbackServiceCodec

該類是針對回調服務的編解碼器。

1.屬性

/**
 * 代理工廠
 */
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
/**
 * dubbo協議
 */
private static final DubboProtocol protocol = DubboProtocol.getDubboProtocol();
/**
 * 回調的標誌
 */
private static final byte CALLBACK_NONE = 0x0;
/**
 * 回調的建立標誌
 */
private static final byte CALLBACK_CREATE = 0x1;
/**
 * 回調的銷燬標誌
 */
private static final byte CALLBACK_DESTROY = 0x2;
/**
 * 回調參數key
 */
private static final String INV_ATT_CALLBACK_KEY = "sys_callback_arg-";

2.encodeInvocationArgument

public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException {
    // get URL directly
    // 直接得到url
    URL url = inv.getInvoker() == null ? null : inv.getInvoker().getUrl();
    // 設置回調標誌
    byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex);
    // 得到參數集合
    Object[] args = inv.getArguments();
    // 得到參數類型集合
    Class<?>[] pts = inv.getParameterTypes();
    // 根據不一樣的回調狀態來設置附加值和返回參數
    switch (callbackstatus) {
        case CallbackServiceCodec.CALLBACK_NONE:
            return args[paraIndex];
        case CallbackServiceCodec.CALLBACK_CREATE:
            inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], true));
            return null;
        case CallbackServiceCodec.CALLBACK_DESTROY:
            inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], false));
            return null;
        default:
            return args[paraIndex];
    }
}

該方法是對會話域的信息進行編碼。

3.decodeInvocationArgument

public static Object decodeInvocationArgument(Channel channel, RpcInvocation inv, Class<?>[] pts, int paraIndex, Object inObject) throws IOException {
    // if it's a callback, create proxy on client side, callback interface on client side can be invoked through channel
    // need get URL from channel and env when decode
    URL url = null;
    try {
        // 得到url
        url = DubboProtocol.getDubboProtocol().getInvoker(channel, inv).getUrl();
    } catch (RemotingException e) {
        if (logger.isInfoEnabled()) {
            logger.info(e.getMessage(), e);
        }
        return inObject;
    }
    // 得到回調狀態
    byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex);
    // 根據回調狀態來返回結果
    switch (callbackstatus) {
        case CallbackServiceCodec.CALLBACK_NONE:
            return inObject;
        case CallbackServiceCodec.CALLBACK_CREATE:
            try {
                return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), true);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new IOException(StringUtils.toString(e));
            }
        case CallbackServiceCodec.CALLBACK_DESTROY:
            try {
                return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), false);
            } catch (Exception e) {
                throw new IOException(StringUtils.toString(e));
            }
        default:
            return inObject;
    }
}

該方法是對會話域內的信息進行解碼。

4.isCallBack

private static byte isCallBack(URL url, String methodName, int argIndex) {
    // parameter callback rule: method-name.parameter-index(starting from 0).callback
    // 參數的規則:ethod-name.parameter-index(starting from 0).callback
    byte isCallback = CALLBACK_NONE;
    if (url != null) {
        // 得到回調的值
        String callback = url.getParameter(methodName + "." + argIndex + ".callback");
        if (callback != null) {
            // 若是爲true,則設置爲建立標誌
            if (callback.equalsIgnoreCase("true")) {
                isCallback = CALLBACK_CREATE;
                // 若是爲false,則設置爲銷燬標誌
            } else if (callback.equalsIgnoreCase("false")) {
                isCallback = CALLBACK_DESTROY;
            }
        }
    }
    return isCallback;
}

該方法是根據url攜帶的參數設置回調的標誌,以供執行不一樣的編解碼邏輯。

5.exportOrunexportCallbackService

private static String exportOrunexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException {
    // 返回對象的hashCode
    int instid = System.identityHashCode(inst);

    Map<String, String> params = new HashMap<String, String>(3);
    // no need to new client again
    // 設置不是服務端標誌爲否
    params.put(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
    // mark it's a callback, for troubleshooting
    // 設置是回調服務標誌爲true
    params.put(Constants.IS_CALLBACK_SERVICE, Boolean.TRUE.toString());
    String group = url.getParameter(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        // 設置是消費側仍是提供側
        params.put(Constants.GROUP_KEY, group);
    }
    // add method, for verifying against method, automatic fallback (see dubbo protocol)
    // 添加方法,在dubbo的協議裏面用到
    params.put(Constants.METHODS_KEY, StringUtils.join(Wrapper.getWrapper(clazz).getDeclaredMethodNames(), ","));

    Map<String, String> tmpmap = new HashMap<String, String>(url.getParameters());
    tmpmap.putAll(params);
    // 移除版本信息
    tmpmap.remove(Constants.VERSION_KEY);// doesn't need to distinguish version for callback
    // 設置接口名
    tmpmap.put(Constants.INTERFACE_KEY, clazz.getName());
    // 建立服務暴露的url
    URL exporturl = new URL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(), channel.getLocalAddress().getPort(), clazz.getName() + "." + instid, tmpmap);

    // no need to generate multiple exporters for different channel in the same JVM, cache key cannot collide.

    // 得到緩存的key
    String cacheKey = getClientSideCallbackServiceCacheKey(instid);
    // 得到計數的key
    String countkey = getClientSideCountKey(clazz.getName());
    // 若是是暴露服務
    if (export) {
        // one channel can have multiple callback instances, no need to re-export for different instance.
        if (!channel.hasAttribute(cacheKey)) {
            if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {
                // 得到代理對象
                Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exporturl);
                // should destroy resource?
                // 暴露服務
                Exporter<?> exporter = protocol.export(invoker);
                // this is used for tracing if instid has published service or not.
                // 放到通道
                channel.setAttribute(cacheKey, exporter);
                logger.info("export a callback service :" + exporturl + ", on " + channel + ", url is: " + url);
                // 計數器加1
                increaseInstanceCount(channel, countkey);
            }
        }
    } else {
        // 若是通道內已經有該服務的緩存
        if (channel.hasAttribute(cacheKey)) {
            // 則得到該暴露者
            Exporter<?> exporter = (Exporter<?>) channel.getAttribute(cacheKey);
            // 取消暴露
            exporter.unexport();
            // 移除該緩存
            channel.removeAttribute(cacheKey);
            // 計數器減1
            decreaseInstanceCount(channel, countkey);
        }
    }
    return String.valueOf(instid);
}

該方法是在客戶端側暴露服務和取消暴露服務。

6.referOrdestroyCallbackService

private static Object referOrdestroyCallbackService(Channel channel, URL url, Class<?> clazz, Invocation inv, int instid, boolean isRefer) {
    Object proxy = null;
    // 得到服務調用的緩存key
    String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid);
    // 得到代理緩存key
    String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid);
    // 從通道內得到代理對象
    proxy = channel.getAttribute(proxyCacheKey);
    // 得到計數器key
    String countkey = getServerSideCountKey(channel, clazz.getName());
    // 若是是服務引用
    if (isRefer) {
        // 若是代理對象爲空
        if (proxy == null) {
            // 得到服務引用的url
            URL referurl = URL.valueOf("callback://" + url.getAddress() + "/" + clazz.getName() + "?" + Constants.INTERFACE_KEY + "=" + clazz.getName());
            referurl = referurl.addParametersIfAbsent(url.getParameters()).removeParameter(Constants.METHODS_KEY);
            if (!isInstancesOverLimit(channel, referurl, clazz.getName(), instid, true)) {
                @SuppressWarnings("rawtypes")
                Invoker<?> invoker = new ChannelWrappedInvoker(clazz, channel, referurl, String.valueOf(instid));
                // 得到代理類
                proxy = proxyFactory.getProxy(invoker);
                // 設置代理類
                channel.setAttribute(proxyCacheKey, proxy);
                // 設置實體域
                channel.setAttribute(invokerCacheKey, invoker);
                // 計數器加1
                increaseInstanceCount(channel, countkey);

                //convert error fail fast .
                //ignore concurrent problem. 
                Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>) channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY);
                if (callbackInvokers == null) {
                    // 建立回調的服務實體域集合
                    callbackInvokers = new ConcurrentHashSet<Invoker<?>>(1);
                    // 把該實體域加入集合中
                    callbackInvokers.add(invoker);
                    channel.setAttribute(Constants.CHANNEL_CALLBACK_KEY, callbackInvokers);
                }
                logger.info("method " + inv.getMethodName() + " include a callback service :" + invoker.getUrl() + ", a proxy :" + invoker + " has been created.");
            }
        }
    } else {
        // 銷燬
        if (proxy != null) {
            Invoker<?> invoker = (Invoker<?>) channel.getAttribute(invokerCacheKey);
            try {
                Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>) channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY);
                if (callbackInvokers != null) {
                    // 從集合中移除
                    callbackInvokers.remove(invoker);
                }
                // 銷燬該調用
                invoker.destroy();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            // cancel refer, directly remove from the map
            // 取消引用,直接從集合中移除
            channel.removeAttribute(proxyCacheKey);
            channel.removeAttribute(invokerCacheKey);
            // 計數器減1
            decreaseInstanceCount(channel, countkey);
        }
    }
    return proxy;
}

該方法是在服務端側進行服務引用或者銷燬回調服務。

(十一)DubboCodec

該類是dubbo的編解碼器,分別針對dubbo協議的request和response進行編碼和解碼。

1.屬性

/**
 * dubbo名稱
 */
public static final String NAME = "dubbo";
/**
 * 協議版本號
 */
public static final String DUBBO_VERSION = Version.getProtocolVersion();
/**
 * 響應攜帶着異常
 */
public static final byte RESPONSE_WITH_EXCEPTION = 0;
/**
 * 響應
 */
public static final byte RESPONSE_VALUE = 1;
/**
 * 響應結果爲空
 */
public static final byte RESPONSE_NULL_VALUE = 2;
/**
 * 響應結果有異常而且帶有附加值
 */
public static final byte RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS = 3;
/**
 * 響應結果有附加值
 */
public static final byte RESPONSE_VALUE_WITH_ATTACHMENTS = 4;
/**
 * 響應結果爲空並帶有附加值
 */
public static final byte RESPONSE_NULL_VALUE_WITH_ATTACHMENTS = 5;
/**
 * 對象空集合
 */
public static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
/**
 * 空的類集合
 */
public static final Class<?>[] EMPTY_CLASS_ARRAY = new Class<?>[0];
private static final Logger log = LoggerFactory.getLogger(DubboCodec.class);

2.decodeBody

@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    // get request id.
    long id = Bytes.bytes2long(header, 4);
    // 若是是response
    if ((flag & FLAG_REQUEST) == 0) {
        // decode response.
        // 建立一個response
        Response res = new Response(id);
        // 若是是事件,則設置事件,這裏有個問題,我提交了pr在新版本已經修復
        if ((flag & FLAG_EVENT) != 0) {
            res.setEvent(Response.HEARTBEAT_EVENT);
        }
        // get status.
        // 設置狀態
        byte status = header[3];
        res.setStatus(status);
        try {
            // 反序列化
            ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
            // 若是狀態是響應成功
            if (status == Response.OK) {
                Object data;
                // 若是是心跳事件,則按照心跳事件解碼
                if (res.isHeartbeat()) {
                    data = decodeHeartbeatData(channel, in);
                } else if (res.isEvent()) {
                    // 若是是事件,則
                    data = decodeEventData(channel, in);
                } else {
                    // 不然對結果進行解碼
                    DecodeableRpcResult result;
                    if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        result = new DecodeableRpcResult(channel, res, is,
                                (Invocation) getRequestData(id), proto);
                        result.decode();
                    } else {
                        result = new DecodeableRpcResult(channel, res,
                                new UnsafeByteArrayInputStream(readMessageData(is)),
                                (Invocation) getRequestData(id), proto);
                    }
                    data = result;
                }
                // 把結果從新放入response中
                res.setResult(data);
            } else {
                // 不然設置錯誤信息
                res.setErrorMessage(in.readUTF());
            }
        } catch (Throwable t) {
            if (log.isWarnEnabled()) {
                log.warn("Decode response failed: " + t.getMessage(), t);
            }
            res.setStatus(Response.CLIENT_ERROR);
            res.setErrorMessage(StringUtils.toString(t));
        }
        return res;
    } else {
        // decode request.
        // 若是該消息是request
        Request req = new Request(id);
        // 設置版本
        req.setVersion(Version.getProtocolVersion());
        // 設置是不是雙向請求
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
        // 設置是不是事件,該地方問題也在新版本修復
        if ((flag & FLAG_EVENT) != 0) {
            req.setEvent(Request.HEARTBEAT_EVENT);
        }
        try {
            Object data;
            // 反序列化
            ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
            // 進行解碼
            if (req.isHeartbeat()) {
                data = decodeHeartbeatData(channel, in);
            } else if (req.isEvent()) {
                data = decodeEventData(channel, in);
            } else {
                DecodeableRpcInvocation inv;
                if (channel.getUrl().getParameter(
                        Constants.DECODE_IN_IO_THREAD_KEY,
                        Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                    inv = new DecodeableRpcInvocation(channel, req, is, proto);
                    inv.decode();
                } else {
                    inv = new DecodeableRpcInvocation(channel, req,
                            new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                }
                data = inv;
            }
            // 把body數據設置到response
            req.setData(data);
        } catch (Throwable t) {
            if (log.isWarnEnabled()) {
                log.warn("Decode request failed: " + t.getMessage(), t);
            }
            // bad request
            req.setBroken(true);
            req.setData(t);
        }
        return req;
    }
}

該方法是對request和response進行解碼,用位運算來進行解碼,其中的邏輯跟我在 《dubbo源碼解析(十)遠程通訊——Exchange層》中講到的編解碼器邏輯差很少。

3.encodeRequestData

@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
    RpcInvocation inv = (RpcInvocation) data;

    // 輸出版本
    out.writeUTF(version);
    // 輸出path
    out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
    // 輸出版本號
    out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

    // 輸出方法名稱
    out.writeUTF(inv.getMethodName());
    // 輸出參數類型
    out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
    // 輸出參數
    Object[] args = inv.getArguments();
    if (args != null)
        for (int i = 0; i < args.length; i++) {
            out.writeObject(encodeInvocationArgument(channel, inv, i));
        }
    // 輸出附加值
    out.writeObject(inv.getAttachments());
}

該方法是對請求數據的編碼。

4.encodeResponseData

@Override
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
    Result result = (Result) data;
    // currently, the version value in Response records the version of Request
    boolean attach = Version.isSupportResponseAttatchment(version);
    // 得到異常
    Throwable th = result.getException();
    if (th == null) {
        Object ret = result.getValue();
        // 根據結果的不一樣輸出不一樣的值
        if (ret == null) {
            out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
        } else {
            out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
            out.writeObject(ret);
        }
    } else {
        // 若是有異常,則輸出異常
        out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
        out.writeObject(th);
    }

    if (attach) {
        // returns current version of Response to consumer side.
        // 在附加值中加入版本號
        result.getAttachments().put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
        // 輸出版本號
        out.writeObject(result.getAttachments());
    }
}

該方法是對響應數據的編碼。

(十二)DubboCountCodec

該類是對DubboCodec的功能加強,增長了消息長度的限制。

public final class DubboCountCodec implements Codec2 {

    private DubboCodec codec = new DubboCodec();

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        codec.encode(channel, buffer, msg);
    }

    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        // 保存讀取的標誌
        int save = buffer.readerIndex();
        MultiMessage result = MultiMessage.create();
        do {
            Object obj = codec.decode(channel, buffer);
            // 粘包拆包
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                buffer.readerIndex(save);
                break;
            } else {
                // 增長消息
                result.addMessage(obj);
                // 記錄消息長度
                logMessageLength(obj, buffer.readerIndex() - save);
                save = buffer.readerIndex();
            }
        } while (true);
        // 若是結果爲空,則返回須要更多的輸入
        if (result.isEmpty()) {
            return Codec2.DecodeResult.NEED_MORE_INPUT;
        }
        if (result.size() == 1) {
            return result.get(0);
        }
        return result;
    }

    private void logMessageLength(Object result, int bytes) {
        if (bytes <= 0) {
            return;
        }
        // 若是是request類型
        if (result instanceof Request) {
            try {
                // 設置附加值
                ((RpcInvocation) ((Request) result).getData()).setAttachment(
                        Constants.INPUT_KEY, String.valueOf(bytes));
            } catch (Throwable e) {
                /* ignore */
            }
        } else if (result instanceof Response) {
            try {
                // 設置附加值 輸出的長度
                ((RpcResult) ((Response) result).getResult()).setAttachment(
                        Constants.OUTPUT_KEY, String.valueOf(bytes));
            } catch (Throwable e) {
                /* ignore */
            }
        }
    }

}

(十三)TraceFilter

該過濾器是加強的功能是通道的跟蹤,會在通道內把最大的調用次數和如今的調用數量放進去。方便使用telnet來跟蹤服務的調用次數等。

1.屬性

/**
 * 跟蹤數量的最大值key
 */
private static final String TRACE_MAX = "trace.max";

/**
 * 跟蹤的數量
 */
private static final String TRACE_COUNT = "trace.count";

/**
 * 通道集合
 */
private static final ConcurrentMap<String, Set<Channel>> tracers = new ConcurrentHashMap<String, Set<Channel>>();

2.addTracer

public static void addTracer(Class<?> type, String method, Channel channel, int max) {
    // 設置最大的數量
    channel.setAttribute(TRACE_MAX, max);
    // 設置當前的數量
    channel.setAttribute(TRACE_COUNT, new AtomicInteger());
    // 得到key
    String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName();
    // 得到通道集合
    Set<Channel> channels = tracers.get(key);
    // 若是爲空,則新建
    if (channels == null) {
        tracers.putIfAbsent(key, new ConcurrentHashSet<Channel>());
        channels = tracers.get(key);
    }
    channels.add(channel);
}

該方法是對某一個通道進行跟蹤,把如今的調用數量放到屬性裏面

3.removeTracer

public static void removeTracer(Class<?> type, String method, Channel channel) {
    // 移除最大值屬性
    channel.removeAttribute(TRACE_MAX);
    // 移除數量屬性
    channel.removeAttribute(TRACE_COUNT);
    String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName();
    Set<Channel> channels = tracers.get(key);
    if (channels != null) {
        // 集合中移除該通道
        channels.remove(channel);
    }
}

該方法是移除通道的跟蹤。

4.invoke

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    // 開始時間
    long start = System.currentTimeMillis();
    // 調用下一個調用鏈 得到結果
    Result result = invoker.invoke(invocation);
    // 調用結束時間
    long end = System.currentTimeMillis();
    // 若是通道跟蹤大小大於0
    if (tracers.size() > 0) {
        // 服務key
        String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
        // 得到通道集合
        Set<Channel> channels = tracers.get(key);
        if (channels == null || channels.isEmpty()) {
            key = invoker.getInterface().getName();
            channels = tracers.get(key);
        }
        if (channels != null && !channels.isEmpty()) {
            // 遍歷通道集合
            for (Channel channel : new ArrayList<Channel>(channels)) {
                // 若是通道是鏈接的
                if (channel.isConnected()) {
                    try {
                        // 得到跟蹤的最大數
                        int max = 1;
                        Integer m = (Integer) channel.getAttribute(TRACE_MAX);
                        if (m != null) {
                            max = (int) m;
                        }
                        // 得到跟蹤數量
                        int count = 0;
                        AtomicInteger c = (AtomicInteger) channel.getAttribute(TRACE_COUNT);
                        if (c == null) {
                            c = new AtomicInteger();
                            channel.setAttribute(TRACE_COUNT, c);
                        }
                        count = c.getAndIncrement();
                        // 若是數量小於最大數量則發送
                        if (count < max) {
                            String prompt = channel.getUrl().getParameter(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT);
                            channel.send("\r\n" + RpcContext.getContext().getRemoteAddress() + " -> "
                                    + invoker.getInterface().getName()
                                    + "." + invocation.getMethodName()
                                    + "(" + JSON.toJSONString(invocation.getArguments()) + ")" + " -> " + JSON.toJSONString(result.getValue())
                                    + "\r\nelapsed: " + (end - start) + " ms."
                                    + "\r\n\r\n" + prompt);
                        }
                        // 若是數量大於等於max - 1,則移除該通道
                        if (count >= max - 1) {
                            channels.remove(channel);
                        }
                    } catch (Throwable e) {
                        channels.remove(channel);
                        logger.warn(e.getMessage(), e);
                    }
                } else {
                    // 若是未鏈接,也移除該通道
                    channels.remove(channel);
                }
            }
        }
    }
    return result;
}

該方法是當服務被調用時,進行跟蹤或者取消跟蹤的處理邏輯,是核心的功能加強邏輯。

(十四)FutureFilter

該類是處理異步和同步調用結果的過濾器。

1.invoke

@Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
    // 是不是異步的調用
    final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);

    fireInvokeCallback(invoker, invocation);
    // need to configure if there's return value before the invocation in order to help invoker to judge if it's
    // necessary to return future.
    Result result = invoker.invoke(invocation);
    if (isAsync) {
        // 調用異步處理
        asyncCallback(invoker, invocation);
    } else {
        // 調用同步結果處理
        syncCallback(invoker, invocation, result);
    }
    return result;
}

該方法中根據是否爲異步調用來分別執行asyncCallback和syncCallback方法。

2.syncCallback

private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
    // 若是有異常
    if (result.hasException()) {
        // 則調用異常的結果處理
        fireThrowCallback(invoker, invocation, result.getException());
    } else {
        // 調用正常的結果處理
        fireReturnCallback(invoker, invocation, result.getValue());
    }
}

該方法是同步調用的返回結果處理,比較簡單。

3.asyncCallback

private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
    Future<?> f = RpcContext.getContext().getFuture();
    if (f instanceof FutureAdapter) {
        ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
        // 設置回調
        future.setCallback(new ResponseCallback() {
            @Override
            public void done(Object rpcResult) {
                // 若是結果爲空,則打印錯誤日誌
                if (rpcResult == null) {
                    logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                    return;
                }
                ///must be rpcResult
                // 若是不是Result則打印錯誤日誌
                if (!(rpcResult instanceof Result)) {
                    logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
                    return;
                }
                Result result = (Result) rpcResult;
                if (result.hasException()) {
                    // 若是有異常,則調用異常處理方法
                    fireThrowCallback(invoker, invocation, result.getException());
                } else {
                    // 若是正常的返回結果,則調用正常的處理方法
                    fireReturnCallback(invoker, invocation, result.getValue());
                }
            }

            @Override
            public void caught(Throwable exception) {
                fireThrowCallback(invoker, invocation, exception);
            }
        });
    }
}

該方法是異步調用的結果處理,把異步返回結果的邏輯寫在回調函數裏面。

4.fireInvokeCallback

private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
    // 得到調用的方法
    final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
    // 得到調用的服務
    final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));

    if (onInvokeMethod == null && onInvokeInst == null) {
        return;
    }
    if (onInvokeMethod == null || onInvokeInst == null) {
        throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
    }
    // 若是不能夠訪問,則設置爲可訪問
    if (!onInvokeMethod.isAccessible()) {
        onInvokeMethod.setAccessible(true);
    }

    // 得到參數數組
    Object[] params = invocation.getArguments();
    try {
        // 調用方法
        onInvokeMethod.invoke(onInvokeInst, params);
    } catch (InvocationTargetException e) {
        fireThrowCallback(invoker, invocation, e.getTargetException());
    } catch (Throwable e) {
        fireThrowCallback(invoker, invocation, e);
    }
}

該方法是調用方法的執行。

5.fireReturnCallback

private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
    final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
    final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));

    //not set onreturn callback
    if (onReturnMethod == null && onReturnInst == null) {
        return;
    }

    if (onReturnMethod == null || onReturnInst == null) {
        throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
    }
    if (!onReturnMethod.isAccessible()) {
        onReturnMethod.setAccessible(true);
    }

    Object[] args = invocation.getArguments();
    Object[] params;
    // 得到返回結果類型
    Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
    // 設置參數和返回結果
    if (rParaTypes.length > 1) {
        if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
            params = new Object[2];
            params[0] = result;
            params[1] = args;
        } else {
            params = new Object[args.length + 1];
            params[0] = result;
            System.arraycopy(args, 0, params, 1, args.length);
        }
    } else {
        params = new Object[]{result};
    }
    try {
        // 調用方法
        onReturnMethod.invoke(onReturnInst, params);
    } catch (InvocationTargetException e) {
        fireThrowCallback(invoker, invocation, e.getTargetException());
    } catch (Throwable e) {
        fireThrowCallback(invoker, invocation, e);
    }
}

該方法是正常的返回結果的處理。

6.fireThrowCallback

private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
    final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
    final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));

    //onthrow callback not configured
    if (onthrowMethod == null && onthrowInst == null) {
        return;
    }
    if (onthrowMethod == null || onthrowInst == null) {
        throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
    }
    if (!onthrowMethod.isAccessible()) {
        onthrowMethod.setAccessible(true);
    }
    // 得到拋出異常的類型
    Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
    if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
        try {
            Object[] args = invocation.getArguments();
            Object[] params;

            // 把類型和拋出的異常值放入返回結果
            if (rParaTypes.length > 1) {
                if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                    params = new Object[2];
                    params[0] = exception;
                    params[1] = args;
                } else {
                    params = new Object[args.length + 1];
                    params[0] = exception;
                    System.arraycopy(args, 0, params, 1, args.length);
                }
            } else {
                params = new Object[]{exception};
            }
            // 調用下一個調用連
            onthrowMethod.invoke(onthrowInst, params);
        } catch (Throwable e) {
            logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
        }
    } else {
        logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
    }
}

該方法是異常拋出時的結果處理。

(十五)ServerStatusChecker

該類是對於服務狀態的監控設置。

public class ServerStatusChecker implements StatusChecker {

    @Override
    public Status check() {
        // 得到服務集合
        Collection<ExchangeServer> servers = DubboProtocol.getDubboProtocol().getServers();
        // 若是爲空則返回UNKNOWN的狀態
        if (servers == null || servers.isEmpty()) {
            return new Status(Status.Level.UNKNOWN);
        }
        // 設置狀態爲ok
        Status.Level level = Status.Level.OK;
        StringBuilder buf = new StringBuilder();
        // 遍歷集合
        for (ExchangeServer server : servers) {
            // 若是服務沒有綁定到本地端口
            if (!server.isBound()) {
                // 狀態改成error
                level = Status.Level.ERROR;
                // 加入服務本地地址
                buf.setLength(0);
                buf.append(server.getLocalAddress());
                break;
            }
            if (buf.length() > 0) {
                buf.append(",");
            }
            // 若是服務綁定了本地端口,拼接clients數量
            buf.append(server.getLocalAddress());
            buf.append("(clients:");
            buf.append(server.getChannels().size());
            buf.append(")");
        }
        return new Status(level, buf.toString());
    }

}

(十六)ThreadPoolStatusChecker

該類是對於線程池的狀態進行監控。

@Activate
public class ThreadPoolStatusChecker implements StatusChecker {

    @Override
    public Status check() {
        // 得到數據中心
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        // 得到線程池集合
        Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);

        StringBuilder msg = new StringBuilder();
        // 設置爲ok
        Status.Level level = Status.Level.OK;
        // 遍歷線程池集合
        for (Map.Entry<String, Object> entry : executors.entrySet()) {
            String port = entry.getKey();
            ExecutorService executor = (ExecutorService) entry.getValue();

            if (executor != null && executor instanceof ThreadPoolExecutor) {
                ThreadPoolExecutor tp = (ThreadPoolExecutor) executor;
                boolean ok = tp.getActiveCount() < tp.getMaximumPoolSize() - 1;
                Status.Level lvl = Status.Level.OK;
                // 若是活躍數量超過了最大的線程數量,則設置warn
                if (!ok) {
                    level = Status.Level.WARN;
                    lvl = Status.Level.WARN;
                }

                if (msg.length() > 0) {
                    msg.append(";");
                }
                // 輸出線程池相關信息
                msg.append("Pool status:" + lvl
                        + ", max:" + tp.getMaximumPoolSize()
                        + ", core:" + tp.getCorePoolSize()
                        + ", largest:" + tp.getLargestPoolSize()
                        + ", active:" + tp.getActiveCount()
                        + ", task:" + tp.getTaskCount()
                        + ", service port: " + port);
            }
        }
        return msg.length() == 0 ? new Status(Status.Level.UNKNOWN) : new Status(level, msg.toString());
    }

}

邏輯比較簡單,我就不贅述了。

關於telnet下的相關實現請感興趣的朋友直接查看,裏面都是對於telnet命令的實現,內容比較獨立。

後記

該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...

該文章講解了遠程調用中關於dubbo協議的部分,dubbo協議是官方推薦使用的協議,而且對於telnet命令也作了很好的支持,要看懂這部分的邏輯,必須先對於以前的一些接口設計瞭解的很清楚。接下來我將開始對rpc模塊關於hessian協議部分進行講解。

相關文章
相關標籤/搜索