一.基本概念java
1.channel:channel是tcp連接的抽象,進行一些讀寫操做。git
2.buffer:其實就是一塊內存區域,channel從buffer中讀數據,或者往裏面寫數據。github
3.selector:selector的做用是一個線程來操做多個channel,在運用時須要將channel註冊到selector中。數組
4.Bootstrap:它是Netty 提供的一個便利的工廠類, 咱們能夠經過它來完成 Netty 的客戶端或服務器端的 Netty 初始化.promise
5.EventLoopGroup:是一個線程組。服務器
6.EventLoop:能夠理解成一個抽象的線程。app
7.channelpipeline:是一個處理鏈,這裏面體現的是責任鏈模式。在這個鏈裏面咱們能夠把咱們的處理邏輯加進去從而實現咱們的業務邏輯。socket
8.unsafe:是channel中底層Soceket操做封裝成的一個對象。tcp
二.流程分析ide
1.客戶端:
這裏分析的是項目中的示例中的EchoClient。
客戶端代碼以下:
public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure SSL.git final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } }
首先分析初始化操做:
1.NioSocketChannel初始化:
初始化須要一個channelfactory,其中客戶端代碼中b.group().channel()就返回了一個niochannelfactory代碼以下:
b.group(group) .channel(NioSocketChannel.class) public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } @SuppressWarnings({ "unchecked", "deprecation" }) public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { return channelFactory((ChannelFactory<C>) channelFactory); } //最終在AbstractBootstrap類中設置了一個channelfactory @Deprecated public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }
該接口中只有一個方法
@Deprecated public interface ChannelFactory<T extends Channel> { /** * Creates a new channel. */ T newChannel(); }
這個方法就是生成channel的方法,下一步找到調用處。
ChannelFuture f = b.connect(HOST, PORT).sync();
這段代碼會生成一個channel,調用鏈以下:
public ChannelFuture connect(String inetHost, int inetPort) { return connect(InetSocketAddress.createUnresolved(inetHost, inetPort)); } public ChannelFuture connect(SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doResolveAndConnect(remoteAddress, config.localAddress()); } private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { //此處生成了channel final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }
final ChannelFuture regFuture = initAndRegister(); final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
經過newChannel方法生成一個niochannel,因此會調用NioSocketChannel的默認構造器。流程以下:
//第一步: private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public NioSocketChannel() { this(DEFAULT_SELECTOR_PROVIDER); } public NioSocketChannel(SelectorProvider provider) { this(newSocket(provider)); } private static SocketChannel newSocket(SelectorProvider provider) { try { return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } } public NioSocketChannel(SocketChannel socket) { this(null, socket); } public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } //第二步:調用父類 protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); } //第三步:再調用父類 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } //第四步:接着調用父類 protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
這裏一個channel的初始化就已經所有完成。
總結:
經過NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 產生一個新的Java Nio SocketChannel。
AbstractChannel:
parent屬性爲空
unsafe經過newUnsafe() 實例化一個 unsafe 對象
pipeline是經過new DefaultChannelPipeline(this)建立的實例,這裏也告訴了咱們channel與channelpipeline是一一對應的關係。
AbstractNioChannel:
SelectableChannel 是咱們產生的實例
readInterestOp是SelectionKey.OP_READ
SelectableChannel被設置成非阻塞的。
NioSocketChannel:
SocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket())
unsafe初始化:
@Override protected AbstractNioUnsafe newUnsafe() { return new NioSocketChannelUnsafe(); }
pipeline初始化:
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
pipeline實際上是一個雙向鏈表,這裏面開始有一個頭和尾,你本身的邏輯handler就是放入到這個鏈表裏面,而後進行處理的。須要注意的是,header 是一個 outboundHandler, 而 tail 是一個inboundHandler。
2.EventLoop初始化:
EventLoop初始化最終調用的是
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
初始化時:
EventLoopGroup group = new NioEventLoopGroup();
當沒有傳初始化個數時,且io.netty.eventLoopThreads屬性值沒有設置,則取處理器核心數*2。
這裏的邏輯是:首先建立一個大小爲nThreads 的數組,再調用newCild方法來初始化這個數組。其實netty裏面的group就是基於這個的,這裏面維護了一個EventExecutor數組,當netty須要一個EventLoop時就調用next()方法獲取一個EventLoop。(其實EventLoopGroup能夠理解爲MultithreadEventExecutorGroup)。在EventLoop中第一次執行execute方法時,會調用startThread方法,接着調用doStartThread方法,這個方法會把當前線程賦值給SingleThreadEventExecutor裏面的thread屬性,這樣每一個eventloop就有了一個跟它綁定的線程。而且調用下方法
SingleThreadEventExecutor.this.run();
時期一直處在運行狀態中,處理任務。
3.channel註冊過程:
調用鏈以下:
initAndRegister(); ChannelFuture regFuture = config().group().register(channel); @Override public ChannelFuture register(Channel channel) { return next().register(channel); } @Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register
在unsafe類中:
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { //將得到到的eventloop賦值給channel中的eventloop值 AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
最終調用的是AbstractNioChannel.doRegister這個方法,把channel與eventLoop中的selector關聯起來。這裏也作了另一件事就是將當前 Channel 做爲 attachment。
註冊過程:AbstractBootstrap.initAndRegister->group().register(channel)->MultithreadEventLoopGroup.register->SingleThreadEventLoop.register->channel.unsafe().register(this, promise)-> AbstractUnsafe.register->register0->AbstractNioChannel.doRegister->javaChannel().register(eventLoop().selector, 0, this)。
註冊結束後調用
pipeline.fireChannelRegistered();
從而把咱們本身定義的業務handler加載到pipeline中。
連接成功後調用
pipeline().fireChannelActive();
4.handler添加過程:
Bootstrap.handler方法就是實現了handler的添加功能。
調用方法鏈爲:
register
register0
pipeline.fireChannelRegistered();
AbstractChannelHandlerContext.invokeChannelRegistered(head);
next.invokeChannelRegistered();
((ChannelInboundHandler) handler()).channelRegistered(this);
initChannel(ctx)
initChannel((C) ctx.channel());
到了這裏就會調用咱們重寫的handler加載邏輯,最後再調用
remove(ctx);
remove(ctx)調用是由於initAndRegister()方法中調用了init(channel)方法,從而把咱們重寫的ChannelInitializer加載到了pipeline中。
netty的事件傳播機制:例子
public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("Connected!"); ctx.fireChannelActive(); } }
當調用了channelActive時,若是但願繼續傳播下去,須要調用ctx.fireChannelActive()。