本節大綱:java
一、Handler的執行順序
二、自定義二進制協議(每條完整數據的組成),從而解決拆包和粘包。
三、經過爲每一個channel建立新的handler,從而解決即便handler中使用全局變量,也能夠避免競態條件。併發
一、Handler的執行順序。app
client中pipeline順序: //first,add codec pipeline.addLast(new BigIntegerDecoder()); pipeline.addLast(new NumberEncoder()); //then,add business logic pipeline.addLast(new FactorialClientHandler());
server中pipeline順序: //first,codec pipeline.addLast(new BigIntegerDecoder()); pipeline.addLast(new NumberEncoder()); //then,business logic pipeline.addLast(new FactorialServerHandler());
總結: 寫(outbound):自下而上,跳過inbound 讀(inbound): 自上而下,跳過outbound Codec放在上邊,業務邏輯handler放在下邊。
二、自定義二進制協議(每條完整數據的組成),從而解決拆包和粘包。ide
每條完整數據的組成:'F'+4個字節的長度+數據 將傳進來的number編碼爲二進制,在其前邊加上'F'和4個字節的長度,做爲前綴。 例如:42被編碼爲:'F',0,0,0,1,42
客戶端:oop
public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { StringBuilder sb = new StringBuilder(); for (int j = 0; j < 1024; j++) { sb.append(j); } BigInteger bigInt = new BigInteger(sb.toString()); ChannelFuture future = ctx.writeAndFlush(bigInt);//只發送1次 log.info("send:{}", bigInt); log.info("send字節數:{}", bigInt.toByteArray().length); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { log.info("發送成功"); } } }); }
/** * <pre> * 自定義二進制協議:F+4字節長度+具體數值 * 例如:'F',0,0,0,1,42 解碼爲new BigInteger("42") * </pre> */ @Slf4j public class BigIntegerDecoder extends ByteToMessageDecoder { private int splitCount = 0; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { log.info(">>>>>splitCount:{},可讀字節數:{}",++splitCount,in.readableBytes()); //wait until the length prefix is available if (in.readableBytes() < 5) { return ; } in.markReaderIndex(); int magicNumber = in.readUnsignedByte(); if (magicNumber !='F') { throw new CorruptedFrameException("Invalid:"+magicNumber); } //wait until the whole data is available int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return ; } //convert the received data into a new BigInteger byte [] decoded = new byte[dataLength]; in.readBytes(decoded); out.add(new BigInteger(decoded)); } }
客戶端發送了1240個字節的BigInteger,服務端接收:ui
18:15:13.121 [nioEventLoopGroup-3-1] >>>>>splitCount:1,可讀字節數:1024
18:15:13.126 [nioEventLoopGroup-3-1] >>>>>splitCount:2,可讀字節數:1245編碼
雖然客戶端只發送了1次,但服務端分2次接收。在BigIntegerDecoder中都接收完後,才調用FactorialServerHandler的channelRead0方法。url
注意,spa
in.markReaderIndex(); 。。。 if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return ; }
以上的流程:.net
當第一次時,in.readableBytes()=1024,而dataLength=1245,因此進入該方法,將readerIndex復位到以前mark處,此例爲0。捨棄該部分包數據。
當第二次時,in.readableBytes()=1245(說明,從0開始讀的),讀取到了完整的報文。
若是去掉以上代碼,則會報錯:
18:09:51.465 [nioEventLoopGroup-3-1] >>>>>splitCount:1,可讀字節數:1024 io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(5) + length(1240) exceeds writerIndex(1024): PooledUnsafeDirectByteBuf(ridx: 5, widx: 1024, cap: 1024)
固然,服務端處理完並把原文發給客戶端後,客戶端也是分2次讀取的:
18:15:13.119 [nioEventLoopGroup-2-1] send字節數:1240 18:15:13.153 [nioEventLoopGroup-2-1] >>>>>splitCount:1,可讀字節數:1024 18:15:13.154 [nioEventLoopGroup-2-1] >>>>>splitCount:2,可讀字節數:1245
三、經過爲每一個channel建立新的handler,從而解決即便handler中使用全局變量,也能夠避免競態條件。
併發發送數據包,且每一個數據包超過1024個字節,以下代碼中的成員變量:
public class FactorialServerHandler extends SimpleChannelInboundHandler<BigInteger> { private BigInteger lastMultiplier = new BigInteger("1"); private BigInteger factorial = new BigInteger("1"); @Override protected void channelRead0(ChannelHandlerContext ctx, BigInteger msg) throws Exception { //計算階乘併發送到客戶端 lastMultiplier = msg; factorial = factorial.multiply(msg); ctx.writeAndFlush(factorial); } 。。。
客戶端調用:
public static void main(String[] args) throws Exception { Thread t1 = new Thread(new Runnable() { @Override public void run() { executor(2,Thread.currentThread().getName());// 2*3*4*5=120 } }); t1.start(); Thread t2 = new Thread(new Runnable() { @Override public void run() { executor(3,Thread.currentThread().getName());// 3*4*5=60 } }); t2.start(); Thread t3 = new Thread(new Runnable() { @Override public void run() { executor(4,Thread.currentThread().getName());// 4*5=20 } }); t3.start(); } public static void executor(int next,String threadName) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).handler(new FactorialClientInitializer(next)); // make a new connection ChannelFuture f = b.connect(HOST, PORT).sync(); // get the handler instance to retrieve the answer. FactorialClientHandler handler = (FactorialClientHandler) f.channel().pipeline().last(); // print out the answer log.info("threadName:{},開始:{},結束:{},結果:{}", threadName,next,COUNT, handler.getFactorial()); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } }
結果:不亂,各自打印各自的。由於,每發送1條數據則建立1個Channel和handler ,因此不會亂。
四、codec是netty封裝好了的handler,簡化代碼開發。
本例中涉及的是:
ByteToMessageDecoder(inbound):必須實現decode方法
MessageToByteEncoder<Number>(outbound):必須實現encode方法
最後,
以上的3參考代碼:userguide-04-factorial。一、2參考代碼:userguide-04-2-factorial