Netty 4 經常使用 handler 分析

這裏咱們就要分析下netty4 中 hanlder從註冊到執行用戶消息事件的流程。java

handler的註冊

在server端或是client端,都須要註冊handler,才能使用。經過以下方式爲channel設置相應的handler。linux

channel.pipeline().addLast(new RpcDecoder(MomResponse.class));

或者
 ChannelPipeline cp = socketChannel.pipeline();
 cp.addLast(new RpcEncoder(RpcRequest.class));等

 通常是要經過channel獲取pipeline,由於Channel的構造函數中會 new DefaultChannelPipeline(this);git

而這個pipeline內部又維護了一個 雙向鏈表,github

public DefaultChannelPipeline(AbstractChannel channel) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        this.channel = channel;
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }

而在addLast的過程當中,以下 ,會將handler 加到內部鏈表的尾部。可是在add以前 ,會將其封裝到一個DefaultChannelHandlerContext中,而這個context就 作爲鏈表中的一個節點。經過鏈表表實現每一個handler的順序執行。 數組

//DefaultChannelPipeline
    @Override
    public ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }

    @Override
    public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
        synchronized (this) {
            checkDuplicateName(name);

            AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
            addLast0(name, newCtx);
        }

        return this;
    }

    private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
        checkMultiplicity(newCtx);

        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;

        name2ctx.put(name, newCtx);

        callHandlerAdded(newCtx);
    }

handler 的執行  

netty中使用nio  中 selector也就是linux的epoll系統調用,來實現IO的多路複用。緩存

channel監聽 SelectionKey.OP_READ | SelectionKey.OP_ACCEPT 事件,然後會調用channel中構造的內部類 nioUafe 的read方法 。unsafe.read();  然後經過channel中pipeline串聯整個msg的消息處理。核心是Context中的 fireChannelRead,由於每一個handler都封裝到一個Context中,經過以下的方法安全

//DefaultChannelHandlerContext
  @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        final AbstractChannelHandlerContext next = findContextInbound();//找到鏈表中下一個處理讀的handler
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(msg);
        } else {
            executor.execute(new OneTimeTask() {
                @Override
                public void run() {
                    next.invokeChannelRead(msg);
                }
            });
        }
        return this;
    }

    private void invokeChannelRead(Object msg) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg); //執行handler的邏輯。
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }

能夠完成整個handler鏈的執行。app

而在handler的執行流中,通常在中間的handler中,在執行channelRead的業務邏輯中,會將 context自身傳到方法裏,並會經過調用該context的 fireChannelRead 將處理後的msg  經過 查找下一個handler(由於context是一個雙向鏈表),找到相應的handler(In or  Out)中的業務邏輯,直到最後一個hanlder不在調用 ctx.fireChannelRead 而整個handler鏈能夠分爲幾類,對於In 也就是收到消息的處理handlers來講,主要是分隔handler,Decoderhandler,業務邏輯handler。這三大類,是將收到的字節符按照設定的協議,執行完結束。socket

經常使用handle 分析 

除了用戶定義的業務邏輯的handler以外,netty也爲咱們提供了不少十分有用的handler。咱們下面是以in類型的爲主進行介紹,out邏輯。經常使用的有ByteToMessageDecoder 、SimpleChannelInboundHandler、ChannelInboundHandlerAdapter、SslHandler、DelimiterBasedFrameDecoder、FixedLengthFrameDecoder等,這些handler之間有繼承的關係,在使用中咱們能夠直接用,有些也能夠經過 繼承 來擴展達到咱們的業務功能。從基類開始介紹tcp

1.  ChannelInboundHandlerAdapter  

相對來講比較底層的handler,能夠直接繼承,一般用在處理handler的register,unregister等事件,最爲核心的就是繼承  channelRead,經過以前的handler對msg的處理,直接能夠轉換爲java類,執行業務。

2. SimpleChannelInboundHandler

繼承自 ChannelInboundHandlerAdapter  ,爲咱們作了一次轉換,將msg先轉換爲java類,而咱們能夠經過繼承,直接調用channelRead0,參數就是轉換好的java類。使用十分簡單。前提是前面的handler要先解碼。一般放在最後一個handler。

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg); // 實現業務邏輯
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

 

3. ByteToMessageDecoder

這個也是比較重要的handler,用戶解碼的基類handler,從名字也可猜出,其核心將接收的byte轉換爲用戶定義的mssage, 用戶須要 實現 decode方法,完成具體的轉換。

以下是其channelRead源碼

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null; //上一次是否有累積的字節
                if (first) {
                    cumulation = data;
                } else {
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()
                            || cumulation.refCnt() > 1) {
                        // Expand cumulation (by replace it) when either there is not more room in the buffer
                        // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                        // duplicate().retain().
                        //
                        // See:
                        // - https://github.com/netty/netty/issues/2327
                        // - https://github.com/netty/netty/issues/1764
                        expandCumulation(ctx, data.readableBytes());//剩餘的空間不足寫下本次的數據,擴充累積區,
                    }
                    cumulation.writeBytes(data);//將本次讀取的data附加到上一次剩餘的數據中。
                    data.release(); 
                }
                callDecode(ctx, cumulation, out); //解碼過程
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    cumulation.release(); //當累積區已經不可讀了,釋放。
                    cumulation = null;
                }
                int size = out.size();
                decodeWasNull = size == 0;

                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));  //執行下一個handler。
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

主要完成兩個事件

    1)解碼(byte 轉成object)

過程相對簡單,經過定義了一個 cumulation 的累積緩存區,用以保存本次沒有處理完的buf,並等待下一個tcp包的到來。一塊兒傳遞到decode方法解決執行,如此反覆。解決了粘包的問題。不過注意一點,這個handler是非線程安全的,一個channle對應一個該handler。因此一般咱們在加入到piepelie中都是從新new的。

而對於轉換的邏輯來講,就須要根據邏輯,轉換成相應的對象了。經過callDecode,將從新組合後的cumulation,進行解碼。將解碼後的信息加到out中,該方法會經過循環,每次解碼後的out大小與解碼前大小是否一致,以此來決定是否結束本次解碼過程。由於一次callDecode可能會 攜帶多個msg。

    2) 下一個handler 

將轉換後的信息傳遞到下一個handler,    經過ctx.fireChannelRead。上面已經分析handler執行鏈的過程。

4.   DelimiterBasedFrameDecoder

繼承自ByteToMessageDecoder,只需完成解碼的工做,一樣從名字看出,起到分隔的做用,就是將收到的字節以特殊的字符分隔。通常要指定最大長度,以及分隔符。超過最大長度拋異常。

通常用在以下,以換行符結尾,轉化爲string。

5. FixedLengthFrameDecoder 

同上,解碼成固定長度的字節。

protected Object decode(
            @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            return in.readSlice(frameLength).retain();
        }
    }

handler組合

接下來咱們給出幾組經常使用的handler組合

 1) 定義協議,decode爲java對象,

channel.pipeline().addLast(new RpcDecoder(Response.class));
channel.pipeline().addLast(new RpcEncoder(Request.class));
channel.pipeline().addLast(handle);

這種在Decoder中須要用戶去實現協議,最簡單的過程以下,信息頭部指定有效字節數,先讀取頭部長度。然後在讀取相應長度的字節,反序列化。而複雜的可能設定magic、type、length等。

@Override
	public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		int HEAD_LENGTH=4;
        if (in.readableBytes() < HEAD_LENGTH) {
            return;
        }
        in.markReaderIndex();               
        int dataLength = in.readInt();       
        if (dataLength < 0) { 				 
            ctx.close();
        }
        if (in.readableBytes() < dataLength) { 
            in.resetReaderIndex();
            return;
        }
        byte[] body = new byte[dataLength]; 
        in.readBytes(body);  				      
        Object obj=SerializeUtils.Deserialize(body);//經過定義序列化工具將字節數組轉換爲指定類的實例
        out.add(obj);
	}

2) 以換行符結尾分隔,轉化爲string。

// 以("\n")爲結尾分割的 解碼器
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); 
         // 字符串解碼 和 編碼
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());

         // 本身的邏輯Handler
pipeline.addLast("handler", new SelfHandler());

 

 

 

 

http://blog.csdn.net/langzi7758521/article/details/52712159

相關文章
相關標籤/搜索