netty

netty

概念: Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
也就是說,Netty 是一個基於NIO的客戶、服務器端編程框架,使用Netty 能夠確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。Netty至關簡化和流線化了網絡應用的編程開發過程,例如,TCP和UDP的socket服務開發。java

新特性

  • 處理大容量數據流更簡單
  • 處理協議編碼和單元測試更簡單
  • I/O超時和idle狀態檢測
  • 應用程序的關閉更簡單,更安全
  • 更可靠的OutOfMemoryError預防

性能

  • 更好的吞吐量,更低的延遲
  • 更少的資源消耗
  • 最小化沒必要要的內存拷貝

具體使用見代碼及註釋

Helloword版
服務端這邊綁定了兩個端口,能夠根據業務區別對待如端口1是作A業務,端2作B業務.編程

public class Server {
    public static void main(String[] args) throws InterruptedException {
        //1.建立兩個線程組 (只有服務器端須要 )
        //一個線程組專門用來管理接收客戶端的請求鏈接的
        //一個線程組進行網絡通訊(讀寫)
        EventLoopGroup receiveGroup = new NioEventLoopGroup();
        EventLoopGroup dealGroup = new NioEventLoopGroup();
        //建立輔助工具類,用於設置服務器通道的一系列配置
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(receiveGroup, dealGroup)//綁定兩個線程組
        .channel(NioServerSocketChannel.class)   //指定NIO的模式
        .option(ChannelOption.SO_BACKLOG, 1024)     //設置tcp緩衝區
        .option(ChannelOption.SO_SNDBUF, 32*1024) //設置發送緩衝區大小
        .option(ChannelOption.SO_RCVBUF, 32*1024) //設置接收緩衝大小
        .option(ChannelOption.SO_KEEPALIVE, true)  //保持鏈接
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //3 在這裏配置具體數據接收方法的處理
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        //4 進行綁定 
                ChannelFuture cf1 = serverBootstrap.bind(8765).sync();
                ChannelFuture cf2 = serverBootstrap.bind(8764).sync();
                //5 等待關閉
                cf1.channel().closeFuture().sync();
                cf2.channel().closeFuture().sync();
                receiveGroup.shutdownGracefully();
                dealGroup.shutdownGracefully();
     }
}

服務端處理器:安全

public class ServerHandler extends ChannelHandlerAdapter {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("server channel active... ");
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "gbk");
            System.out.println("Server :" + body );
            String response = "進行返回給客戶端的響應:" + body ;
            //注意使用了writeAndFlush的話就能夠不釋放ReferenceCountUtil.release(msg); 不然須要釋放ByteBuf容器的數據。
            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
            //.addListener(ChannelFutureListener.CLOSE);//監聽,內容傳輸完畢後就關閉管道
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        System.out.println("讀完了");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
            throws Exception {
        ctx.close();
    }


}

客戶端:服務器

public class Client {
public static void main(String[] args) throws Exception{
        
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
        ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();
        //發送消息
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:777".getBytes()));
        Thread.sleep(1000);
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:666".getBytes()));
        cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:888".getBytes()));
        Thread.sleep(2000);
        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:888".getBytes()));
        cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:666".getBytes()));
        
        cf1.channel().closeFuture().sync();
        cf2.channel().closeFuture().sync();
        group.shutdownGracefully();
        
        
        
    }
}

客戶端處理器:網絡

public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客戶端的channelActive()方法");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            
            String body = new String(req, "gbk");
            System.out.println("Client :" + body );
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

TCP拆包粘包問題

TCP是個「流」協議,所謂流,就是沒有界限的一串數據。你們能夠想一想河裏的流水,是連成一片的,其間並無分界線。TCP底層並不瞭解上層業務數據的具體含義,它會根據TCP緩衝區的實際狀況進行包的劃分,因此在業務上認爲,一個完整的包可能會被TCP拆分紅多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題。
通俗意義來講多是三個數據如'A','B','C' 但通過TCP協議流式傳輸後成了'AB','C'兩個數據了,這種就是粘包了數據包之間粘一塊兒了。那麼拆包的話有三種方式。app

  • 設置每一個數據包的大小如200個字節,若是某個數據包不足200個字節可能會出現丟包的狀況,即該數據包未從一個端到另外一個端,此時須要用空格或者既定的符號補充.
  • 在數據包之間使用一些字符進行分割如$號之類的,解析的時候先處理掉分隔符再拿到各個數據包就行了。(通常用的比較多)
  • 細粒化數據包分爲頭和尾(將消息分爲消息頭和消息尾)
  • 其餘

兩根水管(服務器與客戶端)須要相互流通水(數據),那麼須要一個轉接頭(套接字)鏈接,水流式沒法區分一段段的數據,一種方式在流通的過程當中設置些標誌性物品如記號筆勾一下(分隔符),另外一種方式則是設定每一段都是多少容量的水來區分.框架

使用分隔符解決TCP粘包

能夠理解管道流裏流的都是ByteBuffer類型的數據,那麼使用分隔符(非ByteBuffer類型)的話可能就意味着一個轉碼與解碼的過程。
服務端:異步

public class Server {
    public static void main(String[] args) throws Exception{
        //1 建立2個線程,一個是負責接收客戶端的鏈接。一個是負責進行數據傳輸的
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        //2 建立服務器輔助類
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_SNDBUF, 32*1024)
         .option(ChannelOption.SO_RCVBUF, 32*1024)
         .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //設置特殊分隔符  解決TCP拆包黏包問題,
                ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
                sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                //設置字符串形式的解碼
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        
        //4 綁定鏈接
        ChannelFuture cf = b.bind(8765).sync();
        
        //等待服務器監聽端口關閉
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
        
    }
}

服務端處理器:socket

public class ServerHandler extends ChannelHandlerAdapter{
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" server channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String request = (String)msg;
        System.out.println("Server channelRead:" + request);
        String response = "服務器響應:" + msg + "$";
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
        System.out.println("exceptionCaught");
        ctx.close();
    }

}

客戶端:tcp

public class Client {
public static void main(String[] args) throws Exception {
        
        EventLoopGroup group = new NioEventLoopGroup();
        
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //
                ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
                sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("數據A$".getBytes()));
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("數據B$".getBytes()));
        
        
        //等待客戶端端口關閉
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
        
    }
}

客戶端處理器:

public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel active... ");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            String response = (String)msg;
            System.out.println("Client: " + response);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught");
        ctx.close();
    }

}

設置長度大小解決TCP拆包黏包問題

服務端:

public class Server {

    public static void main(String[] args) throws Exception{
        //1 建立2個線程,一個是負責接收客戶端的鏈接。一個是負責進行數據傳輸的
        EventLoopGroup pGroup = new NioEventLoopGroup();
        EventLoopGroup cGroup = new NioEventLoopGroup();
        
        //2 建立服務器輔助類
        ServerBootstrap b = new ServerBootstrap();
        b.group(pGroup, cGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .option(ChannelOption.SO_SNDBUF, 32*1024)
         .option(ChannelOption.SO_RCVBUF, 32*1024)
         .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                //設置定長字符串接收
                sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
                //設置字符串形式的解碼
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ServerHandler());
            }
        });
        
        //4 綁定鏈接
        ChannelFuture cf = b.bind(8765).sync();
        
        //等待服務器監聽端口關閉
        cf.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();
        
    }
    
}

客戶端:

public class Client {

    public static void main(String[] args) throws Exception {
        
        EventLoopGroup group = new NioEventLoopGroup();
        
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new ClientHandler());
            }
        });
        
        ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
        
        cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes()));
        cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccc".getBytes()));
        
        //等待客戶端端口關閉
        cf.channel().closeFuture().sync();
        group.shutdownGracefully();
        
    }
}

服務端與客戶端的處理器參照上例以字符串分割的.

新手上路,多多關注...
相關文章
相關標籤/搜索