Netty 源碼解析系列-服務端啓動流程解析

netty源碼解析系列

1.服務端啓動例子(基於4.0.31.Final)

public class Server {
    private ServerBootstrap serverBootstrap;
    private NioEventLoopGroup bossGroup;
    private NioEventLoopGroup workGroup;

    public static void main(String[] args) throws InterruptedException {
        System.out.println("服務啓動");
        Server server = new Server();
        server.start();
    }
    private void start() throws InterruptedException {
       try {
             serverBootstrap=new ServerBootstrap();
             bossGroup = new NioEventLoopGroup();
             workGroup = new NioEventLoopGroup(4);
             serverBootstrap.group(bossGroup, workGroup)
                               .channel(NioServerSocketChannel.class)
                               .option(ChannelOption.SO_BACKLOG, 128)
                               .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                               .childOption(ChannelOption.SO_KEEPALIVE, true)
			        .handler(new InitHandler())
                               .childHandler(new IOChannelInitialize());
             ChannelFuture future = serverBootstrap.bind(8802).sync();
             future.channel().closeFuture().sync();
          } finally {
                 bossGroup.shutdownGracefully();
                 workGroup.shutdownGracefully();
          }

}

    private class IOChannelInitialize extends ChannelInitializer<SocketChannel>{

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            System.out.println("initChannel");
            ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0));
            ch.pipeline().addLast(new IOHandler());
        }
    }
}
複製代碼

步驟說明java

  • 1.1 建立 ServerBootstrap 實例,它是 netty 的啓動輔助類,提供了一系列的方法用於設置服務 端啓動相關的參數。底層經過門面模式對各類能力進行抽象和封裝,儘可能不須要用戶跟過 多的底層 API 打交道,下降用戶的開發難度react

  • 1.2 NioEventLoopGroupnetty Reactor 線程池,bossGroup 監聽和 accept 客戶端鏈接,workGroup 則處理 IO ,編解碼promise

  • 1.3 綁定服務端 NioServerSocketChannel安全

  • 1.4 設置一些參數bash

  • 1.5 初始化 pipeline 並綁定 handlerpipeline 是一個負責處理網絡事件的職責鏈,負責管理和執行 ChannelHandler ,設置系統提供的 IdleStateHandler 和自定義 IOHandler網絡

  • 1.6 serverBootstrap.bind(8802) 這裏纔是啓動服務端綁定端口異步

  • 1.7 future.channel().closeFuture().sync(); 等待服務端關閉socket

  • 1.8 優雅關閉ide

2. 源碼分析

2.1 NioEventLoopGroup

    NioEventLoopGroup 不單單是 I/O 線程,除了負責 I/O 的讀寫,還負責系統 Task 和定時任務oop

public NioEventLoopGroup(int nThreads) {
           this(nThreads, null);
       }
複製代碼
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
   this(nThreads, threadFactory, SelectorProvider.provider());
}
複製代碼
public NioEventLoopGroup(
        int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
    super(nThreads, threadFactory, selectorProvider);
}
複製代碼
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
複製代碼

    繼續,如下是精簡代碼

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
			...
	if (threadFactory == null) {
    		threadFactory = newDefaultThreadFactory();
	}
	children = new SingleThreadEventExecutor[nThreads];
	if (isPowerOfTwo(children.length)) {
    		chooser = new PowerOfTwoEventExecutorChooser();
	} else {
    		chooser = new GenericEventExecutorChooser();
	}
	for (int i = 0; i < nThreads; i ++) {
		...
        	children[i] = newChild(threadFactory, args);
		...
     }
    
複製代碼

     MultithreadEventExecutorGroup 實現了線程的建立和線程的選擇,咱們看看 newChild 方法( NioEventLoopGroup 類的方法),newChild 實例化線程

@Override
protected EventExecutor newChild(
        ThreadFactory threadFactory, Object... args) throws Exception {
    return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
複製代碼

    建立了一個 NioEventLoop

NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
    super(parent, threadFactory, false);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    provider = selectorProvider;
    selector = openSelector();
}
複製代碼

    跟着 super

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    super(parent, threadFactory, addTaskWakesUp);
}
複製代碼

    代碼有精簡,繼續

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    thread = threadFactory.newThread(new Runnable() {
        	@Override
        	public void run() {
    			SingleThreadEventExecutor.this.run();
	}
 }
});
複製代碼

    在這裏實例化了一個線程,並在 run 中調用 SingleThreadEventExecutorrun 方法,這個線程在哪裏啓動的呢,咱們繼續往下看
    總結:
          NioEventLoopGroup 實際就是 Reactor 線程池,負責調度和執行客戶端的接入、網絡讀寫事件的處理、用戶自定義任務和定時任務的執行。

2.2 ServerBootstrap

     ServerBootstrap 是服務端的啓動輔助類,父類是 AbstractBootstrap ,與之相對應的客戶端啓動輔助類是 Bootstrap

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
		volatile EventLoopGroup group;
		private volatile ChannelFactory<? extends C> channelFactory;
		private volatile SocketAddress localAddress;
		private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
		private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
		private volatile ChannelHandler handler;
	  }
複製代碼

2.2.1 設置booss和work線程池

     將 bossGroup 傳給父類,workGroup 賦值給 serverBootstrapchildGroup

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    		super.group(parentGroup);
    		if (childGroup == null) {
        		throw new NullPointerException("childGroup");
    		}
    		if (this.childGroup != null) {
        		throw new IllegalStateException("childGroup set already");
    		}
    		this.childGroup = childGroup;
    		return this;
}
複製代碼

2.2.2 設置NioServerSocketChannel處理鏈接請求

serverBootstrap.channel(NioServerSocketChannel.class) 
複製代碼
public B channel(Class<? extends C> channelClass) {
    			if (channelClass == null) {
        			throw new NullPointerException("channelClass");
   			 }
   			 return channelFactory(new BootstrapChannelFactory<C>(channelClass));
   }
複製代碼

     繼續跟 new BootstrapChannelFactory

private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
    			private final Class<? extends T> clazz;
    			BootstrapChannelFactory(Class<? extends T> clazz) {
        			this.clazz = clazz;
    			}
    			@Override
			public T newChannel() {
        			try {
            				return clazz.newInstance();
        			} catch (Throwable t) {
            				throw new ChannelException("Unable to create Channel from class " + clazz, t);
        			}
    			}
		}
複製代碼

     BootstrapChannelFactory 是一個繼承了 ChannelFactory 的內部類,從名稱上就能看出,這是一個 channel 工廠類,重寫了父類的 newChannel 方法,經過反射建立 NioServerSocketChannel 實例,後面會告訴你是在哪裏調用到的

2.2.3 設置channel通道塊的值

serverBootstrap.option(ChannelOption.SO_BACKLOG, 128) 
複製代碼
public <T> B option(ChannelOption<T> option, T value) {
    			if (option == null) {
        			throw new NullPointerException("option");
    			}
    			if (value == null) {
        			synchronized (options) {
            				options.remove(option);
        			}
    			} else {
        			synchronized (options) {
            			options.put(option, value);
        			}
    			}
    			return (B) this;
          }
複製代碼

     這裏的 option 方法是父類 AbstractBootstrap 的方法,options 是一個有序的非線程安全的雙向鏈表,加鎖添加

2.2.4 serverBootstrap.childOption

public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
    			if (childOption == null) {
       				 throw new NullPointerException("childOption");
    			}
    			if (value == null) {
        			synchronized (childOptions) {
            				childOptions.remove(childOption);
       				 }
    			} else {
        			synchronized (childOptions) {
            				childOptions.put(childOption, value);
        			}
    			}
    			return this;
		}
複製代碼

     childOption 是子類 serverBootstrap 的方法
     childOptionoption 的區別:
         option : 主要是設置 ServerChannel 的一些選項
         childOption : 主要設置 ServerChannel 的子 channel 的選項,即 option
                       針對的是 boss 線程而 childOption 針對的是 work 線程池

2.2.5 設置服務端NioServerSocketChannel的Handler

serverBootstrap.handler(new InitHandler())
複製代碼
public B handler(ChannelHandler handler) {
    			if (handler == null) {
        			throw new NullPointerException("handler");
    			}
    			this.handler = handler;
    			return (B) this;
         }
複製代碼

2.2.6 serverBootstrap.childHandler()

public ServerBootstrap childHandler(ChannelHandler childHandler) {
    			   if (childHandler == null) {
        				throw new NullPointerException("childHandler");
    			   }
       			   this.childHandler = childHandler;
    			   return this;
               
           }
複製代碼

handlerchildHandler 的區別
    Handler 是屬於服務端 NioServerSocketChannel ,只會建立一次 childHandler 是屬於每個新建的 NioSocketChannel ,每當有一個鏈接上來,都會調用

2.2.7 真正的啓動過程是在這裏執行,咱們看看bind()方法

serverBootstrap.bind(8802).sync() 
複製代碼
public ChannelFuture bind(int inetPort) {
    			return bind(new InetSocketAddress(inetPort));
		}
複製代碼
  • (1) 經過端口號建立一個 InetSocketAddress ,繼續 bind
public ChannelFuture bind(SocketAddress localAddress) {
    			validate();
    			if (localAddress == null) {
        			throw new NullPointerException("localAddress");
    			}
    			return doBind(localAddress);
 }
複製代碼
  • (2) validate() 方法進行一些參數驗證,咱們直接看 doBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
    			final ChannelFuture regFuture = initAndRegister();
    			final Channel channel = regFuture.channel();
			if (regFuture.cause() != null) {
    				return regFuture;
			}
			if (regFuture.isDone()) {
    				ChannelPromise promise = channel.newPromise();
    				doBind0(regFuture, channel, localAddress, promise);
    				return promise;
			} else {
				final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
				regFuture.addListener(new ChannelFutureListener() {
    					@Override
    					public void operationComplete(ChannelFuture future) throws Exception {
        					Throwable cause = future.cause();
        					if (cause != null) {
 							promise.setFailure(cause);
                				} else {
                    					promise.executor = channel.eventLoop();
                				}
                					doBind0(regFuture, channel, localAddress, promise);
            					}
        				});
        			return promise;
    			}
		}
複製代碼
  • (3.1) 先看 initAndRegister ( AbstractBootstrap 類 ),去掉了一些不重要的
final ChannelFuture initAndRegister() {
    				final Channel channel = channelFactory().newChannel();
        			init(channel);
				ChannelFuture regFuture = group().register(channel);
				return regFuture;
			}
複製代碼

     channelFactoryserverBootstrap.channel() 時建立的,在這裏調用反射建立 NioServerSocketChannel 實例

  • (3.2.1) 再看 init(channel) 方法( ServerBootstrap 類)
@Override
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }
final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
    	for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
        	@SuppressWarnings("unchecked")
        	AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
        	channel.attr(key).set(e.getValue());
    	}
     }
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
            	ChannelHandler handler = handler();
            	if (handler != null) {
                	pipeline.addLast(handler);
            	}
            	pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }	
    });
}
複製代碼

options()serverBootstrap.option() 賦值的 AbstractBootstrap 類的 options 雙向鏈表成員變量,在這裏將 optionsattrs 注入 channelP.addLast()NioServerSocketChannel 加入新的 handler (處理器),這裏 pipeline 相似於 Servlet 的過濾器,管理全部 handler

  • (3.2.2) 再看 group().register() 方法
        這裏的 groupbossGroup(NioEventLoopGroup----▷MultithreadEventLoopGroup) ,屢次跳轉到 SingleThreadEventLoop 類的 register() 方法
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}
複製代碼
  • (3.2.3) 清除一些不重要的代碼,下面纔是真正的註冊
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
       try {
    	    eventLoop.execute(new OneTimeTask() {
        	@Override
        	public void run() {
            	register0(promise);
        	}
    	    });
        } catch (Throwable t) {
        }
    }
}
複製代碼

     eventLoop.inEventLoop() 用來判斷啓動線程與當前線程是否相同,相同表示已經啓動,不一樣則有兩種可能:未啓動或者線程不一樣。

  • (3.2.4) 這裏線程還未啓動,走 eventLoop.execute() ,這個 execute() 方法是 SingleThreadEventExecutor 類的
@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
          reject();
        }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}
複製代碼
  • (3.2.5) 啓動線程
private void startThread() {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            thread.start();
        }
    }
}
複製代碼

     咱們在最開始2.1裏面 SingleThreadEventExecutor 構造方法內的 thread 就是在這裏啓動的,咱們再回到2.1的

protected SingleThreadEventExecutor(
        EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    thread = threadFactory.newThread(new Runnable() {
        	@Override
        	public void run() {
    			SingleThreadEventExecutor.this.run();
	}
 }
});
複製代碼
  • (3.2.6) 打開 SingleThreadEventExecutor.this.run() ;
@Override
	protected void run() {
    		for (;;) {
        		boolean oldWakenUp = wakenUp.getAndSet(false);
        		try {
            			if (hasTasks()) {
                			selectNow();
            			} else {
                			select(oldWakenUp);
   					if (wakenUp.get()) {
        					selector.wakeup();
    					}
				}
			cancelledKeys = 0;
			needsToSelectAgain = false;
			final int ioRatio = this.ioRatio;
			if (ioRatio == 100) {
   				processSelectedKeys();
    				runAllTasks();
			} else {
    				final long ioStartTime = System.nanoTime();
    				processSelectedKeys();
    				final long ioTime = System.nanoTime() - ioStartTime;
    				runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
			}
			if (isShuttingDown()) {
    				closeAll();
    				if (confirmShutdown()) {
        				break;
    				}
			}
		} catch (Throwable t) {
            		try {
                		Thread.sleep(1000);
            		} catch (InterruptedException e) {
            		}
        	 }
    	   }
	  }
複製代碼

     在這裏異步執行,輪詢 select 客戶端的 accept ,而且 runAllTasks 全部的任務

  • (3.3) 咱們再看 (3.1) 裏面的 ChannelFuture regFuture = group().register(channel); 跳轉到 SingleThreadEventLoopregister 方法
@Override
public ChannelFuture register(Channel channel) {
	...
   	channel.unsafe().register(this, promise);
	return promise;
}
複製代碼

     如下是精簡後的代碼(位於 AbstractChannel 類的 AbstractUnsafe 內部類)

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
	...
	eventLoop.execute(new OneTimeTask() {
    	@Override
    	public void run() {
        	register0(promise);
    	}
	});
	...
}
複製代碼
private void register0(ChannelPromise promise) {
	...
	doRegister();
	...
	if (firstRegistration && isActive()) {
    		pipeline.fireChannelActive();
	}
	...
}
複製代碼

     繼續(位於 AbstractNioChannel 類)

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
	...
	selectionKey = javaChannel().register(eventLoop().selector, 0, this);
	...        
}
}        
複製代碼

     將 NioServerSocketChannel 註冊到 boss 線程池 NioEventLoopSelector 上。
在這裏應該註冊 OP_ACCEPT(16) 到多路複用器上
註冊0的緣由:
     (1)註冊方法是多態的,它既能夠被 NioServerSocketChannel 用來監聽客戶端的鏈接接入,也能夠註冊 SocketChannel 用來監聽網絡讀或寫操做
    (2)經過 SelectionKeyinterestOps(int ops) 方法能夠方便地修改監聽操做位

     再看 pipeline.fireChannelActive()

@Override
public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        channel.read();
    }

    return this;
}

複製代碼
@Override
public Channel read() {
    pipeline.read();
    return this;
}
複製代碼
@Override
public ChannelPipeline read() {
    tail.read();
    return this;
}
複製代碼
@Override
public ChannelHandlerContext read() {
	...
        next.invokeRead();
	...
}
複製代碼
private void invokeRead() {
    try {
        ((ChannelOutboundHandler) handler()).read(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}
複製代碼

     進到 HeadContextread

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}
複製代碼

@Override public final void beginRead() { ... doBeginRead(); ... }

@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
    return;
}

final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
    return;
}
readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
複製代碼

     最終在這裏將 selectionKey 的監聽操做位改成 OP_READ

  • (4) 再看 doBind0( ) 方法
private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
複製代碼

     將方法丟到 reactor 線程池任務隊列中執行,會先判斷註冊是否成功,成功則繼續執行bind方法

  • (5) 執行 bind( ) 方法
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}
複製代碼
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}
複製代碼
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
	...
	final AbstractChannelHandlerContext next = findContextOutbound();
	EventExecutor executor = next.executor();
	...
	next.invokeBind(localAddress, promise);
	...
}
複製代碼

    因爲 bind 事件是出站事件,尋找出站的 handler ,執行 invokeBind( ) 方法

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}
複製代碼
@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}
複製代碼
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
	...
	doBind(localAddress);
	...
}
複製代碼
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().socket().bind(localAddress, config.getBacklog());
}
複製代碼

     通過多層 bind 深刻,最後在這裏能夠看到,仍是會調用Java底層的nio進行 socket bind 自此,服務端啓動流程解析完畢,咱們總結一下
     ① 經過 ServerBootstrap 輔助啓動類,配置了 reactor 線程池,服務端 Channel ,一些配置參數,客戶端鏈接後的 handler
     ② 將 ServerBootstrap 的值初始化,並註冊 OP_ACCEPT 到多路複用器
     ③ 啓動 reactor 線程池,不斷循環監聽鏈接,處理任務      ④ 綁定端口

相關文章
相關標籤/搜索