dubbo源碼解析(九)遠程通訊——Transport層

遠程通信——Transport層

目標:介紹Transport層的相關設計和邏輯、介紹dubbo-remoting-api中的transport包內的源碼解析。

前言

先預警一下,該文篇幅會很長,作好心理準備。Transport層也就是網絡傳輸層,在遠程通訊中必然會涉及到傳輸。它在dubbo 的框架設計中也處於倒數第二層,固然最底層是序列化,這個後面介紹。官方文檔對Transport層的解釋是抽象 mina 和 netty 爲統一接口,以 Message 爲中心,擴展接口爲 Channel、Transporter、Client、Server、Codec。那咱們如今先來看這個包下面的類圖:java

transport包類圖

能夠看到有四個包繼承了AbstractChannel、AbstractServer、AbstractClient。也就是說如今Transport層是抽象mina、netty以及grizzly爲統一接口。看完類圖,再來看看包結構:git

transport包結構

下面的講解大體會按照類圖中類的順序往下講,儘可能把client、server、channel、codec、dispacher五部分涉及到的內容一塊兒講解。github

源碼解析

(一)AbstractPeer

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    private final ChannelHandler handler;

    private volatile URL url;
    /**
     * 是否正在關閉
     */
    // closing closed means the process is being closed and close is finished
    private volatile boolean closing;
    /**
     * 是否關閉完成
     */
    private volatile boolean closed;

    public AbstractPeer(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }
}

該類實現了Endpoint和ChannelHandler兩個接口,要關注的兩個點:segmentfault

  1. 實現ChannelHandler接口而且有在屬性中還有一個handler,下面不少實現方法也是直接調用了handler方法,這種模式叫作裝飾模式,這樣作能夠對裝飾對象靈活的加強功能。對裝飾模式不懂的朋友能夠google一下。有不少例子介紹。
  2. 在該類中有closing和closed屬性,在Endpoint中有不少關於關閉通道的操做,會有關閉中和關閉完成的狀態區分,在該類中就緩存了這兩個屬性來判斷關閉的狀態。

下面我就介紹該類中的send方法,其餘方法比較好理解,到時候能夠直接看源碼:api

@Override
public void send(Object message) throws RemotingException {
    // url中sent的配置項
    send(message, url.getParameter(Constants.SENT_KEY, false));
}

該配置項是選擇是否等待消息發出:緩存

  1. sent值爲true,等待消息發出,消息發送失敗將拋出異常。
  2. sent值爲false,不等待消息發出,將消息放入 IO 隊列,即刻返回。

對該類還有點糊塗的朋友,記住在ChannelHandler接口,該類就作了裝飾模式中裝飾角色,在Endpoint接口,只是維護了通道的正在關閉和關閉完成兩個狀態。服務器

(二)AbstractEndpoint

public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {

    /**
     * 日誌記錄
     */
    private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class);

    /**
     * 編解碼器
     */
    private Codec2 codec;

    /**
     * 超時時間
     */
    private int timeout;

    /**
     * 鏈接超時時間
     */
    private int connectTimeout;

    public AbstractEndpoint(URL url, ChannelHandler handler) {
        super(url, handler);
        this.codec = getChannelCodec(url);
        // 優先從url配置中取,若是沒有,默認爲1s
        this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 優先從url配置中取,若是沒有,默認爲3s
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }

    /**
     * 從url中得到編解碼器的配置,而且返回該實例
     * @param url
     * @return
     */
    protected static Codec2 getChannelCodec(URL url) {
        String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
        // 優先從Codec2的擴展類中找
        if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
            return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
        } else {
            return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
                    .getExtension(codecName));
        }
    }

}

該類是端點的抽象類,其中封裝了編解碼器以及兩個超時時間。基於dubbo 的SPI機制,得到相應的編解碼器實現對象,編解碼器優先從Codec2的擴展類中尋找。微信

下面來看看該類中的reset方法:網絡

@Override
public void reset(URL url) {
    if (isClosed()) {
        throw new IllegalStateException("Failed to reset parameters "
                + url + ", cause: Channel closed. channel: " + getLocalAddress());
    }
    try {
        // 判斷重置的url中有沒有攜帶timeout,有的話重置
        if (url.hasParameter(Constants.TIMEOUT_KEY)) {
            int t = url.getParameter(Constants.TIMEOUT_KEY, 0);
            if (t > 0) {
                this.timeout = t;
            }
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
    try {
        // 判斷重置的url中有沒有攜帶connect.timeout,有的話重置
        if (url.hasParameter(Constants.CONNECT_TIMEOUT_KEY)) {
            int t = url.getParameter(Constants.CONNECT_TIMEOUT_KEY, 0);
            if (t > 0) {
                this.connectTimeout = t;
            }
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
    try {
        // 判斷重置的url中有沒有攜帶codec,有的話重置
        if (url.hasParameter(Constants.CODEC_KEY)) {
            this.codec = getChannelCodec(url);
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
}

@Deprecated
public void reset(com.alibaba.dubbo.common.Parameters parameters) {
    reset(getUrl().addParameters(parameters.getParameters()));
}

這個方法是Resetable接口中的方法,能夠看到之前的reset實現方法都加上了@Deprecated註解,不推薦使用了,由於這種實現方式重置太複雜,須要把全部參數都設置一遍,好比我只想重置一個超時時間,可是其餘值不變,若是用之前的reset,我須要在url中把全部值都帶上,就會不少餘。如今用新的reset,每次只關心我須要重置的值,只更改成須要重置的值。好比上面的代碼所示,只想修改超時時間,那我就只在url中攜帶超時時間的參數。app

(三)AbstractServer

該類繼承了AbstractEndpoint而且實現Server接口,是服務器抽象類。重點實現了服務器的公共邏輯,好比發送消息,關閉通道,鏈接通道,斷開鏈接等。而且抽象了打開和關閉服務器兩個方法。

1.屬性

/**
 * 服務器線程名稱
 */
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
/**
 * 線程池
 */
ExecutorService executor;
/**
 * 服務地址,也就是本地地址
 */
private InetSocketAddress localAddress;
/**
 * 綁定地址
 */
private InetSocketAddress bindAddress;
/**
 * 最大可接受的鏈接數
 */
private int accepts;
/**
 * 空閒超時時間,單位是s
 */
private int idleTimeout = 600; //600 seconds

該類的屬性比較好理解,就是稍微注意一下idleTimeout的單位是s。

2.構造函數

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    // 從url中得到本地地址
    localAddress = getUrl().toInetSocketAddress();

    // 從url配置中得到綁定的ip
    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    // 從url配置中得到綁定的端口號
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    // 判斷url中配置anyhost是否爲true或者判斷host是否爲不可用的本地Host
    if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = NetUtils.ANYHOST;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    // 從url中獲取配置,默認值爲0
    this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
    // 從url中獲取配置,默認600s
    this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
    try {
        // 開啓服務器
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    // 得到線程池
    //fixme replace this with better method
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}

構造函數大部分邏輯就是從url中取配置,存到緩存中,而且作了開啓服務器的操做。具體的看上面的註釋,仍是比較清晰的。

3.reset方法

@Override
public void reset(URL url) {
    if (url == null) {
        return;
    }
    try {
        // 重置accepts的值
        if (url.hasParameter(Constants.ACCEPTS_KEY)) {
            int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
            if (a > 0) {
                this.accepts = a;
            }
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
    try {
        // 重置idle.timeout的值
        if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
            int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
            if (t > 0) {
                this.idleTimeout = t;
            }
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
    try {
        // 重置線程數配置
        if (url.hasParameter(Constants.THREADS_KEY)
                && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
            // 得到url配置中的線程數
            int threads = url.getParameter(Constants.THREADS_KEY, 0);
            // 得到線程池容許的最大線程數
            int max = threadPoolExecutor.getMaximumPoolSize();
            // 返回核心線程數
            int core = threadPoolExecutor.getCorePoolSize();
            // 設置最大線程數和核心線程數
            if (threads > 0 && (threads != max || threads != core)) {
                if (threads < core) {
                    // 若是設置的線程數比核心線程數少,則直接設置核心線程數
                    threadPoolExecutor.setCorePoolSize(threads);
                    if (core == max) {
                        // 當核心線程數和最大線程數相等的時候,把最大線程數也重置
                        threadPoolExecutor.setMaximumPoolSize(threads);
                    }
                } else {
                    // 當大於核心線程數時,直接設置最大線程數
                    threadPoolExecutor.setMaximumPoolSize(threads);
                    // 只有當核心線程數和最大線程數相等的時候才設置核心線程數
                    if (core == max) {
                        threadPoolExecutor.setCorePoolSize(threads);
                    }
                }
            }
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
    // 重置url
    super.setUrl(getUrl().addParameters(url.getParameters()));
}

該類中的reset方法作了三個值的重置,分別是最大可鏈接的客戶端數量、空閒超時時間以及線程池的兩個配置參數。其中要注意核心線程數和最大線程數的區別。舉個例子,核心線程數就像是工廠正式工,最大線程數,就是工廠臨時工做量加大,請了一批臨時工,臨時工加正式工的和就是最大線程數,等這批任務結束後,臨時工要辭退的,而正式工會留下。

還有send、close、connected、disconnected等方法比較簡單,若是有興趣,能夠到個人GitHub查看,地址文章末尾會給出。

(四)AbstractClient

該類是客戶端的抽象類,繼承了AbstractEndpoint類,實現了Client接口,該類中也是作了客戶端公用的重連邏輯,抽象了打開客戶端、關閉客戶端、鏈接服務器、斷開服務器鏈接以及得到通道方法,讓子類去重點關注這幾個方法。

1.屬性

/**
 * 客戶端線程名稱
 */
protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
/**
 * 線程池id
 */
private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger();
/**
 * 重連定時任務執行器
 */
private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
/**
 * 鏈接鎖
 */
private final Lock connectLock = new ReentrantLock();
/**
 * 發送消息時,若斷開,是否重連
 */
private final boolean send_reconnect;
/**
 * 重連次數
 */
private final AtomicInteger reconnect_count = new AtomicInteger(0);
/**
 * 在這以前是否調用從新鏈接的錯誤日誌
 */
// Reconnection error log has been called before?
private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
/**
 * 重連 warning 的間隔.(waring多少次以後,warning一次),也就是錯誤多少次後告警一次錯誤
 */
// reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
private final int reconnect_warning_period;
/**
 * 關閉超時時間
 */
private final long shutdown_timeout;
/**
 * 線程池
 */
protected volatile ExecutorService executor;
/**
 * 重連執行任務
 */
private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
// the last successed connected time
/**
 * 最後成功鏈接的時間
 */
private long lastConnectedTime = System.currentTimeMillis();

上述屬性大部分跟重連有關,該類最重要的也是封裝了重連的邏輯。

2.構造函數

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);

    // 從url中得到是否重連的配置,默認爲false
    send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);

    // 從url中得到關閉超時時間,默認爲900s
    shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);

    // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
    // 重連的默認值是2s,重連 warning 的間隔默認是1800,當出錯的時候,每隔1800*2=3600s報警一次
    reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);

    try {
        // 打開客戶端
        doOpen();
    } catch (Throwable t) {
        close();
        throw new RemotingException(url.toInetSocketAddress(), null,
                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    }
    try {
        // connect.
        // 鏈接服務器
        connect();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
        }
    } catch (RemotingException t) {
        if (url.getParameter(Constants.CHECK_KEY, true)) {
            close();
            throw t;
        } else {
            logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                    + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
        }
    } catch (Throwable t) {
        close();
        throw new RemotingException(url.toInetSocketAddress(), null,
                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    }

    // 從緩存中得到線程池
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    // 清楚線程池緩存
    ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}

該構造函數中作了一些屬性值的設置,而且作了打開客戶端和鏈接服務器的操做。

3.wrapChannelHandler

protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
    // 加入線程名稱
    url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
    // 設置使用的線程池類型
    url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
    // 包裝
    return ChannelHandlers.wrap(handler, url);
}

該方法是包裝通道處理器,設置使用的線程池類型是可緩存線程池。

4.initConnectStatusCheckCommand

private synchronized void initConnectStatusCheckCommand() {
    //reconnect=false to close reconnect
    int reconnect = getReconnectParam(getUrl());
    // 有鏈接頻率的值,而且當前沒有鏈接任務
    if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
        Runnable connectStatusCheckCommand = new Runnable() {
            @Override
            public void run() {
                try {
                    if (!isConnected()) {
                        // 重連
                        connect();
                    } else {
                        // 記錄最後一次重連的時間
                        lastConnectedTime = System.currentTimeMillis();
                    }
                } catch (Throwable t) {
                    String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
                    // wait registry sync provider list
                    if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
                        // 若是以前沒有打印太重連的誤日誌
                        if (!reconnect_error_log_flag.get()) {
                            reconnect_error_log_flag.set(true);
                            // 打印日誌
                            logger.error(errorMsg, t);
                            return;
                        }
                    }
                    // 若是到達一次重連日誌告警週期,則打印告警日誌
                    if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
                        logger.warn(errorMsg, t);
                    }
                }
            }
        };
        // 開啓重連定時任務
        reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
    }
}

該方法是初始化重連線程,其中作了重連失敗後的告警日誌和錯誤日誌打印策略。

5.reconnect

@Override
public void reconnect() throws RemotingException {
    disconnect();
    connect();
}

單獨放該方法是由於這是該類關注的重點。實現了客戶端的重連邏輯。

6.其餘

connect、disconnect、close等方法都是調用了對應的抽象方法,而具體的邏輯須要看具體的子類如何去實現相關的抽象方法,這幾個方法邏輯比較簡單,我不在這裏貼出源碼,有興趣能夠看個人GitHub,地址文章末尾會給出。

(四)AbstractChannel

該類是通道的抽象類,該類裏面作的邏輯很簡單,具體的發送消息邏輯在它 的子類中實現。

@Override
public void send(Object message, boolean sent) throws RemotingException {
    // 檢測通道是否關閉
    if (isClosed()) {
        throw new RemotingException(this, "Failed to send message "
                + (message == null ? "" : message.getClass().getName()) + ":" + message
                + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
    }
}

能夠看到send方法,其中只作了檢測通道是否關閉的狀態檢測,沒有實現具體的發送消息的邏輯。

(五)ChannelHandlerDelegate

該類繼承了ChannelHandler,從它的名字能夠看出是ChannelHandler的表明,它就是做爲裝飾模式中的Component角色,後面講到的AbstractChannelHandlerDelegate做爲裝飾模式中的Decorator角色。

public interface ChannelHandlerDelegate extends ChannelHandler {
    /**
     * 得到通道
     * @return
     */
    ChannelHandler getHandler();
}

(六)AbstractChannelHandlerDelegate

屬性:

protected ChannelHandler handler

該類實現了ChannelHandlerDelegate接口,而且有一個屬性是ChannelHandler,上述已經說到這是裝飾模式中的裝飾角色,其中的全部實現方法都直接調用被裝飾的handler屬性的方法。

(七)DecodeHandler

該類爲解碼處理器,繼承了AbstractChannelHandlerDelegate,對接收到的消息進行解碼,在父類處理接收消息的功能上疊加了解碼功能。

咱們來看看received方法:

@Override
public void received(Channel channel, Object message) throws RemotingException {
    // 若是是Decodeable類型的消息,則對整個消息解碼
    if (message instanceof Decodeable) {
        decode(message);
    }

    // 若是是Request請求類型消息,則對請求中對請求數據解碼
    if (message instanceof Request) {
        decode(((Request) message).getData());
    }

    // 若是是Response返回類型的消息,則對返回消息中對結果進行解碼
    if (message instanceof Response) {
        decode(((Response) message).getResult());
    }

    // 繼續將消息委託給handler,繼續處理
    handler.received(channel, message);
}

能夠看到作了三次判斷,根據消息的不一樣會對消息的不一樣數據作解碼。能夠看到,這裏用到裝飾模式後,在處理消息的前面作了解碼的處理,而且還能繼續委託給handler來處理消息,經過組合作到了功能的疊加。

private void decode(Object message) {
    // 若是消息類型是Decodeable,進一步調用Decodeable的decode來解碼
    if (message != null && message instanceof Decodeable) {
        try {
            ((Decodeable) message).decode();
            if (log.isDebugEnabled()) {
                log.debug("Decode decodeable message " + message.getClass().getName());
            }
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
            }
        } // ~ end of catch
    } // ~ end of if
} // ~ end of method decode

能夠看到這是解析消息的邏輯,當消息是Decodeable類型,還會繼續調用Decodeable的decode方法來進行解析。它的實現類後續會講解到。

(八)MultiMessageHandler

該類是多消息處理器的抽象類。一樣繼承了AbstractChannelHandlerDelegate類,咱們來看看它的received方法:

@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {
    // 當消息爲多消息時 循環交給handler處理接收到當消息
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for (Object obj : list) {
            handler.received(channel, obj);
        }
    } else {
        // 若是是單消息,就直接交給handler處理器
        handler.received(channel, message);
    }
}

邏輯很簡單,當消息是多消息類型時,也就是一次性接收到多條消息的狀況,循環去處理消息,當消息是單消息時候,直接交給handler去處理。

(九)WrappedChannelHandler

該類跟AbstractChannelHandlerDelegate的做用相似,都是裝飾模式中的裝飾角色,其中的全部實現方法都直接調用被裝飾的handler屬性的方法,該類是爲了添加線程池的功能,它的子類都是去關心哪些消息是須要分發到線程池的,哪些消息直接由I / O線程執行,如今版本有四種場景,也就是它的四個子類,下面我一一描述。

public WrappedChannelHandler(ChannelHandler handler, URL url) {
    this.handler = handler;
    this.url = url;
    // 建立線程池
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

    // 設置組件的key
    String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
    if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
        componentKey = Constants.CONSUMER_SIDE;
    }
    // 得到dataStore實例
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    // 把線程池放到dataStore中緩存
    dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}

能夠看到構造方法除了屬性的填充之外,線程池是基於dubbo 的SPI Adaptive機制建立的,在dataStore中把線程池加進去, 該線程池就是AbstractClient 或 AbstractServer 從 DataStore 得到的線程池。

public ExecutorService getExecutorService() {
    // 首先返回的不是共享線程池,是該類的線程池
    ExecutorService cexecutor = executor;
    // 若是該類的線程池關閉或者爲空,則返回的是共享線程池
    if (cexecutor == null || cexecutor.isShutdown()) {
        cexecutor = SHARED_EXECUTOR;
    }
    return cexecutor;
}

該方法是得到線程池的實例,不過該類裏面有兩個線程池,還加入了一個共享線程池,共享線程池優先級較低。

(十)ExecutionChannelHandler

該類繼承了WrappedChannelHandler,也是加強了功能,處理的是接收請求消息時,把請求消息分發到線程池,而除了請求消息之外,其餘消息類型都直接經過I / O線程直接執行。

@Override
public void received(Channel channel, Object message) throws RemotingException {
    // 得到線程池實例
    ExecutorService cexecutor = getExecutorService();
    // 若是消息是request類型,纔會分發到線程池,其餘消息,如響應,鏈接,斷開鏈接,心跳將由I / O線程直接執行。
    if (message instanceof Request) {
        try {
            // 把請求消息分發到線程池
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
            // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
            // this scenario from happening, but a better solution should be considered later.
            // 當線程池滿了,SERVER_THREADPOOL_EXHAUSTED_ERROR錯誤沒法正常返回
            // 所以消費者方必須等到超時。這是一種預防的臨時解決方案,因此這裏直接返回該錯誤
            if (t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort()
                            + ") thread pool is exhausted, detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
        }
    } else {
        // 若是消息不是request類型,則直接處理
        handler.received(channel, message);
    }
}

上述就能夠都看到對於請求消息的處理,其中有個打補丁的方式是當線程池滿了的時候,消費者只能等待請求超時,因此這裏直接返回線程池滿的錯誤。

(十一)AllChannelHandler

該類也繼承了WrappedChannelHandler,也是爲了加強功能,處理的是鏈接、斷開鏈接、捕獲異常以及接收到的全部消息都分發到線程池。

@Override
public void connected(Channel channel) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        // 把鏈接操做分發到線程池處理
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    } catch (Throwable t) {
        throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
    }
}

@Override
public void disconnected(Channel channel) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        // 把斷開鏈接操做分發到線程池處理
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
    } catch (Throwable t) {
        throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
    }
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        // 把全部消息分發到線程池處理
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
        //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
        // 這裏處理線程池滿的問題,只有在請求時候會出現。
        //複線程池已滿,拒絕調用,不返回,並致使使用者等待超時
       if(message instanceof Request && t instanceof RejectedExecutionException){
          Request request = (Request)message;
          if(request.isTwoWay()){
             String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
             Response response = new Response(request.getId(), request.getVersion());
             response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
             response.setErrorMessage(msg);
             channel.send(response);
             return;
          }
       }
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        // 把捕獲異常做分發到線程池處理
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
    } catch (Throwable t) {
        throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
    }
}

能夠看到,全部操做以及消息都分到到線程池中。而且注意操做不一樣,傳入的狀態也不一樣。

(十二)ConnectionOrderedChannelHandler

該類也是繼承了WrappedChannelHandler,加強功能,該類是把鏈接、取消鏈接以及接收到的消息都分發到線程池,可是不一樣的是,該類本身建立了一個跟鏈接相關的線程池,把鏈接操做和斷開鏈接操分發到該線程池,而接收到的消息則分發到WrappedChannelHandler的線程池中。來看看具體的實現。

/**
 * 鏈接線程池
 */
protected final ThreadPoolExecutor connectionExecutor;
/**
 * 鏈接隊列大小限制
 */
private final int queuewarninglimit;

public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
    super(handler, url);
    // 得到線程名,默認是Dubbo
    String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
    // 建立鏈接線程池
    connectionExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
            new NamedThreadFactory(threadName, true),
            new AbortPolicyWithReport(threadName, url)
    );  // FIXME There's no place to release connectionExecutor!
    // 設置工做隊列限制,默認是1000
    queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}

能夠屬性中有一個鏈接線程池,看到在構造函數裏建立了該線程池,而queuewarninglimit是用來限制鏈接線程池的工做隊列長度,比較簡單。來看看鏈接和斷開鏈接到邏輯。

@Override
public void connected(Channel channel) throws RemotingException {
    try {
        // 覈對工做隊列長度
        checkQueueLength();
        // 分發鏈接操做
        connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    } catch (Throwable t) {
        throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
    }
}

@Override
public void disconnected(Channel channel) throws RemotingException {
    try {
        // 覈對工做隊列長度
        checkQueueLength();
        // 分發斷開鏈接操做
        connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
    } catch (Throwable t) {
        throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
    }
}

能夠看到,這兩個操做都是分發到鏈接線程池connectionExecutor中,和AllChannelHandle類r中的分發的線程池不是同一個。而ConnectionOrderedChannelHandler的received方法跟AllChannelHandle同樣,我就不貼出來。

(十三)MessageOnlyChannelHandler

該類也是繼承了WrappedChannelHandler,是WrappedChannelHandler的最後一個子類,也是加強功能,不過該類只是處理了全部的消息分發到線程池。能夠看到源碼,比較簡單:

@Override
public void received(Channel channel, Object message) throws RemotingException {
    // 得到線程池實例
    ExecutorService cexecutor = getExecutorService();
    try {
        // 把消息分發到線程池
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

下面我講講解五種線程池的調度策略,也就是我在《dubbo源碼解析(八)遠程通訊——開篇》中提到的Dispatcher接口的五種實現,分別是AllDispatcher、DirectDispatcher、MessageOnlyDispatcher、ExecutionDispatcher、ConnectionOrderedDispatcher。

(十四)AllDispatcher

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 線程池調度方法:任何消息以及操做都分發到線程池中
        return new AllChannelHandler(handler, url);
    }

}

對照着上述講到的AllChannelHandler,是否是很清晰這種線程池的調度方法。而且該調度方法是默認的調度方法。

(十五)ConnectionOrderedDispatcher

public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 線程池調度方法:鏈接、斷開鏈接分發到到線程池和其餘消息分發到線程池不是同一個
        return new ConnectionOrderedChannelHandler(handler, url);
    }

}

對照上述講到的ConnectionOrderedChannelHandler,也很清晰該線程池調度方法。

(十六)DirectDispatcher

public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 直接處理消息,不分發到線程池
        return handler;
    }

}

該線程池調度方法是不調度線程池,直接執行。

(十七)ExecutionDispatcher

public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 線程池調度方法:只有請求消息分發到線程池,其餘都直接執行
        return new ExecutionChannelHandler(handler, url);
    }

}

對照着上述的ExecutionChannelHandler講解,也能夠很清晰的看出該線程池調度策略。

(十八)MessageOnlyDispatcher

public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 只要是接收到的消息,都分發到線程池
        return new MessageOnlyChannelHandler(handler, url);
    }

}

對照着上述講到的MessageOnlyChannelHandler,能夠很清晰該線程池調度策略。

(十九)ChannelHandlers

該類是通道處理器工廠,會對傳入的handler進行一次包裝,不管是Client仍是Server都會作這樣的處理,也就是作了一些功能上的加強,就像上述我說到的裝飾模式中的那些功能。

咱們來看看源碼:

public static ChannelHandler wrap(ChannelHandler handler, URL url) {
    return ChannelHandlers.getInstance().wrapInternal(handler, url);
}

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    // 調用了多消息處理器,對心跳消息進行了功能增強
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
            .getAdaptiveExtension().dispatch(handler, url)));
}

最關鍵的是這兩個方法,看第二個方法,其實就是包裝了MultiMessageHandler功能,增長了多消息處理的功能,以及對心跳消息作了功能加強。

(二十)AbstractCodec

實現 Codec2 接口,,其中實現了一些編解碼的公共邏輯。

1.checkPayload

protected static void checkPayload(Channel channel, long size) throws IOException {
    // 默認長度
    int payload = Constants.DEFAULT_PAYLOAD;
    if (channel != null && channel.getUrl() != null) {
        // 優先從url中得到消息長度配置,若是沒有則用默認長度
        payload = channel.getUrl().getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD);
    }
    // 若是消息長度過長,則報錯
    if (payload > 0 && size > payload) {
        ExceedPayloadLimitException e = new ExceedPayloadLimitException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
        logger.error(e);
        throw e;
    }
}

該方法是檢驗消息長度。

2.getSerialization

protected Serialization getSerialization(Channel channel) {
    return CodecSupport.getSerialization(channel.getUrl());
}

該方法是得到序列化對象。

3.isClientSide

protected boolean isClientSide(Channel channel) {
    // 得到是side對應的value
    String side = (String) channel.getAttribute(Constants.SIDE_KEY);
    if ("client".equals(side)) {
        return true;
    } else if ("server".equals(side)) {
        return false;
    } else {
        InetSocketAddress address = channel.getRemoteAddress();
        URL url = channel.getUrl();
        // 判斷url的主機地址是否和遠程地址同樣,若是是,則判斷爲client,若是不是,則判斷爲server
        boolean client = url.getPort() == address.getPort()
                && NetUtils.filterLocalHost(url.getIp()).equals(
                NetUtils.filterLocalHost(address.getAddress()
                        .getHostAddress()));
        // 把value設置進去
        channel.setAttribute(Constants.SIDE_KEY, client ? "client"
                : "server");
        return client;
    }
}

該方法是判斷是否爲客戶端側的通道。

4.isServerSide

protected boolean isServerSide(Channel channel) {
    return !isClientSide(channel);
}

該方法是判斷是否爲服務端側的通道。

(二十一)TransportCodec

該類是傳輸編解碼器,使用 Serialization 進行序列化/反序列化,直接編解碼。關於序列化爲會在後續文章中介紹。

@Override
public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
    // 得到序列化的 ObjectOutput 對象
    OutputStream output = new ChannelBufferOutputStream(buffer);
    ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output);
    // 寫入 ObjectOutput
    encodeData(channel, objectOutput, message);
    objectOutput.flushBuffer();
    // 釋放
    if (objectOutput instanceof Cleanable) {
        ((Cleanable) objectOutput).cleanup();
    }
}

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    // 得到反序列化的 ObjectInput 對象
    InputStream input = new ChannelBufferInputStream(buffer);
    ObjectInput objectInput = getSerialization(channel).deserialize(channel.getUrl(), input);
    // 讀取 ObjectInput
    Object object = decodeData(channel, objectInput);
    // 釋放
    if (objectInput instanceof Cleanable) {
        ((Cleanable) objectInput).cleanup();
    }
    return object;
}

該類關鍵方法就是編碼和解碼,比較好理解,直接進行了序列化和反序列化。

(二十二)CodecAdapter

該類是Codec 的適配器,用到了適配器模式,把Codec適配成Codec2。將Codec的編碼和解碼方法都適配成Codec2。好比不少時候都只能用Codec2的編解碼器,可是有的時候須要用Codec,可是不能知足致使只能加入適配器來完成使用。

@Override
public void encode(Channel channel, ChannelBuffer buffer, Object message)
        throws IOException {
    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(1024);
    // 調用舊的編解碼器的編碼
    codec.encode(channel, os, message);
    buffer.writeBytes(os.toByteArray());
}

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    byte[] bytes = new byte[buffer.readableBytes()];
    int savedReaderIndex = buffer.readerIndex();
    buffer.readBytes(bytes);
    UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream(bytes);
    // 調用舊的編解碼器的解碼
    Object result = codec.decode(channel, is);
    buffer.readerIndex(savedReaderIndex + is.position());
    return result == Codec.NEED_MORE_INPUT ? DecodeResult.NEED_MORE_INPUT : result;
}

能夠看到,在編碼和解碼的方法中都調用了codec的方法。

(二十三)ChannelDelegate、ServerDelegate、ClientDelegate

ChannelDelegate實現類Channel,ServerDelegate實現了Server,ClientDelegate實現了Client,都用到了裝飾模式,都做爲裝飾模式中的裝飾角色,因此類中的全部實現方法都調用了屬性的方法。具體代碼就不貼了,朋友們能夠自行查看。

(二十四)ChannelHandlerAdapter

該類實現了ChannelHandler接口,是通道處理器適配類,該類中全部實現方法都是空的,全部想實現ChannelHandler接口的類能夠直接繼承該類,選擇須要實現的方法進行實現,不須要實現ChannelHandler接口中全部方法。

(二十五)ChannelHandlerDispatcher

該類是通道處理器調度器,其中緩存了全部通道處理器,有一個通道處理器集合。而且每一個操做都會去遍歷該集合,執行相應的操做,例如:

@Override
public void connected(Channel channel) {
    // 遍歷通道處理器集合
    for (ChannelHandler listener : channelHandlers) {
        try {
            // 鏈接
            listener.connected(channel);
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }
}

(二十六)CodecSupport

該類是編解碼工具類,提供查詢 Serialization 的功能。

/**
 * 序列化對象集合 key爲序列化類型編號
 */
private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>();
/**
 * 序列化擴展名集合 key爲序列化類型編號 value爲序列化擴展名
 */
private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>();

static {
    // 利用dubbo 的SPI機制得到序列化擴展名
    Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
    for (String name : supportedExtensions) {
        // 得到相應擴展名的序列化實現
        Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);
        byte idByte = serialization.getContentTypeId();
        if (ID_SERIALIZATION_MAP.containsKey(idByte)) {
            logger.error("Serialization extension " + serialization.getClass().getName()
                    + " has duplicate id to Serialization extension "
                    + ID_SERIALIZATION_MAP.get(idByte).getClass().getName()
                    + ", ignore this Serialization extension");
            continue;
        }
        // 緩存序列化實現
        ID_SERIALIZATION_MAP.put(idByte, serialization);
        // 緩存序列化編號和擴展名
        ID_SERIALIZATIONNAME_MAP.put(idByte, name);
    }
}

能夠看到該類中緩存了全部的序列化對象和序列化擴展名。能夠從中拿到Serialization。

(二十七)ExceedPayloadLimitException

該類是消息長度限制異常。

public class ExceedPayloadLimitException extends IOException {
    private static final long serialVersionUID = -1112322085391551410L;

    public ExceedPayloadLimitException(String message) {
        super(message);
    }
}

後記

該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...

該文章講解了Transport層的相關設計和邏輯、介紹dubbo-remoting-api中的transport包內的源碼解,其中關鍵的是整個設計都在使用裝飾模式,傳輸層中關鍵的編解碼器以及客戶端、服務的、通道的抽象,還有關鍵的就是線程池的調度方法,熟悉那五種調度方法,對消息的處理。整個傳輸層核心的消息,不少操做圍繞着消息展開。下一篇我會講解交換層exchange部分。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,個人私人微信號碼:HUA799695226。

相關文章
相關標籤/搜索