Netty官網:https://netty.io/java
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.react
Java技術棧方向的朋友應該或多或少都據說過Netty是對Java中nio ( Non Blocking IO
)的封裝,讓咱們能快速開發出性能更高、擴展性更好的網絡應用程序。那麼Netty究竟對nio作了怎樣的封裝呢?本文主要從源碼角度揭開這層面紗。ios
源碼追蹤中,我使用阿里的語雀產品的思惟圖記錄主要方法調用,上面的圖片是部分截圖,完整原貌見:git
https://www.yuque.com/docs/share/02fa3e3d-d485-48e1-9cfe-6722a3ad8915github
在初探Netty源碼以前,至少須要理解Reactor Pattern、java.nio基本使用、Netty基本使用,這樣後面才能把Netty的源碼與java.nio對比着來看。編程
不識Netty真面目,只緣未讀此真經。Doug Lea
(java.util.concurrent包的做者) 在《Scalable IO in Java》中按部就班地分析瞭如何構建可伸縮的高性能IO服務以及服務模型的演變與進化。文中描述的Reactor Pattern
,也被Netty等大多數高性能IO服務框架所借鑑。所以仔細閱讀《Scalable IO in Java》有助於更好地理解Netty框架的架構與設計。詳情見:bootstrap
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdfapi
Server端爲每個Client端的鏈接請求都開啓一個獨立線程,也就是所謂的BIO (Blocking IO),即java.net.ServerSocket
包下api的使用。
promise
Reactor responds to IO events by dispatching the appropriate handler (Similar to AWT thread)bash
Handlers perform non-blocking actions (Similar to AWT ActionListeners)
Manage by binding handlers to events (Similar to AWT addActionListener)
(1) 單線程版本
(2) 多線程版本
(3) 多Reactor版本 (一主多從、多主多從)
Netty正是借鑑了這種多Reactor版本的設計。
注意:如下Demo僅專一於主邏輯,沒有處理異常,也沒有關閉資源。
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; public class NIOServer { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public static void main(String[] args) throws IOException { // ServerSocketChannel.open() ServerSocketChannel serverSocketChannel = DEFAULT_SELECTOR_PROVIDER.openServerSocketChannel(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // Selector.open() Selector selector = DEFAULT_SELECTOR_PROVIDER.openSelector(); // register this serverSocketChannel with the selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // selector.select() while (!Thread.interrupted()) { selector.select(); Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); // handle IO events handle(key); } } } private static void handle(SelectionKey key) throws IOException { if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(key.selector(), SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); // read client data ByteBuffer buffer = ByteBuffer.allocate(1024); int len = socketChannel.read(buffer); if (len != -1) { String msg = String.format("recv client[%s] data:%s", socketChannel.getRemoteAddress(), new String(buffer.array(), 0, len)); System.out.println(msg); } // response client ByteBuffer data = ByteBuffer.wrap("Hello, NIOClient!".getBytes()); socketChannel.write(data); key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } else if (key.isWritable()) { // ... } } }
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; public class NIOClient { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public static void main(String[] args) throws IOException { // SocketChannel.open() SocketChannel socketChannel = DEFAULT_SELECTOR_PROVIDER.openSocketChannel(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080)); // Selector.open() Selector selector = DEFAULT_SELECTOR_PROVIDER.openSelector(); // register this socketChannel with the selector socketChannel.register(selector, SelectionKey.OP_CONNECT); // selector.select() while (!Thread.interrupted()) { selector.select(); Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); // handle IO events if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key.channel(); if (channel.isConnectionPending()) { channel.finishConnect(); } channel.configureBlocking(false); // request server ByteBuffer buffer = ByteBuffer.wrap("Hello, NIOServer!".getBytes()); channel.write(buffer); channel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); // read server data ByteBuffer buffer = ByteBuffer.allocate(1024); int len = channel.read(buffer); if (len != -1) { String msg = String.format("recv server[%s] data:%s", channel.getRemoteAddress(), new String(buffer.array(), 0, len)); System.out.println(msg); } } } } } }
更多官方example
,請參考:
https://github.com/netty/netty/tree/4.1/example/
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.CharsetUtil; public class NettyServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new NettyServerHandler()); } }); ChannelFuture cf = bootstrap.bind(8080).sync(); cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } static class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String message = String.format("recv client[%s] data:%s", ctx.channel().remoteAddress(), ((ByteBuf) msg).toString(CharsetUtil.UTF_8)); System.out.println(message); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ByteBuf buf = Unpooled.copiedBuffer("Hello, NettyClient!".getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(buf); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } }
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; public class NettyClient { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(1); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture cf = bootstrap.connect("127.0.0.1", 8080).sync(); cf.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } static class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { ByteBuf buf = Unpooled.copiedBuffer("Hello, NettyServer!".getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(buf); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String message = String.format("recv server[%s] data:%s", ctx.channel().remoteAddress(), ((ByteBuf) msg).toString(CharsetUtil.UTF_8)); System.out.println(message); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } }
建議跟着我畫的源碼走向圖,跟下面的內容,最好也開着debug
模式,不理解的地方調試幾遍。這裏再次貼一下連接:
https://www.yuque.com/docs/share/02fa3e3d-d485-48e1-9cfe-6722a3ad8915
注意:追蹤的是當前最新release的 4.1.58.Final
版本的源碼。
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.58.Final</version> </dependency>
本文出自 行無際的博客
:
https://www.cnblogs.com/itwild/
下面先重點看幾個關鍵類的大體狀況,方便咱們讀代碼 。由於面向抽象編程,若是對常見類的繼承層次一點不瞭解,讀代碼的過程會讓人崩潰。你懂的!!!
類定義:
io.netty.channel.nio.NioEventLoopGroup
/** * {@link MultithreadEventLoopGroup} implementations which is used for NIO {@link Selector} based {@link Channel}s. */ public class NioEventLoopGroup extends MultithreadEventLoopGroup
類圖:
類定義:
io.netty.channel.nio.NioEventLoop
/** * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a * {@link Selector} and so does the multi-plexing of these in the event loop. * */ public final class NioEventLoop extends SingleThreadEventLoop
類圖:
類定義:
io.netty.channel.socket.nio.NioServerSocketChannel
/** * A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses * NIO selector based implementation to accept new connections. */ public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel
類圖:
類定義:
io.netty.channel.socket.nio.NioSocketChannel
/** * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. */ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel
類圖:
類定義:
io.netty.channel.ChannelInitializer
/** * A special {@link ChannelInboundHandler} which offers an easy way to initialize a {@link Channel} once it was * registered to its {@link EventLoop}. * * Implementations are most often used in the context of {@link Bootstrap#handler(ChannelHandler)} , * {@link ServerBootstrap#handler(ChannelHandler)} and {@link ServerBootstrap#childHandler(ChannelHandler)} to * setup the {@link ChannelPipeline} of a {@link Channel}. * * <pre> * * public class MyChannelInitializer extends {@link ChannelInitializer} { * public void initChannel({@link Channel} channel) { * channel.pipeline().addLast("myHandler", new MyHandler()); * } * } * * {@link ServerBootstrap} bootstrap = ...; * ... * bootstrap.childHandler(new MyChannelInitializer()); * ... * </pre> * Be aware that this class is marked as {@link Sharable} and so the implementation must be safe to be re-used. * * @param <C> A sub-type of {@link Channel} */ @Sharable public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
類圖:
類定義:
io.netty.channel.ChannelInboundHandlerAdapter
/** * Abstract base class for {@link ChannelInboundHandler} implementations which provide * implementations of all of their methods. * * <p> * This implementation just forward the operation to the next {@link ChannelHandler} in the * {@link ChannelPipeline}. Sub-classes may override a method implementation to change this. * </p> * <p> * Be aware that messages are not released after the {@link #channelRead(ChannelHandlerContext, Object)} * method returns automatically. If you are looking for a {@link ChannelInboundHandler} implementation that * releases the received messages automatically, please see {@link SimpleChannelInboundHandler}. * </p> */ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler
類圖:
類定義:
io.netty.bootstrap.ServerBootstrap
/** * {@link Bootstrap} sub-class which allows easy bootstrap of {@link ServerChannel} * */ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel>
類圖:
類定義:
io.netty.bootstrap.Bootstrap
/** * A {@link Bootstrap} that makes it easy to bootstrap a {@link Channel} to use * for clients. * * <p>The {@link #bind()} methods are useful in combination with connectionless transports such as datagram (UDP). * For regular TCP connections, please use the provided {@link #connect()} methods.</p> */ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel>
類圖:
下面就正式開始追源碼。
Selector
的建立起於這行代碼EventLoopGroup bossGroup = new NioEventLoopGroup(1)
io.netty.channel.nio.NioEventLoopGroup
/** * Create a new instance using the specified number of threads, {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); }
這裏咱們看到了熟悉的SelectorProvider.provider()
,若是以爲陌生,建議回到上面快速上手java.nio的代碼。
往裏面追幾層,就到了NioEventLoopGroup
的父類 MultithreadEventExecutorGroup
。
io.netty.util.concurrent.MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); } }
注意: 建立NioEventLoopGroup(int nThreads)
時的參數nThreads
就傳到了上面代碼中的children = new EventExecutor[nThreads]
。看newChild(executor, args)
作了什麼。
io.netty.channel.nio.NioEventLoopGroup
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }
io.netty.channel.nio.NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; }
Selector
的建立就發生在這行代碼final SelectorTuple selectorTuple = openSelector();
進去看看。
io.netty.channel.nio.NioEventLoop
private SelectorTuple openSelector() { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); } // 省略其餘代碼... return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }
這裏咱們看到了provider.openSelector()
,到這裏,建立出來的Selector
就與 EventLoop
關聯在一塊兒了。
同時在建立NioEventLoop
時,看看super(parent, executor, false, newTaskQueue(queueFactory), ...)
在父類SingleThreadEventLoop
幹了什麼。
io.netty.channel.SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler); tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue"); }
再往下;
io.netty.util.concurrent.SingleThreadEventExecutor
private final Queue<Runnable> taskQueue; protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; this.executor = ThreadExecutorMap.apply(executor, this); this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
這裏咱們看到了對Queue<Runnable> taskQueue
的賦值。
AbstractBootstrap
中的initAndRegister()
方法是ServerSocketChannel
的建立入口。
io.netty.bootstrap.AbstractBootstrap
final ChannelFuture initAndRegister() { Channel channel = null; try { // 1.建立ServerSocketChannel channel = channelFactory.newChannel(); // 2.初始化ServerSocketChannel init(channel); } catch (Throwable t) { } // 3.將ServerSocketChannel註冊到Selector上 ChannelFuture regFuture = config().group().register(channel); return regFuture; }
Server端的啓動最核心的也就是上面加註釋的三步。按照順序先從ServerSocketChannel
的建立講起。
ServerSocketChannel
的建立用了工廠模式+反射機制。具體見ReflectiveChannelFactory
io.netty.channel.ReflectiveChannelFactory
/** * A {@link ChannelFactory} that instantiates a new {@link Channel} by invoking its default constructor reflectively. */ public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Constructor<? extends T> constructor; public ReflectiveChannelFactory(Class<? extends T> clazz) { this.constructor = clazz.getConstructor(); } @Override public T newChannel() { return constructor.newInstance(); } }
還記得在前面的bootstrap.channel(NioServerSocketChannel.class)
這行代碼嗎?傳入的Class
就是用於反射生成Channel
實例的。這裏是Server端,顯然須要進NioServerSocketChannel
看如何建立的。
io.netty.channel.socket.nio.NioServerSocketChannel
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
provider.openServerSocketChannel()
這行代碼也就建立出來了ServerSocketChannel
。再往父類裏面追,看作了些什麼。super(null, channel, SelectionKey.OP_ACCEPT);
io.netty.channel.nio.AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; ch.configureBlocking(false); }
this.readInterestOp = readInterestOp
把感興趣的操做賦值給readInterestOp
,上面傳過來的是SelectionKey.OP_ACCEPT
。
ch.configureBlocking(false)
把剛纔建立出來的channel
設置爲非阻塞。繼續往父類追。
io.netty.channel.AbstractChannel
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
這裏看到建立了ChannelPipeline
,並關聯到Channel
上。再往下走一步。
io.netty.channel.DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
此時ChannelPipeline
大體以下:
head --> tail
回到上面提到的重要的第2步: init(channel);
注意,實現類爲ServerBootstrap
,由於是Server端嘛。
io.netty.bootstrap.ServerBootstrap
@Override void init(Channel channel) { ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
在ChannelPipeline
加了一個ChannelHandler
。此時ChannelPipeline
大體以下:
head --> ChannelInitializer --> tail
一旦serverSocketChannel
註冊到EventLoop
(或者說Selector
)上,便會觸發這裏initChannel
的調用。避免繞暈了,這裏暫時不去探究具體的調用邏輯。後面調用到這裏的時候,再回過頭來仔細探究。
回到上面提到的重要的第3步:config().group().register(channel);
經過分析類的繼承層次(或者debug也行)能夠跟蹤調用到SingleThreadEventLoop
的register
方法。
io.netty.channel.SingleThreadEventLoop
@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
再往下跟,最終調用的是AbstractChannel
的register
方法,以下:
io.netty.channel.AbstractChannel
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); }
往下跟eventLoop.execute()
io.netty.util.concurrent.SingleThreadEventExecutor
private void execute(Runnable task, boolean immediate) { addTask(task); startThread(); }
addTask(task)
把上面的Runnable
放入到上面提到的Queue<Runnable> taskQueue
,過程見以下代碼:
io.netty.util.concurrent.SingleThreadEventExecutor
/** * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown * before. */ protected void addTask(Runnable task) { ObjectUtil.checkNotNull(task, "task"); if (!offerTask(task)) { reject(task); } } final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); }
把task
放入taskQueue
後,就到startThread()
這行代碼了,進去瞧瞧。
io.netty.util.concurrent.SingleThreadEventExecutor
private void startThread() { doStartThread(); } private void doStartThread() { executor.execute(new Runnable() { @Override public void run() { SingleThreadEventExecutor.this.run(); } }); }
繼續追executor.execute
,到這裏才真正建立新的線程執行SingleThreadEventExecutor.this.run()
, thread名稱大體爲nioEventLoopGroup-2-1
,見以下代碼:
io.netty.util.concurrent.ThreadPerTaskExecutor
@Override public void execute(Runnable command) { threadFactory.newThread(command).start(); }
SingleThreadEventExecutor.this.run()
實際執行的代碼以下:
io.netty.channel.nio.NioEventLoop
@Override protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 // ... continue; } selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0); // This will run the minimum number of tasks } } finally { // Always handle shutdown even if the loop processing threw an exception. } } }
先簡單解釋一下上面的代碼,部分細節後面再扣。run()
方法裏面是個死循環,大體是這樣的,這裏的描述並不徹底準確,是這麼個意思,taskQueue
裏面若是有task,就不斷poll
執行隊列裏的task,具體見runAllTasks()
;不然,就selector.select()
,如有IO事件,則經過processSelectedKeys()
來處理。
講到這裏,正好剛纔不是往taskQueue
裏放了個Runnable
嗎,再貼一下上面那個Runnable
的代碼
new Runnable() { @Override public void run() { register0(promise); } };
因而就要執行Runnable
裏面register0(promise)
了。
io.netty.channel.AbstractChannel
private void register0(ChannelPromise promise) { //(1)把ServerSocketChannel註冊到了Selector上 doRegister(); // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. //(2)觸發pipeline中的ChannelHandler的handlerAdded()方法調用 pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //(3)觸發pipeline中的ChannelInboundHandler的channelRegistered()方法調用 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } }
上面我按照本身的理解,在代碼中加了少量註釋,下面按照我註釋的順序依次解釋一下。
(1) doRegister()
io.netty.channel.nio.AbstractNioChannel
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } }
這裏顯然是把ServerSocketChannel
註冊到了Selector
上。
(2) pipeline.invokeHandlerAddedIfNeeded()
做用:觸發pipeline
中的ChannelHandler
的handlerAdded()
方法調用
io.netty.channel.DefaultChannelPipeline
final void invokeHandlerAddedIfNeeded() { if (firstRegistration) { firstRegistration = false; // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers, // that were added before the registration was done. callHandlerAddedForAllHandlers(); } }
上面的註釋清晰地告訴咱們,如今ServerSocketChannel
已經註冊到EventLoop
上,是時候該調用Pipeline
中的ChannelHandlers
。到這裏,就能與上面初始化ServerSocketChannel對接起來了,猜想應該會觸發上面的ChannelInitializer
的調用。
io.netty.channel.DefaultChannelPipeline
private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; // Null out so it can be GC'ed. this.pendingHandlerCallbackHead = null; } // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside // the EventLoop. PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { task.execute(); task = task.next; } }
這裏須要先解釋一下爲何又忽然冒出來PendingHandlerCallback
。是這樣的,在addLast(ChannelHandler... handlers)
時,實際上調了下面的方法。
io.netty.channel.DefaultChannelPipeline
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this; }
看到上面的3行註釋沒有,就解釋了上面的PendingHandlerCallback
從哪裏來的。翻譯一下就是,在往Pipeline
中添加ChannelHandler
時,若是Channel
尚未註冊到EventLoop
上,就將當前的AbstractChannelHandlerContext
封裝到PendingHandlerCallback
裏去,等着後面觸發調用。
回到正題,PendingHandlerCallback.execute()
幾經周折,會調用ChannelHandler
的handlerAdded()
,以下所示:
io.netty.channel.AbstractChannelHandlerContext
final void callHandlerAdded() throws Exception { // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates // any pipeline events ctx.handler() will miss them because the state will not allow it. if (setAddComplete()) { handler().handlerAdded(this); } }
那麼再回頭看看ChannelInitializer
io.netty.channel.ChannelInitializer
/** * {@inheritDoc} If override this method ensure you call super! */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { if (initChannel(ctx)) { removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. initChannel((C) ctx.channel()); return true; } return false; } /** * This method will be called once the {@link Channel} was registered. After the method returns this instance * will be removed from the {@link ChannelPipeline} of the {@link Channel}. * * @param ch the {@link Channel} which was registered. * @throws Exception is thrown if an error occurs. In that case it will be handled by * {@link #exceptionCaught(ChannelHandlerContext, Throwable)} which will by default close * the {@link Channel}. */ protected abstract void initChannel(C ch) throws Exception;
原來,最終會觸發initChannel
調用,因此上面初始化ServerSocketChannel時重寫的initChannel
會在這時執行。
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
這裏的initChannel
執行以後,此時ChannelPipeline
大體以下:
head --> tail
值得注意的是,此時ServerBootstrapAcceptor
暫時並無被放入ChannelPipeline
中,而一樣是放到了上面提到的Queue<Runnable> taskQueue
隊列中,以下:
ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } });
至於ServerBootstrapAcceptor
裏面幹了啥,等到後面再細說。
來,繼續。上面講清楚了doRegister()
和pipeline.invokeHandlerAddedIfNeeded()
,接下來看pipeline.fireChannelRegistered()
。
(3) pipeline.fireChannelRegistered()
做用:觸發pipeline
中的ChannelInboundHandler
的channelRegistered()
方法調用
仍是往裏面簡單追一下源碼。
io.netty.channel.AbstractChannelHandlerContext
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } } private void invokeChannelRegistered() { if (invokeHandler()) { try { // 這裏觸發了channelRegistered()方法調用 ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { invokeExceptionCaught(t); } } else { fireChannelRegistered(); } }
到這裏,register0()
這個task就執行完了。可是還記得這個task執行過程當中,又往taskQueue
中添加了一個Runnable
嗎?
new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }
此時會poll到新加的task,見以下代碼:
io.netty.util.concurrent.SingleThreadEventExecutor
protected boolean runAllTasks(long timeoutNanos) { for (;;) { safeExecute(task); task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
執行完這個新增的Runnable
後,此時ChannelPipeline
大體以下:
head --> ServerBootstrapAcceptor --> tail
此時,taskQueue
中的task都執行完了,EventLoop線程執行selector.select()
,等待客戶端的鏈接。
到這裏,Server端也就成功啓動了。
與Server端徹底一致。
入口與Server端同樣,不同的地方在於Client端是bootstrap.channel(NioSocketChannel.class)
,因此須要看NioSocketChannel
的實現。這裏也沒必要多說。
Client端的就比較簡單了,以下:
io.netty.bootstrap.Bootstrap
@Override void init(Channel channel) { ChannelPipeline p = channel.pipeline(); p.addLast(config.handler()); }
前面的過程與Server端基本同樣,執行完doRegister()
,執行pipeline.invokeHandlerAddedIfNeeded()
時,沒有Server端複雜(由於Server端初始化SocketChannel
,加了個添加ServerBootstrapAcceptor
到ChannelPipeline
的task)。
前面分析過,這個過程會觸發initChannel
調用,因此這時會執行用戶編寫的ChannelInitializer
,也就是會執行ch.pipeline().addLast(new NettyClientHandler())
,將用戶編寫的NettyClientHandler
插入到ChannelPipeline
中。
註冊成功後,會執行鏈接Server的回調。
io.netty.bootstrap.Bootstrap
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }
須要看doResolveAndConnect0()
, 裏面又調用的是doConnect()
io.netty.bootstrap.Bootstrap
private static void doConnect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. final Channel channel = connectPromise.channel(); channel.eventLoop().execute(new Runnable() { @Override public void run() { if (localAddress == null) { channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }
最終調用的是:
io.netty.channel.socket.nio.NioSocketChannel#doConnect()
@Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { boolean success = false; try { boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }
再看SocketUtils.connect(javaChannel(), remoteAddress)
io.netty.util.internal.SocketUtils
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { return socketChannel.connect(remoteAddress); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
這裏咱們看到了熟悉的socketChannel.connect(remoteAddress)
。
上面詳細介紹了Server端的啓動過程,Client端的啓動過程,Client也向Server發出了鏈接請求。這時再回過頭來看Server端。
Server端感知到了IO事件,會在io.netty.channel.nio.NioEventLoop的run()
方法裏,調用processSelectedKeys()
,對於每一個IO事件,最終調用的是processSelectedKey()
來處理。
io.netty.channel.nio.NioEventLoop
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
這裏是SelectionKey.OP_ACCEPT,固然走的是unsafe.read()
io.netty.channel.nio.AbstractNioMessageChannel
private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); } finally { // ... } } }
這裏面有很重要的兩個方法,doReadMessages(readBuf)
和pipeline.fireChannelRead()
io.netty.channel.socket.nio.NioServerSocketChannel
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { // ... } return 0; }
io.netty.util.internal.SocketUtils
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() { @Override public SocketChannel run() throws IOException { return serverSocketChannel.accept(); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
serverSocketChannel
接受了Client端的鏈接後,將該socketChannel
放到了List
中。
然後遍歷該List
,將每一個socketChannel
傳入pipeline.fireChannelRead()
中。
還記得當前serverSocketChannel
的ChannelPipeline
有哪些ChannelHandler
嗎?
head --> ServerBootstrapAcceptor --> tail
接下來就須要重點看下ServerBootstrapAcceptor
的邏輯
io.netty.bootstrap.ServerBootstrap#ServerBootstrapAcceptor
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Runnable enableAutoReadTask; ServerBootstrapAcceptor( final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; enableAutoReadTask = new Runnable() { @Override public void run() { channel.config().setAutoRead(true); } }; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } }
ServerBootstrapAcceptor
將創建好鏈接的socketChannel
註冊到workerGroup
中的某個EventLoop
(或者說是Selector
)上,並且將用戶編寫的childHandler
加到了每一個socketChannel
的ChannelPipeline
中。ServerBootstrapAcceptor
至關於起了轉發的做用,創建好鏈接後Channel
實際的讀寫IO事件是由workerGroup
中的EventLoop
來處理。
再回過頭來,看Reactor模式的多Reactor版本(一主多從),不知道你是否能get到其中的含義?
注意:上面代碼裏的childGroup
就是來自咱們在寫Server端NettyServer
代碼時定義的workerGroup
EventLoopGroup workerGroup = new NioEventLoopGroup();
我以爲能堅持看到這個地方的朋友應該能明白,只是這裏又囉嗦了一下。
講到這裏,我以爲其實後面Client端的狀況都不用講了,已經很清晰了。不過爲了文章的完整性,仍是寫下去比較好。
Server端accept
鏈接請求後,Client端此時一樣也有了IO事件。一樣仍是走processSelectedKey()
那個方法,不過執行的分支不同。
int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }
最終調用doFinishConnect()
,以下:
io.netty.channel.socket.nio.NioSocketChannel
@Override protected void doFinishConnect() throws Exception { if (!javaChannel().finishConnect()) { throw new Error(); } }
以後,Client端與Server端就能夠經過Channel
讀寫數據,經過ChannelPipeline
中的ChannelHandler
對數據decode
、compute
、encode
。
至此,本篇就大體講清楚了Netty的Server端和Client端的整個啓動並通訊的過程以及如何對nio進行封裝的。這裏再貼一張在網絡上流傳較廣的Netty工做原理圖,相信此時再看這張圖應該無比親切吧。
整個過程確實比較繞。但回過頭再看,有一個清晰的思路,而後時刻記着與nio的代碼作對比,多點耐心也還能堅持下去,另外遇到搞不明白的地方再配合debug
,會輕鬆許多。最後,因爲本人能力有限,文中若有錯誤的理解、不恰當的描述,歡迎指出!