Netty框架

 

學習Netty框架,三連問:

  什麼是Netty框架?html

  爲何要用Netty框架?java

  怎麼用Netty框架?spring

 

什麼是Netty框架?

  Netty 是一個廣受歡迎的異步事件驅動的Java開源網絡應用程序框架,用於快速開發可維護的高性能協議服務器和客戶端。apache

  Netty 是由 JBOSS 提供的一個 Java 開源框架。Netty 提供異步的、基於事件驅動的網絡應用程序框架,用以快速開發高性能、高可靠性的網絡 IO 程序。編程

  Netty 是一個基於 NIO 的網絡編程框架,使用 Netty 能夠幫助你快速、簡單的開發出一個網絡應用,至關於簡化和流程化了 NIO 的開發過程。bootstrap

  做爲當前最流行的 NIO 框架,Netty 在互聯網領域、大數據分佈式計算領域、遊戲行業、通訊行業等得到了普遍的應用,知名的 Elasticsearch 、Dubbo 框架內部都採用了 Netty。api

 

爲何要用Netty框架?

由於Netty 對 JDK 自帶的 NIO 的 API 進行了封裝,解決了JDK 原生 NIO 程序的問題。緩存

  JDK 原生 NIO 程序的問題:安全

    JDK 原生也有一套網絡應用程序 API,可是存在一系列問題,主要以下:服務器

      1)NIO 的類庫和 API 繁雜,使用麻煩:你須要熟練掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。

      2)須要具有其餘的額外技能作鋪墊:例如熟悉 Java 多線程編程,由於 NIO 編程涉及到 Reactor 模式,你必須對多線程和網路編程很是熟悉,才能編寫出高質量的 NIO 程序。

      3)可靠性能力補齊,開發工做量和難度都很是大:例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流的處理等等。NIO 編程的特色是功能開發相對容易,可是可靠性能力補齊工做量和難度都很是大。

      4)JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它會致使 Selector 空輪詢,最終致使 CPU 100%。官方聲稱在 JDK 1.6 版本的 update 18 修復了該問題,可是直到 JDK 1.7 版本該問題仍舊存在,只不過該 Bug 發生機率下降了一些而已,它並無被根本解決。

  Netty的主要特色有:

    1)設計優雅:適用於各類傳輸類型的統一 API 阻塞和非阻塞 Socket;基於靈活且可擴展的事件模型,能夠清晰地分離關注點;高度可定製的線程模型 - 單線程,一個或多個線程池;真正的無鏈接數據報套接字支持(自 3.1 起)。

    2)使用方便:詳細記錄的 Javadoc,用戶指南和示例;沒有其餘依賴項,JDK 5(Netty 3.x)或 6(Netty 4.x)就足夠了。

    3)高性能、吞吐量更高:延遲更低;減小資源消耗;最小化沒必要要的內存複製。

    4)安全:完整的 SSL/TLS 和 StartTLS 支持。

    5)社區活躍、不斷更新:社區活躍,版本迭代週期短,發現的 Bug 能夠被及時修復,同時,更多的新功能會被加入。

Netty 常見的使用場景以下:

  1)互聯網行業:在分佈式系統中,各個節點之間須要遠程服務調用,高性能的 RPC 框架必不可少,Netty 做爲異步高性能的通訊框架,每每做爲基礎通訊組件被這些 RPC 框架使用。典型的應用有:阿里分佈式服務框架 Dubbo 的 RPC 框架使用 Dubbo 協議進行節點間通訊,Dubbo 協議默認使用 Netty 做爲基礎通訊組件,用於實現各進程節點之間的內部通訊。

  2)遊戲行業:不管是手遊服務端仍是大型的網絡遊戲,Java 語言獲得了愈來愈普遍的應用。Netty 做爲高性能的基礎通訊組件,它自己提供了 TCP/UDP 和 HTTP 協議棧。

很是方便定製和開發私有協議棧,帳號登陸服務器,地圖服務器之間能夠方便的經過 Netty 進行高性能的通訊。

  3)大數據領域:經典的 Hadoop 的高性能通訊和序列化組件 Avro 的 RPC 框架,默認採用 Netty 進行跨界點通訊,它的 Netty Service 基於 Netty 框架二次封裝實現。

有興趣的讀者能夠了解一下目前有哪些開源項目使用了 Netty的Related Projects

 

怎麼用?(簡單入門)

  可參考學習   netty 官方API: http://netty.io/4.1/api/index.html

 

 配置 pom.xml

1         <dependency>
2             <groupId>io.netty</groupId>
3             <artifactId>netty-all</artifactId>
4             <version>4.1.31.Final</version>
5         </dependency>

 

 配置 ServerConnection.java 

package com.example.demo.net;

import java.net.InetSocketAddress;

import org.apache.log4j.Logger;

import com.example.demo.handler.ServerMsgHandler;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ServerConnection {
    
    Logger logger = Logger.getLogger(ServerConnection.class);
    private final int port ;
    private EventLoopGroup bossGroup ;
    private EventLoopGroup workerGroup ;
    
    public ServerConnection(int port) {
        this.port = port ;
    }
    
    /***
     * NioEventLoopGroup 是用來處理I/O操做的多線程事件循環器,
     * Netty提供了許多不一樣的EventLoopGroup的實現用來處理不一樣傳輸協議。 在這個例子中咱們實現了一個服務端的應用,
     * 所以會有2個NioEventLoopGroup會被使用。 第一個常常被叫作‘boss’,用來接收進來的鏈接。
     * 第二個常常被叫作‘worker’,用來處理已經被接收的鏈接, 一旦‘boss’接收到鏈接,就會把鏈接信息註冊到‘worker’上。
     * 如何知道多少個線程已經被使用,如何映射到已經建立的Channels上都須要依賴於EventLoopGroup的實現,
     * 而且能夠經過構造函數來配置他們的關係。
     */
    public void run() {
        System.out.println("啓動服務端Netty鏈接");
        bossGroup = new NioEventLoopGroup() ;
        workerGroup = new NioEventLoopGroup() ;
        /**
         * ServerBootstrap 是一個服務端啓動NIO服務的輔助啓動類 , 能夠在這個服務中直接使用Channel
         */
        ServerBootstrap bootstrap = new ServerBootstrap() ;
        /**
         * 這一步是必須的,若是沒有設置group將會報java.lang.IllegalStateException: group not set異常
         */
        bootstrap = bootstrap.group(bossGroup, workerGroup) ;
        /***
         * ServerSocketChannel以NIO的selector爲基礎進行實現的,用來接收新的鏈接
         * 這裏告訴Channel如何獲取新的鏈接.
         */
        bootstrap = bootstrap.channel(NioServerSocketChannel.class) ;
        /***
         * 綁定端口
         */
        bootstrap = bootstrap.localAddress(new InetSocketAddress(port)) ;
        /***
         * 你能夠設置這裏指定的通道實現的配置參數。 咱們正在寫一個TCP/IP的服務端,
         * 所以咱們被容許設置socket的參數選項好比tcpNoDelay和keepAlive。
         * 請參考ChannelOption和詳細的ChannelConfig實現的接口文檔以此能夠對ChannelOptions的有一個大概的認識。
         */
        bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, 128) ;
        /***
         * option()是提供給NioServerSocketChannel用來接收進來的鏈接。
         * childOption()是提供給由父管道ServerChannel接收到的鏈接,
         * 在這個例子中也是NioServerSocketChannel。
         */
        bootstrap = bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true) ;
        /***
         * 這裏的事件處理類常常會被用來處理一個最近的已經接收的Channel。 ChannelInitializer是一個特殊的處理類,
         * 目的是幫助使用者配置一個新的Channel。
         * 也許你想經過增長一些處理類好比NettyServerHandler來配置一個新的Channel
         * 或者其對應的ChannelPipeline來實現你的網絡程序。 當你的程序變的複雜時,可能你會增長更多的處理類到pipline上,
         * 而後提取這些匿名類到最頂層的類上。
         */
        bootstrap = bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline() ;
                    pipeline.addLast("decoder", new StringDecoder()) ;
                    pipeline.addLast("encoder", new StringEncoder()) ;
                    pipeline.addLast("handler", new ServerMsgHandler()) ;
                }
            }) ;
        bootstrap.bind().addListener(new ChannelFutureListener() {            
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if ( future.isSuccess() ) {
                    System.out.println("服務端開始監聽") ;
//                    logger.info("服務端開始監聽") ;
                }else {
                    logger.error("服務端沒法使用監聽端口",future.cause()) ;
                }
            }
        }) ;
    }
    
    public void shutdown() {
        logger.info("關閉 Server 端口");
        bossGroup.shutdownGracefully() ;
        workerGroup.shutdownGracefully() ;
    }

}
View Code

 

配置 ServerMsgHandler.java 

package com.example.demo.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerMsgHandler extends ChannelInboundHandlerAdapter {

    /**
     * 這裏咱們覆蓋了chanelRead()事件處理方法。 每當從客戶端收到新的數據時, 這個方法會在收到消息時被調用,
     * 這個例子中,收到的消息的類型是ByteBuf
     * 
     * @param ctx
     *            通道處理的上下文信息
     * @param msg
     *            接收的消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        
        System.out.println("服務端接收的消息:"+msg.toString()) ;
        //向客戶端發送消息
        String str = msg.toString() ;
        if ( "高性能NIO框架——Netty".equals(str) ) {
            ctx.writeAndFlush( "客戶端 , 你好!") ;
        }
//        ctx.writeAndFlush(msg.toString()+"你好!") ;
    }
    
    /***
     * 這個方法會在發生異常時觸發
     * 
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**
         * exceptionCaught() 事件處理方法是當出現 Throwable 對象纔會被調用,即當 Netty 因爲 IO
         * 錯誤或者處理器在處理事件時拋出的異常時。在大部分狀況下,捕獲的異常應該被記錄下來 而且把關聯的 channel
         * 給關閉掉。然而這個方法的處理方式會在遇到不一樣異常的狀況下有不 同的實現,好比你可能想在關閉鏈接以前發送一個錯誤碼的響應消息。
         */
        // 出現異常就關閉
        cause.printStackTrace() ;
        ctx.close() ;
    }
    
}
View Code

 

 

配置 ClientConnection.java

package com.example.demo.net;

import java.net.InetSocketAddress;

import org.apache.log4j.Logger;

import com.example.demo.handler.ClientMsgHandler;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ClientConnection {

    Logger logger = Logger.getLogger(ClientConnection.class);
    private final int port ;
    private EventLoopGroup bossGroup ;
    private EventLoopGroup workerGroup ;
    
    private Channel channel ;
    
    public ClientConnection(int port) {
        this.port = port ;
    }    
    
    public Channel getChannel() {
        return this.channel ;
    }
    
    /***
     * NioEventLoopGroup 是用來處理I/O操做的多線程事件循環器,
     * Netty提供了許多不一樣的EventLoopGroup的實現用來處理不一樣傳輸協議。 在這個例子中咱們實現了一個服務端的應用,
     * 所以會有2個NioEventLoopGroup會被使用。
     * 第一個常常被叫作‘boss’,用來接收進來的鏈接。
     * 第二個常常被叫作‘worker’,用來處理已經被接收的鏈接, 一旦‘boss’接收到鏈接,就會把鏈接信息註冊到‘worker’上。
     * 如何知道多少個線程已經被使用,如何映射到已經建立的Channels上都須要依賴於EventLoopGroup的實現,
     * 而且能夠經過構造函數來配置他們的關係。
     * @throws InterruptedException 
     */
    public void run() {
        System.out.println("啓動客戶端Netty鏈接");
        bossGroup = new NioEventLoopGroup() ;
        workerGroup = new NioEventLoopGroup() ;
        /**
         * Bootstrap 是客戶端一個啓動NIO服務的輔助啓動類 , 能夠在這個服務中直接使用Channel
         */
        Bootstrap bootstrap = new Bootstrap() ;
//        ServerBootstrap bootstrap = new ServerBootstrap() ;
        /**
         * 這一步是必須的,若是沒有設置group將會報java.lang.IllegalStateException: group not set異常
         */
//        bootstrap.group(bossGroup, workerGroup)
        bootstrap.group(bossGroup)
        /***
         * ServerSocketChannel以NIO的selector爲基礎進行實現的,用來接收新的鏈接
         * 這裏告訴Channel如何獲取新的鏈接.
         */
//            .channel(NioServerSocketChannel.class)
            .channel(NioSocketChannel.class)
            /***
             * 綁定端口,等價於bootstrap.bind("127.0.0.1", port) ,若下面用了,就要把這個註釋掉,否則會報錯
             */
//            .localAddress(new InetSocketAddress(port))
            .remoteAddress("127.0.0.1", port)
            /***
             * 你能夠設置這裏指定的通道實現的配置參數。 咱們正在寫一個TCP/IP的服務端,
             * 所以咱們被容許設置socket的參數選項好比tcpNoDelay和keepAlive。
             * 請參考ChannelOption和詳細的ChannelConfig實現的接口文檔以此能夠對ChannelOptions的有一個大概的認識。
             */
//            .option(ChannelOption.SO_BACKLOG, 128)
            /***
             * option()是提供給NioServerSocketChannel用來接收進來的鏈接。
             * childOption()是提供給由父管道ServerChannel接收到的鏈接,
             * 在這個例子中也是NioServerSocketChannel。
             */
//            .childOption(ChannelOption.SO_KEEPALIVE, true)
            /***
             * 這裏的事件處理類常常會被用來處理一個最近的已經接收的Channel。 ChannelInitializer是一個特殊的處理類,
             * 目的是幫助使用者配置一個新的Channel。
             * 也許你想經過增長一些處理類好比NettyServerHandler來配置一個新的Channel
             * 或者其對應的ChannelPipeline來實現你的網絡程序。 當你的程序變的複雜時,可能你會增長更多的處理類到pipline上,
             * 而後提取這些匿名類到最頂層的類上。
             */
//            .childHandler(new ChannelInitializer<SocketChannel>() {
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline() ;
                    pipeline.addLast("decoder", new StringDecoder()) ;
                    pipeline.addLast("encoder", new StringEncoder()) ;
//                    pipeline.addLast("handler", new ClientMsgHandler()) ;
                    pipeline.addLast(new ClientMsgHandler()) ;
                }
            }) ;
/*        bootstrap.bind().addListener(new ChannelFutureListener() {            
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if ( future.isSuccess() ) {
                    System.out.println("客戶端開始監聽") ;
                    logger.info("客戶端開始監聽") ;
                }else {
                    logger.error("客戶端沒法使用監聽端口",future.cause()) ;
                }
            }
        }) ;    */
        
/*        //綁定端口,開始接收進來的鏈接
        ChannelFuture cFuture;
        try {
//            cFuture = bootstrap.connect(host, port).sync();            
//            cFuture = bootstrap.bind("127.0.0.1", port).sync();
            cFuture = bootstrap.bind().sync() ;
            //在這裏拿到這個channel,是爲了 等下 測試消息發送 用的
            channel = cFuture.channel();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        */
                    
        ChannelFuture cf;
        try {
            cf = bootstrap.connect().sync();
            channel = cf.channel();
            channel.writeAndFlush("ClientConnection客戶端已成功啓動!");
            
            cf.addListener(new ChannelFutureListener() {            
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if ( future.isSuccess() ) {
                        System.out.println("客戶端開始監聽") ;
//                        logger.info("客戶端開始監聽") ;
                    }else {
//                        logger.error("客戶端沒法使用監聽端口",future.cause()) ;
                    }
                }
            }) ;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
    }
    
    public void shutdown() {
        System.out.println("關閉 Client 端口");
        logger.info("關閉 Client 端口");
        bossGroup.shutdownGracefully() ;
        workerGroup.shutdownGracefully() ;
    }
    
}
View Code

 

配置 ClientMsgHandler.java

package com.example.demo.handler;

import org.apache.log4j.Logger;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientMsgHandler extends ChannelInboundHandlerAdapter {

    Logger logger = Logger.getLogger(ClientMsgHandler.class) ;
    
    /**
     * 這裏咱們覆蓋了chanelRead()事件處理方法。 每當從客戶端收到新的數據時, 這個方法會在收到消息時被調用,
     * 這個例子中,收到的消息的類型是ByteBuf
     * 
     * @param ctx
     *            通道處理的上下文信息
     * @param msg
     *            接收的消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        
//        super.channelRead(ctx, msg);
        System.out.println("客戶端接收的消息:"+msg.toString()) ;
        //向服務端發送消息
        ctx.writeAndFlush("服務端 , 你好!") ;
    }
    
    /***
     * 這個方法會在發生異常時觸發
     * 
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**
         * exceptionCaught() 事件處理方法是當出現 Throwable 對象纔會被調用,即當 Netty 因爲 IO
         * 錯誤或者處理器在處理事件時拋出的異常時。在大部分狀況下,捕獲的異常應該被記錄下來 而且把關聯的 channel
         * 給關閉掉。然而這個方法的處理方式會在遇到不一樣異常的狀況下有不 同的實現,好比你可能想在關閉鏈接以前發送一個錯誤碼的響應消息。
         */
        // 出現異常就關閉
        cause.printStackTrace() ;
        ctx.close() ;
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//        super.channelActive(ctx);
//        logger.info("client channel active");
        System.out.println("client channel active");
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel inactive");
        ctx.close() ;
    }
    
}
View Code

 

 

編寫測試類 DemoTest.java 

 1 package com.example.demo.netty;
 2 
 3 import org.junit.runner.RunWith;
 4 import org.springframework.boot.test.context.SpringBootTest;
 5 import org.springframework.test.context.ActiveProfiles;
 6 import org.springframework.test.context.junit4.SpringRunner;
 7 
 8 import com.example.demo.DemoApplicationTests;
 9 import com.example.demo.net.ClientConnection;
10 import com.example.demo.net.ServerConnection;
11 
12 import io.netty.channel.Channel;
13 
14 @RunWith(SpringRunner.class)
15 @SpringBootTest(classes = DemoApplicationTests.class)
16 @ActiveProfiles("test")
17 public class DemoTest {
18 
19     public static void main(String[] args) {
20         int port = 2222 ;
21         Thread serverThread = new Thread( new Runnable() {            
22             @Override
23             public void run() {
24                 new ServerConnection(port).run() ;
25             }
26         } ) ;
27         serverThread.start() ;
28         ClientConnection clientConnection = new ClientConnection(port) ;
29 //        Thread clientThread = new Thread( new Runnable() {            
30 //            @Override
31 //            public void run() {
32 //                new ClientConnection(port).run() ;
33 //            }
34 //        } ) ;
35 //        clientThread.start() ;
36         
37         clientConnection.run();
38         Channel channel = clientConnection.getChannel() ;
39         channel.writeAndFlush("高性能NIO框架——Netty");
40         
41 //        new Thread( ()->{
42 //            new ServerConnection(port) ;
43 //        } ).start() ;        
44 //        new Thread( ()->{
45 //            new ClientConnection(port) ;
46 //        } ).start() ;
47         
48     }
49 }
View Code

 

服務端與客戶端的區別:

  1. 在客戶端只建立了一個NioEventLoopGroup實例,由於客戶端並不須要使用I/O多路複用模型,須要有一個Reactor來接受請求。只須要單純的讀寫數據便可

  2. 在客戶端只須要建立一個Bootstrap對象,它是客戶端輔助啓動類,功能相似於ServerBootstrap。

 

 

共同窗習,共同進步,如有補充,歡迎指出,謝謝!

相關文章
相關標籤/搜索