本文主要分析一下NettyServer,HeaderExchangeServer實現細節。ios
NettyServer整個類圖以下: 首先從全貌上大概看一下NettyServer對象所持有的屬性:bootstrap
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }
直接調用父類的public AbstractServer(URL url, ChannelHandler handler)方法,從前面的文章中得知, ChannelHandlers.wrap方法會對ChannelHandler handler進行封裝,主要是加入事件分發模式(Dispatch)。網絡
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); // @1 localAddress = getUrl().toInetSocketAddress(); // @2 String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); // @3 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); // @4 try { doOpen(); // @5 if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }
代碼@1:調用父類的構造方法,主要初始化AbstractPeer(channelHandler、url)和AbstractEndpoint(codec二、timeout、idleTimeout )架構
代碼@2:根據URL中的host與端口,建立localAddress。併發
代碼@3:若是配置了< dubbo:parameter key = "bind.ip" value = ""/> 與 < dubbo:parameter key = "bind.port" />,則用該IP與端口建立bindAddress,一般用於多網卡,若是未配置,bindAddress與 localAddress綁定的IP與端口同樣。socket
代碼@4:初始化accepts與idleTimeout ,這兩個參數未被其餘地方使用。分佈式
代碼@5,調用doOpen方法,正式在相應端口創建網絡監聽。ide
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ServerBootstrap(); // @1 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); // @2 workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); // @3 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); // @4 channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) // @5 .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<niosocketchannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); // @6 channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
代碼@1:建立Netty服務端啓動幫助類ServerBootstrap.函數
代碼@2:建立服務端Boss線程,線程名:.NettyServerBoss,主要負責客戶端的鏈接事件,主從多Reactor線程模型中的主線程(鏈接事件)。高併發
代碼@3:建立服務端Work線程組,線程名:NettyServerWorker-序號,線程個數取自參數:iothreads,默認爲(CPU核數+1)與32取小值,顧名思義,IO線程數,主要處理讀寫事件,編碼、解碼都在IO線程中完成。
代碼@4:建立用戶Handler,這裏是NettyServerHandler。 代碼@5:Netty啓動的常規寫法,關注以下內容:
addLast("decoder", adapter.getDecoder()) : 添加解碼器 addLast("encoder", adapter.getEncoder()) :添加編碼器 addLast("handler", nettyServerHandler) :添加業務Handler。
這裏簡單介紹一下流程:
若是對Netty想深刻學習的話,請移步到做者的《源碼分析Netty系列》
根據 Dubbo 服務端初始化流程,咱們可知,Dubbo 爲了封裝各類不一樣的網絡實現客戶端(netty、mina)等,引入了 Exchangers 層,存在 ExchangeServer,其實現 Server 並內部持有具體的 Server 實現端,例如 NettyServer。
接下來,咱們重點來關注一下 HeaderExchangeServer. 核心屬性以下:
public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } startHeartbeatTimer(); }
說明,主要是經過heartbeat參數設置心跳間隔,若是不配置,則不啓動心跳檢測。從上面看來HeaderExchangeServer內部持有Server,並封裝了心跳的功能,在這裏就不細細分析了。
>做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區優秀佈道師、CSDN2019博客之星TOP10,維護公衆號:中間件興趣圈目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接加入中間件知識星球 ,一塊兒探討高併發、分佈式服務架構,交流源碼。
</niosocketchannel>