public class IOServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.setReuseAddress(true); serverSocket.bind(new InetSocketAddress(8899)); ExecutorService executor = Executors.newCachedThreadPool(); Set<Socket> socketGroup = new HashSet<>(); while (true) { Socket socket = serverSocket.accept(); socketGroup.add(socket); executor.execute(() -> { try ( InputStream in = socket.getInputStream(); InputStreamReader reader = new InputStreamReader(in, "UTF-8"); BufferedReader br = new BufferedReader(reader); ) { String line; while ((line = br.readLine()) != null) { int port = socket.getPort(); System.out.println("from client:{" + port + "}" + line); String finalLine = line; for (Socket client : socketGroup) { if (client == socket) continue; try { OutputStream output = client.getOutputStream(); DataOutputStream out = new DataOutputStream(output); String s = "client{" + port + "}" + finalLine + "\n"; out.write(s.getBytes()); } catch (IOException e) { e.printStackTrace(); } } } } catch (IOException e) { e.printStackTrace(); } }); } } }
public class IOClient { public static void main(String[] args) throws IOException { Socket socket = new Socket(); InetSocketAddress address = new InetSocketAddress("localhost", 8899); socket.connect(address); try ( OutputStream output = socket.getOutputStream(); DataOutputStream out = new DataOutputStream(output); Reader rd = new InputStreamReader(socket.getInputStream()); BufferedReader bufferRd = new BufferedReader(rd); ) { // 子線程監聽輸入併發送 new Thread(() -> { InputStreamReader in = new InputStreamReader(System.in); BufferedReader reader = new BufferedReader(in); while (true) { try { out.write((reader.readLine() + '\n').getBytes()); } catch (IOException e) { e.printStackTrace(); } } }).start(); // 主線程循環監聽接受到的數據並輸出 while (true) { System.out.println(bufferRd.readLine()); } } } }
public class NIOServer { public static void main(String[] args) throws IOException { ServerSocketChannel srvSocketChannel = ServerSocketChannel.open(); srvSocketChannel.configureBlocking(false); ServerSocket socket = srvSocketChannel.socket(); socket.setReuseAddress(true); socket.bind(new InetSocketAddress(8899)); Selector selector = Selector.open(); srvSocketChannel.register(selector, SelectionKey.OP_ACCEPT); Set<SocketChannel> channelGroup = new HashSet<>(); while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { SocketChannel client; if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); client = channel.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); channelGroup.add(client); System.out.println(client.getRemoteAddress()); } else if (key.isReadable()) { client = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); client.read(buffer); buffer.flip(); System.out.print(new String(buffer.array())); channelGroup.forEach(channel -> { buffer.rewind(); if (channel != client) { try { int port = client.socket().getPort(); byte[] array = buffer.array(); String s = "client{" + port + "}:" + new String(array); channel.write(ByteBuffer.wrap(s.getBytes())); } catch (IOException e) { e.printStackTrace(); } } }); } keys.remove(key); } } } }
public class NIOClient { public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); InetSocketAddress address = new InetSocketAddress("localhost", 8899); socketChannel.connect(address); Selector selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_CONNECT); while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { SocketChannel client; if (key.isConnectable()) { client = (SocketChannel) key.channel(); if (client.isConnectionPending()) { client.finishConnect(); client.register(selector, SelectionKey.OP_READ); new Thread(() -> { InputStreamReader in = new InputStreamReader(System.in); BufferedReader reader = new BufferedReader(in); while (true) { try { String line = reader.readLine() + '\n'; client.write(ByteBuffer.wrap(line.getBytes())); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } else if (key.isReadable()) { client = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); client.read(byteBuffer); byteBuffer.flip(); while (byteBuffer.hasRemaining()) { System.out.print((char)byteBuffer.get()); } } keys.remove(key); } } } }
public class TCPServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss,worker).channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ServerChannelInitializer()); ChannelFuture channelFuture = bootstrap.bind(8899).sync(); channelFuture.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(4096,Delimiters.lineDelimiter())); pipeline.addLast(new StringDecoder(UTF_8)); pipeline.addLast(new StringEncoder(UTF_8)); pipeline.addLast(new ServerHandler()); } }
public class ServerHandler extends SimpleChannelInboundHandler<String> { private static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); group.forEach(ch -> { ch.writeAndFlush(channel.remoteAddress() + " 上線" + "\n"); }); group.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { group.forEach(ch -> { ch.writeAndFlush(ctx.channel().remoteAddress() + " 下線" + "\n"); }); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); group.forEach(ch -> { if (ch != channel) { ch.writeAndFlush(channel.remoteAddress() + ":" + msg + "\n"); } else { ch.writeAndFlush("本身:" + msg + "\n"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
public class TCPClient { public static void main(String[] args) throws InterruptedException, IOException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ClientChannelInitializer()); Channel channel = bootstrap .connect("localhost", 8899) .sync() .channel(); InputStreamReader in = new InputStreamReader(System.in); BufferedReader reader = new BufferedReader(in); while (true) { channel.writeAndFlush(reader.readLine() + "\n"); } } finally { group.shutdownGracefully(); } } }
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())); pipeline.addLast(new StringDecoder(UTF_8)); pipeline.addLast(new StringEncoder(UTF_8)); pipeline.addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }
注:NIO是Netty的基礎,學好NIO對於Netty的學習有重要做用。java