Netty與網絡編程

Netty什麼?

Netty項目是一個提供異步事件驅動網絡應用框架和快速開發可維護的高性能高擴展性服務端和客戶端協議工具集的成果。
換句話說,Netty是一個NIO客戶端服務端框架,它使得快速而簡單的開發像服務端客戶端協議的網絡應用成爲了可能。它它極大的簡化並流線化了如TCP和UDP套接字服務器開發的網絡編程。
「快速且簡便」不意味着目標應用將容忍維護性和性能上的問題。Netty在吸收了大量協議實現(如FTP,SMTP,HTTP以及各類二進制,基於文本的傳統協議)的經驗上進行了精心的設計。由此,Netty成功找到了一個無需折衷妥協而讓開發、性能、穩定性和靈活性相互協調的方法。html

一、回顧

a) http協議是咱們接觸最多的,定義的API都是基於Http協議的,Http協議屬於應用層協議。傳輸層協議,TCP / UDP協議,TCP是經過三次握手,保障通訊的可信,也就是咱們常說的長鏈接。Netty是對TCP協議的封裝。java

     

b) JAVA BIO / NIO / AIO linux

Java1.4版本引入NIO概念,實現了對「多路複用IO」的支持,Java1.7版本引入AIO概念。AIO是最晚提出的,理應是更先進的技術,可是並無大規模的在商業領域應用。Unix提供了五種參考網絡模型,在linux領域,並無異步IO網絡模型的成熟方案(linux發行版內核不支持AIO,須要本身安裝擴展包)。JAVA的新版本的AIO,仍是用採用多路複用IO實現。在Windows平臺經過IOCP協議實現,可參考 IOCP淺析編程

二、Netty 編程基礎

如今從最簡單的Echo程序入手,逐步深刻地分享利用Netty如何編程。windows

Netty Server Exempleapi

EventLoopGroup group = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
 
try {
   ServerBootstrap b = new ServerBootstrap();
   b.group(group, workGroup)
         .channel(NioServerSocketChannel.class)
         .localAddress(new InetSocketAddress(port))
         .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
               ch.pipeline().addLast(new EchoServerHandler());
            }
         });
 
   ChannelFuture f = b.bind().sync();
   System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress());
   f.channel().closeFuture().sync();
} finally {
   group.shutdownGracefully().sync();
}

Netty Server Handler Exemple數組

public class EchoServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
   @Override
   public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
      ByteBuf in = (ByteBuf) msg;
      System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
      ctx.write(Unpooled.copiedBuffer("Response from server. You have input \"" + in.toString(CharsetUtil.UTF_8) + "\"!", CharsetUtil.UTF_8));
      ctx.flush();
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
      cause.printStackTrace();
      ctx.close();
   }
 

Netty Client Exemple服務器

EventLoopGroup group = new NioEventLoopGroup();
 
try {
    Bootstrap b = new Bootstrap();
    b.group(group)
     .channel(NioSocketChannel.class)
     .remoteAddress(new InetSocketAddress(host, port))
     .handler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch)
             throws Exception {
             ch.pipeline().addLast(
                     new EchoClientHandler());
         }
     });
 
    ChannelFuture f = b.connect().sync();
    if (f.channel().isActive()) {
        f.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Casper!", CharsetUtil.UTF_8));
    }
 
    Thread.sleep(1000);
 
} finally {
    group.shutdownGracefully().sync();
}

Netty Client Handler Exemple網絡

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
   @Override
   public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
      System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
      cause.printStackTrace();
      ctx.close();
   }
 
}

三、BootStrap

一個Netty程序開始於Bootstrap類,Bootstrap類是Netty提供的一個能夠經過簡單配置來設置或"引導"程序的一個很重要的類。啓動Sever端,須要初始化ServerBootStrap。定義兩個EventLoopGroup,分別處理鏈接請求和socket數據傳輸。Netty巧妙的把接收請求和處理請求,都抽象成ChannelHanndler處理,模式上更加統一。經過閱讀代碼,接收請求的處理以下:接到鏈接請求後,設置其初始化參數,而後註冊到childGroup處理。數據結構

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    final EventLoopGroup currentChildGroup = this.childGroup;
    final ChannelHandler currentChildHandler = this.childHandler;
 
    p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ch.eventLoop().execute(new Runnable() {
                public void run() {
                    pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
                }
            });
        }
    }});
}
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
    }
 
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel)msg;
        child.pipeline().addLast(new ChannelHandler[]{this.childHandler});
        try {
             this.childGroup.register(child).addListener(new ChannelFutureListener() {
         });
         } catch (Throwable var8) {
         }
    }
}

四、EventLoopGroup

SingleThreadEventLoopGroup  一個線程處理全部的Channel

ThreadPerChannelLoopGroup  每一個線程處理一個channel

MultiThreadEventLoopGroup 經過線程組處理channel

       NIOEventLoopGroup

       EpollEventLoopGroup       根據Selecter的不一樣實現,不一樣的處理策略。NIOEventLoopGroup,默認採用sellect方式,參考JAVA NIO實現。EpollEventLoopGroup只能在linux平臺使用,更高效。

五、編程模式

宏觀上,ChannelPipline串聯起全部的ChannelHandler,在特定的時間節點,調用Handler的函數,完成處理流程。處理流程以下圖所示:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
 
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }
 
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }
 
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }
 
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }
 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }
 
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }
 
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }
 
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }
 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

ChannelPipline:內部包含一個ChannelHandlerContext的鏈表數據結構,每次從header item開始,調用context.invoke函數,開始處理流程。

ChannelHandlerContext:在Channelhandler中,調用context的fire函數;fire函數在pipline中查找須要執行的下一個context,調用context.invoke函數;調用channelhandler的函數。

ChannelHandler:channelChandler分爲inbound和outbond,inbound處理接收消息,outbound處理傳出。如上所示,channelInboundHandler中定義了一系列的函數,通常狀況下,編寫Netty程序,只須要在ChannelHandler中處理業務邏輯,編程模型至關簡單。

七、編碼與解碼

代碼實例中,channelHandler中讀到的是ByteBuf對象,其實就是byte數組,在程序內部,把byte數組轉換成了字符串。

問題一、半包問題,沒有讀到完整的數據,就進行了轉換,致使錯誤。

問題二、編解碼與業務柔和在一塊兒,設計上不完美。

Netty提供了編碼器,解碼器以及編解碼器。

public class RpcChannelHandler extends SimpleChannelInboundHandler<RpcResponse> implements RpcChannel {
 
    // 讀取數據,讀的是對象
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception {
        String requestId = rpcResponse.getRequestId();
        if (rpcFutureMap.containsKey(requestId)) {
            RpcFuture rpcFuture = rpcFutureMap.get(requestId);
            rpcFutureMap.remove(requestId);
            rpcFuture.finish(rpcResponse);
        }
    }
 
    @Override
    public RpcFuture call(RpcRequest request, RpcCallback callback) {
        this.channel.writeAndFlush(request); // 寫數據,寫的也是對象
    }
}
 

編碼器

public class RpcEncoder extends MessageToByteEncoder {
 
    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            byte[] data = SerializationUtil.serialize(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

解碼器

public class RpcDecoder extends ByteToMessageDecoder {
 
 
    @Override
    public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
 
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);
 
        Object obj = SerializationUtil.deserialize(data, genericClass);
        out.add(obj);
    }
}

處理流程以下:

編碼解碼器類結構

編碼器是一個ChannelHandler,放在pipline中,做爲處理請求消息的一個環節。處理傳入參數的是Decoder,負責把二進制的數據,轉換成程序可識別的數據結構,其實現了InboundChannelHandler接口;通常狀況下,decoder要放在pip的header位置,即addFrist。處理傳出參數的是Encoder,負責把程序內部的數據結構,轉換成可在網絡傳輸的二進制數據;通常狀況下,encoder須要放在pipline的最後處理。

分別實現Encoder和Decoder,多是代碼放在兩個類裏實現;Netty提供了Codec,在一個類裏實現編碼和解碼。

Netty對協議的支持,都是經過提供編碼解碼器實現的。例如:http協議

       HttpRequestEncoder,將HttpRequest或HttpContent編碼成ByteBuf
       HttpRequestDecoder,將ByteBuf解碼成HttpRequest和HttpContent
       HttpResponseEncoder,將HttpResponse或HttpContent編碼成ByteBuf
       HttpResponseDecoder,將ByteBuf解碼成HttpResponse和HttpContent

八、Netty 爲何高效?

1)、線程模型

  • Reactor單線程模型;
    • 做爲NIO服務端,接收客戶端的TCP鏈接;
    • 做爲NIO客戶端,向服務端發起TCP鏈接;
    • 讀取通訊對端的請求或者應答消息;
    • 向通訊對端發送消息請求或者應答消息。
  • Reactor多線程模型;
    • 有專門一個NIO線程-Acceptor線程用於監聽服務端,接收客戶端的TCP鏈接請求;
    • 網絡IO操做-讀、寫等由一個NIO線程池負責,線程池能夠採用標準的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、解碼、編碼和發送;
    • 1個NIO線程能夠同時處理N條鏈路,可是1個鏈路只對應1個NIO線程,防止發生併發操做問題。
  • 主從Reactor多線程模型。
    • 爲了解決1個服務端監聽線程沒法有效處理全部客戶端鏈接的性能不足問題,有一組NIO線程處理服務氣短的監聽。

      如上的幾種模式,在ServerBootStrap.goup初始化時設置,咱們的例子,採用的是主從Reactor多線程模型。

 

2)、Zero Copy

  • Netty的接收和發送ByteBuffer採用DIRECT BUFFERS,使用堆外直接內存進行Socket讀寫,不須要進行字節緩衝區的二次拷貝。若是使用傳統的堆內存(HEAP BUFFERS)進行Socket讀寫,JVM會將堆內存Buffer拷貝一份到直接內存中,而後才寫入Socket中。相比於堆外直接內存,消息在發送過程當中多了一次緩衝區的內存拷貝。
    ByteBufAllocator 經過ioBuffer分配堆外內存:
    public ByteBuf ioBuffer() {
    return PlatformDependent.hasUnsafe() ? this.directBuffer(256) : this.heapBuffer(256);
    }
      
  • Netty提供了組合Buffer對象,能夠聚合多個ByteBuffer對象,用戶能夠像操做一個Buffer那樣方便的對組合Buffer進行操做,避免了傳統經過內存拷貝的方式將幾個小Buffer合併成一個大的Buffer。
    private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
    ......
    wasAdded = this.components.add(c);
    ......
    return var11;
    }
  • Netty的文件傳輸採用了transferTo方法,它能夠直接將文件緩衝區的數據發送到目標Channel,避免了傳統經過循環write方式致使的內存拷貝問題。linux零拷貝,sendfile https://blog.csdn.net/caianye/article/details/7576198
    private native long transferTo0(FileDescriptor var1, long var2, long var4, FileDescriptor var6);

3)、Select VS Epoll

        多路複用IO有多種實現方式,其中,select/poll是全部操做系統都支持的方式,提出時間較早。JAVA NIO默認實現是用的Select,windows操做系統只支持Select。Epoll提出較晚,在linux系統提供,是目前比較成熟穩定的方案。

 

  • select/poll 函數監視的文件描述符分3類,分別是writefds(可寫)、readfds(可讀)、和exceptfds(異常)。調用後select函數會阻塞,直到有描述符就緒(有數據 可讀、可寫、或者有except),或者超時(timeout指定等待時間,若是當即返回設爲null便可),函數返回。當select函數返回後,能夠經過遍歷fdset,來找到就緒的描述符。
    select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

     
  • epoll是在2.6內核中提出的,是以前的select和poll的加強版本。首先,epoll使用一組函數來完成,而不是單獨的一個函數;其次,epoll把用戶關心的文件描述符上的事件放在內核裏的一個事件表中,無須向select和poll那樣每次調用都要重複傳入文件描述符集合事件集。
    int epoll_create(int size);  // 該函數返回的文件描述符指定要訪問的內核事件表,是其餘全部epoll系統調用的句柄,
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  //告訴操做系統須要關注哪些文件描述符
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); // 等待事件發生

九、Netty能作什麼?

1)、Rpc框架,表明 Dubbo

       Http VS TCP

      

  • 採用Http協議,增長了http協議的解碼編碼過程
  • http協議自己無狀態的,鏈接的複用很差;(可經過keep-alive解決一部分鏈接複用的問題)

      Tips: 

2)、服務代理,表明 NRedis-Proxy 參考: http://www.javashuo.com/article/p-aimzncum-hx.html

NRedis-Proxy 是一個Redis中間件服務,第一個Java 版本開源Redis中間件,無須修改業務應用程序任何代碼與配置,與業務解耦;以Spring爲基礎開發自定義標籤,讓它可配置化,使其更加容易上手;以netty 做爲通訊傳輸工具,讓它具備高性能,高併發,可分佈式擴展部署等特色,單片性能損耗約5%左右。

      這裏寫圖片描述

3)、中間件Vert.x  參考 https://vertx.io/

clipse Vert.x is event driven and non blocking. This means your app can handle a lot of concurrency using a small number of kernel threads. Vert.x lets your app scale with minimal hardware.

參考文章

http://ifeve.com/netty-mina-in-depth-1/ Netty-Mina深刻學習與對比(一)
http://ifeve.com/netty-mina-in-depth-2/ Netty-Mina深刻學習與對比(二)
http://www.52im.net/thread-304-1-1.html 詳解快的打車架構設計及技術實踐
http://www.52im.net/thread-306-1-1.html Java新一代網絡編程模型AIO原理及Linux系統AIO介紹
https://www.ibm.com/developerworks/cn/java/j-lo-comet/ Servlet 3.0 實戰:異步 Servlet 與 Comet 風格應用程序
https://www.jianshu.com/p/fbe0430959e8 利用Vertx構建簡單的API 服務、分佈式服務
http://www.52im.net/thread-400-1-1.html NIO框架詳解:Netty的高性能之道
相關文章
相關標籤/搜索