NioServerSocketChannel 初始化過程:java
1.建立 Java NIO的 ServerSocketChannel;ios
2.爲ServerSocketChannel 註冊感興趣的鍵以及設置爲非阻塞模式;git
3.建立默認的 pipeline 以及unsafe 對象;github
(1)建立 ServerSocketChannelpromise
/** * Create a new instance */ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
newSocket(DEFAULT_SELECTOR_PROVIDER) :用來建立一個Java NIO的 ServerSocketChannel ;如下操做等於 ServerSocketChannel.open();爲了解決性能問題(每次回進行加鎖)
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class); private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
(2)爲ServerSocketChannel 註冊感興趣的鍵以及設置爲非阻塞模式;多線程
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; //感興趣的key this.readInterestOp = readInterestOp; try { //設置爲非阻塞模式 ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
(3).建立默認的 pipeline 以及unsafe 對象;併發
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
不管在建立服務端仍是處理客戶端請求鏈接的時候,會對每個客戶端的鏈接建立一個NiosocketChannel 以及 pipeline組件;咱們來看一看pipeline的建立流程;socket
首先須要知道的是pipeline 建立是跟隨者NioServerSocketChannel 或者NioSocketChannel 一塊兒建立的;ide
NioSocketChannel 與NioServerSocketChannel 都繼承了AbstractNioChannel 這個抽象類,源碼可知因此對於一些方法的實現以及pipeline 實現都是相同的函數
(1).進入AbstractNioChannel 的父類 AbstractChannel 構造函數,這裏進行了pipeline 的建立
protected AbstractChannel(Channel parent) {
this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
(2).在構造pipeline 會將當前的對象this(NioServerSocketChannel 或者NioSocketChannel 傳進來),建立默認的channelpipeline;
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this); }
(3).接下來就是賦值操做
protected DefaultChannelPipeline(Channel channel) { //維護當前的channel 對象 this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); //建立尾巴哨兵 tail = new TailContext(this); //建立頭哨兵 head = new HeadContext(this); //鏈表連起來,之後的addLat 操做就是基於這個鏈表的 head.next = tail; tail.prev = head; }
pipeline 的addlast 方法會將咱們的handler 轉換爲一個handlercontext 對象;
//新構造一個channelHandlerContext 對象 newCtx = newContext(group, filterName(name, handler), handler); //鏈表 addLast0(newCtx);
用圖表示的話就是:
在上圖中,在初始化channel 中,進行了新鏈接接入的方法,看一下吧;
void init(Channel channel) throws Exception { //獲取用戶配置的option 屬性 final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } //獲取用戶的attr屬性 final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //獲得NioServerSocketChannel pipeline ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //添加ServerBootstrapAcceptor 用來處理NioSocketChannel p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
通過這樣以後,pipeline的結構就是這樣的;能夠說這個ServerBootStrapAcceptor 就是一個橋樑
咱們看一下ServerBootStrapAcceptor 這個進站處理器一個核心方法,進行socketChannel 的註冊以及監聽
@Override @SuppressWarnings("unchecked") //這個傳過來的msg 實際上是NioSocketChannel 對象,這個我後面說 public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //爲NioSocketChannel 的pipeline 進行添加咱們定義的處理器 child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //socketChannel 交給work 線程管理,註冊監聽其Read事件,並事件傳播ad dhandler 等方法; childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
到這裏,估計疑惑的就是何時觸發執行這個channelread方法呢?顯而易見的,是咱們的selector 輪詢到有新連接接入的時候,即觸發accept 方法
主要從綁定端口的操做開始的
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. //execute 方法,會進行selector 的輪詢,能夠點進去這個execute 內部,startThread 方法 channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
最終會運行到NioEventLoop 方法中的以下
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
咱們主要看這個unsafe.read() 方法;NioServerSocketChannel 的unsafe 實現類是NioMessageUnsafe;
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
accept 一個SocketChannel,也就是Java nio 操做,並構形成一個NioSocketChannel ;
int localRead = doReadMessages(readBuf);
protected int doReadMessages(List<Object> buf) throws Exception { //jdk SocketChannel SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { //構形成一個NioSocketChannel buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
這樣一個NioSocketChannel 就建立出來了;接下來就是鏈接的接入,監聽read 事件;
for (int i = 0; i < size; i ++) { readPending = false; //事件的傳播,就會執行到ServerBootStrapAcceptor 的channelread 方法中,參數就是Niosocketchannel pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete();
流程圖以下:
(一)咱們先看一看 EventLoopGroup 的初始化;
在此聲明,Netty 是基於java NIO 的,建議知道java NIO 運行機制(Selector,Channel ,ByteBuffer ,zeroCopy) ,再閱讀此篇文章,否則會一頭霧水;
netty 是對javanio 的頂層封裝擴展,因此對於一些底層的重要操做,仍是基於Javanio的;
EventLoopGroup :事件循環組:
EventLoopGroup bossGroup =new NioEventLoopGroup(); EventLoopGroup workGroup =new NioEventLoopGroup();
主要是完成一些變量的賦值
主要發生了什麼:Look,源碼就是一層一層的調用構造函數,往裏面賦值;
1.extends 多線程事件循環組, 被用於基於channel 的NIO selector 實現
/** * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s. */ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
2.咱們一步一步點擊下來,發現就是在它的構造函數裏面調來調去,咱們發現他建立了一個空的 Executor 對象 (java 1. 5 併發庫重要內容,執行器,進行線程執行),以及 nThreads 線程數量爲 0(這裏的0並非說咱們給咱們建立 0個線程,後面會有判斷);
*/ public NioEventLoopGroup() { this(0); } /** * Create a new instance using the specified number of threads, {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); }
3.咱們能夠看到在這個構造方法裏調用了 SelectorProvider.provider(),這個不陌生吧,在java nio 中建立selector 的Selector.open() 方法中其實調用的就是這個
/** * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); }
4.繼續點,能夠看到它又添加了一些新的內容 DefaultSelectStrategyFactory 工廠 Factory which uses the default select strategy. 默認的選擇策略
public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } //在這個構造方法裏用添加了一個參數 DefaultSelectStrategyFactory.INSTANCE ,提供一個默認選擇策略,工廠模式 /** * Factory which uses the default select strategy. */ public final class DefaultSelectStrategyFactory implements SelectStrategyFactory { public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory(); private DefaultSelectStrategyFactory() { } @Override public SelectStrategy newSelectStrategy() { return DefaultSelectStrategy.INSTANCE; } }
5. 繼續走,這裏就開始調用父類super(MultithreadEventLoopGroup)方法了,在這裏咱們就能夠知道默認給咱們建立多少線程了;
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
//分析線程數量 //線程數量判斷,若是是0的話,就是 DEFAULT_EVENT_LOOP_THREADS ,是多少呢?咱們點進去看一看,咱們會看到一個靜態代碼塊 static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } //調用Runtime.availableProcessors將會獲取 可用的處理器 @SuppressForbidden(reason = "to obtain default number of available processors") synchronized int availableProcessors() { if (this.availableProcessors == 0) { final int availableProcessors = SystemPropertyUtil.getInt( "io.netty.availableProcessors", Runtime.getRuntime().availableProcessors()); setAvailableProcessors(availableProcessors); } return this.availableProcessors; } 由此能夠看到 默認建立的線程數不是0 而是根據不一樣電腦的處理器個數*2
6,接下來就是Excutor 的賦值了,由於從第二部能夠看到,初始的Excutor 的null;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); }
//**********源碼解釋***********//
//咱們看紅色部分,就是對 executor 進行初始化操做,這裏咱們須要瞭解的是Excutor 接口 以及ThreadFactory 接口的做用
//在netty 裏實現了ThreadFactory關於本身的DefaultThreadFactory
if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet);