1、前言java
最近看了netty源碼,打算寫個博客記下來,方便後面再複習,同時但願也能方便看到的人,在研究netty的時候,多少能方便點。ios
2、環境搭建git
git clone netty的代碼下來,或者能夠fork到本身的git 倉庫,而後git clone下來。github
後面的版本統一用promise
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency>
3、例子研究app
以下是服務端標準的代碼案例,bossgroup主要是用來接收鏈接請求的,workergroup主要是用來處理讀寫請求的異步
1 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 2 EventLoopGroup workerGroup = new NioEventLoopGroup(); 3 final EchoServerHandler serverHandler = new EchoServerHandler(); 4 try { 5 ServerBootstrap b = new ServerBootstrap(); 6 b.group(bossGroup, workerGroup) 7 .channel(NioServerSocketChannel.class) 8 .option(ChannelOption.SO_BACKLOG, 100) 9 .handler(new LoggingHandler(LogLevel.INFO)) 10 .childHandler(new ChannelInitializer<SocketChannel>() { 11 @Override 12 public void initChannel(SocketChannel ch) throws Exception { 13 ChannelPipeline p = ch.pipeline(); 14 if (sslCtx != null) { 15 p.addLast(sslCtx.newHandler(ch.alloc())); 16 } 17 //p.addLast(new LoggingHandler(LogLevel.INFO)); 18 p.addLast(serverHandler); 19 } 20 }); 21 22 // Start the server. 23 ChannelFuture f = b.bind(PORT).sync();
前面5-20都是初始化,咱們先看23行,bind方法,一路跟下去,分爲三部分socket
1 private ChannelFuture doBind(final SocketAddress localAddress) { 2 final ChannelFuture regFuture = initAndRegister(); 3 final Channel channel = regFuture.channel(); 4 if (regFuture.cause() != null) { 5 return regFuture; 6 } 7 8 if (regFuture.isDone()) { 9 // At this point we know that the registration was complete and successful. 10 ChannelPromise promise = channel.newPromise(); 11 doBind0(regFuture, channel, localAddress, promise); 12 return promise; 13 } else { 14 // Registration future is almost always fulfilled already, but just in case it's not. 15 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); 16 regFuture.addListener(new ChannelFutureListener() { 17 @Override 18 public void operationComplete(ChannelFuture future) throws Exception { 19 Throwable cause = future.cause(); 20 if (cause != null) { 21 // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an 22 // IllegalStateException once we try to access the EventLoop of the Channel. 23 promise.setFailure(cause); 24 } else { 25 // Registration was successful, so set the correct executor to use. 26 // See https://github.com/netty/netty/issues/2586 27 promise.registered(); 28 29 doBind0(regFuture, channel, localAddress, promise); 30 } 31 } 32 }); 33 return promise; 34 } 35 }
第一是 initAndRegister方法ide
在這裏,channelFactory啥時候初始化的?咱們回到標準案例那裏函數
跟進去看看
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
跟到底會發現下面這段
public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } //初始化cannelFactory this.channelFactory = channelFactory; return self(); }
因此很明顯,cannelFactory是ReflectiveChannelFactory,咱們繼續看ReflectiveChannelFactory的newChannel方法
public T newChannel() { try { return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
這就能夠看出,channel的建立是經過工廠模式,反射建立無參構造函數的,實例就是咱們初始化傳進去的 NioServerSocketChannel,咱們把channel的建立看完,繼續跟它的構造函數
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
DEFAULT_SELECTOR_PROVIDER 是根據操做系統選擇的provider,而newSocket其實就是根據provider到jdk底層去獲取對應的serversockerchannel,咱們繼續this,
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
調用父類的構造方法,注意參數 SelectionKey.OP_ACCEPT,繼續
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
這個構造函數就是把前面生成的channel和accept事件保存起來,並設置該channel爲非阻塞模式,是否是就是nio的代碼方式,咱們繼續看super
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
id咱們暫時無論,這裏會生成一個unsafe 來操做bytebuffer的,還生成了pipeline,這個主要是爲了執行咱們初始化設定的一些handdler,咱們後面分析;到這裏把channel的初始化分析完了,回到以前的initAndRegister方法,咱們繼續往下看有個init方法,它有兩個實現,一個是客戶端的BootStrap,一個是服務端的ServerBootStrap,作的事情都差很少,咱們看下ServerBootStrap的,
@Override void init(Channel channel) throws Exception { // 把代碼啓動的時候設置的參數放到它該有的位置上 final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { // options設置到channel上 setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { // 遍歷attr事件,設置到channel上 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); ...... // 把全部handler組裝成pipeline p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
省略了些代碼,主要仍是把以前一開始初始化保存的對象綁到對應的channel上,而後放到一個 inboundhandler類型-ServerBootstrapAcceptor對象上,並給放到pipeline 鏈上。
咱們繼續看initAndRegister的另外一行代碼
ChannelFuture regFuture = config().group().register(channel);
config().group(),在這裏是 NioEventLoopGroup,register執行的是它的父類 MultithreadEventLoopGroup
public ChannelFuture register(Channel channel) { return next().register(channel); }
next方法有兩種實現,在NioEventLoopGroup初始化的時候會調用它的父類構造函數,若是線程數是2的次方就實例化 PowerOfTwoEventExecutorChooser,不然就是GenericEventExecutorChooser,咱們看看兩個實現的有啥區別
PowerOfTwoEventExecutorChooser: public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } GenericEventExecutorChooser: public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; }
一個按位與,一個是取模運算,明顯按位與快一點,因此推薦設置2的n次方
講完chooser選擇後,繼續看register,由於咱們是 NioEventLoop,它繼承於 SingleThreadEventLoop,因此咱們看它的register
public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
從DefaultChannelPromise 拿到niosocketchannel,拿到對應的unsafe,而 AbstractUnsafe 是 AbstractChannel的內部類,
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ...... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } } }
繼續看 register0
private void register0(ChannelPromise promise) { ....... doRegister(); ....... // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. ....... }
繼續 doRegister
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } } }
調用底層的jdk channel來註冊selector,拿到一個selectionKey,initAndRegister方法算是結束了,主要是初始化channel和註冊selector的,接下來看看 doBind0 方法,
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // 經過pipeline 從tail到head節點執行,最終在對應的NioServerSocketChannel執行bind方法 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
咱們繼續往下跟會發現,扔給線程池的任務異步執行,從AbstractChannel開始作bind操做,經過每一個channel對應的 DefaultChannelPipeline 來執行bind,最終會經過 pipeline 從tail執行到head節點,最終跑到NioServerSocketChannel 類
protected void doBind(SocketAddress localAddress) throws Exception { // 最終執行到jdk底層的 bind方法 if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
最終bind 方法執行完畢。