基於Netty實現簡易RPC框架

前言

如今網上有不少關於使用Netty來構建RPC框架的例子,爲何我這裏還要寫一篇文章進行論述呢,我很清楚我可能沒有寫得他們那麼好。之因此還要寫,有兩點緣由:html

  • 一是由於學過Netty以後,還須要去不斷實踐才能更好的把握Netty的用法,顯然,基於Netty實現RPC框架是一個很好的作法;
  • 二是由於目前市面上有不少RPC框架,好比Dubbo,這些框架通信底層都是Netty,因此說經過這個例子,也能夠更好的去體驗RPC的設計。

下面我將從如下幾點闡述如何基於Netty實現簡易的RPC框架:java

  • RPC是什麼?
  • 實現RPC框架須要關注哪些方面 ?
  • 使用Netty如何實現?

<!--more-->git

RPC是什麼?

RPC,遠程過程調用,能夠作到像本地調用同樣調用遠程服務,是一種進程間的通訊方式,概念想必你們都很清楚,在看到58沈劍寫的RPC文章 以後,意識到其實咱們能夠換一種思考方式去理解RPC,也就是從本地調用出發,進而去推導RPC調用github

rpc_58

1. 本地函數調用

本地函數是咱們常常碰到的,好比下面示例:spring

public String sayHello(String name) {
    return "hello, " + name;
}

咱們只須要傳入一個參數,調用sayHello方法就能夠獲得一個輸出,也就是輸入參數——>方法體——>輸出,入參、出參以及方法體都在同一個進程空間中,這就是本地函數調用編程

2. Socket通訊

那有沒有辦法實現不一樣進程之間通訊呢?調用方在進程A,須要調用方法A,可是方法A在進程B中json

rpc_2

最容易想到的方式就是使用Socket通訊,使用Socket能夠完成跨進程調用,咱們須要約定一個進程通訊協議,來進行傳參,調用函數,出參。寫過Socket應該都知道,Socket是比較原始的方式,咱們須要更多的去關注一些細節問題,好比參數和函數須要轉換成字節流進行網絡傳輸,也就是序列化操做,而後出參時須要反序列化;使用socket進行底層通信,代碼編程也比較容易出錯。bootstrap

若是一個調用方須要關注這麼多問題,那無疑是個災難,因此有沒有什麼簡單方法,讓咱們的調用方不須要關注細節問題,讓調用方像調用本地函數同樣,只要傳入參數,調用方法,而後坐等返回結果就能夠了呢?數組

3. RPC框架

RPC框架就是用來解決上面的問題的,它可以讓調用方像調用本地函數同樣調用遠程服務,底層通信細節對調用方是透明的,將各類複雜性都給屏蔽掉,給予調用方極致體驗。promise

rpc_3

RPC調用須要關注哪些方面

前面就已經說到RPC框架,讓調用方像調用本地函數同樣調用遠程服務,那麼如何作到這一點呢?

在使用的時候,調用方是直接調用本地函數,傳入相應參數,其餘細節它不用管,至於通信細節交給RPC框架來實現。實際上RPC框架採用代理類的方式,具體來講是動態代理的方式,在運行時動態建立新的類,也就是代理類,在該類中實現通信的細節問題,好比參數序列化

固然不光是序列化,咱們還須要約定一個雙方通訊的協議格式,規定好協議格式,好比請求參數的數據類型,請求的參數,請求的方法名等,這樣根據格式進行序列化後進行網絡傳輸,而後服務端收到請求對象後按照指定格式進行解碼,這樣服務端才知道具體該調用哪一個方法,傳入什麼樣的參數。

剛纔又提到網絡傳輸,RPC框架重要的一環也就是網絡傳輸,服務是部署在不一樣主機上的,如何高效的進行網絡傳輸,儘可能不丟包,保證數據完整無誤的快速傳遞出去?實際上,就是利用咱們今天的主角——Netty,Netty是一個高性能的網絡通信框架,它足以勝任咱們的任務。

前面說了這麼多,再次總結下一個RPC框架須要重點關注哪幾個點:

  • 代理 (動態代理)
  • 通信協議
  • 序列化
  • 網絡傳輸

固然一個優秀的RPC框架須要關注的不止上面幾點,只不過本篇文章旨在作一個簡易的RPC框架,理解了上面關鍵的幾點就夠了

rpc_4

基於Netty實現RPC框架

終於到了本文的重頭戲了,咱們將根據實現RPC須要關注的幾個要點(代理、序列化、協議、編解碼),使用Netty進行逐一實現

1. Protocol(協議)

首先咱們須要肯定通訊雙方的協議格式,請求對象和響應對象

請求對象:

@Data
@ToString
public class RpcRequest {
    /**
     * 請求對象的ID
     */
    private String requestId;
    /**
     * 類名
     */
    private String className;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 參數類型
     */
    private Class<?>[] parameterTypes;
    /**
     * 入參
     */
    private Object[] parameters;
}
  • 請求對象的ID是客戶端用來驗證服務器請求和響應是否匹配

響應對象:

@Data
public class RpcResponse {
    /**
     * 響應ID
     */
    private String requestId;
    /**
     * 錯誤信息
     */
    private String error;
    /**
     * 返回的結果
     */
    private Object result;
}

2. 序列化

市面上序列化協議不少,好比jdk自帶的,Google的protobuf,kyro、Hessian等,只要不選擇jdk自帶的序列化方法,(由於其性能太差,序列化後產生的碼流太大),其餘方式其實均可以,這裏爲了方便起見,選用JSON做爲序列化協議,使用fastjson做爲JSON框架

爲了後續擴展方便,先定義序列化接口:

public interface Serializer {
    /**
     * java對象轉換爲二進制
     *
     * @param object
     * @return
     */
    byte[] serialize(Object object) throws IOException;

    /**
     * 二進制轉換成java對象
     *
     * @param clazz
     * @param bytes
     * @param <T>
     * @return
     */
    <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException;
}

由於咱們採用JSON的方式,因此定義JSONSerializer的實現類:

public class JSONSerializer implements Serializer{

    @Override
    public byte[] serialize(Object object) {
        return JSON.toJSONBytes(object);
    }

    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
        return JSON.parseObject(bytes, clazz);
    }
}

若是後續要使用其餘序列化方式,能夠自行實現序列化接口

3. 編解碼器

約定好協議格式和序列化方式以後,咱們還須要編解碼器,編碼器將請求對象轉換爲適合於傳輸的格式(通常來講是字節流),而對應的解碼器是將網絡字節流轉換回應用程序的消息格式。

編碼器實現:

public class RpcEncoder extends MessageToByteEncoder {
    private Class<?> clazz;
    private Serializer serializer;

    public RpcEncoder(Class<?> clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }


    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception {
        if (clazz != null && clazz.isInstance(msg)) {
            byte[] bytes = serializer.serialize(msg);
            byteBuf.writeInt(bytes.length);
            byteBuf.writeBytes(bytes);
        }
    }
}

解碼器實現:

public class RpcDecoder extends ByteToMessageDecoder {
    private Class<?> clazz;
    private Serializer serializer;

    public RpcDecoder(Class<?> clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        //由於以前編碼的時候寫入一個Int型,4個字節來表示長度
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        //標記當前讀的位置
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        //將byteBuf中的數據讀入data字節數組
        byteBuf.readBytes(data);
        Object obj = serializer.deserialize(clazz, data);
        list.add(obj);
    }
}

4. Netty 客戶端

下面來看看Netty客戶端是如何實現的,也就是如何使用Netty開啓客戶端。

實際上,熟悉Netty的朋友應該都知道,咱們須要注意如下幾點:

  • 編寫啓動方法,指定傳輸使用Channel
  • 指定ChannelHandler,對網絡傳輸中的數據進行讀寫處理
  • 添加編解碼器
  • 添加失敗重試機制
  • 添加發送請求消息的方法

下面來看具體的實現代碼:

@Slf4j
public class NettyClient {
    private EventLoopGroup eventLoopGroup;
    private Channel channel;
    private ClientHandler clientHandler;
    private String host;
    private Integer port;
    private static final int MAX_RETRY = 5;
    public NettyClient(String host, Integer port) {
        this.host = host;
        this.port = port;
    }
    public void connect() {
        clientHandler = new ClientHandler();
        eventLoopGroup = new NioEventLoopGroup();
        //啓動類
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                //指定傳輸使用的Channel
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加編碼器
                        pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
                        //添加解碼器
                        pipeline.addLast(new RpcDecoder(RpcResponse.class, new JSONSerializer()));
                        //請求處理類
                        pipeline.addLast(clientHandler);
                    }
                });
        connect(bootstrap, host, port, MAX_RETRY);
    }

    /**
     * 失敗重連機制,參考Netty入門實戰掘金小冊
     *
     * @param bootstrap
     * @param host
     * @param port
     * @param retry
     */
    private void connect(Bootstrap bootstrap, String host, int port, int retry) {
        ChannelFuture channelFuture = bootstrap.connect(host, port).addListener(future -> {
            if (future.isSuccess()) {
                log.info("鏈接服務端成功");
            } else if (retry == 0) {
                log.error("重試次數已用完,放棄鏈接");
            } else {
                //第幾回重連:
                int order = (MAX_RETRY - retry) + 1;
                //本次重連的間隔
                int delay = 1 << order;
                log.error("{} : 鏈接失敗,第 {} 重連....", new Date(), order);
                bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
            }
        });
        channel = channelFuture.channel();
    }

    /**
     * 發送消息
     *
     * @param request
     * @return
     */
    public RpcResponse send(final RpcRequest request) {
        try {
            channel.writeAndFlush(request).await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return clientHandler.getRpcResponse(request.getRequestId());
    }
    @PreDestroy
    public void close() {
        eventLoopGroup.shutdownGracefully();
        channel.closeFuture().syncUninterruptibly();
    }
}

咱們對於數據的處理重點在於ClientHandler類上,它繼承了ChannelDuplexHandler類,能夠對出站和入站的數據進行處理

public class ClientHandler extends ChannelDuplexHandler {
    /**
     * 使用Map維護請求對象ID與響應結果Future的映射關係
     */
    private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>();
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof RpcResponse) {
            //獲取響應對象
            RpcResponse response = (RpcResponse) msg;
            DefaultFuture defaultFuture =
            futureMap.get(response.getRequestId());
            //將結果寫入DefaultFuture
            defaultFuture.setResponse(response);
        }
        super.channelRead(ctx,msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof RpcRequest) {
            RpcRequest request = (RpcRequest) msg;
            //發送請求對象以前,先把請求ID保存下來,並構建一個與響應Future的映射關係
            futureMap.putIfAbsent(request.getRequestId(), new DefaultFuture());
        }
        super.write(ctx, msg, promise);
    }

    /**
     * 獲取響應結果
     *
     * @param requsetId
     * @return
     */
    public RpcResponse getRpcResponse(String requsetId) {
        try {
            DefaultFuture future = futureMap.get(requsetId);
            return future.getRpcResponse(5000);
        } finally {
            //獲取成功之後,從map中移除
            futureMap.remove(requsetId);
        }
    }
}
參考文章: https://xilidou.com/2018/09/2...

從上面實現能夠看出,咱們定義了一個Map,維護請求ID與響應結果的映射關係,目的是爲了客戶端用來驗證服務端響應是否與請求相匹配,由於Netty的channel可能被多個線程使用,當結果返回時,你不知道是從哪一個線程返回的,因此須要一個映射關係。

而咱們的結果是封裝在DefaultFuture中的,由於Netty是異步框架,全部的返回都是基於Future和Callback機制的,咱們這裏自定義Future來實現客戶端"異步調用"

public class DefaultFuture {
    private RpcResponse rpcResponse;
    private volatile boolean isSucceed = false;
    private final Object object = new Object();

    public RpcResponse getRpcResponse(int timeout) {
        synchronized (object) {
            while (!isSucceed) {
                try {
                    object.wait(timeout);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return rpcResponse;
        }
    }

    public void setResponse(RpcResponse response) {
        if (isSucceed) {
            return;
        }
        synchronized (object) {
            this.rpcResponse = response;
            this.isSucceed = true;
            object.notify();
        }
    }
}
  • 實際上用了wait和notify機制,同時使用一個boolean變量作輔助

5. Netty服務端

Netty服務端的實現跟客戶端的實現差很少,只不過要注意的是,當對請求進行解碼事後,須要經過代理的方式調用本地函數。下面是Server端代碼:

public class NettyServer implements InitializingBean {
    private EventLoopGroup boss = null;
    private EventLoopGroup worker = null;
    @Autowired
    private ServerHandler serverHandler;
    @Override
    public void afterPropertiesSet() throws Exception {
        //此處使用了zookeeper作註冊中心,本文不涉及,可忽略
        ServiceRegistry registry = new ZkServiceRegistry("127.0.0.1:2181");
        start(registry);
    }

    public void start(ServiceRegistry registry) throws Exception {
        //負責處理客戶端鏈接的線程池
        boss = new NioEventLoopGroup();
        //負責處理讀寫操做的線程池
        worker = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加解碼器
                        pipeline.addLast(new RpcEncoder(RpcResponse.class, new JSONSerializer()));
                        //添加編碼器
                        pipeline.addLast(new RpcDecoder(RpcRequest.class, new JSONSerializer()));
                        //添加請求處理器
                        pipeline.addLast(serverHandler);

                    }
                });
        bind(serverBootstrap, 8888);
    }

    /**
     * 若是端口綁定失敗,端口數+1,從新綁定
     *
     * @param serverBootstrap
     * @param port
     */
    public void bind(final ServerBootstrap serverBootstrap,int port) {
        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
                log.info("端口[ {} ] 綁定成功",port);
            } else {
                log.error("端口[ {} ] 綁定失敗", port);
                bind(serverBootstrap, port + 1);
            }
        });
    }

    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        worker.shutdownGracefully().sync();
        log.info("關閉Netty");
    }
}

下面是處理讀寫操做的Handler類:

@Component
@Slf4j
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> implements ApplicationContextAware {
    private ApplicationContext applicationContext;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) {
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(msg.getRequestId());
        try {
            Object handler = handler(msg);
            log.info("獲取返回結果: {} ", handler);
            rpcResponse.setResult(handler);
        } catch (Throwable throwable) {
            rpcResponse.setError(throwable.toString());
            throwable.printStackTrace();
        }
        ctx.writeAndFlush(rpcResponse);
    }

    /**
     * 服務端使用代理處理請求
     *
     * @param request
     * @return
     */
    private Object handler(RpcRequest request) throws ClassNotFoundException, InvocationTargetException {
        //使用Class.forName進行加載Class文件
        Class<?> clazz = Class.forName(request.getClassName());
        Object serviceBean = applicationContext.getBean(clazz);
        log.info("serviceBean: {}",serviceBean);
        Class<?> serviceClass = serviceBean.getClass();
        log.info("serverClass:{}",serviceClass);
        String methodName = request.getMethodName();

        Class<?>[] parameterTypes = request.getParameterTypes();
        Object[] parameters = request.getParameters();

        //使用CGLIB Reflect
        FastClass fastClass = FastClass.create(serviceClass);
        FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
        log.info("開始調用CGLIB動態代理執行服務端方法...");
        return fastMethod.invoke(serviceBean, parameters);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

6. 客戶端代理

客戶端使用Java動態代理,在代理類中實現通訊細節,衆所衆知,Java動態代理須要實現InvocationHandler接口

@Slf4j
public class RpcClientDynamicProxy<T> implements InvocationHandler {
    private Class<T> clazz;
    public RpcClientDynamicProxy(Class<T> clazz) throws Exception {
        this.clazz = clazz;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request = new RpcRequest();
        String requestId = UUID.randomUUID().toString();

        String className = method.getDeclaringClass().getName();
        String methodName = method.getName();

        Class<?>[] parameterTypes = method.getParameterTypes();

        request.setRequestId(requestId);
        request.setClassName(className);
        request.setMethodName(methodName);
        request.setParameterTypes(parameterTypes);
        request.setParameters(args);
        log.info("請求內容: {}",request);
        
        //開啓Netty 客戶端,直連
        NettyClient nettyClient = new NettyClient("127.0.0.1", 8888);
        log.info("開始鏈接服務端:{}",new Date());
        nettyClient.connect();
        RpcResponse send = nettyClient.send(request);
        log.info("請求調用返回結果:{}", send.getResult());
        return send.getResult();
    }
}
  • 在invoke方法中封裝請求對象,構建NettyClient對象,並開啓客戶端,發送請求消息

代理工廠類以下:

public class ProxyFactory {
    public static <T> T create(Class<T> interfaceClass) throws Exception {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[] {interfaceClass}, new RpcClientDynamicProxy<T>(interfaceClass));
    }
}
  • 經過Proxy.newProxyInstance建立接口的代理類

7. RPC遠程調用測試

API:

public interface HelloService {
    String hello(String name);
}
  • 準備一個測試API接口

客戶端:

@SpringBootApplication
@Slf4j
public class ClientApplication {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(ClientApplication.class, args);
        HelloService helloService = ProxyFactory.create(HelloService.class);
        log.info("響應結果「: {}",helloService.hello("pjmike"));
    }
}
  • 客戶端調用接口的方法

服務端:

//服務端實現
@Service
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String name) {
        return "hello, " + name;
    }
}

運行結果:

rpc_5

小結

以上咱們基於Netty實現了一個非非很是簡陋的RPC框架,比起成熟的RPC框架來講相差甚遠,甚至說基本的註冊中心都沒有實現,可是經過本次實踐,能夠說我對於RPC的理解更深了,瞭解了一個RPC框架到底須要關注哪些方面,將來當咱們使用成熟的RPC框架時,好比Dubbo,可以作到心中有數,能明白其底層不過也是使用Netty做爲基礎通信框架。日後,若是更深刻翻看開源RPC框架源碼時,也相對比較容易

項目地址: https://github.com/pjmike/spr...

參考資料 & 鳴謝

相關文章
相關標籤/搜索