import java.nio.ByteBuffer; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class Server { public static void main(String[] args) throws Exception{ //1 建立2個線程,一個是負責接收客戶端的鏈接。一個是負責進行數據傳輸的 EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); //2 建立服務器輔助類 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //設置特殊分隔符 ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //設置字符串形式的解碼 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //4 綁定鏈接 ChannelFuture cf = b.bind(8765).sync(); //等待服務器監聽端口關閉 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes())); //等待客戶端端口關閉 cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String request = (String)msg; System.out.println("Server :" + msg); String response = "服務器響應:" + msg + "$_"; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } }
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String response = (String)msg; System.out.println("Client: " + response); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; public class Server { public Server() { } public static void main(String[] args) throws Exception { EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)b.group(pGroup, cGroup).channel(NioServerSocketChannel.class)).option(ChannelOption.SO_BACKLOG, 1024)).handler(new LoggingHandler(LogLevel.INFO))).childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ChannelHandler[]{MarshallingCodeCFactory.buildMarshallingDecoder()}); sc.pipeline().addLast(new ChannelHandler[]{MarshallingCodeCFactory.buildMarshallingEncoder()}); sc.pipeline().addLast(new ChannelHandler[]{new ReadTimeoutHandler(5)}); sc.pipeline().addLast(new ChannelHandler[]{new ServerHandler()}); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import java.util.concurrent.TimeUnit; public class Client { private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf; public static Client getInstance() { return Client.SingletonHolder.instance; } private Client() { this.group = new NioEventLoopGroup(); this.b = new Bootstrap(); ((Bootstrap)((Bootstrap)((Bootstrap)this.b.group(this.group)).channel(NioSocketChannel.class)).handler(new LoggingHandler(LogLevel.INFO))).handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ChannelHandler[]{MarshallingCodeCFactory.buildMarshallingDecoder()}); sc.pipeline().addLast(new ChannelHandler[]{MarshallingCodeCFactory.buildMarshallingEncoder()}); sc.pipeline().addLast(new ChannelHandler[]{new ReadTimeoutHandler(5)}); sc.pipeline().addLast(new ChannelHandler[]{new ClientHandler()}); } }); } public void connect() { try { this.cf = this.b.connect("127.0.0.1", 8765).sync(); System.out.println("遠程服務器已經鏈接, 能夠進行數據交換.."); } catch (Exception var2) { var2.printStackTrace(); } } public ChannelFuture getChannelFuture() { if (this.cf == null) { this.connect(); } if (!this.cf.channel().isActive()) { this.connect(); } return this.cf; } public static void main(String[] args) throws Exception { final Client c = getInstance(); ChannelFuture cf = c.getChannelFuture(); for(int i = 1; i <= 3; ++i) { Request request = new Request(); request.setId("" + i); request.setName("pro" + i); request.setRequestMessage("數據信息" + i); cf.channel().writeAndFlush(request); TimeUnit.SECONDS.sleep(4L); } cf.channel().closeFuture().sync(); (new Thread(new Runnable() { public void run() { try { System.out.println("進入子線程..."); ChannelFuture cf = c.getChannelFuture(); System.out.println(cf.channel().isActive()); System.out.println(cf.channel().isOpen()); Request request = new Request(); request.setId("4"); request.setName("pro4"); request.setRequestMessage("數據信息4"); cf.channel().writeAndFlush(request); cf.channel().closeFuture().sync(); System.out.println("子線程結束."); } catch (InterruptedException var3) { var3.printStackTrace(); } } })).start(); System.out.println("斷開鏈接,主線程結束.."); } private static class SingletonHolder { static final Client instance = new Client((Client)null); private SingletonHolder() { } } }