首先,咱們回顧一下netty的組件設計:Netty的主要組件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等。java
ChannelHandler充當了處理入站和出站數據的應用程序邏輯的容器。例如,實現ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就能夠接收入站事件和數據,這些數據隨後會被你的應用程序的業務邏輯處理。當你要給鏈接的客戶端發送響應時,也能夠從ChannelInboundHandler沖刷數據。你的業務邏輯一般寫在一個或者多個ChannelInboundHandler中。ChannelOutboundHandler原理同樣,只不過它是用來處理出站數據的。git
ChannelPipeline提供了ChannelHandler鏈的容器。以客戶端應用程序爲例,若是事件的運動方向是從客戶端到服務端的,那麼咱們稱這些事件爲出站的,即客戶端發送給服務端的數據會經過pipeline中的一系列ChannelOutboundHandler,並被這些Handler處理,反之則稱爲入站的。github
當你經過Netty發送或者接受一個消息的時候,就將會發生一次數據轉換。入站消息會被解碼:從字節轉換爲另外一種格式(好比java對象);若是是出站消息,它會被編碼成字節。api
Netty提供了一系列實用的編碼解碼器,他們都實現了ChannelInboundHadnler或者ChannelOutcoundHandler接口。在這些類中,channelRead方法已經被重寫了。以入站爲例,對於每一個從入站Channel讀取的消息,這個方法會被調用。隨後,它將調用由已知解碼器所提供的decode()方法進行解碼,並將已經解碼的字節轉發給ChannelPipeline中的下一個ChannelInboundHandler。網絡
因爲你不可能知道遠程節點是否會一次性發送一個完整的信息,tcp有可能出現粘包拆包的問題,這個類會對入站數據進行緩衝,直到它準備好被處理。tcp
主要api有兩個:ide
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.isReadable()) { // Only call decode() if there is something left in the buffer to decode. // See https://github.com/netty/netty/issues/4386 decodeRemovalReentryProtection(ctx, in, out); } } }
decode方法:oop
必須實現的方法,ByteBuf包含了傳入數據,List用來添加解碼後的消息。對這個方法的調用將會重複進行,直到肯定沒有新的元素被添加到該List,或者該ByteBuf中沒有更多可讀取的字節時爲止。而後若是該List不會空,那麼它的內容將會被傳遞給ChannelPipeline中的下一個ChannelInboundHandler。性能
decodeLast方法:編碼
當Channel的狀態變成非活動時,這個方法將會被調用一次。
最簡單的例子:
public class ToIntegerDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4) { out.add(in.readInt()); } } }
這個例子,每次入站從ByteBuf中讀取4字節,將其解碼爲一個int,而後將它添加到下一個List中。當沒有更多元素能夠被添加到該List中時,它的內容將會被髮送給下一個ChannelInboundHandler。int在被添加到List中時,會被自動裝箱爲Integer。在調用readInt()方法前必須驗證所輸入的ByteBuf是否具備足夠的數據。
一個實用的例子:
public class MyDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; }
//在讀取前標記readerIndex in.markReaderIndex();
//讀取頭部 int length = in.readInt(); if (in.readableBytes() < length) {
//消息不完整,沒法處理,將readerIndex復位 in.resetReaderIndex(); return; } out.add(in.readBytes(length).toString(CharsetUtil.UTF_8)); } }
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder擴展了ByteToMessageDecoder類,使用這個類,咱們沒必要調用readableBytes()方法。參數S指定了用戶狀態管理的類型,其中Void表明不須要狀態管理。
以上代碼能夠簡化爲:
public class MySimpleDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //傳入的ByteBuf是ReplayingDecoderByteBuf //首先從入站ByteBuf中讀取頭部,獲得消息體長度length,而後讀取length個字節, //並添加到解碼消息的List中 out.add(in.readBytes(in.readInt()).toString(CharsetUtil.UTF_8)); }
如何實現的?
ReplayingDecoder在調用decode方法時,傳入的是一個自定義的ByteBuf實現:
final class ReplayingDecoderByteBuf extends ByteBuf
ReplayingDecoderByteBuf在讀取數據前,會先檢查是否有足夠的字節可用,以readInt()爲例:
final class ReplayingDecoderByteBuf extends ByteBuf { private static final Signal REPLAY = ReplayingDecoder.REPLAY; ...... @Override public int readInt() { checkReadableBytes(4); return buffer.readInt(); } private void checkReadableBytes(int readableBytes) { if (buffer.readableBytes() < readableBytes) { throw REPLAY; } } ...... }
若是字節數量不夠,會拋出一個Error(實際是一個Signal public final class Signal extends Error implements Constant<Signal> ),而後會在上層被捕獲並處理,它會把ByteBuf中的ReadIndex恢復到讀以前的位置,以供下次讀取。當有更多數據可供讀取時,該decode()方法將會被再次調用。最終結果和以前同樣,從ByteBuf中提取的String將會被添加到List中。
雖然ReplayingDecoder使用方便,但它也有一些侷限性:
1. 並非全部的 ByteBuf 操做都被支持,若是調用了一個不被支持的方法,將會拋出一個 UnsupportedOperationException。
2. ReplayingDecoder 在某些狀況下可能稍慢於 ByteToMessageDecoder,例如網絡緩慢而且消息格式複雜時,消息被拆成了多個碎片,因而decode()方法會被屢次調用反覆地解析一個消息。
3. 你須要時刻注意decode()方法在同一個消息上可能被屢次調用.。
一個簡單的echo服務,客戶端在鏈接創建時,向服務端發送消息(兩個1)。服務端須要一次拿到兩個Integer,並作處理。
EchoServerHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("msg from client: " + msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
EchoClientHandler
public class EchoClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("sent to server: 11"); ctx.writeAndFlush(1); Thread.sleep(1000); ctx.writeAndFlush(1); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
解碼器
public class MyReplayingDecoder extends ReplayingDecoder<Void> { private final Queue<Integer> values = new LinkedList<>(); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { values.add(in.readInt()); values.add(in.readInt()); assert values.size() == 2; out.add(values.poll() + values.poll()); } }
運行程序,就會發現斷言失敗。
咱們經過在decode()方法中打印日誌或者打斷點的方式,能夠看到,decode()方法是被調用了兩次的,分別在服務端兩次接受到消息的時候:
第一次調用時,因爲緩衝區中只有四個字節,在第二句 values.add(in.readInt()) 中拋出了異常REPLAY,在ReplayingDecoder中被捕獲,並復位ReadIndex。此時values.size() = 1。
第二次調用時,從頭開始讀取到兩個Integer並放入values,所以values.size() = 3。
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //清空隊列 values.clear(); values.add(in.readInt()); values.add(in.readInt()); assert values.size() == 2; out.add(values.poll() + values.poll()); }
如何提升ReplayingDecoder的性能?如上所說,使用ReplayingDecoder存在對一個消息屢次重複解碼的問題,咱們能夠經過Netty提供的狀態控制來解決這個問題。
首先咱們將消息結構設計爲:header(4個字節,存放消息體長度),body(消息體)
根據消息的結構,咱們定義兩個狀態:
public enum MyDecoderState { /** * 未讀頭部 */ READ_LENGTH, /** * 未讀內容 */ READ_CONTENT; }
EchoClientHandler
public class EchoClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { System.out.println("sent to server: msg" + i); ctx.writeAndFlush("msg" + i); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
EchoServerHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("msg from client: " + ((ByteBuf) msg).toString(CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
解碼器
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> { private int length; public IntegerHeaderFrameDecoder() { // Set the initial state. super(MyDecoderState.READ_LENGTH); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { switch (state()) { case READ_LENGTH: length = in.readInt(); checkpoint(MyDecoderState.READ_CONTENT); case READ_CONTENT: ByteBuf frame = in.readBytes(length); checkpoint(MyDecoderState.READ_LENGTH); out.add(frame); break; default: throw new Error("Shouldn't reach here."); } } }
編碼器
public class MyEncoder extends MessageToByteEncoder<String> { @Override protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception { byte[] b = msg.getBytes(); int length = b.length; //write length of msg out.writeInt(length); //write msg out.writeBytes(b); } }
當頭部被成功讀取到時,咱們調用 checkpoint(MyDecoderState.READ_CONTENT) 設置狀態爲「未讀消息」,至關於設置一個標誌位,若是在後續讀取時拋出異常,那麼readIndex會被複位到上一次你調用checkpoint()方法的地方。下一次接收到消息,再次調用decode()方法時,就可以從checkpoint處開始讀取,避免了又從頭開始讀。
LineBasedFrameDecoder
這個類在Netty內部也有使用,它使用行尾控制字符(\n或者\r\n)做爲分隔符來解析數據。
DelimiterBasedFrameDecoder
使用自定義的特殊字符做爲消息的分隔符。
HttpObjectDecoder
一個HTTP數據的解碼器。
這些解碼器也很是實用,下次更新關於這些解碼器的原理和詳細使用。
更多詳細內容參見《netty in action》 或者netty源碼的英文註釋。