首先《Netty權威指南》私有協議開發那一章的樣例代碼是編譯不經過的(可是這絲絕不影響本書的價值)
處理方案能夠參考:http://www.itnose.net/detail/6112870.htmlhtml
另外這一章的私有協議開發案例也過於理想化,爲何這麼說呢?若是說服務端或客戶端有一方基於歷史緣由是用其餘語言實現的呢,好比C,Java和C的int類型字節長度是不同的
那LineBasedFrameDecoder就用不上了,讓咱們看一個實際的私有協議案例:java
這個協議在C/++程序員看來是個再正常不過的了,尤爲注意協議對長度字段是採用字符串方式描述的(最多支持9999),若是用Netty來實現,又該如何處理呢?程序員
public class NettyMessage { private Header header; private Object body; //檢驗和 private byte crcCode; public byte getCrcCode() { return crcCode; } public void setCrcCode(byte crcCode) { this.crcCode = crcCode; } public Header getHeader() { return header; } public void setHeader(Header header) { this.header = header; } public Object getBody() { return body; } public void setBody(Object body) { this.body = body; } public String toString(){ return ToStringBuilder.reflectionToString(this); } }
public class Header { //固定頭 private byte startTag; //命令碼,4位 private byte[] cmdCode; //版本 2位 private byte[] version; private int length; public byte[] getVersion() { return version; } public void setVersion(byte[] version) { this.version = version; } public byte[] getCmdCode() { return cmdCode; } public void setCmdCode(byte[] cmdCode) { this.cmdCode = cmdCode; } public byte getStartTag() { return startTag; } public void setStartTag(byte startTag) { this.startTag = startTag; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } public String toString(){ return ToStringBuilder.reflectionToString(this); } }
public class MessageEncoder extends MessageToByteEncoder<NettyMessage>{ @Override protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) throws Exception { try{ if(msg == null || msg.getHeader() == null){ throw new Exception("The encode message is null"); } out.writeByte(msg.getHeader().getStartTag()); out.writeBytes(msg.getHeader().getCmdCode()); //佔位 byte[] lengthBytes = new byte[]{0, 0, 0, 0}; out.writeBytes(lengthBytes); out.writeBytes(msg.getHeader().getVersion()); String body = (String)msg.getBody(); int length = 0; if(body != null){ byte[] bodyBytes = body.getBytes(); out.writeBytes(bodyBytes); length = bodyBytes.length; if(Constants.CRCCODE_DEFAULT != msg.getCrcCode()){ msg.setCrcCode(CRC8.calcCrc8(bodyBytes)); } } //長度從int轉換爲byte[4] byte l1 = getIndexToByte(length, 3); byte l2 = getIndexToByte(length, 2); byte l3 = getIndexToByte(length, 1); byte l4 = getIndexToByte(length, 0); lengthBytes = new byte[]{l1, l2, l3, l4}; out.setBytes(5, lengthBytes); out.writeByte(msg.getCrcCode()); }catch(Exception e){ e.printStackTrace(); throw e; } } public static byte getIndexToByte(int i, int index){ if(index == 0){ return (byte)(i % 10); }else{ int num = (int)Math.pow(10, index); return (byte)((i / num) % 10); } } }
public class MessageDecoder extends ByteToMessageDecoder { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { try{ if(in.readableBytes() < 12){ return; } in.markReaderIndex(); NettyMessage message = new NettyMessage(); Header header = new Header(); header.setStartTag(in.readByte()); byte[] cmdCode = new byte[4]; in.readBytes(cmdCode); header.setCmdCode(cmdCode); //長度從byte[4]轉int byte[] lengthBytes = new byte[4]; in.readBytes(lengthBytes); int length = toInt(lengthBytes); header.setLength(length); if(length < 0 || length > 10240){//過長消息或不合法消息 throw new IllegalArgumentException("wrong message length"); } byte[] version = new byte[2]; in.readBytes(version); header.setVersion(version); if(header.getLength() > 0){ if(in.readableBytes() < length + 1){ in.resetReaderIndex(); return; } byte[] bodyBytes = new byte[header.getLength()]; in.readBytes(bodyBytes); message.setBody(new String(bodyBytes)); } message.setCrcCode(in.readByte()); message.setHeader(header); out.add(message); }catch(Exception e){ e.printStackTrace(); throw e; } } public static int toInt(byte[] bytes){ int value = 0; for(int i=0; i<bytes.length; i++){ int num = (int)Math.pow(10, bytes.length - 1 - i); value += num * bytes[i]; } return value; } }
服務端代碼:ide
public class NettyServer implements Runnable{ private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); @Autowired Config config; public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); ChannelFuture f = b.bind(port).sync(); logger.info("Push server started on port " + port); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new MessageDecoder()) .addLast(new MessageEncoder()) } } @Override public void run() { try { this.bind(config.getServerPort()); } catch (Exception e) { logger.error(e.getMessage()); System.exit(1); } } }
客戶端代碼:oop
/** * 客戶端 * @author peng */ public class NettyClient { public void connect(String remoteServer, int port) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChildChannelHandler()); ChannelFuture f = b.connect(remoteServer,port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MessageDecoder()) .addLast(new MessageEncoder()) } } public static void main(String[] args){ try { new NettyClient().connect("127.0.0.1", 9080); } catch (Exception e) { e.printStackTrace(); } } }
總結:關鍵在於Encoder和Decoder的編碼實現ui