Netty

 netty 服務啓動流程 

NioServerSocketChannel  初始化過程:java

1.建立 Java NIO的 ServerSocketChannel;ios

2.爲ServerSocketChannel 註冊感興趣的鍵以及設置爲非阻塞模式;git

3.建立默認的 pipeline 以及unsafe 對象;github

 

(1)建立 ServerSocketChannelpromise

 /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
newSocket(DEFAULT_SELECTOR_PROVIDER) :用來建立一個Java NIO的 ServerSocketChannel ;如下操做等於 ServerSocketChannel.open();爲了解決性能問題(每次回進行加鎖)
 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

(2)爲ServerSocketChannel 註冊感興趣的鍵以及設置爲非阻塞模式;多線程

 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        //感興趣的key
        this.readInterestOp = readInterestOp;
        try {
        //設置爲非阻塞模式
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }    

(3).建立默認的 pipeline 以及unsafe 對象;併發

  protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

 

pipeline建立流程

不管在建立服務端仍是處理客戶端請求鏈接的時候,會對每個客戶端的鏈接建立一個NiosocketChannel 以及 pipeline組件;咱們來看一看pipeline的建立流程;socket

首先須要知道的是pipeline 建立是跟隨者NioServerSocketChannel 或者NioSocketChannel 一塊兒建立的;ide

 

NioSocketChannel 與NioServerSocketChannel 都繼承了AbstractNioChannel 這個抽象類,源碼可知因此對於一些方法的實現以及pipeline 實現都是相同的函數

(1).進入AbstractNioChannel 的父類 AbstractChannel 構造函數,這裏進行了pipeline 的建立

protected AbstractChannel(Channel parent) {
        this.parent = parent; id = newId(); unsafe = newUnsafe();  pipeline = newChannelPipeline(); }

(2).在構造pipeline 會將當前的對象this(NioServerSocketChannel 或者NioSocketChannel 傳進來),建立默認的channelpipeline;

protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this); }

(3).接下來就是賦值操做

protected DefaultChannelPipeline(Channel channel) {
    //維護當前的channel 對象
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        //建立尾巴哨兵
        tail = new TailContext(this);
        //建立頭哨兵
        head = new HeadContext(this);
        //鏈表連起來,之後的addLat 操做就是基於這個鏈表的
        head.next = tail;
        tail.prev = head;
    }

 

pipeline 的addlast 方法會將咱們的handler 轉換爲一個handlercontext 對象;

//新構造一個channelHandlerContext 對象            
newCtx = newContext(group, filterName(name, handler), handler);
//鏈表
 addLast0(newCtx);

 

 

用圖表示的話就是:

  

 

 

netty新鏈接接入(即建立NioSocketChannel)

在上圖中,在初始化channel 中,進行了新鏈接接入的方法,看一下吧;

void init(Channel channel) throws Exception {
    //獲取用戶配置的option 屬性
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
        //獲取用戶的attr屬性
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        //獲得NioServerSocketChannel pipeline
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        //添加ServerBootstrapAcceptor 用來處理NioSocketChannel
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

 

通過這樣以後,pipeline的結構就是這樣的;能夠說這個ServerBootStrapAcceptor 就是一個橋樑

咱們看一下ServerBootStrapAcceptor 這個進站處理器一個核心方法,進行socketChannel 的註冊以及監聽

@Override
        @SuppressWarnings("unchecked")
        //這個傳過來的msg 實際上是NioSocketChannel 對象,這個我後面說
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
        //爲NioSocketChannel 的pipeline 進行添加咱們定義的處理器
            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                //socketChannel 交給work 線程管理,註冊監聽其Read事件,並事件傳播ad dhandler 等方法;
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

 

到這裏,估計疑惑的就是何時觸發執行這個channelread方法呢?顯而易見的,是咱們的selector 輪詢到有新連接接入的時候,即觸發accept 方法

主要從綁定端口的操做開始的

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        //execute 方法,會進行selector 的輪詢,能夠點進去這個execute 內部,startThread 方法
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

 

最終會運行到NioEventLoop 方法中的以下

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

 

咱們主要看這個unsafe.read() 方法;NioServerSocketChannel 的unsafe 實現類是NioMessageUnsafe;

  if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }

accept 一個SocketChannel,也就是Java nio 操做,並構形成一個NioSocketChannel ;

int localRead = doReadMessages(readBuf);
protected int doReadMessages(List<Object> buf) throws Exception {
        //jdk SocketChannel 
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
            //構形成一個NioSocketChannel
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

 

這樣一個NioSocketChannel 就建立出來了;接下來就是鏈接的接入,監聽read 事件;

 for (int i = 0; i < size; i ++) {
                    readPending = false;
//事件的傳播,就會執行到ServerBootStrapAcceptor 的channelread 方法中,參數就是Niosocketchannel

                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

 流程圖以下: 

 

 

線程輪詢

(一)咱們先看一看 EventLoopGroup 的初始化;

在此聲明,Netty 是基於java NIO 的,建議知道java NIO 運行機制(Selector,Channel ,ByteBuffer ,zeroCopy) ,再閱讀此篇文章,否則會一頭霧水;

netty 是對javanio 的頂層封裝擴展,因此對於一些底層的重要操做,仍是基於Javanio的;

EventLoopGroup :事件循環組:

EventLoopGroup bossGroup =new NioEventLoopGroup();
EventLoopGroup workGroup =new NioEventLoopGroup();

主要是完成一些變量的賦值

主要發生了什麼:Look,源碼就是一層一層的調用構造函數,往裏面賦值;

1.extends 多線程事件循環組, 被用於基於channel 的NIO selector 實現

/**
 * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s.
 */
public class NioEventLoopGroup extends MultithreadEventLoopGroup {

2.咱們一步一步點擊下來,發現就是在它的構造函數裏面調來調去,咱們發現他建立了一個空的 Executor 對象 (java 1. 5 併發庫重要內容,執行器,進行線程執行),以及 nThreads 線程數量爲 0(這裏的0並非說咱們給咱們建立 0個線程,後面會有判斷);

  */
    public NioEventLoopGroup() {
        this(0);
    }

    /**
     * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

3.咱們能夠看到在這個構造方法裏調用了  SelectorProvider.provider(),這個不陌生吧,在java nio 中建立selector 的Selector.open() 方法中其實調用的就是這個

/**
  * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the
  * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
  */  

public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}

4.繼續點,能夠看到它又添加了一些新的內容 DefaultSelectStrategyFactory 工廠 Factory which uses the default select strategy.  默認的選擇策略

public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

//在這個構造方法裏用添加了一個參數  DefaultSelectStrategyFactory.INSTANCE ,提供一個默認選擇策略,工廠模式
/**
* Factory which uses the default select strategy.
*/
public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();

private DefaultSelectStrategyFactory() { }

@Override
public SelectStrategy newSelectStrategy() {
return DefaultSelectStrategy.INSTANCE;
}
}

5. 繼續走,這裏就開始調用父類super(MultithreadEventLoopGroup)方法了,在這裏咱們就能夠知道默認給咱們建立多少線程了;

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}



protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { 
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }


//分析線程數量 //線程數量判斷,若是是0的話,就是 DEFAULT_EVENT_LOOP_THREADS ,是多少呢?咱們點進去看一看,咱們會看到一個靜態代碼塊 static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } //調用Runtime.availableProcessors將會獲取 可用的處理器 @SuppressForbidden(reason = "to obtain default number of available processors") synchronized int availableProcessors() { if (this.availableProcessors == 0) { final int availableProcessors = SystemPropertyUtil.getInt( "io.netty.availableProcessors", Runtime.getRuntime().availableProcessors()); setAvailableProcessors(availableProcessors); } return this.availableProcessors; } 由此能夠看到 默認建立的線程數不是0 而是根據不一樣電腦的處理器個數*2

6,接下來就是Excutor 的賦值了,由於從第二部能夠看到,初始的Excutor 的null;

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}


//**********源碼解釋***********//
//咱們看紅色部分,就是對 executor 進行初始化操做,這裏咱們須要瞭解的是Excutor 接口 以及ThreadFactory 接口的做用
//在netty 裏實現了ThreadFactory關於本身的DefaultThreadFactory
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}

chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
                                               
相關文章
相關標籤/搜索