經過本章學習,筆者但願你能掌握EventLoopGroup的工做流程,ServerBootstrap的啓動流程,ChannelPipeline是如何操做管理Channel。只有清楚這些,才能更好的瞭解和使用Netty。還在等什麼,快來學習吧!html
知識結構圖:java
技術:Netty,拆包粘包,服務啓動流程
說明:若你對NIO有必定的瞭解,對於本章知識來講有很大的幫助!NIO教程
源碼:https://github.com/ITDragonBl...git
這裏讓你清楚瞭解 ChannelPipeline,ChannelHandlerContext,ChannelHandler,Channel 四者之間的關係。
這裏讓你清楚瞭解 NioEventLoopGroup,NioEventLoop,Channel 三者之間的關係。
這裏讓你清楚瞭解 ServerBootstrap,Channel 二者之間的關係。
看懂了這塊的理論知識,後面Netty拆包粘包的代碼就很是的簡單。github
Channel : Netty最核心的接口。NIO通信模式中經過Channel進行Socket套接字的讀,寫和同時讀寫操做。
ChannelHandler : 由於直接使用Channel會比較麻煩,因此在Netty編程中經過ChannelHandler間接操做Channel,從而簡化開發。
ChannelPipeline : 能夠理解爲一個管理ChandlerHandler的鏈表。對Channel進行操做時,Pipeline負責從尾部依次調用每個Handler進行處理。每一個Channel都有一個屬於本身的ChannelPipeline。
ChannelHandlerContext : ChannelPipeline經過ChannelHandlerContext間接管理每一個ChannelHandler。編程
以下圖所示,結合代碼,在服務器初始化和客戶端建立鏈接的過程當中加了四個Handler,分別是日誌事務,字符串分割解碼器,接受參數轉字符串解碼器,處理任務的Handler。
bootstrap
EventLoopGroup : 本質是個線程池,繼承了ScheduledExecutorService 定時任務線程池。
NioEventLoopGroup : 是用來處理NIO通訊模式的線程池。每一個線程池有N個NioEventLoop來處理Channel事件,每個NioEventLoop負責處理N個Channel。
NioEventLoop : 負責不停地輪詢IO事件,處理IO事件和執行任務,類比多路複用器,細化分三件事。
1 輪詢註冊到Selector上全部的Channel的IO事件
2 處理產生網絡IO事件的Channel
3 處理隊列中的任務服務器
本章重點,Netty是如何經過NIO輔助啓動類來初始化Channel的?先看下面的源碼。網絡
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
服務器啓動和鏈接過程:
第一步:是給Channel設置options和attrs,
第二步:複製childGroup,childHandler,childOptions和childAttrs等待服務器和客戶端鏈接,
第三步:實例化一個ChannelInitializer,添加到Pipeline的末尾。
第四步:當Channel註冊到NioEventLoop時,ChannelInitializer觸發initChannel方法,pipeline裝入自定義的Handler,給Channel設置一下child配置。多線程
小結:
1 group,options,attrs,handler,是在服務器端初始化時配置,是AbstractBootstrap的方法。
2 childGroup,childOption,childAttr,childHandler,是在服務器與客戶端創建Channel後配置,是ServerBootstrap的方法。
3 Bootstrap 和 ServerBootstrap 都繼承了AbstractBootstrap類。
4 若不設置childGroup,則默認取group值。
5 Bootstrap 和 ServerBootstrap 啓動服務時,都會執行驗證方法,判斷必填參數是否都有配置。socket
這裏經過介紹Netty拆包粘包問題來對Netty進行入門學習。
在基於流的傳輸中,即使客戶端發送獨立的數據包,操做系統也會將其轉換成一串字節隊列,而服務端一次讀取到的字節數又不肯定。再加上網絡傳輸的快慢。服務端很難完整的接收到數據。
常見的拆包粘包方法有三種
1 服務端設置一次接收字節的長度。若服務端接收的字節長度不知足要求則一直處於等待。客戶端爲知足傳輸的字節長度合格,能夠考慮使用空格填充。
2 服務端設置特殊分隔符。客戶端經過特殊分隔符粘包,服務端經過特殊分隔符拆包。
3 自定義協議。數據傳輸通常分消息頭和消息體,消息頭中包含了數據的長度。服務端先接收到消息頭,得知須要接收N個數據,而後服務端接收直到數據爲N個爲止。
本章採用第二種,用特殊分隔符的方式。
第一步:準備兩個線程池。一個用於接收事件的boss線程池,另外一個用於處理事件的worker線程池。
第二步:服務端實例化ServerBootstrap NIO服務輔助啓動類。用於簡化提升開發效率。
第三步:配置服務器啓動參數。好比channel的類型,接收channel的EventLoop,初始化的日誌打印事件,創建鏈接後的事件(拆包,對象轉字符串,自定義事件),初始化的配置和創建鏈接後的配置。
第四步:綁定端口,啓動服務。Netty會根據第三步配置的參數啓動服務。
第五步:關閉資源。
package com.itdragon.delimiter; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class ITDragonServer { private static final Integer PORT = 8888; // 被監聽端口號 private static final String DELIMITER = "_$"; // 拆包分隔符 public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用於接收進來的鏈接 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用於處理進來的鏈接 try { ServerBootstrap serverbootstrap = new ServerBootstrap(); // 啓動NIO服務的輔助啓動類 serverbootstrap.group(bossGroup, workerGroup) // 分別設置bossGroup, workerGroup 順序不能反 .channel(NioServerSocketChannel.class) // Channel的建立工廠,啓動服務時會經過反射的方式來建立一個NioServerSocketChannel對象 .handler(new LoggingHandler(LogLevel.INFO)) // handler在初始化時就會執行,能夠設置打印日誌級別 .childHandler(new ChannelInitializer<SocketChannel>() { // childHandler會在客戶端成功connect後才執行,這裏實例化ChannelInitializer @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // initChannel方法執行後刪除實例ChannelInitializer,添加如下內容 ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes()); // 獲取特殊分隔符的ByteBuffer socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter)); // 設置特殊分隔符用於拆包 // socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8)); 設置指定長度分割 socketChannel.pipeline().addLast(new StringDecoder()); // 設置字符串形式的解碼 socketChannel.pipeline().addLast(new ITDragonServerHandler()); // 自定義的服務器處理類,負責處理事件 } }) .option(ChannelOption.SO_BACKLOG, 128) // option在初始化時就會執行,設置tcp緩衝區 .childOption(ChannelOption.SO_KEEPALIVE, true); // childOption會在客戶端成功connect後才執行,設置保持鏈接 ChannelFuture future = serverbootstrap.bind(PORT).sync(); // 綁定端口, 阻塞等待服務器啓動完成,調用sync()方法會一直阻塞等待channel的中止 future.channel().closeFuture().sync(); // 等待關閉 ,等待服務器套接字關閉 } catch (Exception e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); // 關閉線程組,先打開的後關閉 bossGroup.shutdownGracefully(); } } }
NioEventLoopGroup : 是用來處理I/O操做的多線程事件循環器。 Netty提供了許多不一樣的EventLoopGroup的實現用來處理不一樣傳輸協議。
ServerBootstrap : 啓動NIO服務的輔助啓動類。先配置Netty服務端啓動參數,執行bind(PORT)方法纔算真正啓動服務。
group : 註冊EventLoopGroup
channel : channelFactory,用於配置通道的類型。
handler : 服務器始化時就會執行的事件。
childHandler : 服務器在和客戶端成功鏈接後會執行的事件。
initChannel : channelRegistered事件觸發後執行,刪除ChannelInitializer實例,添加該方法體中的handler。
option : 服務器始化的配置。
childOption : 服務器在和客戶端成功鏈接後的配置。
SocketChannel : 繼承了Channel,經過Channel能夠對Socket進行各類操做。
ChannelHandler : 經過ChannelHandler來間接操縱Channel,簡化了開發。
ChannelPipeline : 能夠當作是一個ChandlerHandler的鏈表。
ChannelHandlerContext : ChannelPipeline經過ChannelHandlerContext來間接管理ChannelHandler。
第一步:繼承 ChannelInboundHandlerAdapter,其父類已經實現了ChannelHandler接口,簡化了開發。
第二步:覆蓋 chanelRead()事件處理方法 ,每當服務器從客戶端收到新的數據時,該方法會在收到消息時被調用。
第三步:釋放 ByteBuffer,ByteBuf是一個引用計數對象,這個對象必須顯示地調用release()方法來釋放。
第四步:異常處理,即當Netty因爲IO錯誤或者處理器在處理事件時拋出的異常時觸發。在大部分狀況下,捕獲的異常應該被記錄下來而且把關聯的channel給關閉掉。
package com.itdragon.delimiter; import com.itdragon.utils.ITDragonUtil; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; public class ITDragonServerHandler extends ChannelInboundHandlerAdapter{ private static final String DELIMITER = "_$"; // 拆包分隔符 @Override public void channelRead(ChannelHandlerContext chc, Object msg) { try { // 普通讀寫數據 /* 設置字符串形式的解碼 new StringDecoder() 後能夠直接使用 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); */ System.out.println("Netty Server : " + msg.toString()); // 分隔符拆包 String response = ITDragonUtil.cal(msg.toString())+ DELIMITER; chc.channel().writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); } catch (Exception e) { e.printStackTrace(); } finally { ReferenceCountUtil.release(msg); // 寫入方法writeAndFlush ,Netty已經釋放了 } } // 當出現Throwable對象纔會被調用 @Override public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) { // 這個方法的處理方式會在遇到不一樣異常的狀況下有不一樣的實現,好比你可能想在關閉鏈接以前發送一個錯誤碼的響應消息。 cause.printStackTrace(); chc.close(); } }
第一步:建立一個用於發送請求的線程池。
第二步:客戶端實例化Bootstrap NIO服務啓動輔助類,簡化開發。
第三步:配置參數,粘包,發送請求。
第四步:關閉資源。
值得注意的是,和ServerBootstrap不一樣,它並無childHandler和childOption方法。
package com.itdragon.delimiter; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class ITDragonClient { private static final Integer PORT = 8888; private static final String HOST = "127.0.0.1"; private static final String DELIMITER = "_$"; // 拆包分隔符 public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes()); // 設置特殊分隔符 socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter)); // 設置指定長度分割 不推薦,二者選其一 // socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new ITDragonClientHandler()); } }) .option(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); // 創建鏈接 future.channel().writeAndFlush(Unpooled.copiedBuffer(("1+1"+DELIMITER).getBytes())); future.channel().writeAndFlush(Unpooled.copiedBuffer(("6+1"+DELIMITER).getBytes())); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
和服務器處理類同樣,這裏只負責打印數據。
package com.itdragon.delimiter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; public class ITDragonClientHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext chc, Object msg) { try { /* 設置字符串形式的解碼 new StringDecoder() 後能夠直接使用 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); */ System.out.println("Netty Client :" + msg); } catch (Exception e) { e.printStackTrace(); } finally { ReferenceCountUtil.release(msg); } } public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) { cause.printStackTrace(); chc.close(); } }
打印結果
一月 29, 2018 11:31:10 上午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0xcf3a3ac1] REGISTERED 一月 29, 2018 11:31:11 上午 io.netty.handler.logging.LoggingHandler bind 信息: [id: 0xcf3a3ac1] BIND: 0.0.0.0/0.0.0.0:8888 一月 29, 2018 11:31:11 上午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] ACTIVE 一月 29, 2018 11:31:18 上午 io.netty.handler.logging.LoggingHandler channelRead 信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xf1b8096b, L:/127.0.0.1:8888 - R:/127.0.0.1:4777] 一月 29, 2018 11:31:18 上午 io.netty.handler.logging.LoggingHandler channelReadComplete 信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE Netty Server : 1+1 Netty Server : 6+1 Netty Client :2 Netty Client :7
從日誌中能夠看出Channel的狀態從REGISTERED ---> ACTIVE ---> READ ---> READ COMPLETE。服務端也是按照特殊分割符拆包。
看完本章,你必需要掌握的三個知識點:NioEventLoopGroup,ServerBootstrap,ChannelHandlerAdapter
1 NioEventLoopGroup 本質就是一個線程池,管理多個NioEventLoop,一個NioEventLoop管理多個Channel。
2 NioEventLoop 負責不停地輪詢IO事件,處理IO事件和執行任務。
3 ServerBootstrap 是NIO服務的輔助啓動類,先配置服務參數,後執行bind方法啓動服務。
4 Bootstrap 是NIO客戶端的輔助啓動類,用法和ServerBootstrap相似。
5 Netty 使用FixedLengthFrameDecoder 固定長度拆包,DelimiterBasedFrameDecoder 分隔符拆包。
到這裏,Netty的拆包粘包,以及Netty的重要組件,服務器啓動流程到這裏就結束了,若是以爲不錯能夠點一個 "推薦" ,也能夠 "關注" 我哦。
http://blog.csdn.net/spiderdo...
https://www.jianshu.com/p/c50...