Netty系列-netty的初體驗

1、前言java

   最近看了netty源碼,打算寫個博客記下來,方便後面再複習,同時但願也能方便看到的人,在研究netty的時候,多少能方便點。ios

2、環境搭建git

   git clone netty的代碼下來,或者能夠fork到本身的git 倉庫,而後git clone下來。github

  後面的版本統一用promise

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.6.Final</version>
</dependency>

3、例子研究app

以下是服務端標準的代碼案例,bossgroup主要是用來接收鏈接請求的,workergroup主要是用來處理讀寫請求的異步

 1    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 2         EventLoopGroup workerGroup = new NioEventLoopGroup();
 3         final EchoServerHandler serverHandler = new EchoServerHandler();
 4         try {
 5             ServerBootstrap b = new ServerBootstrap();
 6             b.group(bossGroup, workerGroup)
 7              .channel(NioServerSocketChannel.class)
 8              .option(ChannelOption.SO_BACKLOG, 100)
 9              .handler(new LoggingHandler(LogLevel.INFO))
10              .childHandler(new ChannelInitializer<SocketChannel>() {
11                  @Override
12                  public void initChannel(SocketChannel ch) throws Exception {
13                      ChannelPipeline p = ch.pipeline();
14                      if (sslCtx != null) {
15                          p.addLast(sslCtx.newHandler(ch.alloc()));
16                      }
17                      //p.addLast(new LoggingHandler(LogLevel.INFO));
18                      p.addLast(serverHandler);
19                  }
20              });
21 
22             // Start the server.
23             ChannelFuture f = b.bind(PORT).sync();

前面5-20都是初始化,咱們先看23行,bind方法,一路跟下去,分爲三部分socket

 1 private ChannelFuture doBind(final SocketAddress localAddress) {
 2         final ChannelFuture regFuture = initAndRegister();
 3         final Channel channel = regFuture.channel();
 4         if (regFuture.cause() != null) {
 5             return regFuture;
 6         }
 7 
 8         if (regFuture.isDone()) {
 9             // At this point we know that the registration was complete and successful.
10             ChannelPromise promise = channel.newPromise();
11             doBind0(regFuture, channel, localAddress, promise);
12             return promise;
13         } else {
14             // Registration future is almost always fulfilled already, but just in case it's not.
15             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
16             regFuture.addListener(new ChannelFutureListener() {
17                 @Override
18                 public void operationComplete(ChannelFuture future) throws Exception {
19                     Throwable cause = future.cause();
20                     if (cause != null) {
21                         // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
22                         // IllegalStateException once we try to access the EventLoop of the Channel.
23                         promise.setFailure(cause);
24                     } else {
25                         // Registration was successful, so set the correct executor to use.
26                         // See https://github.com/netty/netty/issues/2586
27                         promise.registered();
28 
29                         doBind0(regFuture, channel, localAddress, promise);
30                     }
31                 }
32             });
33             return promise;
34         }
35     }

第一是 initAndRegister方法ide

 

 

 在這裏,channelFactory啥時候初始化的?咱們回到標準案例那裏函數

 

 

 跟進去看看

public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

跟到底會發現下面這段

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }
        //初始化cannelFactory
        this.channelFactory = channelFactory;
        return self();
    }

因此很明顯,cannelFactory是ReflectiveChannelFactory,咱們繼續看ReflectiveChannelFactory的newChannel方法

 public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

這就能夠看出,channel的建立是經過工廠模式,反射建立無參構造函數的,實例就是咱們初始化傳進去的 NioServerSocketChannel,咱們把channel的建立看完,繼續跟它的構造函數

 
 
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

DEFAULT_SELECTOR_PROVIDER 是根據操做系統選擇的provider,而newSocket其實就是根據provider到jdk底層去獲取對應的serversockerchannel,咱們繼續this,

public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

調用父類的構造方法,注意參數 SelectionKey.OP_ACCEPT,繼續

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

這個構造函數就是把前面生成的channel和accept事件保存起來,並設置該channel爲非阻塞模式,是否是就是nio的代碼方式,咱們繼續看super

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

id咱們暫時無論,這裏會生成一個unsafe 來操做bytebuffer的,還生成了pipeline,這個主要是爲了執行咱們初始化設定的一些handdler,咱們後面分析;到這裏把channel的初始化分析完了,回到以前的initAndRegister方法,咱們繼續往下看有個init方法,它有兩個實現,一個是客戶端的BootStrap,一個是服務端的ServerBootStrap,作的事情都差很少,咱們看下ServerBootStrap的,

@Override
    void init(Channel channel) throws Exception {
        // 把代碼啓動的時候設置的參數放到它該有的位置上
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
             // options設置到channel上
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            // 遍歷attr事件,設置到channel上
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();
            ......

        // 把全部handler組裝成pipeline
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }            

省略了些代碼,主要仍是把以前一開始初始化保存的對象綁到對應的channel上,而後放到一個 inboundhandler類型-ServerBootstrapAcceptor對象上,並給放到pipeline 鏈上。

咱們繼續看initAndRegister的另外一行代碼

ChannelFuture regFuture = config().group().register(channel);

 config().group(),在這裏是 NioEventLoopGroup,register執行的是它的父類 MultithreadEventLoopGroup

public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

next方法有兩種實現,在NioEventLoopGroup初始化的時候會調用它的父類構造函數,若是線程數是2的次方就實例化 PowerOfTwoEventExecutorChooser,不然就是GenericEventExecutorChooser,咱們看看兩個實現的有啥區別

PowerOfTwoEventExecutorChooser:
public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }


GenericEventExecutorChooser:
public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }

一個按位與,一個是取模運算,明顯按位與快一點,因此推薦設置2的n次方

講完chooser選擇後,繼續看register,由於咱們是 NioEventLoop,它繼承於 SingleThreadEventLoop,因此咱們看它的register

public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

從DefaultChannelPromise 拿到niosocketchannel,拿到對應的unsafe,而 AbstractUnsafe 是 AbstractChannel的內部類,

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
           ......
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } 
            }
        }

繼續看 register0

private void register0(ChannelPromise promise) {
               .......
                doRegister();
               .......
                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
               .......
        }

繼續 doRegister

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            }
        }
    }

調用底層的jdk channel來註冊selector,拿到一個selectionKey,initAndRegister方法算是結束了,主要是初始化channel和註冊selector的,接下來看看 doBind0 方法,

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    // 經過pipeline 從tail到head節點執行,最終在對應的NioServerSocketChannel執行bind方法
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

咱們繼續往下跟會發現,扔給線程池的任務異步執行,從AbstractChannel開始作bind操做,經過每一個channel對應的 DefaultChannelPipeline 來執行bind,最終會經過 pipeline 從tail執行到head節點,最終跑到NioServerSocketChannel 類

protected void doBind(SocketAddress localAddress) throws Exception {
        // 最終執行到jdk底層的 bind方法
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

最終bind 方法執行完畢。

相關文章
相關標籤/搜索