基於netty框架的Socket傳輸

1、Netty框架介紹

什麼是netty?先看下百度百科的解釋:java

        Netty是由 JBOSS提供的一個 java開源框架。Netty提供異步的、 事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的 網絡服務器和客戶端程序。
也就是說,Netty 是一個基於NIO的客戶、服務器端編程框架,使用Netty 能夠確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶, 服務端應用。Netty至關簡化和流線化了網絡應用的編程開發過程,例如,TCP和UDP的socket服務開發。
       「快速」和「簡單」並不用產生維護性或性能上的問題。Netty 是一個吸取了多種協議的實現經驗,這些協議包括FTP,SMTP,HTTP,各類二進制,文本協議,並通過至關精心設計的項目,最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的性能,穩定性和伸縮性

爲何好多大公司都在使用netty框架?主要是基於netty框架的如下幾個特色決定的:編程

1)健壯性,2)功能齊全,3)可定製,4)擴展性bootstrap

2、框架優勢

       傳統的RPC性能差,主要是因爲客戶端和遠程調用採用了同步阻塞IO線程,當客戶端的併發壓力增大後,同步阻塞會因爲頻繁的等待致使I/O線程堵塞,線程沒法高效的工做,IO處理能力天然會下降。影響性能的三個因素:第一,IO模型,IO模型在必定程度上決定了框架的性能。第2、協議,如:HTTP、TCP/IP等,協議選擇的不一樣,性能模型也不一樣,一般狀況下,內部私有協議的性能比較優,這是因爲內部設計決定的。第3、線程,數據報文的接收、讀取、編碼、解碼等,線程模型的不一樣,性能也不一樣。相比於傳統的RPC框架,netty的優勢主要體如今如下幾個方面:服務器

  1. API使用簡單,封裝很是完善,開發門檻低
  2. 功能上強大,預置了多種編碼解碼功能,多種協議支持
  3. 定製能力強,能夠對ChannelHandler對通訊框架靈活擴展
  4. 性能高,Reactor線程模型調度,ChannelFuture-Listener,經過Listener機制主動推送結果
  5. 版本成熟穩定,社區活躍,版本更新快,出現的Bug會被很快的修復,同時,有心功能的加入,經歷了大規模的商業應用考驗,質量的到了充分的驗證。已經普遍應用到互聯網、大數據、企業應用、電信軟件、網絡遊戲等熱門行業,他能夠知足不一樣的商業標準。

3、Netty架構分析

         Netty是一個基於三層網絡架構模型的框架,三層網絡架構分析包括調度層、鏈條傳遞層以及業務邏輯層。網絡

  1. Reactor通訊調度層,是一個模型,

NIO線程池組件{多線程

        監聽網絡讀寫鏈接架構

        業務調度處理併發

NIO,AIO,配合NIO通道NioSocketChannel組件框架

}異步

        Netty經過內部select巡查機制,可以實現IO多路複用,經過把多個IO阻塞複用到同一個select的阻塞上,從而可以使系統即便在單線程的狀況下,也可以同時處理多個請求。這樣就使得netty實現了IO多路複用的優點,與傳統多線程相比,大大減小了系統的開銷,由於系統沒必要建立新的線程和銷燬線程了,減小了系統的維護難度,節省了資源。

ByteBuffer池化支持,不用手動切換標誌位,實現零拷貝。傳統的Socket讀寫,基本是使用堆內存進行,即jvm事先會把堆內存拷貝到內存中,而後再寫入Socket,而netty採用的是DIRECT BUFFERS,不須要通過jvm內存拷貝,在堆外內存直接進行Socket讀寫,這樣就少了一次緩衝區的內存拷貝,從而實現零拷貝。

       2.Pipleline職責鏈條傳遞

       攔截處理向前向後事件,外部傳入的消息包對象,有POJO信息抽象,上層也只須要處理邏輯,相似SpringIOC處理BeanDefince。不一樣的Handler節點的功能也不一樣,一般狀況下須要編碼解碼等,它能夠完成外部協議到內部POJO對象的轉化,這樣上層只須要關注業務邏輯,不須要知道底層的協議和線程模型,從而實現解耦。

      3.構建邏輯業務處理單元

       底層的協議隔離,上層處理邏輯框架並不須要關心底層協議是什麼。Netty框架的分層設計使得開發人員不須要關注協議框架的實現,只須要關注服務層的業務邏輯開發便可,實現了簡單化。

       以前有個項目是基於傳統Socket和線程池的技術實現的,可是在高併發的時候發現併發能力不足,壓測的時候發現TPS達不到理想值,因此通過考慮,決定使用netty框架來解決此問題。一樣,netty框架也分爲客戶端和服務端,通過整理,先寫一個demo初探netty框架,下面是代碼的實現過程。

首先是服務端,服務端包含兩個方面,第1、服務端Server的主要做用就是經過輔助引導程序,設置NIO的鏈接方式處理客戶端請求,經過綁定特定端口、設定解碼方式以及監聽來實現整個線程的處理請求;第2、服務端Handler須要繼承ChannelInboundHandlerAdapter類,handler類的主要做用是讀取客戶端數據,處理業務,拋出異常,響應客戶端請求。代碼以下:

服務端Server:

public class Server {

private static Log logger = LogFactory.getLog(Server.class);

private int port;

public Server(int port) {

        super();

        this.port = port;

}

public  void start(){

        ServerBootstrap b = new ServerBootstrap();//引導輔助程序

        EventLoopGroup group = new NioEventLoopGroup();//經過nio方式來接收鏈接和處理請求

        try {

               b.group(group);

               b.channel(NioServerSocketChannel.class);//設置nio類型的channnel

               b.localAddress(new InetSocketAddress(port));//設置監聽端口

               //b.option(ChannelOption.SO_BACKLOG, 2048);

               b.childHandler(new ChannelInitializer<SocketChannel>() {//有鏈接到達時會建立一個channel

                      @Override

                      protected void initChannel(SocketChannel ch) throws Exception {

                             //註冊handler

                             ch.pipeline().addLast(new ByteArrayDecoder());

                             ch.pipeline().addLast(new ByteArrayEncoder());

                             ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));

                             ch.pipeline().addLast(new ServerHandler());

                            

                      }

               });//.option(ChannelOption.SO_BACKLOG, 2048).childOption(ChannelOption.SO_KEEPALIVE, true);

               ChannelFuture f = b.bind().sync();//配置完成,開始綁定server,經過調用sync同步方法阻塞直到綁定成功

               logger.info(Server.class.getName()+"開始監聽:"+f.channel().localAddress());

               f.channel().closeFuture().sync();//應用程序會一直等待直到channel關閉

        } catch (Exception e) {

               e.printStackTrace();

        } finally {

               try {

                      //關閉EventLoopGroup,釋放掉全部資源包括建立的線程

                      group.shutdownGracefully().sync();

               } catch (InterruptedException e) {

                      e.printStackTrace();

               }

        }

}

}
View Code

服務端Handler 

public class ServerHandler extends ChannelInboundHandlerAdapter {

private static Log logger=LogFactory.getLog(ServerHandler.class);

@Override

public void channelActive(ChannelHandlerContext ctx){

        logger.info(ctx.channel().localAddress().toString()+"通道活躍....");

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        logger.error(ctx.channel().localAddress().toString()+"通道不活躍....");

}

/**

 *

 * 讀取客戶端傳過來的消息

 */

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //業務處理類

        logger.info("開始業務處理....");

        new SocketController(ctx,msg).run();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        //出現異常,關閉連

        logger.error("服務端出現異常:"+cause.getMessage(),cause);

        ctx.close();

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        logger.info("服務端完成請求!");

        ctx.flush();

}

}
View Code

客戶端代碼

    客戶端主要是用來向服務端發送數據,一樣包含兩個方面,第1、Client主要經過設定端口和IP和服務器創建鏈接,進行數據包的編碼;第2、ClientHandler 須要繼承 SimpleChannelInboundHandler<ByteBuf>類,針對不一樣的傳輸方式,繼承不一樣的類,handler類一樣處理業務請求,響應服務端的請求。代碼以下:

客戶端Client:

public class Client {
    private static Log logger=LogFactory.getLog(Client.class);
    private String host;
    private int port;
    public Client(String host, int port) {
        super();
        this.host = host;
        this.port = port;
    }
    public void connect(){
        EventLoopGroup workGroup=new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.group(workGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                logger.info("客戶端觸發鏈接......");
                ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                ch.pipeline().addLast(new ClientHandler());
            }
        });
        //客戶端開始鏈接
        try {
            logger.info("鏈接到服務器......");
            ChannelFuture future=bootstrap.connect(host,port).sync();
            //等待鏈接關閉
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            workGroup.shutdownGracefully();
        }
    }
}
View Code

客戶端Handler:

public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private static Log logger=LogFactory.getLog(ClientHandler.class);
    /**
     * 向服務端發送消息
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info(ctx.channel().localAddress().toString()+"客戶點活躍...");
        //向服務端寫字符串
        logger.info("客戶端鏈接服務端,開始發送數據.....");
        String string ="hello server!";
        System.out.println("發送數據爲:"+string);
        ByteBuf buf=ctx.alloc().buffer(4*string.length());
        buf.writeBytes(string.getBytes());
        ctx.writeAndFlush(buf);
        logger.info("發送完畢...");
    }
    
    /**
     * 讀取服務端返回來的消息
     */
    @Override
    protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) throws Exception {
        logger.info("開始接受服務端數據");
        byte[] b=new byte[in.readableBytes()];
        in.readBytes(b);
        String string=new String(b);
        logger.info("服務端發送的數據爲:"+string);
        in.release();
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.info("客戶端異常:"+cause.getMessage(),cause);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        logger.info("客戶端完成請求....");
        ctx.flush();    
    }
}
View Code

服務端啓動:

public class ServerMain {

private static Log logger=LogFactory.getLog(ServerMain.class);

private static Server server =new Server(55550);

public static void main(String[] args) {

        logger.info("服務端啓動.......");

        server.start();

}

}
View Code 

客戶端啓動類:

public class Test {

private static Client client = new Client("127.0.0.1", 55550);

public static void main(String[] args) throws UnknownHostException, IOException {

        client.connect();

}

}
View Code

測試結果:

服務端:

 

客戶端:

總結:

       以上只是一個netty框架初探的小Demo,學習使用netty框架的開始,這裏面涉及到了不少的技術以及很是多的組件,好比:Channels、Callbacks、Futures、Events和handlers等等,須要進一步的學習,另外,消息的編碼解碼、粘包、拆包的方式方法、消息格式的轉換以及報文格式大小限制都須要進一步的研究學習。

相關文章
相關標籤/搜索