這裏咱們就要分析下netty4 中 hanlder從註冊到執行用戶消息事件的流程。java
在server端或是client端,都須要註冊handler,才能使用。經過以下方式爲channel設置相應的handler。linux
channel.pipeline().addLast(new RpcDecoder(MomResponse.class)); 或者 ChannelPipeline cp = socketChannel.pipeline(); cp.addLast(new RpcEncoder(RpcRequest.class));等
通常是要經過channel獲取pipeline,由於Channel的構造函數中會 new DefaultChannelPipeline(this);git
而這個pipeline內部又維護了一個 雙向鏈表,github
public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
而在addLast的過程當中,以下 ,會將handler 加到內部鏈表的尾部。可是在add以前 ,會將其封裝到一個DefaultChannelHandlerContext中,而這個context就 作爲鏈表中的一個節點。經過鏈表表實現每一個handler的順序執行。 數組
//DefaultChannelPipeline @Override public ChannelPipeline addLast(String name, ChannelHandler handler) { return addLast(null, name, handler); } @Override public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addLast0(name, newCtx); } return this; } private void addLast0(final String name, AbstractChannelHandlerContext newCtx) { checkMultiplicity(newCtx); AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; name2ctx.put(name, newCtx); callHandlerAdded(newCtx); }
netty中使用nio 中 selector也就是linux的epoll系統調用,來實現IO的多路複用。緩存
channel監聽 SelectionKey.OP_READ | SelectionKey.OP_ACCEPT 事件,然後會調用channel中構造的內部類 nioUafe 的read方法 。unsafe.read(); 然後經過channel中pipeline串聯整個msg的消息處理。核心是Context中的 fireChannelRead,由於每一個handler都封裝到一個Context中,經過以下的方法安全
//DefaultChannelHandlerContext @Override public ChannelHandlerContext fireChannelRead(final Object msg) { if (msg == null) { throw new NullPointerException("msg"); } final AbstractChannelHandlerContext next = findContextInbound();//找到鏈表中下一個處理讀的handler EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRead(msg); } }); } return this; } private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); //執行handler的邏輯。 } catch (Throwable t) { notifyHandlerException(t); } }
能夠完成整個handler鏈的執行。app
而在handler的執行流中,通常在中間的handler中,在執行channelRead的業務邏輯中,會將 context自身傳到方法裏,並會經過調用該context的 fireChannelRead 將處理後的msg 經過 查找下一個handler(由於context是一個雙向鏈表),找到相應的handler(In or Out)中的業務邏輯,直到最後一個hanlder不在調用 ctx.fireChannelRead 而整個handler鏈能夠分爲幾類,對於In 也就是收到消息的處理handlers來講,主要是分隔handler,Decoderhandler,業務邏輯handler。這三大類,是將收到的字節符按照設定的協議,執行完結束。socket
除了用戶定義的業務邏輯的handler以外,netty也爲咱們提供了不少十分有用的handler。咱們下面是以in類型的爲主進行介紹,out邏輯。經常使用的有ByteToMessageDecoder 、SimpleChannelInboundHandler、ChannelInboundHandlerAdapter、SslHandler、DelimiterBasedFrameDecoder、FixedLengthFrameDecoder等,這些handler之間有繼承的關係,在使用中咱們能夠直接用,有些也能夠經過 繼承 來擴展達到咱們的業務功能。從基類開始介紹tcp
相對來講比較底層的handler,能夠直接繼承,一般用在處理handler的register,unregister等事件,最爲核心的就是繼承 channelRead,經過以前的handler對msg的處理,直接能夠轉換爲java類,執行業務。
繼承自 ChannelInboundHandlerAdapter ,爲咱們作了一次轉換,將msg先轉換爲java類,而咱們能夠經過繼承,直接調用channelRead0,參數就是轉換好的java類。使用十分簡單。前提是前面的handler要先解碼。一般放在最後一個handler。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); // 實現業務邏輯 } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }
這個也是比較重要的handler,用戶解碼的基類handler,從名字也可猜出,其核心將接收的byte轉換爲用戶定義的mssage, 用戶須要 實現 decode方法,完成具體的轉換。
以下是其channelRead源碼
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; //上一次是否有累積的字節 if (first) { cumulation = data; } else { if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes() || cumulation.refCnt() > 1) { // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain(). // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 expandCumulation(ctx, data.readableBytes());//剩餘的空間不足寫下本次的數據,擴充累積區, } cumulation.writeBytes(data);//將本次讀取的data附加到上一次剩餘的數據中。 data.release(); } callDecode(ctx, cumulation, out); //解碼過程 } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); //當累積區已經不可讀了,釋放。 cumulation = null; } int size = out.size(); decodeWasNull = size == 0; for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); //執行下一個handler。 } out.recycle(); } } else { ctx.fireChannelRead(msg); } }
主要完成兩個事件
1)解碼(byte 轉成object)
過程相對簡單,經過定義了一個 cumulation 的累積緩存區,用以保存本次沒有處理完的buf,並等待下一個tcp包的到來。一塊兒傳遞到decode方法解決執行,如此反覆。解決了粘包的問題。不過注意一點,這個handler是非線程安全的,一個channle對應一個該handler。因此一般咱們在加入到piepelie中都是從新new的。
而對於轉換的邏輯來講,就須要根據邏輯,轉換成相應的對象了。經過callDecode,將從新組合後的cumulation,進行解碼。將解碼後的信息加到out中,該方法會經過循環,每次解碼後的out大小與解碼前大小是否一致,以此來決定是否結束本次解碼過程。由於一次callDecode可能會 攜帶多個msg。
2) 下一個handler
將轉換後的信息傳遞到下一個handler, 經過ctx.fireChannelRead。上面已經分析handler執行鏈的過程。
4. DelimiterBasedFrameDecoder
繼承自ByteToMessageDecoder,只需完成解碼的工做,一樣從名字看出,起到分隔的做用,就是將收到的字節以特殊的字符分隔。通常要指定最大長度,以及分隔符。超過最大長度拋異常。
通常用在以下,以換行符結尾,轉化爲string。
5. FixedLengthFrameDecoder
同上,解碼成固定長度的字節。
protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) { return null; } else { return in.readSlice(frameLength).retain(); } }
接下來咱們給出幾組經常使用的handler組合
1) 定義協議,decode爲java對象,
channel.pipeline().addLast(new RpcDecoder(Response.class)); channel.pipeline().addLast(new RpcEncoder(Request.class)); channel.pipeline().addLast(handle);
這種在Decoder中須要用戶去實現協議,最簡單的過程以下,信息頭部指定有效字節數,先讀取頭部長度。然後在讀取相應長度的字節,反序列化。而複雜的可能設定magic、type、length等。
@Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int HEAD_LENGTH=4; if (in.readableBytes() < HEAD_LENGTH) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] body = new byte[dataLength]; in.readBytes(body); Object obj=SerializeUtils.Deserialize(body);//經過定義序列化工具將字節數組轉換爲指定類的實例 out.add(obj); }
2) 以換行符結尾分隔,轉化爲string。
// 以("\n")爲結尾分割的 解碼器 pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); // 字符串解碼 和 編碼 pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); // 本身的邏輯Handler pipeline.addLast("handler", new SelfHandler());
http://blog.csdn.net/langzi7758521/article/details/52712159