源碼分析Dubbo網絡通訊篇NettyServer、HeaderExchangeServer

本文主要分析一下NettyServer,HeaderExchangeServer實現細節。ios

一、NettyServer

NettyServer整個類圖以下: 這裏寫圖片描述 首先從全貌上大概看一下NettyServer對象所持有的屬性:bootstrap

  • AbstractPeer
    1. private final ChannelHandler handler 事件處理Handler。
    2. private volatile URL url 該協議的第一個服務提供者的URL, Server只須要用到 URL中的參數,與具體某一個服務沒什麼關係。
  • AbstractEndpoint
    1. private Codec2 codec 編碼解碼器。
    2. private int timeout 超時時間
    3. private int connectTimeout 鏈接超時時間
  • AbstractServer
    1. private InetSocketAddress localAddress :url host:port地址。
    2. private InetSocketAddress bindAddress:若是是多網卡,而且指定了 bind.ip、bind.port,若是爲空,與localAddress相同。
    3. private int accepts : AbstractServer#accepts未使用到。
    4. private int idleTimeout = 600; AbstractServer#accepts未使用到。
  • NettyServer
    1. private Map< String, Channel> channels:< ip:port, channel> 全部通道。
    2. private ServerBootstrap bootstrap : netty 服務端啓動器。
    3. private io.netty.channel.Channel channel:服務端監聽通道。
    4. private EventLoopGroup bossGroup;Netty boss線程組(負責鏈接事件)
    5. private EventLoopGroup workerGroup : nety work線程組(負責IO事件)

1.1 NettyServer 構造方法

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

直接調用父類的public AbstractServer(URL url, ChannelHandler handler)方法,從前面的文章中得知, ChannelHandlers.wrap方法會對ChannelHandler handler進行封裝,主要是加入事件分發模式(Dispatch)。網絡

1.1.1 AbstractServer構造方法
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);       // @1
        localAddress = getUrl().toInetSocketAddress();   // @2

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);   // @3
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);  
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); // @4
        try {
            doOpen();   // @5
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }

代碼@1:調用父類的構造方法,主要初始化AbstractPeer(channelHandler、url)和AbstractEndpoint(codec二、timeout、idleTimeout )架構

代碼@2:根據URL中的host與端口,建立localAddress。併發

代碼@3:若是配置了< dubbo:parameter key = "bind.ip" value = ""/> 與 < dubbo:parameter key = "bind.port" />,則用該IP與端口建立bindAddress,一般用於多網卡,若是未配置,bindAddress與 localAddress綁定的IP與端口同樣。socket

代碼@4:初始化accepts與idleTimeout ,這兩個參數未被其餘地方使用。分佈式

代碼@5,調用doOpen方法,正式在相應端口創建網絡監聽。ide

1.二、源碼分析NettyServer#doOpen

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ServerBootstrap();       // @1
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));    // @2
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));    // @3
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);   // @4
        channels = nettyServerHandler.getChannels();
        bootstrap.group(bossGroup, workerGroup)                                                                        // @5
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<niosocketchannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug                       
                                .addLast("decoder", adapter.getDecoder())   
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());     // @6
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    }

代碼@1:建立Netty服務端啓動幫助類ServerBootstrap.函數

代碼@2:建立服務端Boss線程,線程名:.NettyServerBoss,主要負責客戶端的鏈接事件,主從多Reactor線程模型中的主線程(鏈接事件)。高併發

代碼@3:建立服務端Work線程組,線程名:NettyServerWorker-序號,線程個數取自參數:iothreads,默認爲(CPU核數+1)與32取小值,顧名思義,IO線程數,主要處理讀寫事件,編碼、解碼都在IO線程中完成。

代碼@4:建立用戶Handler,這裏是NettyServerHandler。 代碼@5:Netty啓動的常規寫法,關注以下內容:

addLast("decoder", adapter.getDecoder())  : 添加解碼器
 addLast("encoder", adapter.getEncoder()) :添加編碼器
 addLast("handler", nettyServerHandler) :添加業務Handler。

這裏簡單介紹一下流程:

  1. 客戶端創建與服務端鏈接,此時Boss線程的鏈接事件觸發,創建TCP鏈接,並向IO線程註冊該通道(Channel0)的讀事件。
  2. 當客戶端向服務端發送請求消息後,IO線程中的讀事件觸發,會首先調用adapter.getDecoder() 根據對應的請求協議(例如dubbo)從二進制流中解碼出一個完整的請求對象,而後傳入到業務handler,例如nettyServerHandler,執行相應的事件方法,例如recive方法。
  3. 當服務端向Channel寫入響應結果時,首先編碼器會按照協議編碼成二進制流,供客戶端解碼。

若是對Netty想深刻學習的話,請移步到做者的《源碼分析Netty系列》

二、HeaderExchangeServer

根據 Dubbo 服務端初始化流程,咱們可知,Dubbo 爲了封裝各類不一樣的網絡實現客戶端(netty、mina)等,引入了 Exchangers 層,存在 ExchangeServer,其實現 Server 並內部持有具體的 Server 實現端,例如 NettyServer。 這裏寫圖片描述

接下來,咱們重點來關注一下 HeaderExchangeServer. 核心屬性以下:

  • ScheduledExecutorService scheduled:心跳線程數,線程名稱前綴,dubbo-remoting-server-heartbeat-thread-序號
  • private final Server server:具體的Server實現類,例如NettyServer。
  • private ScheduledFuture< ?> heartbeatTimer:心跳調度Future,能夠經過future取消心跳等動做。
  • private int heartbeat:心跳間隔時間
  • private int heartbeatTimeout:心跳超時時間,至少爲heartbeat的兩倍

2.1 構造函數

public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout &lt; heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout &lt; heartbeatInterval * 2");
        }
        startHeartbeatTimer();
    }

說明,主要是經過heartbeat參數設置心跳間隔,若是不配置,則不啓動心跳檢測。從上面看來HeaderExchangeServer內部持有Server,並封裝了心跳的功能,在這裏就不細細分析了。


>做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區優秀佈道師、CSDN2019博客之星TOP10,維護公衆號:中間件興趣圈目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接加入中間件知識星球 ,一塊兒探討高併發、分佈式服務架構,交流源碼。

在這裏插入圖片描述</niosocketchannel>

相關文章
相關標籤/搜索