netty的基本使用

yls 2020/5/23html

有了NIO,爲何還要有netty?

  1. NIO類庫和和API繁雜,須要熟練掌握 Selector,ServerSocketChannel,SocketChannel,ByteBuffer等
  2. 須要具有其它額外的技能,要熟悉java多線程編程,由於NIO涉及到Reactor模式,也要熟悉網絡編程
  3. 開發工做和難度都比較大,例如客戶端面臨 斷連重連、網絡閃斷、半包讀寫、網絡擁塞和異常流的處理等等
  4. JDK NIO的Bug:臭名昭著的Epoll Bug,他會致使selector空輪詢,最終致使cpu 100%,知道jdk 1.7仍舊存在,
    沒有被根本解決

Netty模型

  1. netty抽象出兩組線程池(NioEventLoopGroup),BossGroup和workerGroup,BossGroup專門接收客戶端鏈接,workGroup專門負責網絡讀寫操做
  2. NioEventLoop是BossGroup或workerGroup中不斷循環執行處理任務的線程,每一個NioEventLoop都有一個selector,
    用於監聽綁定在其上的socket網絡通訊
  3. NioEventLoop 內部採用串行化設計,從消息的讀取->解碼->處理->編碼->發送,始終又IO線程 NioEventLoop負責
  • NioEventLoopGroup下包含多個NioEventLoop
  • 每一個NioEventLoop中包含一個selector,一個taskQueue
  • 每一個NioEventLoop的selector能夠註冊監聽多個NioChannel
  • 每一個NioChannel只會綁定在惟一的NioEventLoop
  • 每一個NioChannel都綁定有一個本身的ChannelPipeline

Netty的異步模型

Netty的異步模型的基本介紹

  1. 異步的概念和同步相對,當一個異步過程調用發生後,調用者不能馬上獲得結果,異步調用完成後,經過狀態,通知和回調來通知調用者
  2. Netty中的I/O操做是異步的,包括 bind,write,connect等操做會簡單的返回一個ChannelFuture.
  3. Netty的異步模型是創建在future和callback之上的,調用者經過Future-Listener機制得到結果。

Future-Listener 機制

  1. 當 future對象剛剛建立時,處於未完成狀態,調用者能夠經過返回的 ChannelFuture 來獲取操做執行的狀態,
    註冊監聽函數來執行完成後的操做
  2. 常見操做以下:
方法名稱 含義
isDone 判斷當前操做是否完成,不必定成功,多是拋出異常,程序中斷等
isSuccess 判斷當前操縱是否成功
cause 獲取已完成的當前操做失敗的緣由
isCancelled 判斷當前操做是否被取消
addListener 註冊監聽器
  1. 舉例說明
//啓動服務器,並綁定端口
            ChannelFuture channelFuture = serverBootstrap.bind(6999);
            //給 channelFuture 註冊監聽器,監聽關心的事件
            channelFuture.addListener((future) -> {
                if (future.isSuccess()) {
                    System.out.println("監聽端口成功。。。");
                } else {
                    System.out.println("監聽端口失敗。。。");
                }
            });
  1. 總結:相比傳統阻塞I/O,線程會阻塞,直到操做完成;異步處理不會形成線程阻塞,線程在I/O操做期間能夠執行別的程序,
    在高併發情形下會更穩定更高的吞吐量。

netty的簡單使用

1.建立客戶端

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //客戶端須要一個事件循環組就能夠
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            //建立客戶端的啓動對象 bootstrap ,不是 serverBootStrap
            Bootstrap bootstrap = new Bootstrap();
            //設置相關參數
            bootstrap.group(group) //設置線程組
                    .channel(NioSocketChannel.class) //設置客戶端通道的實現數 (反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler()); //加入本身的處理器
                        }
                    });

            System.out.println("客戶端 ready is ok..");
            //鏈接服務器
            final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6999).sync();
            //對關閉通道進行監聽
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

2.客戶端自定義處理器

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    //當通道就緒就會觸發該方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hi~,服務端。。。。", CharsetUtil.UTF_8));
    }

    //當通道有讀取事件會觸發
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服務器回覆的數據: " + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服務器的地址: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.建立服務端

/**
 * netty 簡單示例
 * 客戶端像服務端發送一個消息,服務端向客戶端回覆一條消息
 */
public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //建立兩個線程組 bossGroup workerGroup,兩個都是無限循環
        //bossGroup 只是處理鏈接請求
        //workerGroup  真正處理客戶端的業務
        // 若不傳參數,bossGroup workerGroup中默認的子線程數(NioEventLoop) = 電腦處理器數 * 2
        // Runtime.getRuntime().availableProcessors() 能夠獲取處處理器數
        // bossGroup只處理請求的鏈接,咱們這裏設置線程數爲1
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //建立服務端的啓動對象,並使用鏈式編程來設置參數
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup) //設置兩個線程組
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel 做爲服務器的通道實現
                    .option(ChannelOption.SO_BACKLOG, 128)//設置線程隊列的鏈接個數
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //設置一直保持活動鏈接狀態
                    .childHandler(new ChannelInitializer<SocketChannel>() {//設置一個通道測試對象
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //給pipeline設置通道處理器
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });//給 workerGroup 的EventLoop對應的管道設置處理器
            //啓動服務器,並綁定端口而且同步
            ChannelFuture channelFuture = serverBootstrap.bind(6999).sync();

            //給 channelFuture 註冊監聽器,監聽關心的事件
//            channelFuture.addListener((future) -> {
//                if (future.isSuccess()) {
//                    System.out.println("監聽端口成功。。。");
//                } else {
//                    System.out.println("監聽端口失敗。。。");
//                }
//            });
            //對關閉通道進行監聽
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

4.服務端自定義處理器

/**
 * 自定義一個handler,須要繼承netty規定好的某個 HandlerAdapter (規範)
 * 這是自定義的handler才能稱之爲handler
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 讀取客戶端發送的數據
     *
     * @param ctx 上下文對象,含有管道 pipeline,通道 channel, 地址
     * @param msg 客戶端發送的數據,默認爲Object
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

/*        // 能夠將耗時長的業務添加到 taskQueue中執行
        ctx.channel().eventLoop().execute(() -> {
            //處理業務
        });*/
        System.out.println("服務器當前線程:"+Thread.currentThread().getName());
        //將msg轉爲 ByteBuf
        //ByteBuf是netty提供,不是NIO中的 ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客戶端發送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客戶端的遠程地址:" + ctx.channel().remoteAddress());
    }

    //數據讀取完畢
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //將數據寫入到緩存並刷新
        //通常要對數據進行編碼
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello~,客戶端", CharsetUtil.UTF_8));
    }

    //處理異常,通常須要關閉通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

netty使用WebSocket(長鏈接)

1.服務端

/**
 * 服務端
 * netty 實現 websocket 全雙工通訊
 */
public class websocket {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //建立服務端的啓動對象,並使用鏈式編程來設置參數
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup) //設置兩個線程組
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel 做爲服務器的通道實現
                    .option(ChannelOption.SO_BACKLOG, 128)//設置線程隊列的鏈接個數
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //設置一直保持活動鏈接狀態
                    .handler(new LoggingHandler(LogLevel.INFO)) //給bossGroup 添加一個日誌處理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {//設置一個通道測試對象
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //給pipeline設置通道處理器
                            final ChannelPipeline pipeline = ch.pipeline();
                            //由於基於http協議,加入http的編碼和解碼器
                            pipeline.addLast(new HttpServerCodec());
                            //是以塊方式寫,添加ChunkedWriteHandler處理器
                            pipeline.addLast(new ChunkedWriteHandler());
                            //http 數據在傳輸過程當中是分段,當瀏覽器發送大量數據時,就會發送屢次http請求
                            //HttpObjectAggregator能夠將多個段聚合
                            pipeline.addLast(new HttpObjectAggregator(8192));
                            /**
                             *websocket 數據是以幀(frame)的形式傳遞
                             * 能夠看到WebSocketFrame 下面有六個子類
                             * WebSocketServerProtocolHandler 的核心功能是將http協議升級爲ws協議,保持長鏈接
                             */
                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
                            //自定義handler,處理業務邏輯
                            pipeline.addLast(new MyTextWebSocketFrameHandler());

                        }
                    });//給 workerGroup 的EventLoop對應的管道設置處理器
            //啓動服務器,並綁定端口而且同步
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();

            //給 channelFuture 註冊監聽器,監聽關心的事件
//            channelFuture.addListener((future) -> {
//                if (future.isSuccess()) {
//                    System.out.println("監聽端口成功。。。");
//                } else {
//                    System.out.println("監聽端口失敗。。。");
//                }
//            });
            //對關閉通道進行監聽
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

2.服務端自定義WebSocket處理器

/**
 * TextWebSocketFrame 表示一個文本幀
 */
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("服務器收到消息:"+msg.text());
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器收到:"+msg.text()));
    }

    //客戶端鏈接後,就會觸發該函數
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerAdded 觸發 "+ctx.channel().id().asLongText());
        System.out.println("handlerAdded 觸發 "+ctx.channel().id().asShortText());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved 觸發了。。");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("異常發生");
        ctx.close();
    }
}

3.編寫 html 客戶端

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>

<textarea id="message" style="width: 300px;height: 300px;"></textarea>
<button onclick="send()">發送</button>
<textarea id="response" style="width: 300px;height: 300px;"></textarea>
<button onclick="document.getElementById('response').value=''">清空</button>

<script>
    let socket
    if (window.WebSocket) {
        socket = new WebSocket("ws://localhost:7000/hello")

        socket.onmessage = function (e) {
            let text = document.getElementById('response')
            text.value = text.value + "\n" + e.data
        }
        socket.onopen = function (e) {
            let text = document.getElementById('response')
            text.value = "start...."
        }
        socket.onclose = function (e) {
            let text = document.getElementById('response')
            text.value = text + "\n" + "鏈接關閉"
        }
    } else {
        alert("瀏覽器不支持websocket")
    }

    function send() {
        console.log("222")
        if (!socket) {
            console.log("3")

            return
        }
        if (socket.readyState == WebSocket.OPEN) {
            let message = document.getElementById("message").value
            console.log("11")
            console.log(message)
            socket.send(message)
        } else {
            alert("鏈接未開始。。")
        }
    }
</script>
</body>
</html>

netty使用http(短鏈接)

1.服務端

public class TestServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //建立服務端的啓動對象,並使用鏈式編程來設置參數
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup) //設置兩個線程組
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel 做爲服務器的通道實現
                    .option(ChannelOption.SO_BACKLOG, 128)//設置線程隊列獲得鏈接個數
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動鏈接狀態
                    .childHandler(new TestServerInitializer());//給 workerGroup 的EventLoop對應的管道設置處理器
            //啓動服務器,並綁定端口而且同步
            ChannelFuture channelFuture = serverBootstrap.bind(6991).sync();
            //對關閉通道進行監聽
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //向管道加入處理器
        //獲取管道
        ch.pipeline()
                .addLast("myHttpServerCodec",new HttpServerCodec())//添加一個netty提供的處理http的編碼-解碼器
                .addLast(new TestHttpServerHandler());//添加一個自定義的handler
    }
}
/**
 * HttpObject  客戶端和服務端相互通信的數據
 */
public class TestHttpServerHandler extends SimpleChannelInboundHandler<DecoderResultProvider> {
    //讀取客戶端數據
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DecoderResultProvider msg) throws Exception {
        if (msg instanceof HttpRequest) {
            System.out.println("msg 類型:" + msg.getClass());
            HttpRequest httpRequest = (HttpRequest) msg;
            System.out.println("uri:  " + httpRequest.uri());
            //過濾特定資源
            if ("/favicon.ico".equals(httpRequest.uri())) {
                System.out.println("請求是 /favicon.ico ,不作響應");
                return;
            }
            //回覆消息給瀏覽器 (http協議)
            ByteBuf content = Unpooled.copiedBuffer("hello,我是服務器。。", CharsetUtil.UTF_8);

            //構建一個 http response
            DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK, content);
            defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
            defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            //將構建好的 response 返回
            ctx.writeAndFlush(defaultFullHttpResponse);

        }
    }
}

2.客戶端直接用瀏覽器訪問服務端的 6991 端口

netty實現簡單羣聊系統

1.服務端啓動類

/**
 * netty 實現羣聊
 * 服務端能夠監聽客戶端的上線,離線和轉發消息
 * 客戶端能夠發送消息和接收其它客戶端發送的消息
 */
public class GroupChatServer {

    private int port;

    public GroupChatServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder())//向pipeline加入解碼器
                                    .addLast(new StringEncoder())//想pipeline加入編碼器
                                    .addLast("myServerHandler", new ServerHandler());
                        }
                    });
            final ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new GroupChatServer(7000).run();
    }
}

2.服務端自定義處理器

public class ServerHandler extends SimpleChannelInboundHandler<String> {

    // 定義一個 channel 組,管理全部的channel
    //GlobalEventExecutor.INSTANCE 全局的事件執行器,是一個單例
    // 必須用 static 修飾,表示類變量
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // 表示鏈接創建,一旦鏈接,第一個被執行
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //將該客戶端加入聊天通知給其它客戶端
        String dateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        String s = String.format("客戶端 %s 加入聊天 %s", ctx.channel().remoteAddress(), dateTime);
        //該方法會自動遍歷全部channel,併發送消息,不須要本身遍歷
//        channelGroup.writeAndFlush(s);
        channelGroup.writeAndFlush(s);
        //將該客戶端的channel 加入channel 組
        channelGroup.add(ctx.channel());
    }

    //斷開鏈接,handlerRemoved 執行後,會自動將當前channel從channelGroup移除
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        final Channel channel = ctx.channel();
        String dateTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        String s = String.format("客戶端 %s 離開了 %s", ctx.channel().remoteAddress(), dateTime);
        //該方法會自動遍歷全部channel,併發送消息,不須要本身遍歷
        channelGroup.writeAndFlush(s);
    }

    //表示channel處於活動狀態
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 上線了。。");
    }

    //表示channel處於非活動狀態
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "離線了。。");
    }

    //讀取數據
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        final Channel channel = ctx.channel();
        channelGroup.forEach(ch -> {
            if (channel != ch) {
                ch.writeAndFlush("客戶端 " + channel.remoteAddress() + " 發送了消息:" + msg);
            } else {
                ch.writeAndFlush("本身發送了消息:" + msg);
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

3.客戶端啓動類

public class GroupChatClient {
    private String ip;
    private int port;

    public GroupChatClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public void run() {
        NioEventLoopGroup group = new NioEventLoopGroup(1);

        try {
            final Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder())
                                    .addLast(new StringEncoder())
                                    .addLast(new ClientHandler());
                        }
                    });
            final ChannelFuture sync = bootstrap.connect(ip, port).sync();
            final Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                final String s = scanner.nextLine();
                sync.channel().writeAndFlush(s);
            }
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        new GroupChatClient("127.0.0.1",7000).run();
    }
}

4.客戶端自定義處理器

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }
}
  1. netty實現簡單的RPC調用框架
  2. 代碼託管地址
相關文章
相關標籤/搜索