前面的第一篇文章中,我以spark中的netty客戶端的建立爲切入點,分析了netty的客戶端引導類Bootstrap的參數設置以及啓動過程。顯然,咱們還有另外一個重要的部分--服務端的初始化和啓動過程沒有探究,因此這一節,咱們就來從源碼層面詳細分析一下netty的服務端引導類ServerBootstrap的啓動過程。java
咱們仍然以spark中對netty的使用爲例,以此爲源碼分析的切入點,首先咱們看一下spark的NettyRpc模塊中建立netty服務端引導類的代碼:ios
TransportServer的構造方法中會調用init方法,ServerBootstrap類就是在init方法中被建立並初始化以及啓動的。
這個方法主要分爲三塊:git
很顯然,ServerBootstrap的啓動入口就是bind方法。github
// 初始化netty服務端 private void init(String hostToBind, int portToBind) { // io模式,有兩種選項NIO, EPOLL IOMode ioMode = IOMode.valueOf(conf.ioMode()); // 建立bossGroup和workerGroup,即主線程組合子線程組 EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); EventLoopGroup workerGroup = bossGroup; // 緩衝分配器,分爲堆內存和直接內存 PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); // 建立一個netty服務端引導對象,並設置相關參數 bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) .childOption(ChannelOption.ALLOCATOR, allocator); // 內存使用的度量對象 this.metrics = new NettyMemoryMetrics( allocator, conf.getModuleName() + "-server", conf); // 排隊的鏈接數 if (conf.backLog() > 0) { bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog()); } // socket接收緩衝區大小 if (conf.receiveBuf() > 0) { bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf()); } // socket發送緩衝區大小 if (conf.sendBuf() > 0) { bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf()); } // 子channel處理器 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { RpcHandler rpcHandler = appRpcHandler; for (TransportServerBootstrap bootstrap : bootstraps) { rpcHandler = bootstrap.doBootstrap(ch, rpcHandler); } context.initializePipeline(ch, rpcHandler); } }); InetSocketAddress address = hostToBind == null ? new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind); // 綁定到ip地址和端口 channelFuture = bootstrap.bind(address); // 同步等待綁定成功 channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); logger.debug("Shuffle server started on port: {}", port); }
這裏的校驗主要是對group和channelFactory的非空校驗
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}bootstrap
這個方法,咱們以前在分析Bootstrap的啓動過程時提到過,它的主要做用以下:api
以前,咱們分析了NioSocketChannel的構造過程,以及Bootstarp中對channel的初始化過程,
本節咱們要分析NioServerSocketChannel的構造過程,以及ServerBootstrap的init方法的實現。promise
private ChannelFuture doBind(final SocketAddress localAddress) { // 建立一個channel,並對這個channel作一些初始化工做 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); // 將這個channel綁定到指定的地址 doBind0(regFuture, channel, localAddress, promise); return promise; } else {// 對於還沒有註冊成功的狀況,採用異步的方式,即添加一個回調 // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
這裏經過調用jdk的api建立了一個ServerSocketChannel。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}安全
與NioSocketChannelConfig相似,NioServerSocketChannelConfig也是一種門面模式,是對NioServerSocketChannel中的參數接口的封裝。
此外,咱們注意到,這裏規定了NioServerSocketChannel的初始的感興趣的事件是ACCEPT事件,即默認會監聽請求創建鏈接的事件。
而在NioSocketChannel中的初始感興趣的事件是read事件。
因此,這裏與NioSocketChannel構造過程最主要的不一樣就是初始的感興趣事件不一樣。app
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
這裏首先調用了父類的構造方法,最終調用了AbstractNioChannel類的構造方法,這個過程咱們在以前分析NioSocketChannel初始化的時候已經詳細說過,主要就是建立了內部的Unsafe對象和ChannelPipeline對象。異步
分析完了channel的構造過程,咱們再來看一下ServerBootstrap是怎麼對channel對象進行初始化的。
因此,很顯然,咱們接下來就要看一下這個特殊的handler,ServerBootstrapAcceptor的read方法。
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); // 設置參數 synchronized (options) { setChannelOptions(channel, options, logger); } // 設置屬性 final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); // 子channel的group和handler參數 final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } // 添加處理器 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); // 通常狀況下,對於ServerBootstrap用戶無需設置handler ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 這裏添加了一個關鍵的handler,而且順手啓動了對應的EventLoop的線程 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
在分析ServerBootstrapAcceptor以前,咱們首先來回顧一下NioEventLoop的循環中,對於accept事件的處理邏輯,這裏截取其中的一小段代碼:
// 處理read和accept事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
可見,對於accept事件和read事件同樣,調用NioUnsafe的read方法
由於NioServerSocketChannel繼承了AbstractNioMessageChannel,而且read方法的實現也是在AbstractNioMessageChannel中,
根據前面對channelPipeline的分析,咱們知道,讀事件對從頭結點開始,向尾節點傳播。上面咱們也提到了,對於初始的那個NioServerSocketChannel,會在ServerBootstarp的init方法中向這個channel的處理鏈中加入一個ServerBootstrapAcceptor處理器,因此,很顯然,接下來咱們應該分析ServerBootstrapAcceptor中對讀事件的處理。
public void read() { // 確認當前代碼的執行是在EventLoop的線程中 assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { // 這裏讀取到的是創建的鏈接對應的channel, // jdk的socketChannel被包裝成了netty的NioSocketChannel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 把接收到的每個channel做爲消息,在channelPipeline中觸發一個讀事件 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); // 最後觸發一個讀完成的事件 pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
代碼邏輯仍是比較簡單的,由於有了前面的鋪墊,即在ServerBootstrap的init方法對創始的那個serverChannel進行初始化時,將用戶設置的子channel的參數,屬性,子channel的handler和子group等參數做爲構造參數所有傳給了ServerBootstrapAcceptor,因此在這裏直接用就好了。
其實這裏的子channel的初始化和註冊過程和Bootstrap中對一個新建立的channel的初始化過程基本同樣,區別在於Bootstrap中channel是用戶代碼經過調用connect方法最終在initAndregistry中經過反射構造的一個對象;而在服務端,經過監聽ServerSocketChannel的accept事件,當有新的鏈接創建請求時,會自動建立一個SocketChannel(jdk的代碼實現),而後NioServerSocketChannel將其包裝成一個NioSocketChannel,並做爲消息在傳遞給處理器,因此在ServerSocketChannel中的子channel的建立是由底層的jdk的庫實現的。
public void channelRead(ChannelHandlerContext ctx, Object msg) { // 類型轉換,這裏的強制轉換是安全的的, // 是由各類具體的AbstractNioMessageChannel子類型的實現保證的 // 各類具體的AbstractNioMessageChannel子類型的讀方法確保它們讀取並最終返回的是一個Channel類型 final Channel child = (Channel) msg; // 給子channel添加handler child.pipeline().addLast(childHandler); // 給子channel設置參數 setChannelOptions(child, childOptions, logger); // 給子channel設置屬性 for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { // 將子channel註冊到子group中 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
回到doBind方法中,在完成了channel的構造,初始化和註冊邏輯後,接下來就要把這個server類型的channel綁定到一個地址上,這樣才能接受客戶端創建鏈接的請求。
從代碼中能夠看出,調用了channel的bind方法實現綁定的邏輯。
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // 調用了channel.bind方法完成綁定的邏輯 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
bind操做的傳遞是從尾節點開始向前傳遞,因此咱們直接看Headcontext對於bind方法的實現
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
調用了unsafe的bind方法。
public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { unsafe.bind(localAddress, promise); }
由於後面右有幾個事件的觸發,每一個觸發事件都是經過channel的相關方法來觸發,而後又是經過channelpipeline的傳遞事件,這些事件最後基本都是由HeadContext處理了,因此這裏我只簡單地敘述一下後面的 大概邏輯,代碼比較繁瑣,並且不少都是相同的調用過程,因此就不貼代碼了。
從代碼中能夠看出來,最終調用了jdk的api,將感興趣的事件添加到selectionKey中。經過前面的 分析,咱們知道對於NioSocketChannel,它的感興趣的讀事件類型是SelectionKey.OP_READ,也就是讀事件;
而對於NioServerSocketChannel,根據前面對其構造方法的分析,它的感興趣的事件是SelectionKey.OP_ACCEPT,也就是創建鏈接的事件。
protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; // 將讀事件類型加入到selectionKey的感興趣的事件中 // 這樣jdk底層的selector就會監聽相應類型的事件 final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
到這裏,咱們就把ServerBootstrap的主要功能代碼分析完了,這裏面主要包括三個方面: