在數據傳輸中,咱們發送的數據包以下所示java
+-----+-----+-----+git
| ABC | DEF | GHI |github
+-----+-----+-----+json
而實際接收的包的格式爲:服務器
+----+-------+---+---+ | AB | CDEFG | H | I | +----+-------+---+---+app
產生的緣由爲:數據在傳輸過程當中,產生數據包碎片(TCP/IP數據傳輸時大數據包沒法一次傳輸,被拆分紅小數據包,小數據包即爲數據包碎片),這就形成了實際接收的數據包和發送的數據包不一致的狀況。框架
那麼通常狀況下咱們是如何解決這種問題的呢?我所知道的有這幾種方案:分佈式
首先看netty中消息定長如何處理的,首先在服務端設定當前服務端接受大小爲固定長度,sc.pipeline().addLast(new FixedLengthFrameDecoder(5));本例中指定長度爲5個字符串大小,此類是netty框架提供。
ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //設置定長字符串接收 sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); //設置字符串形式的解碼 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } });
客戶端代碼在建立handler的時候也要指定長度大小,而且與服務器端指定大小一致便可ide
EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new FixedLengthFrameDecoder(5));//制定傳輸數據大小 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes())); cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccccc".getBytes()));
運行結果以下:oop
嘗試了第一種方案後,下面嘗試下第二種方案,這種定長的方式直接限制了傳輸信息的大小,並且要服務端和客戶端同時指定大小感受並非太好,下面看下指定分隔符是如何處理的呢,首先服務端指定分隔符ByteBuf buf = Unpooled.copiedBuffer("*_".getBytes());而後建立new DelimiterBasedFrameDecoder(1024, buf)分割符指定Decoder
ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //設置特殊分隔符 ByteBuf buf = Unpooled.copiedBuffer("*_".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //設置字符串形式的解碼 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } });
Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("*_".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbasfab*_".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("ccsdfasfcc*_".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("fffff*_".getBytes())); //等待客戶端端口關閉 cf.channel().closeFuture().sync();
下面介紹第三種處理方案,也是不少分佈式框架中使用的方式,
Netty4自己自帶了ObjectDecoder,ObjectEncoder來實現自定義對象的序列 化, 可是用的是java內置的序列化,因爲java序列化的性能並非很好, 因此不少時候咱們須要用其餘序列化方式,常見的有 Kryo,Jackson,fastjson,protobuf等。這裏要寫的其實用什麼序列化不是重點,而是咱們怎麼設計咱們的Decoder和 Encoder。
首先咱們寫一個Encoder,咱們繼承自MessageToByteEncoder<T> ,把對象轉換成byte,繼承這個對象,會要求咱們實現一個encode方法:
@Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { byte[] body = convertToBytes(msg); //將對象轉換爲byte,僞代碼,具體用什麼進行序列化,大家自行選擇。可使用我上面說的一些 int dataLength = body.length; //讀取消息的長度 out.writeInt(dataLength); //先將消息長度寫入,也就是消息頭 out.writeBytes(body); //消息體中包含咱們要發送的數據 }
那麼當咱們在Decode的時候,該怎麼處理髮送過來的數據呢?這裏咱們繼承ByteToMessageDecoder方法,繼承這個對象,會要求咱們實現一個decode方法
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < HEAD_LENGTH) { //這個HEAD_LENGTH是咱們用於表示頭長度的字節數。 因爲上面咱們傳的是一個int類型的值,因此這裏HEAD_LENGTH的值爲4. return; } in.markReaderIndex(); //咱們標記一下當前的readIndex的位置 int dataLength = in.readInt(); // 讀取傳送過來的消息的長度。ByteBuf 的readInt()方法會讓他的readIndex增長4 if (dataLength < 0) { // 咱們讀到的消息體長度爲0,這是不該該出現的狀況,這裏出現這狀況,關閉鏈接。 ctx.close(); } if (in.readableBytes() < dataLength) { //讀到的消息體長度若是小於咱們傳送過來的消息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方 in.resetReaderIndex(); return; } byte[] body = new byte[dataLength]; // 嗯,這時候,咱們讀到的長度,知足咱們的要求了,把傳送過來的數據,取出來吧~~ in.readBytes(body); // Object o = convertToObject(body); //將byte數據轉化爲咱們須要的對象。僞代碼,用什麼序列化,自行選擇 out.add(o); }