概念: Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
也就是說,Netty 是一個基於NIO的客戶、服務器端編程框架,使用Netty 能夠確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。Netty至關簡化和流線化了網絡應用的編程開發過程,例如,TCP和UDP的socket服務開發。java
Helloword版
服務端這邊綁定了兩個端口,能夠根據業務區別對待如端口1是作A業務,端2作B業務.編程
public class Server { public static void main(String[] args) throws InterruptedException { //1.建立兩個線程組 (只有服務器端須要 ) //一個線程組專門用來管理接收客戶端的請求鏈接的 //一個線程組進行網絡通訊(讀寫) EventLoopGroup receiveGroup = new NioEventLoopGroup(); EventLoopGroup dealGroup = new NioEventLoopGroup(); //建立輔助工具類,用於設置服務器通道的一系列配置 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(receiveGroup, dealGroup)//綁定兩個線程組 .channel(NioServerSocketChannel.class) //指定NIO的模式 .option(ChannelOption.SO_BACKLOG, 1024) //設置tcp緩衝區 .option(ChannelOption.SO_SNDBUF, 32*1024) //設置發送緩衝區大小 .option(ChannelOption.SO_RCVBUF, 32*1024) //設置接收緩衝大小 .option(ChannelOption.SO_KEEPALIVE, true) //保持鏈接 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //3 在這裏配置具體數據接收方法的處理 sc.pipeline().addLast(new ServerHandler()); } }); //4 進行綁定 ChannelFuture cf1 = serverBootstrap.bind(8765).sync(); ChannelFuture cf2 = serverBootstrap.bind(8764).sync(); //5 等待關閉 cf1.channel().closeFuture().sync(); cf2.channel().closeFuture().sync(); receiveGroup.shutdownGracefully(); dealGroup.shutdownGracefully(); } }
服務端處理器:安全
public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "gbk"); System.out.println("Server :" + body ); String response = "進行返回給客戶端的響應:" + body ; //注意使用了writeAndFlush的話就能夠不釋放ReferenceCountUtil.release(msg); 不然須要釋放ByteBuf容器的數據。 ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); //.addListener(ChannelFutureListener.CLOSE);//監聽,內容傳輸完畢後就關閉管道 } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("讀完了"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } }
客戶端:服務器
public class Client { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync(); ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //發送消息 cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:777".getBytes())); Thread.sleep(1000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:666".getBytes())); cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:888".getBytes())); Thread.sleep(2000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:888".getBytes())); cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:666".getBytes())); cf1.channel().closeFuture().sync(); cf2.channel().closeFuture().sync(); group.shutdownGracefully(); } }
客戶端處理器:網絡
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客戶端的channelActive()方法"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "gbk"); System.out.println("Client :" + body ); } finally { ReferenceCountUtil.release(msg); } }
TCP是個「流」協議,所謂流,就是沒有界限的一串數據。你們能夠想一想河裏的流水,是連成一片的,其間並無分界線。TCP底層並不瞭解上層業務數據的具體含義,它會根據TCP緩衝區的實際狀況進行包的劃分,因此在業務上認爲,一個完整的包可能會被TCP拆分紅多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題。
通俗意義來講多是三個數據如'A','B','C' 但通過TCP協議流式傳輸後成了'AB','C'兩個數據了,這種就是粘包了數據包之間粘一塊兒了。那麼拆包的話有三種方式。app
兩根水管(服務器與客戶端)須要相互流通水(數據),那麼須要一個轉接頭(套接字)鏈接,水流式沒法區分一段段的數據,一種方式在流通的過程當中設置些標誌性物品如記號筆勾一下(分隔符),另外一種方式則是設定每一段都是多少容量的水來區分.框架
能夠理解管道流裏流的都是ByteBuffer類型的數據,那麼使用分隔符(非ByteBuffer類型)的話可能就意味着一個轉碼與解碼的過程。
服務端:異步
public class Server { public static void main(String[] args) throws Exception{ //1 建立2個線程,一個是負責接收客戶端的鏈接。一個是負責進行數據傳輸的 EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); //2 建立服務器輔助類 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //設置特殊分隔符 解決TCP拆包黏包問題, ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //設置字符串形式的解碼 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //4 綁定鏈接 ChannelFuture cf = b.bind(8765).sync(); //等待服務器監聽端口關閉 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
服務端處理器:socket
public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String request = (String)msg; System.out.println("Server channelRead:" + request); String response = "服務器響應:" + msg + "$"; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { System.out.println("exceptionCaught"); ctx.close(); } }
客戶端:tcp
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("數據A$".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("數據B$".getBytes())); //等待客戶端端口關閉 cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
客戶端處理器:
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String response = (String)msg; System.out.println("Client: " + response); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exceptionCaught"); ctx.close(); } }
服務端:
public class Server { public static void main(String[] args) throws Exception{ //1 建立2個線程,一個是負責接收客戶端的鏈接。一個是負責進行數據傳輸的 EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); //2 建立服務器輔助類 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //設置定長字符串接收 sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); //設置字符串形式的解碼 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //4 綁定鏈接 ChannelFuture cf = b.bind(8765).sync(); //等待服務器監聽端口關閉 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
客戶端:
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes())); cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccc".getBytes())); //等待客戶端端口關閉 cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
服務端與客戶端的處理器參照上例以字符串分割的.
新手上路,多多關注...