Netty 系列(三)Netty 入門

Netty 系列(三)Netty 入門

Netty 是一個提供異步事件驅動的網絡應用框架,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。更多請參考:Netty GithubNetty中文入門html

圖3-1 Netty架構

1、得到 Netty

能夠經過Maven安裝Netty。查看Netty之HelloWorld快速入門,更多APIjava

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

2、Netty 服務端開發

如今讓咱們從服務端的處理器的實現開始,處理器是由 Netty 生成用來處理 I/O 事件的。git

public class ServerHandler  extends ChannelHandlerAdapter { // (1)
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  // (2)
        //1. 接收客戶端的請求數據
        ByteBuf buf = (ByteBuf)msg;
        byte[] data = new byte[buf.readableBytes()];
        buf.readBytes(data);
        String request = new String(data, "utf-8");
        System.out.println("收到 client 請求數據:" + request);

        //2. 返回響應數據,ctx.write()後自動釋放msg
        ChannelFuture f = ctx.writeAndFlush(Unpooled.copiedBuffer("netty".getBytes())); // (3)
        //2.1 寫完成後會自動關閉 client,不然與 client 創建長鏈接
        f.addListener(ChannelFutureListener.CLOSE); // (4)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // (5)
        cause.printStackTrace();
        ctx.close();
    }
}
  1. DisCardServerHandler 繼承自 ChannelHandlerAdapter,這個類實現了ChannelHandler接口,ChannelHandler提供了許多事件處理的接口方法,而後你能夠覆蓋這些方法。如今僅僅只須要繼承ChannelHandlerAdapter類而不是你本身去實現接口方法。github

  2. 這裏咱們覆蓋了chanelRead()事件處理方法。每當從客戶端收到新的數據時,這個方法會在收到消息時被調用,這個例子中,收到的消息的類型是ByteBufapi

  3. ByteBuf是一個引用計數對象,這個對象必須顯示地調用release()方法來釋放。ctx.write()後自動釋放 msg,不然,channelRead()方法就須要像下面的這段代碼同樣來手動釋放 msg:服務器

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            // Do something with msg
        } finally {
            // ((ByteBuf) msg).release();
            ReferenceCountUtil.release(msg);
        }
    }
  4. 寫完成後程序不會自動關閉與 client 的鏈接,你須要手動綁定 ChannelFuture 的監聽事件,寫完成後纔會關閉鏈接,ChannelFutureListener.CLOSE 的實現以下:網絡

    ChannelFutureListener CLOSE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };
  5. exceptionCaught()事件處理方法是當出現Throwable對象纔會被調用,即當Netty因爲IO錯誤或者處理器在處理事件時拋出的異常時。在大部分狀況下,捕獲的異常應該被記錄下來而且把關聯的channel給關閉掉。然而這個方法的處理方式會在遇到不一樣異常的狀況下有不一樣的實現,好比你可能想在關閉鏈接以前發送一個錯誤碼的響應消息。多線程

到目前爲止一切都還比較順利,接下來我拉須要編寫一個 main() 方法來啓動服務端的 ServerHandler。架構

public class Server {
    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //1. 第一個線程組是用於接收Client端鏈接
            bossGroup = new NioEventLoopGroup();
            //2. 第二個線程組是用於處理實現的業務操做
            workerGroup = new NioEventLoopGroup();

            //3. ServerBootstrap 是一個啓動NIO服務的輔助啓動類
            ServerBootstrap b = new ServerBootstrap(); // (2)
            //3.1 將兩個工做線程組加進來
            b.group(bossGroup, workerGroup)
                    //3.2 指定使用NioServerSocketChannel這種類型的通道
                    .channel(NioServerSocketChannel.class) // (3)
                    //3.3 使用childHandler來綁定具體的事件處理器
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ServerHandler());
                        }
                    })
                    //3.4 設置TCP緩衝區大小,默認128,通常不用改
                    .option(ChannelOption.SO_BACKLOG, 128) // (5)
                    //3.5 設置發送緩衝區大小
                    .option(ChannelOption.SO_SNDBUF, 32 * 1034)
                    //3.6 設置接收緩衝區大小 
                    .option(ChannelOption.SO_RCVBUF, 32 * 1034)
                    //3.7 KEEPALIVE
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            //4. 綁定端口
            ChannelFuture f = b.bind(port).sync(); // (7)

            //5. 監聽通道關閉  <=>  阻塞程序,否則Server直接執行完成後關閉,client就不可能鏈接上了
            //Thread.sleep(Integer.MAX_VALUE);
            f.channel().closeFuture().sync();
        } finally {
            //6. 修優雅退出,釋放線程池資源
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8765;
        }
        new Server(port).run();
    }
}
  1. NioEventLoopGroup 是用來處理I/O操做的多線程事件循環器,Netty提供了許多不一樣的EventLoopGroup的實現用來處理不一樣傳輸協議。在這個例子中咱們實現了一個服務端的應用,所以會有2個NioEventLoopGroup會被使用。第一個常常被叫作‘boss’,用來接收進來的鏈接。第二個常常被叫作‘worker’,用來處理已經被接收的鏈接,一旦‘boss’接收到鏈接,就會把鏈接信息註冊到‘worker’上。如何知道多少個線程已經被使用,如何映射到已經建立的Channels上都須要依賴於EventLoopGroup的實現,而且能夠經過構造函數來配置他們的關係。框架

  2. ServerBootstrap 是一個啓動NIO服務的輔助啓動類。你能夠在這個服務中直接使用Channel,可是這會是一個複雜的處理過程,在不少狀況下你並不須要這樣作。

  3. 這裏咱們指定使用NioServerSocketChannel類來舉例說明一個新的Channel如何接收進來的鏈接。

  4. 這裏的事件處理類常常會被用來處理一個最近的已經接收的Channel。ChannelInitializer是一個特殊的處理類,他的目的是幫助使用者配置一個新的Channel。

  5. 你能夠設置這裏指定的通道實現的配置參數。咱們正在寫一個TCP/IP的服務端,所以咱們被容許設置socket的參數選項好比tcpNoDelay和keepAlive。請參考ChannelOption和詳細的ChannelConfig實現的接口文檔以此能夠對ChannelOptions的有一個大概的認識。

經過以步驟,一個服務端就搭建好了。

3、客戶端開發

public class Client { 
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup workgroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workgroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new ClientHandler());
                    }
                });
            
            //發起異步鏈接操做
            ChannelFuture f = b.connect("127.0.0.1", 8080).sync();

            //向服務器發送數據 buf
            f.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));

            //等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        }  finally {
            //優雅退出,釋放 NIO 線程組
            workgroup.shutdownGracefully();
        }
    }
}

客戶端業務處理ClientHandler

public class ClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        try {
            //讀取buf中的數據
            ByteBuf buf = (ByteBuf) msg;
            byte[] data = new byte[buf.readableBytes()];
            buf.readBytes(data);

            System.out.println(new String(data));
        } finally {
            //釋放 (ByteBuf) msg
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
        e.printStackTrace();
        ctx.close();
    }
}

總結

Netty自定義協議 https://my.oschina.net/OutOfMemory/blog/290180


天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索