如今網上有不少關於使用Netty來構建RPC框架的例子,爲何我這裏還要寫一篇文章進行論述呢,我很清楚我可能沒有寫得他們那麼好。之因此還要寫,有兩點緣由:html
下面我將從如下幾點闡述如何基於Netty實現簡易的RPC框架:java
<!--more-->git
RPC,遠程過程調用,能夠作到像本地調用同樣調用遠程服務,是一種進程間的通訊方式,概念想必你們都很清楚,在看到58沈劍寫的RPC文章 以後,意識到其實咱們能夠換一種思考方式去理解RPC,也就是從本地調用出發,進而去推導RPC調用github
本地函數是咱們常常碰到的,好比下面示例:spring
public String sayHello(String name) { return "hello, " + name; }
咱們只須要傳入一個參數,調用sayHello方法就能夠獲得一個輸出,也就是輸入參數——>方法體——>輸出,入參、出參以及方法體都在同一個進程空間中,這就是本地函數調用編程
那有沒有辦法實現不一樣進程之間通訊呢?調用方在進程A,須要調用方法A,可是方法A在進程B中json
最容易想到的方式就是使用Socket通訊,使用Socket能夠完成跨進程調用,咱們須要約定一個進程通訊協議,來進行傳參,調用函數,出參。寫過Socket應該都知道,Socket是比較原始的方式,咱們須要更多的去關注一些細節問題,好比參數和函數須要轉換成字節流進行網絡傳輸,也就是序列化操做,而後出參時須要反序列化;使用socket進行底層通信,代碼編程也比較容易出錯。bootstrap
若是一個調用方須要關注這麼多問題,那無疑是個災難,因此有沒有什麼簡單方法,讓咱們的調用方不須要關注細節問題,讓調用方像調用本地函數同樣,只要傳入參數,調用方法,而後坐等返回結果就能夠了呢?數組
RPC框架就是用來解決上面的問題的,它可以讓調用方像調用本地函數同樣調用遠程服務,底層通信細節對調用方是透明的,將各類複雜性都給屏蔽掉,給予調用方極致體驗。promise
前面就已經說到RPC框架,讓調用方像調用本地函數同樣調用遠程服務,那麼如何作到這一點呢?
在使用的時候,調用方是直接調用本地函數,傳入相應參數,其餘細節它不用管,至於通信細節交給RPC框架來實現。實際上RPC框架採用代理類的方式,具體來講是動態代理的方式,在運行時動態建立新的類,也就是代理類,在該類中實現通信的細節問題,好比參數序列化。
固然不光是序列化,咱們還須要約定一個雙方通訊的協議格式,規定好協議格式,好比請求參數的數據類型,請求的參數,請求的方法名等,這樣根據格式進行序列化後進行網絡傳輸,而後服務端收到請求對象後按照指定格式進行解碼,這樣服務端才知道具體該調用哪一個方法,傳入什麼樣的參數。
剛纔又提到網絡傳輸,RPC框架重要的一環也就是網絡傳輸,服務是部署在不一樣主機上的,如何高效的進行網絡傳輸,儘可能不丟包,保證數據完整無誤的快速傳遞出去?實際上,就是利用咱們今天的主角——Netty,Netty是一個高性能的網絡通信框架,它足以勝任咱們的任務。
前面說了這麼多,再次總結下一個RPC框架須要重點關注哪幾個點:
固然一個優秀的RPC框架須要關注的不止上面幾點,只不過本篇文章旨在作一個簡易的RPC框架,理解了上面關鍵的幾點就夠了
終於到了本文的重頭戲了,咱們將根據實現RPC須要關注的幾個要點(代理、序列化、協議、編解碼),使用Netty進行逐一實現
首先咱們須要肯定通訊雙方的協議格式,請求對象和響應對象
請求對象:
@Data @ToString public class RpcRequest { /** * 請求對象的ID */ private String requestId; /** * 類名 */ private String className; /** * 方法名 */ private String methodName; /** * 參數類型 */ private Class<?>[] parameterTypes; /** * 入參 */ private Object[] parameters; }
響應對象:
@Data public class RpcResponse { /** * 響應ID */ private String requestId; /** * 錯誤信息 */ private String error; /** * 返回的結果 */ private Object result; }
市面上序列化協議不少,好比jdk自帶的,Google的protobuf,kyro、Hessian等,只要不選擇jdk自帶的序列化方法,(由於其性能太差,序列化後產生的碼流太大),其餘方式其實均可以,這裏爲了方便起見,選用JSON做爲序列化協議,使用fastjson做爲JSON框架
爲了後續擴展方便,先定義序列化接口:
public interface Serializer { /** * java對象轉換爲二進制 * * @param object * @return */ byte[] serialize(Object object) throws IOException; /** * 二進制轉換成java對象 * * @param clazz * @param bytes * @param <T> * @return */ <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException; }
由於咱們採用JSON的方式,因此定義JSONSerializer的實現類:
public class JSONSerializer implements Serializer{ @Override public byte[] serialize(Object object) { return JSON.toJSONBytes(object); } @Override public <T> T deserialize(Class<T> clazz, byte[] bytes) { return JSON.parseObject(bytes, clazz); } }
若是後續要使用其餘序列化方式,能夠自行實現序列化接口
約定好協議格式和序列化方式以後,咱們還須要編解碼器,編碼器將請求對象轉換爲適合於傳輸的格式(通常來講是字節流),而對應的解碼器是將網絡字節流轉換回應用程序的消息格式。
編碼器實現:
public class RpcEncoder extends MessageToByteEncoder { private Class<?> clazz; private Serializer serializer; public RpcEncoder(Class<?> clazz, Serializer serializer) { this.clazz = clazz; this.serializer = serializer; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception { if (clazz != null && clazz.isInstance(msg)) { byte[] bytes = serializer.serialize(msg); byteBuf.writeInt(bytes.length); byteBuf.writeBytes(bytes); } } }
解碼器實現:
public class RpcDecoder extends ByteToMessageDecoder { private Class<?> clazz; private Serializer serializer; public RpcDecoder(Class<?> clazz, Serializer serializer) { this.clazz = clazz; this.serializer = serializer; } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { //由於以前編碼的時候寫入一個Int型,4個字節來表示長度 if (byteBuf.readableBytes() < 4) { return; } //標記當前讀的位置 byteBuf.markReaderIndex(); int dataLength = byteBuf.readInt(); if (byteBuf.readableBytes() < dataLength) { byteBuf.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; //將byteBuf中的數據讀入data字節數組 byteBuf.readBytes(data); Object obj = serializer.deserialize(clazz, data); list.add(obj); } }
下面來看看Netty客戶端是如何實現的,也就是如何使用Netty開啓客戶端。
實際上,熟悉Netty的朋友應該都知道,咱們須要注意如下幾點:
下面來看具體的實現代碼:
@Slf4j public class NettyClient { private EventLoopGroup eventLoopGroup; private Channel channel; private ClientHandler clientHandler; private String host; private Integer port; private static final int MAX_RETRY = 5; public NettyClient(String host, Integer port) { this.host = host; this.port = port; } public void connect() { clientHandler = new ClientHandler(); eventLoopGroup = new NioEventLoopGroup(); //啓動類 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) //指定傳輸使用的Channel .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //添加編碼器 pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer())); //添加解碼器 pipeline.addLast(new RpcDecoder(RpcResponse.class, new JSONSerializer())); //請求處理類 pipeline.addLast(clientHandler); } }); connect(bootstrap, host, port, MAX_RETRY); } /** * 失敗重連機制,參考Netty入門實戰掘金小冊 * * @param bootstrap * @param host * @param port * @param retry */ private void connect(Bootstrap bootstrap, String host, int port, int retry) { ChannelFuture channelFuture = bootstrap.connect(host, port).addListener(future -> { if (future.isSuccess()) { log.info("鏈接服務端成功"); } else if (retry == 0) { log.error("重試次數已用完,放棄鏈接"); } else { //第幾回重連: int order = (MAX_RETRY - retry) + 1; //本次重連的間隔 int delay = 1 << order; log.error("{} : 鏈接失敗,第 {} 重連....", new Date(), order); bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS); } }); channel = channelFuture.channel(); } /** * 發送消息 * * @param request * @return */ public RpcResponse send(final RpcRequest request) { try { channel.writeAndFlush(request).await(); } catch (InterruptedException e) { e.printStackTrace(); } return clientHandler.getRpcResponse(request.getRequestId()); } @PreDestroy public void close() { eventLoopGroup.shutdownGracefully(); channel.closeFuture().syncUninterruptibly(); } }
咱們對於數據的處理重點在於ClientHandler類上,它繼承了ChannelDuplexHandler類,能夠對出站和入站的數據進行處理
public class ClientHandler extends ChannelDuplexHandler { /** * 使用Map維護請求對象ID與響應結果Future的映射關係 */ private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof RpcResponse) { //獲取響應對象 RpcResponse response = (RpcResponse) msg; DefaultFuture defaultFuture = futureMap.get(response.getRequestId()); //將結果寫入DefaultFuture defaultFuture.setResponse(response); } super.channelRead(ctx,msg); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof RpcRequest) { RpcRequest request = (RpcRequest) msg; //發送請求對象以前,先把請求ID保存下來,並構建一個與響應Future的映射關係 futureMap.putIfAbsent(request.getRequestId(), new DefaultFuture()); } super.write(ctx, msg, promise); } /** * 獲取響應結果 * * @param requsetId * @return */ public RpcResponse getRpcResponse(String requsetId) { try { DefaultFuture future = futureMap.get(requsetId); return future.getRpcResponse(5000); } finally { //獲取成功之後,從map中移除 futureMap.remove(requsetId); } } }
參考文章: https://xilidou.com/2018/09/2...
從上面實現能夠看出,咱們定義了一個Map,維護請求ID與響應結果的映射關係,目的是爲了客戶端用來驗證服務端響應是否與請求相匹配,由於Netty的channel可能被多個線程使用,當結果返回時,你不知道是從哪一個線程返回的,因此須要一個映射關係。
而咱們的結果是封裝在DefaultFuture中的,由於Netty是異步框架,全部的返回都是基於Future和Callback機制的,咱們這裏自定義Future來實現客戶端"異步調用"
public class DefaultFuture { private RpcResponse rpcResponse; private volatile boolean isSucceed = false; private final Object object = new Object(); public RpcResponse getRpcResponse(int timeout) { synchronized (object) { while (!isSucceed) { try { object.wait(timeout); } catch (InterruptedException e) { e.printStackTrace(); } } return rpcResponse; } } public void setResponse(RpcResponse response) { if (isSucceed) { return; } synchronized (object) { this.rpcResponse = response; this.isSucceed = true; object.notify(); } } }
Netty服務端的實現跟客戶端的實現差很少,只不過要注意的是,當對請求進行解碼事後,須要經過代理的方式調用本地函數。下面是Server端代碼:
public class NettyServer implements InitializingBean { private EventLoopGroup boss = null; private EventLoopGroup worker = null; @Autowired private ServerHandler serverHandler; @Override public void afterPropertiesSet() throws Exception { //此處使用了zookeeper作註冊中心,本文不涉及,可忽略 ServiceRegistry registry = new ZkServiceRegistry("127.0.0.1:2181"); start(registry); } public void start(ServiceRegistry registry) throws Exception { //負責處理客戶端鏈接的線程池 boss = new NioEventLoopGroup(); //負責處理讀寫操做的線程池 worker = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //添加解碼器 pipeline.addLast(new RpcEncoder(RpcResponse.class, new JSONSerializer())); //添加編碼器 pipeline.addLast(new RpcDecoder(RpcRequest.class, new JSONSerializer())); //添加請求處理器 pipeline.addLast(serverHandler); } }); bind(serverBootstrap, 8888); } /** * 若是端口綁定失敗,端口數+1,從新綁定 * * @param serverBootstrap * @param port */ public void bind(final ServerBootstrap serverBootstrap,int port) { serverBootstrap.bind(port).addListener(future -> { if (future.isSuccess()) { log.info("端口[ {} ] 綁定成功",port); } else { log.error("端口[ {} ] 綁定失敗", port); bind(serverBootstrap, port + 1); } }); } @PreDestroy public void destory() throws InterruptedException { boss.shutdownGracefully().sync(); worker.shutdownGracefully().sync(); log.info("關閉Netty"); } }
下面是處理讀寫操做的Handler類:
@Component @Slf4j @ChannelHandler.Sharable public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> implements ApplicationContextAware { private ApplicationContext applicationContext; @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) { RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setRequestId(msg.getRequestId()); try { Object handler = handler(msg); log.info("獲取返回結果: {} ", handler); rpcResponse.setResult(handler); } catch (Throwable throwable) { rpcResponse.setError(throwable.toString()); throwable.printStackTrace(); } ctx.writeAndFlush(rpcResponse); } /** * 服務端使用代理處理請求 * * @param request * @return */ private Object handler(RpcRequest request) throws ClassNotFoundException, InvocationTargetException { //使用Class.forName進行加載Class文件 Class<?> clazz = Class.forName(request.getClassName()); Object serviceBean = applicationContext.getBean(clazz); log.info("serviceBean: {}",serviceBean); Class<?> serviceClass = serviceBean.getClass(); log.info("serverClass:{}",serviceClass); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); //使用CGLIB Reflect FastClass fastClass = FastClass.create(serviceClass); FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes); log.info("開始調用CGLIB動態代理執行服務端方法..."); return fastMethod.invoke(serviceBean, parameters); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
客戶端使用Java動態代理,在代理類中實現通訊細節,衆所衆知,Java動態代理須要實現InvocationHandler接口
@Slf4j public class RpcClientDynamicProxy<T> implements InvocationHandler { private Class<T> clazz; public RpcClientDynamicProxy(Class<T> clazz) throws Exception { this.clazz = clazz; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest request = new RpcRequest(); String requestId = UUID.randomUUID().toString(); String className = method.getDeclaringClass().getName(); String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); request.setRequestId(requestId); request.setClassName(className); request.setMethodName(methodName); request.setParameterTypes(parameterTypes); request.setParameters(args); log.info("請求內容: {}",request); //開啓Netty 客戶端,直連 NettyClient nettyClient = new NettyClient("127.0.0.1", 8888); log.info("開始鏈接服務端:{}",new Date()); nettyClient.connect(); RpcResponse send = nettyClient.send(request); log.info("請求調用返回結果:{}", send.getResult()); return send.getResult(); } }
代理工廠類以下:
public class ProxyFactory { public static <T> T create(Class<T> interfaceClass) throws Exception { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[] {interfaceClass}, new RpcClientDynamicProxy<T>(interfaceClass)); } }
API:
public interface HelloService { String hello(String name); }
客戶端:
@SpringBootApplication @Slf4j public class ClientApplication { public static void main(String[] args) throws Exception { SpringApplication.run(ClientApplication.class, args); HelloService helloService = ProxyFactory.create(HelloService.class); log.info("響應結果「: {}",helloService.hello("pjmike")); } }
服務端:
//服務端實現 @Service public class HelloServiceImpl implements HelloService { @Override public String hello(String name) { return "hello, " + name; } }
運行結果:
以上咱們基於Netty實現了一個非非很是簡陋的RPC框架,比起成熟的RPC框架來講相差甚遠,甚至說基本的註冊中心都沒有實現,可是經過本次實踐,能夠說我對於RPC的理解更深了,瞭解了一個RPC框架到底須要關注哪些方面,將來當咱們使用成熟的RPC框架時,好比Dubbo,可以作到心中有數,能明白其底層不過也是使用Netty做爲基礎通信框架。日後,若是更深刻翻看開源RPC框架源碼時,也相對比較容易
項目地址: https://github.com/pjmike/spr...