yls 2020/5/23html
方法名稱 | 含義 |
---|---|
isDone | 判斷當前操做是否完成,不必定成功,多是拋出異常,程序中斷等 |
isSuccess | 判斷當前操縱是否成功 |
cause | 獲取已完成的當前操做失敗的緣由 |
isCancelled | 判斷當前操做是否被取消 |
addListener | 註冊監聽器 |
//啓動服務器,並綁定端口 ChannelFuture channelFuture = serverBootstrap.bind(6999); //給 channelFuture 註冊監聽器,監聽關心的事件 channelFuture.addListener((future) -> { if (future.isSuccess()) { System.out.println("監聽端口成功。。。"); } else { System.out.println("監聽端口失敗。。。"); } });
public class NettyClient { public static void main(String[] args) throws InterruptedException { //客戶端須要一個事件循環組就能夠 NioEventLoopGroup group = new NioEventLoopGroup(); try { //建立客戶端的啓動對象 bootstrap ,不是 serverBootStrap Bootstrap bootstrap = new Bootstrap(); //設置相關參數 bootstrap.group(group) //設置線程組 .channel(NioSocketChannel.class) //設置客戶端通道的實現數 (反射) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); //加入本身的處理器 } }); System.out.println("客戶端 ready is ok.."); //鏈接服務器 final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6999).sync(); //對關閉通道進行監聽 channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
public class NettyClientHandler extends ChannelInboundHandlerAdapter { //當通道就緒就會觸發該方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("hi~,服務端。。。。", CharsetUtil.UTF_8)); } //當通道有讀取事件會觸發 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服務器回覆的數據: " + buf.toString(CharsetUtil.UTF_8)); System.out.println("服務器的地址: " + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
/** * netty 簡單示例 * 客戶端像服務端發送一個消息,服務端向客戶端回覆一條消息 */ public class NettyServer { public static void main(String[] args) throws InterruptedException { //建立兩個線程組 bossGroup workerGroup,兩個都是無限循環 //bossGroup 只是處理鏈接請求 //workerGroup 真正處理客戶端的業務 // 若不傳參數,bossGroup workerGroup中默認的子線程數(NioEventLoop) = 電腦處理器數 * 2 // Runtime.getRuntime().availableProcessors() 能夠獲取處處理器數 // bossGroup只處理請求的鏈接,咱們這裏設置線程數爲1 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //建立服務端的啓動對象,並使用鏈式編程來設置參數 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) //設置兩個線程組 .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel 做爲服務器的通道實現 .option(ChannelOption.SO_BACKLOG, 128)//設置線程隊列的鏈接個數 .childOption(ChannelOption.SO_KEEPALIVE, true) //設置一直保持活動鏈接狀態 .childHandler(new ChannelInitializer<SocketChannel>() {//設置一個通道測試對象 @Override protected void initChannel(SocketChannel ch) throws Exception { //給pipeline設置通道處理器 ch.pipeline().addLast(new NettyServerHandler()); } });//給 workerGroup 的EventLoop對應的管道設置處理器 //啓動服務器,並綁定端口而且同步 ChannelFuture channelFuture = serverBootstrap.bind(6999).sync(); //給 channelFuture 註冊監聽器,監聽關心的事件 // channelFuture.addListener((future) -> { // if (future.isSuccess()) { // System.out.println("監聽端口成功。。。"); // } else { // System.out.println("監聽端口失敗。。。"); // } // }); //對關閉通道進行監聽 channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
/** * 自定義一個handler,須要繼承netty規定好的某個 HandlerAdapter (規範) * 這是自定義的handler才能稱之爲handler */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 讀取客戶端發送的數據 * * @param ctx 上下文對象,含有管道 pipeline,通道 channel, 地址 * @param msg 客戶端發送的數據,默認爲Object * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { /* // 能夠將耗時長的業務添加到 taskQueue中執行 ctx.channel().eventLoop().execute(() -> { //處理業務 });*/ System.out.println("服務器當前線程:"+Thread.currentThread().getName()); //將msg轉爲 ByteBuf //ByteBuf是netty提供,不是NIO中的 ByteBuffer ByteBuf buf = (ByteBuf) msg; System.out.println("客戶端發送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客戶端的遠程地址:" + ctx.channel().remoteAddress()); } //數據讀取完畢 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //將數據寫入到緩存並刷新 //通常要對數據進行編碼 ctx.writeAndFlush(Unpooled.copiedBuffer("hello~,客戶端", CharsetUtil.UTF_8)); } //處理異常,通常須要關閉通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
/** * 服務端 * netty 實現 websocket 全雙工通訊 */ public class websocket { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //建立服務端的啓動對象,並使用鏈式編程來設置參數 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) //設置兩個線程組 .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel 做爲服務器的通道實現 .option(ChannelOption.SO_BACKLOG, 128)//設置線程隊列的鏈接個數 .childOption(ChannelOption.SO_KEEPALIVE, true) //設置一直保持活動鏈接狀態 .handler(new LoggingHandler(LogLevel.INFO)) //給bossGroup 添加一個日誌處理器 .childHandler(new ChannelInitializer<SocketChannel>() {//設置一個通道測試對象 @Override protected void initChannel(SocketChannel ch) throws Exception { //給pipeline設置通道處理器 final ChannelPipeline pipeline = ch.pipeline(); //由於基於http協議,加入http的編碼和解碼器 pipeline.addLast(new HttpServerCodec()); //是以塊方式寫,添加ChunkedWriteHandler處理器 pipeline.addLast(new ChunkedWriteHandler()); //http 數據在傳輸過程當中是分段,當瀏覽器發送大量數據時,就會發送屢次http請求 //HttpObjectAggregator能夠將多個段聚合 pipeline.addLast(new HttpObjectAggregator(8192)); /** *websocket 數據是以幀(frame)的形式傳遞 * 能夠看到WebSocketFrame 下面有六個子類 * WebSocketServerProtocolHandler 的核心功能是將http協議升級爲ws協議,保持長鏈接 */ pipeline.addLast(new WebSocketServerProtocolHandler("/hello")); //自定義handler,處理業務邏輯 pipeline.addLast(new MyTextWebSocketFrameHandler()); } });//給 workerGroup 的EventLoop對應的管道設置處理器 //啓動服務器,並綁定端口而且同步 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); //給 channelFuture 註冊監聽器,監聽關心的事件 // channelFuture.addListener((future) -> { // if (future.isSuccess()) { // System.out.println("監聽端口成功。。。"); // } else { // System.out.println("監聽端口失敗。。。"); // } // }); //對關閉通道進行監聽 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
/** * TextWebSocketFrame 表示一個文本幀 */ public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.println("服務器收到消息:"+msg.text()); ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器收到:"+msg.text())); } //客戶端鏈接後,就會觸發該函數 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded 觸發 "+ctx.channel().id().asLongText()); System.out.println("handlerAdded 觸發 "+ctx.channel().id().asShortText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved 觸發了。。"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("異常發生"); ctx.close(); } }
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <textarea id="message" style="width: 300px;height: 300px;"></textarea> <button onclick="send()">發送</button> <textarea id="response" style="width: 300px;height: 300px;"></textarea> <button onclick="document.getElementById('response').value=''">清空</button> <script> let socket if (window.WebSocket) { socket = new WebSocket("ws://localhost:7000/hello") socket.onmessage = function (e) { let text = document.getElementById('response') text.value = text.value + "\n" + e.data } socket.onopen = function (e) { let text = document.getElementById('response') text.value = "start...." } socket.onclose = function (e) { let text = document.getElementById('response') text.value = text + "\n" + "鏈接關閉" } } else { alert("瀏覽器不支持websocket") } function send() { console.log("222") if (!socket) { console.log("3") return } if (socket.readyState == WebSocket.OPEN) { let message = document.getElementById("message").value console.log("11") console.log(message) socket.send(message) } else { alert("鏈接未開始。。") } } </script> </body> </html>
public class TestServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //建立服務端的啓動對象,並使用鏈式編程來設置參數 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) //設置兩個線程組 .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel 做爲服務器的通道實現 .option(ChannelOption.SO_BACKLOG, 128)//設置線程隊列獲得鏈接個數 .childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動鏈接狀態 .childHandler(new TestServerInitializer());//給 workerGroup 的EventLoop對應的管道設置處理器 //啓動服務器,並綁定端口而且同步 ChannelFuture channelFuture = serverBootstrap.bind(6991).sync(); //對關閉通道進行監聽 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class TestServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { //向管道加入處理器 //獲取管道 ch.pipeline() .addLast("myHttpServerCodec",new HttpServerCodec())//添加一個netty提供的處理http的編碼-解碼器 .addLast(new TestHttpServerHandler());//添加一個自定義的handler } }
/** * HttpObject 客戶端和服務端相互通信的數據 */ public class TestHttpServerHandler extends SimpleChannelInboundHandler<DecoderResultProvider> { //讀取客戶端數據 @Override protected void channelRead0(ChannelHandlerContext ctx, DecoderResultProvider msg) throws Exception { if (msg instanceof HttpRequest) { System.out.println("msg 類型:" + msg.getClass()); HttpRequest httpRequest = (HttpRequest) msg; System.out.println("uri: " + httpRequest.uri()); //過濾特定資源 if ("/favicon.ico".equals(httpRequest.uri())) { System.out.println("請求是 /favicon.ico ,不作響應"); return; } //回覆消息給瀏覽器 (http協議) ByteBuf content = Unpooled.copiedBuffer("hello,我是服務器。。", CharsetUtil.UTF_8); //構建一個 http response DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK, content); defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8"); defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); //將構建好的 response 返回 ctx.writeAndFlush(defaultFullHttpResponse); } } }
/** * netty 實現羣聊 * 服務端能夠監聽客戶端的上線,離線和轉發消息 * 客戶端能夠發送消息和接收其它客戶端發送的消息 */ public class GroupChatServer { private int port; public GroupChatServer(int port) { this.port = port; } public void run() throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder())//向pipeline加入解碼器 .addLast(new StringEncoder())//想pipeline加入編碼器 .addLast("myServerHandler", new ServerHandler()); } }); final ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new GroupChatServer(7000).run(); } }
public class ServerHandler extends SimpleChannelInboundHandler<String> { // 定義一個 channel 組,管理全部的channel //GlobalEventExecutor.INSTANCE 全局的事件執行器,是一個單例 // 必須用 static 修飾,表示類變量 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 表示鏈接創建,一旦鏈接,第一個被執行 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //將該客戶端加入聊天通知給其它客戶端 String dateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String s = String.format("客戶端 %s 加入聊天 %s", ctx.channel().remoteAddress(), dateTime); //該方法會自動遍歷全部channel,併發送消息,不須要本身遍歷 // channelGroup.writeAndFlush(s); channelGroup.writeAndFlush(s); //將該客戶端的channel 加入channel 組 channelGroup.add(ctx.channel()); } //斷開鏈接,handlerRemoved 執行後,會自動將當前channel從channelGroup移除 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { final Channel channel = ctx.channel(); String dateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); String s = String.format("客戶端 %s 離開了 %s", ctx.channel().remoteAddress(), dateTime); //該方法會自動遍歷全部channel,併發送消息,不須要本身遍歷 channelGroup.writeAndFlush(s); } //表示channel處於活動狀態 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + " 上線了。。"); } //表示channel處於非活動狀態 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + "離線了。。"); } //讀取數據 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { final Channel channel = ctx.channel(); channelGroup.forEach(ch -> { if (channel != ch) { ch.writeAndFlush("客戶端 " + channel.remoteAddress() + " 發送了消息:" + msg); } else { ch.writeAndFlush("本身發送了消息:" + msg); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
public class GroupChatClient { private String ip; private int port; public GroupChatClient(String ip, int port) { this.ip = ip; this.port = port; } public void run() { NioEventLoopGroup group = new NioEventLoopGroup(1); try { final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new ClientHandler()); } }); final ChannelFuture sync = bootstrap.connect(ip, port).sync(); final Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ final String s = scanner.nextLine(); sync.channel().writeAndFlush(s); } sync.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) { new GroupChatClient("127.0.0.1",7000).run(); } }
public class ClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); } }