Java編寫基於netty的RPC框架

一 簡單概念java

RPC: ( Remote Procedure Call),遠程調用過程,是經過網絡調用遠程計算機的進程中某個方法,從而獲取到想要的數據,過程如同調用本地的方法同樣.git

阻塞IO :當阻塞I/O在調用InputStream.read()方法是阻塞的,一直等到數據到來時才返回,一樣ServerSocket.accept()方法時,也是阻塞,直到有客戶端鏈接才返回,I/O通訊模式以下:github

缺點:當客戶端多時,會建立大量的處理線程,而且爲每個線程分配必定的資源;阻塞可能帶來頻繁切換上下文,這時引入NIOsql

NIO :  jdk1.4引入的(NEW Input/Output),是基於經過和緩存區的I/O方式,(插入一段題外話,學的多忘得也多,以前有認真研究過NIO,後來用到的時候,忘得一乾二淨,因此學習一些東西,常常返回看看),NIO是一種非阻塞的IO模型,經過不斷輪詢IO事件是否就緒,非阻塞是指線程在等待IO的時候,能夠作其餘的任務,同步的核心是Selector,Selector代替線程本省的輪詢IO事件,避免了阻塞同時減小了沒必要要的線程消耗;非阻塞的核心是通道和緩存區,當IO事件的就緒時,能夠將緩存區的數據寫入通道編程

其工做原理:數組

1  由專門的線程來處理全部的IO事件,而且負責轉發緩存

2  事件驅動機制:事件到的時候才觸發,而不是同步監視服務器

3  線程通信:線程之間通信經過wait,notify等方式通信,保證每次上下文切換都是有意義的,減小不必的線程切換網絡

通道 :  是對原I/O包中流的模擬,全部數據必須經過Channel對象,常見的通道FileChannel,SocketChannel,ServerSocketChannel,DatagramChannel(UDP協議向網絡鏈接的兩端讀寫數據)架構

Buffer緩存區 :其實是一個容器,一個連續的數組,任何讀寫的數據都通過Buffer

Netty :是由JBOSS提供的一個java開源框架,是一個高性能,異步事件驅動的NIO框架,基於JAVA NIO提供的API實現,他提供了TCP UDP和文件傳輸的支持,,全部操做都是異步非阻塞的.經過Futrue-Listener機制,本質就是Reactor模式的現實,Selector做爲多路複用器,EventLoop做爲轉發器,並且,netty對NIO中buffer作優化,大大提升了性能

二  Netty 客戶端和服務端的

Netty中Bootstrap和Channel的生命週期

Bootstrap簡介

Bootstarp:引導程序,將ChannelPipeline,ChannelHandler,EventLoop進行總體關聯

Bootstrap具體分爲兩個實現

ServerBootstrap:用於服務端,使用一個ServerChannel接收客戶端的鏈接,並建立對應的子Channel

Bootstrap:用於客戶端,只須要一個單獨的Channel,配置整個Netty程序,串聯起各個組件

兩者的主要區別:

1 ServerBootstrap用於Server端,經過調用bind()綁定一個端口監聽鏈接,Bootstrap用於Client端,須要調用connect()方法來鏈接服務器端,咱們也能夠調用bind()方法接收返回ChannelFuture中Channel

2 客戶端的Bootstrap通常用一個EventLoopGroup,而服務器的ServerBootstrap會用兩個第一個EventLoopGroup專門負責綁定到端口監聽鏈接事件,而第二個EventLoopGroup專門用來到處理每一個接收的鏈接,這樣大大提升了併發量

public class Server {

 

    public static void main(String[] args) throws Exception {

        // 1 建立線兩個事件循環組

        // 一個是用於處理服務器端接收客戶端鏈接的

        // 一個是進行網絡通訊的(網絡讀寫的)

        EventLoopGroup pGroup = new NioEventLoopGroup();

        EventLoopGroup cGroup = new NioEventLoopGroup();

 

        // 2 建立輔助工具類ServerBootstrap,用於服務器通道的一系列配置

        ServerBootstrap b = new ServerBootstrap();

        b.group(pGroup, cGroup) // 綁定倆個線程組

                .channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel對應TCP, NioDatagramChannel對應UDP

                .option(ChannelOption.SO_BACKLOG, 1024) // 設置TCP緩衝區

                .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 設置發送緩衝大小

                .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 這是接收緩衝大小

                .option(ChannelOption.SO_KEEPALIVE, true) // 保持鏈接

                .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override

                    protected void initChannel(SocketChannel sc) throws Exception {  //SocketChannel創建鏈接後的管道

                        // 3 在這裏配置 通訊數據的處理邏輯, 能夠addLast多個...

                        sc.pipeline().addLast(new ServerHandler());

                    }

                });

 

        // 4 綁定端口, bind返回future(異步), 加上sync阻塞在獲取鏈接處

        ChannelFuture cf1 = b.bind(8765).sync();

        //ChannelFuture cf2 = b.bind(8764).sync();   //能夠綁定多個端口

        // 5 等待關閉, 加上sync阻塞在關閉請求處

        cf1.channel().closeFuture().sync();

        //cf2.channel().closeFuture().sync();

        pGroup.shutdownGracefully();

        cGroup.shutdownGracefully();

    }

}

 

 

public class ServerHandler extends ChannelHandlerAdapter {

 

    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("server channel active... ");

    }

 

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

            ByteBuf buf = (ByteBuf) msg;

            byte[] req = new byte[buf.readableBytes()];

            buf.readBytes(req);

            String body = new String(req, "utf-8");

            System.out.println("Server :" + body );

            String response = "返回給客戶端的響應:" + body ;

            ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));

            // future完成後觸發監聽器, 此處是寫完即關閉(短鏈接). 所以須要關閉鏈接時, 要經過server端關閉. 直接關閉用方法ctx[.channel()].close()

            //.addListener(ChannelFutureListener.CLOSE);

    }

 

    @Override

    public void channelReadComplete(ChannelHandlerContext ctx)

            throws Exception {

        System.out.println("讀完了");

        ctx.flush();

    }

 

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)

            throws Exception {

        ctx.close();

    }

}

 

 

public class Client {

 

    public static void main(String[] args) throws Exception {

        

        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap b = new Bootstrap();

        b.group(group)

        .channel(NioSocketChannel.class)

        .handler(new ChannelInitializer<SocketChannel>() {

            @Override

            protected void initChannel(SocketChannel sc) throws Exception { 

                sc.pipeline().addLast(new ClientHandler());

            }

        });

        

        ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();

        //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();  //可使用多個端口

        //發送消息, Buffer類型. write須要flush才發送, 可用writeFlush代替

        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));

        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));

        Thread.sleep(2000);

        cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));

        //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes()));

        

        cf1.channel().closeFuture().sync();

        //cf2.channel().closeFuture().sync();

        group.shutdownGracefully();

    }

}

 

 

public class ClientHandler extends ChannelHandlerAdapter{

 

    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }

 

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        try {

            ByteBuf buf = (ByteBuf) msg;

            byte[] req = new byte[buf.readableBytes()];

            buf.readBytes(req);

            String body = new String(req, "utf-8");

            System.out.println("Client :" + body );

        } finally {

            // 記得釋放xxxHandler裏面的方法的msg參數: 寫(write)數據, msg引用將被自動釋放不用手動處理; 但只讀數據時,!必須手動釋放引用數

             ReferenceCountUtil.release(msg);

        }

    }

 

    @Override

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

    }

 

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

            throws Exception {

        ctx.close();

    }

}

 

其餘組件:

Handle: 爲了支持各類協議和處理數據的方式,能夠是鏈接,數據接收,異常,數據格式轉換等

ChannelHandler

ChannelInboundHandler :最經常使用的Handler,做用是處理接收數據的事件,來處理咱們的核心業務邏輯。

ChannelInitializer :,當一個連接創建時,咱們須要知道怎麼來接收或者發送數據,固然,咱們有各類各樣的Handler實現來處理它,那麼ChannelInitializer即是用來配置這些Handler,它會提供一個ChannelPipeline,並把Handler加入到ChannelPipeline。

ChannelPipeline :一個Netty應用基於ChannelPipeline機制,這種機制依賴EventLoop和EventLoopGroup,這三個都和事件或者事件處理相關

EventLoop : 爲Channel處理IO操做,一個EventLoop能夠爲多個Channel服務

EventLoopGroup :包含多個EventLoop

Channel :表明一個Socket鏈接

Future :在Netty中全部的IO操做都是異步的,,所以咱們不知道,過來的請求是否被處理了,因此咱們註冊一個監聽,當操做執行成功或者失敗時監聽自動觸發,全部操做都會返回一個ChannelFutrue

ChannelFuture

Netty 是一個非阻塞的,事件驅動的,網絡編程框架,咱們經過一張圖理解一下,Channel,EventLoop以及EventLoopGroup之間的關係

解釋一下,當一個鏈接過來,Netty首先會註冊一個channel,而後EventLoopGroup會分配一個EventLoop綁定到這個channel,在這個channel的整個生命週期過程當中,這個EventLoop一直爲他服務,這個玩意就是一個線程

這下聊一下Netty如何處理數據?

前面有講到,handler數據處理核心,,而ChannelPipeline負責安排Handler的順序和執行,咱們能夠這樣理解,數據在ChannelPipeline中流動,其中ChannelHandler就是一個個閥門,這些數據都會通過每個ChannelHandler而且被他處理,其中ChannelHandler的兩個子類ChannelOutboundHandler和ChannelInboundHandler,根據不一樣的流向,選擇不一樣的Handler

由圖能夠看出,一個數據流進入ChannelPipeline時,一個一個handler挨着執行,各個handler的數據傳遞,這須要調用方法中ChannelHandlerContext來操做,而這個ChannelHandlerContext能夠用來讀寫Netty中的數據流

三 Netty中的業務處理

netty中會有不少Handler.具體哪種Handler還要看繼承是InboundAdapter仍是OutboundAdapter,Netty中提供一系列的Adapter來幫助咱們簡化開發,在ChannelPipeline中的每個handler都負責把Event傳遞個洗下一個handler,有這些adapter,這些工做能夠自動完成,,咱們只需覆蓋咱們真正實現的部分便可,接下來比較經常使用的三種ChannelHandler

Encoders和Decodeers

咱們在網絡傳輸只能傳輸字節流,在發送數據時,把咱們的message轉換成bytes這個過程叫Encode(編碼),相反,接收數據,須要把byte轉換成message,這個過程叫Decode(解碼)

Domain Logic

咱們真正關心的如何處理解碼之後的數據,咱們真正的業務邏輯即是接收處理的數據,Netty提供一個經常使用的基類就是SimpleChannelInboundHandler<T>,其中T就是Handler處理的數據類型,消息到達這個Handler,會自動調用這個Handler中的channelRead0(ChannelHandlerContext,T)方法,T就是傳過來的數據對象

四 基於netty實現的Rpc的例子

這是個人github上項目的位置

https://github.com/developerxiaofeng/rpcByNetty

項目目錄結構以下

歡迎工做一到五年的Java工程師朋友們加入Java架構開發: 855835163 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!

相關文章
相關標籤/搜索