先看一下我Netty的啓動類編程
private void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new LoggingHandler(LogLevel.INFO)) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.MINUTES)); ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(ChannelRequestProto.ChannelRequest.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new HeartBeatServerHandler()); ch.pipeline().addLast(new XtsCoreServerHandler()); } }); ChannelFuture future = bootstrap.bind().sync(); future.channel().closeFuture().sync(); } catch (Exception e) { bossGroup.shutdownGracefully().sync(); workerGroup.shutdownGracefully().sync(); } }
Netty先建立了兩個事件循環組 EventLoopGroup ,這個就對應了上文提到的模型, 第一個事件循環組的職責是 負責接收新的客戶端鏈接 並 把客戶端Channel註冊到多路複用器上面。 第二個事件循環組的職責是 處理客戶端的讀寫等事件。bootstrap
Netty使用 ServerBootstrap 這個鏈式編程的方式把啓動服務端的代碼給串聯起來,使用很是方便和優雅。因此咱們在看源碼的同時也要去好好品讀Netty源碼書寫的巧妙,理解設計思想和方式,巧妙地運用到咱們本身的代碼中。接下來我會把源碼貼出來,並把重要的部分進行解釋和着重指出數組
看下 group 方法 oop
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); // 第一個事件循環組,咱們叫父循環組吧,它是繼續調用了父類的group方法,也就是 AbstractBootstrap 而且賦給 group 成員變量
if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; // 第二個事件循環組, 咱們叫子循環組吧, 直接賦值給 childGroup 這個成員變量 return this; // 返回本身,這就是鏈式編程的緣由 }
接下來是 channel 方法, 設置了 channal 的類型是 NioServerSocketChannel (固然客戶端是 NioSocketChannel),這裏在建立的時候用了反射的方法來建立實例,後面涉及到再具體說。this
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); // NioServerSocketChannel 使用了 ReflectiveChannelFactory 工廠封裝,而且設置到了 AbstractBootstrap
// 的 channelFactory 的這個成員變量中, 注意 這裏的channal 是 父級 的 channal
}
.option 方法 是在設置 父級 的一些參數信息,這些就不說了, 固然這裏 .handler(new LoggingHandler(LogLevel.INFO)) 方法 也是爲父級的事件循環器設置的 handler 。spa
.childHandler(new ChannelInitializer<SocketChannel>() { // 固然這裏是設置 子事件循環組器的 handlers ,可是這裏的 initChannel 方法不是在這裏調用的,這個後面會具體提到。 protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.MINUTES)); ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(ChannelRequestProto.ChannelRequest.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new HeartBeatServerHandler()); ch.pipeline().addLast(new XtsCoreServerHandler()); } });
好了,接下來進入正式部分。設計
ChannelFuture future = bootstrap.bind().sync(); // 就是從這裏的bind()進入
進入doBind方法3d
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } ... 省略一大波代碼 }
進入 initAndRegistercode
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); // 這裏就是建立一個父級的channel init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ...省略一大波代碼 }
這就是我前面爲何着重指出 NioServerSocketChannel 使用了 ReflectiveChannelFactory 工廠封裝 , 就是上圖這裏了。而後進入 ReflectiveChannelFactory blog
使用了 NioServerSocketChannel 的無參構造方法實例化這個類。ok,那咱們來看下NioServerSocketChannel 的 無參構造方法。
傳入一個默認的多路複用器建立器
使用它來調用 openServerSocketChannel() 方法來建立一個ServerSocketChannel, 具體這個方法就是NIO裏面的內容了,有興趣的本身去看一下。
看到這裏,咱們知道channel建立完成(爲父級使用),返回到這裏
你們千萬不要忽視一點,這裏有個this,我當時就沒注意到這裏,粗心了,致使其中有一步始終想不通,後來從新仔細看的時候,打本身的心都有了。
這裏繼續調用了另一個有參的構造方法。
不斷調用父類構造方法,就進入到
這裏設置了父級的成員變量channel,而且把感興趣的key設置爲16(接收新的客戶端),而且設置非阻塞。這裏在第一篇啓動NIO服務端的時候,也有這句,你們應該也記得。
咱們繼續看調用的父類構造方法。
咱們知道了爲Channel設置了一個ID,而且建立了Pipleline.而且初始化了兩個上下文分別爲頭和尾,經過鏈表連接。
咱們說到這裏,簡單回顧一下Pipeline. 咱們開看下ChannelPipeline官方說明
講到了, Pipeline 是 Channel中出站和入站操做的處理器或攔截器的一個列表。同時官方給出了一個表單我也貼出來
下圖說明了I/O讀寫事件是怎麼在PipeLine中的Handlers中傳遞的。須要經過 ChannelHandlerContext, 例如 ChannelHandlerContext#fireChannelRead(Object) 和 ChannelHandlerContext#write(Object)
這點了解Netty的一會兒應該就看得明白,後面設計到PipeLine的地方咱們再展開講解,繼續回到NioServerSocketChannel 的有參構造方法,繼續往下看。
這裏爲剛剛建立的channel建立了一個配置類,而且是一個內部類。傳入了channel和套接字。
不斷往下跟,看到這裏
這裏傳入了一個小內存分配器,也就是說爲這個channel初始化了一個分配器。
ok,咱們來簡單說下這個分配器,後面在Netty的內存模型部分,咱們再細說。
構造方法,傳入了三個默認值,而且說明了 默認分配緩衝區的大小爲1024 ,最小是64,最大是65536
通篇看一下,看到了一個很是重要的靜態代碼塊
依次往sizeTable添加元素:[16 , (512-16)]之間16的倍數。即,1六、3二、48...496
而後再往sizeTable中添加元素:[512 , 512 * (2^N)),N > 1; 直到數值超過Integer的限制(2^31 - 1);
根據sizeTable長度構建一個靜態成員常量數組SIZE_TABLE,並將sizeTable中的元素賦值給SIZE_TABLE數組。注意List是有序的,因此是根據插入元素的順序依次的賦值給SIZE_TABLE,SIZE_TABLE從下標0開始。SIZE_TABLE爲預約義好的以從小到大的順序設定的可分配緩衝區的大小值的數組。由於AdaptiveRecvByteBufAllocator做用是可自動適配每次讀事件使用的buffer的大小。這樣當須要對buffer大小作調整時,只要根據必定邏輯從SIZE_TABLE中取出值,而後根據該值建立新buffer便可。