package com.dlb.note.client; import com.dlb.note.constant.ConstantValue; import com.dlb.note.doj.Request; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.MessageToByteEncoder; /** * 功能:自定義編碼器客戶端 * 版本:1.0 * 日期:2016/12/19 19:55 * 做者:馟蘇 */ public class MySelfEncoderClient { /** * 主函數 */ public static void main(String []args) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new MySelfEncoder()); channel.pipeline().addLast(new MySelfEncoderHandler()); } }); // 等待客戶端連接成功 ChannelFuture future = bootstrap.connect("localhost", 8888).sync(); System.out.println("客戶端連接成功!"); // 等待客戶端連接關閉 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } /** * 請求編碼器 * <pre> * 數據包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包頭 | 模塊號 | 命令號 | 長度 | 數據 | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包頭4字節 * 模塊號2字節short * 命令號2字節short * 長度4字節(描述數據部分字節長度) * @author 馟蘇 */ class MySelfEncoder extends MessageToByteEncoder { /** * 編碼 * @param channelHandlerContext * @param rs * @param byteBuf * @throws Exception */ protected void encode(ChannelHandlerContext channelHandlerContext, Object rs, ByteBuf byteBuf) throws Exception { Request request = (Request)(rs); //包頭 byteBuf.writeInt(ConstantValue.FLAG); //module byteBuf.writeShort(request.getModule()); //cmd byteBuf.writeShort(request.getCmd()); //長度 byteBuf.writeInt(request.getDataLength()); //data if(request.getData() != null){ byteBuf.writeBytes(request.getData()); } } } /** * 自定義處理器 */ class MySelfEncoderHandler extends ChannelHandlerAdapter { // 可讀 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { } // 鏈接 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { Request request = new Request(); request.setCmd((short) 100); request.setData("nihao".getBytes()); request.setModule((short) 9000); ctx.writeAndFlush(request); } } // 關閉 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("client close,ip:" + ctx.channel().remoteAddress()); ctx.close(); } // 異常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.toString()); ctx.close(); } }
package com.dlb.note.server; import com.dlb.note.doj.Request; import com.dlb.note.server.decode.MySelfDecoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 功能:自定義解碼器服務端 * 版本:1.0 * 日期:2016/12/15 12:47 * 做者:馟蘇 */ public class MySelfDecoderServer { /** * 主函數 */ public static void main(String []args) { // 配置服務端的NIO線程池 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // 當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度 .option(ChannelOption.SO_BACKLOG, 1024) // 爲管道添加處理器 .childHandler(new ChannelInitializer() { // 初始化管道 protected void initChannel(Channel channel) throws Exception { channel.pipeline() .addLast(new MySelfDecoder()) .addLast(new MySelfDecoderHandler()); } }); // 綁定端口,同步等待成功 ChannelFuture future = serverBootstrap.bind(8888).sync(); System.out.println("服務器在8888端口監聽hello"); // 等待服務端監聽端口關閉 future.channel().closeFuture().sync(); System.out.println("服務器關閉bye"); } catch (Exception e) { e.printStackTrace(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * 自定義處理器 */ class MySelfDecoderHandler extends ChannelHandlerAdapter { // 可讀 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request) msg; System.out.println(request.toString()); } // 鏈接 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client come,ip:" + ctx.channel().remoteAddress()); } // 關閉 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("client close,ip:" + ctx.channel().remoteAddress()); ctx.close(); } // 異常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.toString()); ctx.close(); } }
package com.dlb.note.server.decode; import com.dlb.note.constant.ConstantValue; import com.dlb.note.doj.Request; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 請求解碼器 * <pre> * 數據包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包頭 | 模塊號 | 命令號 | 長度 | 數據 | * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包頭4字節 * 模塊號2字節short * 命令號2字節short * 長度4字節(描述數據部分字節長度) */ /** * 功能:自定義消息解碼器 * 版本:1.0 * 日期:2016/12/19 19:42 * 做者:馟蘇 */ public class MySelfDecoder extends ByteToMessageDecoder { /** * 數據包基本長度 */ public static int BASE_LENTH = 4 + 2 + 2 + 4; /** * 解碼器 * @param channelHandlerContext * @param byteBuf * @param list * @throws Exception */ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { // 可讀長度必須大於基本長度 if(byteBuf.readableBytes() >= BASE_LENTH){ // 防止socket字節流攻擊 if(byteBuf.readableBytes() > 2048){ byteBuf.skipBytes(byteBuf.readableBytes()); } // 記錄包頭開始的index int beginReader; while(true){ beginReader = byteBuf.readerIndex(); byteBuf.markReaderIndex(); if(byteBuf.readInt() == ConstantValue.FLAG){ break; } // 未讀到包頭,略過一個字節 byteBuf.resetReaderIndex(); byteBuf.readByte(); // 長度又變得不知足 if(byteBuf.readableBytes() < BASE_LENTH){ return; } } // 模塊號 short module = byteBuf.readShort(); // 命令號 short cmd = byteBuf.readShort(); // 長度 int length = byteBuf.readInt(); // 判斷請求數據包數據是否到齊 if(byteBuf.readableBytes() < length){ // 還原讀指針 byteBuf.readerIndex(beginReader); return; } // 讀取data數據 byte[] data = new byte[length]; byteBuf.readBytes(data); Request request = new Request(); request.setModule(module); request.setCmd(cmd); request.setData(data); // 繼續往下傳遞 list.add(request); } } }
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); boolean var12 = false; try { var12 = true; ByteBuf t = (ByteBuf)msg; this.first = this.cumulation == null; if(this.first) { this.cumulation = t; } else { if(this.cumulation.writerIndex() > this.cumulation.maxCapacity() - t.readableBytes()) { this.expandCumulation(ctx, t.readableBytes()); } this.cumulation.writeBytes(t); t.release(); } this.callDecode(ctx, this.cumulation, out); var12 = false; } catch (DecoderException var13) { throw var13; } catch (Throwable var14) { throw new DecoderException(var14); } finally { if(var12) { if(this.cumulation != null && !this.cumulation.isReadable()) { this.cumulation.release(); this.cumulation = null; } int size = out.size(); this.decodeWasNull = size == 0; for(int i1 = 0; i1 < size; ++i1) { ctx.fireChannelRead(out.get(i1)); } out.recycle(); } } if(this.cumulation != null && !this.cumulation.isReadable()) { this.cumulation.release(); this.cumulation = null; } int var16 = out.size(); this.decodeWasNull = var16 == 0; for(int i = 0; i < var16; ++i) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } else { ctx.fireChannelRead(msg); } }
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { this.replayable.setCumulation(in); try { while(in.isReadable()) { int cause = this.checkpoint = in.readerIndex(); int outSize = out.size(); Object oldState = this.state; int oldInputLength = in.readableBytes(); try { this.decode(ctx, this.replayable, out); if(ctx.isRemoved()) { break; } if(outSize == out.size()) { if(oldInputLength == in.readableBytes() && oldState == this.state) { throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() must consume the inbound " + "data or change its state if it did not decode anything."); } continue; } } catch (Signal var10) { var10.expect(REPLAY); if(!ctx.isRemoved()) { int checkpoint = this.checkpoint; if(checkpoint >= 0) { in.readerIndex(checkpoint); } } break; } if(cause == in.readerIndex() && oldState == this.state) { throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() method must consume the inbound data " + "or change its state if it decoded something."); } if(this.isSingleDecode()) { break; } } } catch (DecoderException var11) { throw var11; } catch (Throwable var12) { throw new DecoderException(var12); } }