ChannelHandler 是 Netty 程序的關鍵元素,因此完全地測試它們應該是你的開發過程當中的一個標準部分,EmbeddedChannel 是 Netty 專門爲改進針對 ChannelHandler 的單元測試而提供的。Netty 提供了它所謂的 Embedded 傳輸,這個傳輸是 EmbeddedChannel 的功能,提供了經過 ChannelPipeline 傳播事件的簡便方法java
這個方法是:將入站數據或者出站數據寫入到 EmbeddedChannel 中,而後檢查是否有任何東西到達 CHannelPipeline 的尾端。經過這種方式,你能夠肯定消息是否已經被編碼或者解碼過,以及是否觸發了任何 ChannelHandler 動做ide
下表列出了 EmbeddedChannel 的相關方法單元測試
入站數據由 ChannelInboundHandler 處理,表明從遠程節點讀取的數據。出站數據由 ChannelOutboundHandler 處理,表明將要寫到遠程節點的數據。根據你要測試的 ChannelHandler,你可使用 Inbound() 或者 Outbound() 方法對,或者兼而有之測試
下圖展現了使用 EmbeddedChannel 的方法,數據是如何流經 ChannelPipeline 的。 你可使用 writeOutbound()方法將消息寫到 Channel 中,並經過 ChannelPipeline 沿 着出站的方向傳遞。隨後,你可使用 readOutbound()方法來讀取已被處理過的消息,以確 定結果是否和預期同樣。相似地,對於入站數據,你須要使用 writeInbound()和 readInbound() 方法
![](G:\SSS\Java\Java SE\博客\Netty\EmbeddedChannel 的數據流.png)this
下述代碼展現了一個簡單的 ByteToMessageDecoder 實現,給定足夠的數據,這個實現將產生固定大小的幀。若是沒有足夠的數據可供讀取,它將等待下一個數據塊的到來,並將再次檢查是否可以產生一個新的幀編碼
public class FixedLengthFrameDecoder extends ByteToMessageDecoder { // 指定要生成的幀的長度 private final int frameLength; public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException("frameLength must be a positive integer:" + frameLength); } this.frameLength = frameLength; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //檢查是否有足夠的字節能夠被讀取,以生成下一個幀 while (in.readableBytes() >= frameLength) { //從 ByteBuf 中讀取一個新幀 ByteBuf buf = in.readBytes(frameLength); //將該幀添加到已被解碼的消息列表中 out.add(buf); } } }
下述代碼展現了使用 EmbeddedChannel 的對於前面代碼的測試code
public class FixedLengthFrameDecoderTest { @Test public void testFrameDecoded() { //建立一個 ByteBuf,並存儲 9 字節 ByteBuf buf = Unpooled.buffer(); for (int i = 0; i < 9; i++) { buf.writeByte(i); } ByteBuf input = buf.duplicate(); EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3)); //將數據寫入 EmbeddedChannel System.out.println(channel.writeInbound(input.retain()));//true //標記 Channel 爲已完成狀態 System.out.println(channel.finish());//true //讀取所生成的消息,而且驗證是否有 3 幀,其中每幀都爲 3 字節 ByteBuf read = channel.readInbound(); System.out.println(buf.readSlice(3).equals(read));// true read = channel.readInbound(); System.out.println(buf.readSlice(3).equals(read));// true read.release(); read = channel.readInbound(); System.out.println(buf.readSlice(3).equals(read));// true read.release(); System.out.println(channel.readInbound() == null);// true buf.release(); } @Test public void testFramesDescode2() { ByteBuf buf = Unpooled.buffer(); for (int i = 0; i < 9; i++) { buf.writeByte(i); } ByteBuf input = buf.duplicate(); EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3)); //返回 false,由於沒有一個完整的可供讀取的幀 System.out.println(channel.writeInbound(input.readBytes(2)));// false System.out.println(channel.writeInbound(input.readBytes(7)));// true System.out.println(channel.finish());// true ByteBuf read = channel.readInbound(); System.out.println(buf.readSlice(3) == read);// false read.release(); read = channel.readInbound(); System.out.println(buf.readSlice(3) == read);// false read.release(); read = channel.readInbound(); System.out.println(buf.readSlice(3) == read);// false read.release(); System.out.println(channel.readInbound() == null);// true buf.release(); } }
測試出站消息的處理過程和剛纔所看到的相似,在下面的例子中,咱們將會展現如何使用 EmbeddedChannel 來測試另外一個編碼器形式的 ChannelOutboundHandler,編碼器是一種將一種消息格式轉換爲另外一種的組件blog
該示例將會按照下列方式工做:事件
下述代碼展現了這個邏輯ip
public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> { @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { while (msg.readableBytes() >= 4) { //從輸入的 ByteBuf 中讀取下一個整數,而且計算其絕對值 int value = Math.abs(msg.readInt()); //將該整數寫入到編碼消息的 List 中 out.add(value); } } }
使用 EmbeddedChannel 來測試代碼
public class AbsIntegerEncoderTest { @Test public void testEncoded() { ByteBuf buf = Unpooled.buffer(); for (int i = 1; i < 10; i++) { buf.writeInt(i * -1); } // 建立一個 EmbeddedChanel,並安裝一個要測試的 AbsIntegerEncoder EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder()); // 寫入 ByteBuf,調用 readOutbound() 方法將會產生數據 System.out.println(channel.writeOutbound(buf)); System.out.println(channel.finish()); channel.readOutbound(); for (int i = 1; i < 10; i++) { int temp = channel.readOutbound(); System.out.println(temp); } System.out.println(channel.readOutbound() == null); } }
下面是代碼中執行的步驟。
應用程序一般須要執行比轉換數據更加複雜的任務。例如,你可能須要處理格式不正確的輸 入或者過量的數據。在下一個示例中,若是所讀取的字節數超出了某個特定的限制,咱們將會拋出一個 TooLongFrameException,這是一種常常用來防範資源被耗盡的方法
實現的代碼以下
// 擴展 ByteToMessageDecoder 以將入站字節碼爲消息 public class FrameChunkDecoder extends ByteToMessageDecoder { private final int maxFrameSize; public FrameChunkDecoder(int maxFrameSize) { this.maxFrameSize = maxFrameSize; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int readableBytes = in.readableBytes(); if (readableBytes > maxFrameSize) { // 若是該幀太大,則丟棄它並拋出一個 TooLongFrameException in.clear(); throw new TooLongFrameException(); } // 不然,從 ByteBuf 中讀取一個新的幀 ByteBuf buf = in.readBytes(readableBytes); // 該幀添加到解碼消息的List中 out.add(buf); } }
再使用 EmbeddedChannel 來測試這段代碼
public class FrameChunkDecoderTest { @Test public void testFramesDecoded() { ByteBuf buf = Unpooled.buffer(); for (int i = 0; i < 9; i++) { buf.writeByte(i); } ByteBuf input = buf.duplicate(); // 建立一個 EmbeddedChannel,並向其安裝一個幀大小爲 3 字節的 FixedLengthFrameDecoder EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3)); System.out.println(channel.writeInbound(input.readBytes(2))); try { // 寫入一個 4 字節大小的幀,並捕獲預期的異常 channel.writeInbound(input.readBytes(4)); } catch (TooLongFrameException e) { e.printStackTrace(); } // 寫入剩餘的 2 字節,會產生一個有效幀 System.out.println(channel.writeInbound(input.readBytes(3)));//true System.out.println(channel.finish()); // 讀取產生的消息,而且驗證值 ByteBuf read = channel.readInbound(); System.out.println(read.equals(buf.readSlice(2)));//true read.release(); read = channel.readInbound(); System.out.println(read.equals(buf.skipBytes(4).readSlice(3)));//true read.release(); buf.release(); } }