Netty 拆包粘包和服務啓動流程分析

Netty 拆包粘包和服務啓動流程分析

經過本章學習,筆者但願你能掌握EventLoopGroup的工做流程,ServerBootstrap的啓動流程,ChannelPipeline是如何操做管理Channel。只有清楚這些,才能更好的瞭解和使用Netty。還在等什麼,快來學習吧!html

知識結構圖:java

Netty

技術:Netty,拆包粘包,服務啓動流程
說明:若你對NIO有必定的瞭解,對於本章知識來講有很大的幫助!NIO教程
源碼:https://github.com/ITDragonBl...git

Netty 重要組件

這裏讓你清楚瞭解 ChannelPipeline,ChannelHandlerContext,ChannelHandler,Channel 四者之間的關係。
這裏讓你清楚瞭解 NioEventLoopGroup,NioEventLoop,Channel 三者之間的關係。
這裏讓你清楚瞭解 ServerBootstrap,Channel 二者之間的關係。
看懂了這塊的理論知識,後面Netty拆包粘包的代碼就很是的簡單。github

Channel

Channel : Netty最核心的接口。NIO通信模式中經過Channel進行Socket套接字的讀,寫和同時讀寫操做。
ChannelHandler : 由於直接使用Channel會比較麻煩,因此在Netty編程中經過ChannelHandler間接操做Channel,從而簡化開發。
ChannelPipeline : 能夠理解爲一個管理ChandlerHandler的鏈表。對Channel進行操做時,Pipeline負責從尾部依次調用每個Handler進行處理。每一個Channel都有一個屬於本身的ChannelPipeline。
ChannelHandlerContext : ChannelPipeline經過ChannelHandlerContext間接管理每一個ChannelHandler。編程

以下圖所示,結合代碼,在服務器初始化和客戶端建立鏈接的過程當中加了四個Handler,分別是日誌事務,字符串分割解碼器,接受參數轉字符串解碼器,處理任務的Handler。
bootstrap

NioEventLoopGroup

EventLoopGroup : 本質是個線程池,繼承了ScheduledExecutorService 定時任務線程池。
NioEventLoopGroup : 是用來處理NIO通訊模式的線程池。每一個線程池有N個NioEventLoop來處理Channel事件,每個NioEventLoop負責處理N個Channel。
NioEventLoop : 負責不停地輪詢IO事件,處理IO事件和執行任務,類比多路複用器,細化分三件事。
1 輪詢註冊到Selector上全部的Channel的IO事件
2 處理產生網絡IO事件的Channel
3 處理隊列中的任務服務器

ServerBootstrap

本章重點,Netty是如何經過NIO輔助啓動類來初始化Channel的?先看下面的源碼。網絡

@Override
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
    ChannelPipeline p = channel.pipeline();
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

服務器啓動和鏈接過程:
第一步:是給Channel設置options和attrs,
第二步:複製childGroup,childHandler,childOptions和childAttrs等待服務器和客戶端鏈接,
第三步:實例化一個ChannelInitializer,添加到Pipeline的末尾。
第四步:當Channel註冊到NioEventLoop時,ChannelInitializer觸發initChannel方法,pipeline裝入自定義的Handler,給Channel設置一下child配置。多線程

小結:
1 group,options,attrs,handler,是在服務器端初始化時配置,是AbstractBootstrap的方法。
2 childGroup,childOption,childAttr,childHandler,是在服務器與客戶端創建Channel後配置,是ServerBootstrap的方法。
3 Bootstrap 和 ServerBootstrap 都繼承了AbstractBootstrap類。
4 若不設置childGroup,則默認取group值。
5 Bootstrap 和 ServerBootstrap 啓動服務時,都會執行驗證方法,判斷必填參數是否都有配置。socket

Netty 拆包粘包

這裏經過介紹Netty拆包粘包問題來對Netty進行入門學習。
在基於流的傳輸中,即使客戶端發送獨立的數據包,操做系統也會將其轉換成一串字節隊列,而服務端一次讀取到的字節數又不肯定。再加上網絡傳輸的快慢。服務端很難完整的接收到數據。
常見的拆包粘包方法有三種
1 服務端設置一次接收字節的長度。若服務端接收的字節長度不知足要求則一直處於等待。客戶端爲知足傳輸的字節長度合格,能夠考慮使用空格填充。
2 服務端設置特殊分隔符。客戶端經過特殊分隔符粘包,服務端經過特殊分隔符拆包。
3 自定義協議。數據傳輸通常分消息頭和消息體,消息頭中包含了數據的長度。服務端先接收到消息頭,得知須要接收N個數據,而後服務端接收直到數據爲N個爲止。
本章採用第二種,用特殊分隔符的方式。

建立服務端代碼流程

第一步:準備兩個線程池。一個用於接收事件的boss線程池,另外一個用於處理事件的worker線程池。
第二步:服務端實例化ServerBootstrap NIO服務輔助啓動類。用於簡化提升開發效率。
第三步:配置服務器啓動參數。好比channel的類型,接收channel的EventLoop,初始化的日誌打印事件,創建鏈接後的事件(拆包,對象轉字符串,自定義事件),初始化的配置和創建鏈接後的配置。
第四步:綁定端口,啓動服務。Netty會根據第三步配置的參數啓動服務。
第五步:關閉資源。

package com.itdragon.delimiter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;  
public class ITDragonServer {  
      
    private static final Integer PORT = 8888;                             // 被監聽端口號
    private static final String DELIMITER = "_$";                         // 拆包分隔符  
      
    public static void main(String[] args) {  
        EventLoopGroup bossGroup = new NioEventLoopGroup();             // 用於接收進來的鏈接
        EventLoopGroup workerGroup = new NioEventLoopGroup();             // 用於處理進來的鏈接
        try {  
            ServerBootstrap serverbootstrap = new ServerBootstrap();     // 啓動NIO服務的輔助啓動類
            serverbootstrap.group(bossGroup, workerGroup)                 // 分別設置bossGroup, workerGroup 順序不能反
            .channel(NioServerSocketChannel.class)                         // Channel的建立工廠,啓動服務時會經過反射的方式來建立一個NioServerSocketChannel對象
            .handler(new LoggingHandler(LogLevel.INFO))                    // handler在初始化時就會執行,能夠設置打印日誌級別
            .childHandler(new ChannelInitializer<SocketChannel>() {      // childHandler會在客戶端成功connect後才執行,這裏實例化ChannelInitializer
                @Override  
                protected void initChannel(SocketChannel socketChannel) throws Exception {     // initChannel方法執行後刪除實例ChannelInitializer,添加如下內容
                    ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes());          // 獲取特殊分隔符的ByteBuffer
                    socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter)); // 設置特殊分隔符用於拆包 
//                    socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8));  設置指定長度分割
                    socketChannel.pipeline().addLast(new StringDecoder());                  // 設置字符串形式的解碼  
                    socketChannel.pipeline().addLast(new ITDragonServerHandler());            // 自定義的服務器處理類,負責處理事件
                }  
            })  
            .option(ChannelOption.SO_BACKLOG, 128)                         // option在初始化時就會執行,設置tcp緩衝區  
            .childOption(ChannelOption.SO_KEEPALIVE, true);             // childOption會在客戶端成功connect後才執行,設置保持鏈接  
            ChannelFuture future = serverbootstrap.bind(PORT).sync();     // 綁定端口, 阻塞等待服務器啓動完成,調用sync()方法會一直阻塞等待channel的中止
            future.channel().closeFuture().sync();                         // 等待關閉 ,等待服務器套接字關閉
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            workerGroup.shutdownGracefully();                             // 關閉線程組,先打開的後關閉  
            bossGroup.shutdownGracefully();  
        }  
    }  
}

核心參數說明

NioEventLoopGroup : 是用來處理I/O操做的多線程事件循環器。 Netty提供了許多不一樣的EventLoopGroup的實現用來處理不一樣傳輸協議。
ServerBootstrap : 啓動NIO服務的輔助啓動類。先配置Netty服務端啓動參數,執行bind(PORT)方法纔算真正啓動服務。
group : 註冊EventLoopGroup
channel : channelFactory,用於配置通道的類型。
handler : 服務器始化時就會執行的事件。
childHandler : 服務器在和客戶端成功鏈接後會執行的事件。
initChannel : channelRegistered事件觸發後執行,刪除ChannelInitializer實例,添加該方法體中的handler。
option : 服務器始化的配置。
childOption : 服務器在和客戶端成功鏈接後的配置。
SocketChannel : 繼承了Channel,經過Channel能夠對Socket進行各類操做。
ChannelHandler : 經過ChannelHandler來間接操縱Channel,簡化了開發。
ChannelPipeline : 能夠當作是一個ChandlerHandler的鏈表。
ChannelHandlerContext : ChannelPipeline經過ChannelHandlerContext來間接管理ChannelHandler。

自定義服務器處理類

第一步:繼承 ChannelInboundHandlerAdapter,其父類已經實現了ChannelHandler接口,簡化了開發。
第二步:覆蓋 chanelRead()事件處理方法 ,每當服務器從客戶端收到新的數據時,該方法會在收到消息時被調用。
第三步:釋放 ByteBuffer,ByteBuf是一個引用計數對象,這個對象必須顯示地調用release()方法來釋放。
第四步:異常處理,即當Netty因爲IO錯誤或者處理器在處理事件時拋出的異常時觸發。在大部分狀況下,捕獲的異常應該被記錄下來而且把關聯的channel給關閉掉。

package com.itdragon.delimiter;
import com.itdragon.utils.ITDragonUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;  
  
public class ITDragonServerHandler extends ChannelInboundHandlerAdapter{  
    private static final String DELIMITER = "_$"; // 拆包分隔符  
    @Override  
    public void channelRead(ChannelHandlerContext chc, Object msg) {  
        try {  
            // 普通讀寫數據
            /* 設置字符串形式的解碼 new StringDecoder() 後能夠直接使用
            ByteBuf buf = (ByteBuf) msg;  
            byte[] req = new byte[buf.readableBytes()];  
            buf.readBytes(req);  
            String body = new String(req, "utf-8");  
            */
            System.out.println("Netty Server : " + msg.toString());
            // 分隔符拆包  
            String response = ITDragonUtil.cal(msg.toString())+ DELIMITER;  
            chc.channel().writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            ReferenceCountUtil.release(msg); // 寫入方法writeAndFlush ,Netty已經釋放了
        }  
    }  
    // 當出現Throwable對象纔會被調用
    @Override  
    public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {  
        // 這個方法的處理方式會在遇到不一樣異常的狀況下有不一樣的實現,好比你可能想在關閉鏈接以前發送一個錯誤碼的響應消息。  
        cause.printStackTrace();  
        chc.close();  
    }  
}

客戶端啓動流程

第一步:建立一個用於發送請求的線程池。
第二步:客戶端實例化Bootstrap NIO服務啓動輔助類,簡化開發。
第三步:配置參數,粘包,發送請求。
第四步:關閉資源。
值得注意的是,和ServerBootstrap不一樣,它並無childHandler和childOption方法。

package com.itdragon.delimiter;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;  
import io.netty.channel.ChannelFuture;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.ChannelOption;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.SocketChannel;  
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;  
  
public class ITDragonClient {  
      
    private static final Integer PORT = 8888;  
    private static final String HOST = "127.0.0.1";  
    private static final String DELIMITER = "_$"; // 拆包分隔符  
      
    public static void main(String[] args) {  
        NioEventLoopGroup group = new NioEventLoopGroup();  
        try {  
            Bootstrap bootstrap = new Bootstrap();  
            bootstrap.group(group)  
            .channel(NioSocketChannel.class)  
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override  
                protected void initChannel(SocketChannel socketChannel) throws Exception { 
                    ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes());  
                    // 設置特殊分隔符
                    socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter));  
                    // 設置指定長度分割  不推薦,二者選其一
//                    socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8));  
                    socketChannel.pipeline().addLast(new StringDecoder()); 
                    socketChannel.pipeline().addLast(new ITDragonClientHandler());  
                }  
            })  
            .option(ChannelOption.SO_KEEPALIVE, true);  
              
            ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); // 創建鏈接  
            future.channel().writeAndFlush(Unpooled.copiedBuffer(("1+1"+DELIMITER).getBytes()));  
            future.channel().writeAndFlush(Unpooled.copiedBuffer(("6+1"+DELIMITER).getBytes()));  
            future.channel().closeFuture().sync();  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            group.shutdownGracefully();  
        }  
    }  
}

客戶端請求接收類

和服務器處理類同樣,這裏只負責打印數據。

package com.itdragon.delimiter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;  
public class ITDragonClientHandler extends ChannelInboundHandlerAdapter{  
          
    @Override 
    public void channelRead(ChannelHandlerContext chc, Object msg) {  
        try {  
            /* 設置字符串形式的解碼 new StringDecoder() 後能夠直接使用
            ByteBuf buf = (ByteBuf) msg;  
            byte[] req = new byte[buf.readableBytes()];  
            buf.readBytes(req);  
            String body = new String(req, "utf-8");  
            */
            System.out.println("Netty Client :" + msg);  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            ReferenceCountUtil.release(msg);
        }  
    }  
    public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {  
        cause.printStackTrace();  
        chc.close();  
    }  
}

打印結果

一月 29, 2018 11:31:10 上午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xcf3a3ac1] REGISTERED
一月 29, 2018 11:31:11 上午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xcf3a3ac1] BIND: 0.0.0.0/0.0.0.0:8888
一月 29, 2018 11:31:11 上午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] ACTIVE
一月 29, 2018 11:31:18 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xf1b8096b, L:/127.0.0.1:8888 - R:/127.0.0.1:4777]
一月 29, 2018 11:31:18 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
Netty Server : 1+1
Netty Server : 6+1

Netty Client :2
Netty Client :7

從日誌中能夠看出Channel的狀態從REGISTERED ---> ACTIVE ---> READ ---> READ COMPLETE。服務端也是按照特殊分割符拆包。

總結

看完本章,你必需要掌握的三個知識點:NioEventLoopGroup,ServerBootstrap,ChannelHandlerAdapter
1 NioEventLoopGroup 本質就是一個線程池,管理多個NioEventLoop,一個NioEventLoop管理多個Channel。
2 NioEventLoop 負責不停地輪詢IO事件,處理IO事件和執行任務。
3 ServerBootstrap 是NIO服務的輔助啓動類,先配置服務參數,後執行bind方法啓動服務。
4 Bootstrap 是NIO客戶端的輔助啓動類,用法和ServerBootstrap相似。
5 Netty 使用FixedLengthFrameDecoder 固定長度拆包,DelimiterBasedFrameDecoder 分隔符拆包。

到這裏,Netty的拆包粘包,以及Netty的重要組件,服務器啓動流程到這裏就結束了,若是以爲不錯能夠點一個 "推薦" ,也能夠 "關注" 我哦。

優質文章

http://blog.csdn.net/spiderdo...
https://www.jianshu.com/p/c50...

相關文章
相關標籤/搜索