此博客所述項目代碼已在github開源,歡迎你們一塊兒貢獻!
點此進入:Cool-RPCjava
最近一次寫博客仍是17年末,謝謝你們持久以來的關注
本篇博文將會教你們如何從0到1,搭建一個簡單、高效且拓展性強的rpc框架.node
相信你們都或多或少使用過RPC框架,好比阿里的Dubbo、谷歌的grpc、Facebook的Thrift等等git
那麼究竟什麼是rpc?
rpc翻譯成中文叫作遠程過程調用,通俗易懂點:將單應用架構成分佈式系統架構後,多個系統間數據怎麼交互,這就是rpc的職責.github
從服務的角度來看,rpc分爲服務提供者(provider)和服務消費者(consumer)兩大類,中間會有一些共用java接口,叫作開放api接口
也就是說,接口服務實現類所處的地方叫作provider,接口服務調用類所處的地方叫consumerredis
由於處於分佈式環境中,那consumer調用provider時,如何知道對方服務器的IP和開放端口呢?
這時須要一個組件叫作註冊中心,consumer經過服務名後,去註冊中心上查找該服務的IP+Port,拿到地址數據後,再去請求該地址的服務bootstrap
如圖:api
此項目基於傳輸層(TCP/IP協議)進行通信,傳輸層框架使用netty編寫,github上會有mina版本
提供多套序列化框架,默認使用Protostuff序列化,可配置使用java序列化等
註冊中心默認zookeeper,可配置使用redis(只要有節點數據存儲和消息通知功能的組件便可)服務器
consumer經過java動態代理的方式使用執行遠程調用
將所要執行的類名,方法,參數等通知provider,以後provider拿着數據調用本地實現類,將處理後獲得的結果通知給consumer架構
廢話了那麼多,開始上乾貨,建議你們從github克隆完整代碼,本篇博文只講重點代碼框架
註冊中心以api接口名爲key,IP+Port爲value,將數據持久化,以供消費者查詢調用
以zookeeper爲例:
爲了更靈活地實現服務註冊者和發現者,這裏添加一個註冊中心適配器
public abstract class ServiceCenterAdapter implements ServiceCenter{ String host; int port = 0; String passWord; ServiceCenterAdapter(){} ServiceCenterAdapter(String host){ this.host = host; } ServiceCenterAdapter(String host, int port) { this.host = host; this.port = port; } @Override public String discover(String serviceName) { return null; } @Override public void register(String serviceName, String serviceAddress) {} @Override public void setHost(String host){ this.host = host; }; @Override public void setPort(int port){ this.port = port; }; @Override public void setPassWord(String passWord){ this.passWord = passWord; }; //獲取 IP:端口 @Override public String getAddress(){ if ("".equals(host) || host == null || port == 0){ throw new RuntimeException("the zookeeper host or port error"); } return host+":"+String.valueOf(port); }; }
zookeeper的服務註冊(provider使用):
在實際項目中,須要構造此類,並注入相應的IP和端口,最後以bean的形式注入到IOC容器中
public class ZooKeeperServiceRegistry extends ServiceCenterAdapter { private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceRegistry.class); private ZkClient zkClient; { this.port = 2181; zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT); log.info("connect zookeeper"); } public ZooKeeperServiceRegistry(String zkHost) { super(zkHost); } public ZooKeeperServiceRegistry(String zkHost, int zkPort) { super(zkHost, zkPort); } // 註冊服務 serviceName=接口名 serviceAddress=IP+Port @Override public void register(String serviceName, String serviceAddress) { // create cool node permanent String registryPath = CoolConstant.ZK_REGISTRY_PATH; if (!zkClient.exists(registryPath)) { zkClient.createPersistent(registryPath); log.info("create registry node: {}", registryPath); } // create service node permanent String servicePath = registryPath + "/" + serviceName; if (!zkClient.exists(servicePath)) { zkClient.createPersistent(servicePath); log.info("create service node: {}", servicePath); } // create service address node temp String addressPath = servicePath + "/address-"; String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress); log.info("create address node: {}", addressNode); } }
zookeeper的服務發現者(consumer使用):
同上,也須要配置相應的IP和端口,並以bean注入到項目ioc容器中
public class ZooKeeperServiceDiscovery extends ServiceCenterAdapter { private static final Logger log = LoggerFactory.getLogger(ZooKeeperServiceDiscovery.class); { super.port = 2181; } public ZooKeeperServiceDiscovery(){}; public ZooKeeperServiceDiscovery(String zkHost){ super(zkHost); } public ZooKeeperServiceDiscovery(String zkHost, int zkPort){ super(zkHost, zkPort); } // 服務發現 name=api接口名 @Override public String discover(String name) { ZkClient zkClient = new ZkClient(getAddress(), CoolConstant.ZK_SESSION_TIMEOUT, CoolConstant.ZK_CONNECTION_TIMEOUT); log.debug("connect zookeeper"); try { String servicePath = CoolConstant.ZK_REGISTRY_PATH + "/" + name; if (!zkClient.exists(servicePath)) { throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath)); } List<String> addressList = zkClient.getChildren(servicePath); if (addressList.size() == 0) { throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath)); } String address; int size = addressList.size(); if (size == 1) { address = addressList.get(0); log.debug("get only address node: {}", address); } else { address = addressList.get(ThreadLocalRandom.current().nextInt(size)); log.debug("get random address node: {}", address); } String addressPath = servicePath + "/" + address; return zkClient.readData(addressPath); } finally { zkClient.close(); } } }
此篇博文的TCP數據(包括編解碼器、處理器)所有以netty編寫
服務端的netty引導類:
public class CoolRpcServer implements ApplicationContextAware { private static Logger log = LoggerFactory.getLogger(CoolRpcServer.class); private Channel channel; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerBootstrap bootstrap; private HandlerInitializer handlerInitializer; private ServiceCenter serviceRegistry; private String serviceIP; private int port; public static Map<String, Object> servicesMap ; { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); bootstrap = new ServerBootstrap(); handlerInitializer = new HandlerInitializer(); servicesMap = new HashMap<>(16); } public CoolRpcServer(ServiceCenter serviceRegistry, String serviceIP, int port){ this.serviceRegistry = serviceRegistry; this.serviceIP = serviceIP; this.port = port; } /** * start and init tcp server if ioc contain is booting */ @SuppressWarnings("unchecked") public void initServer() throws InterruptedException { bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(handlerInitializer); bootstrap.option(ChannelOption.SO_BACKLOG, 128); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // the most send bytes ( 256KB ) bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 256); // the most receive bytes ( 2048KB ) bootstrap.childOption(ChannelOption.SO_RCVBUF, 1024 * 1024 * 2); channel = bootstrap.bind(serviceIP,port).sync().channel(); if (servicesMap != null && servicesMap.size() > 0){ for (String beanName: servicesMap.keySet()){ serviceRegistry.register(beanName, serviceIP + ":" + String.valueOf(port)); log.info("register service name = {}", beanName); } } log.info("TCP server started successfully, port:{}", port); channel.closeFuture().sync(); } /** * close ioc contain and stop tcp server */ public void stopServer(){ if (channel != null && channel.isActive()) { channel.close(); } if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } log.info("TCP server stopped successfully, port: {}", port); } /** * scan Annotation of CoolService */ @Override public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> beans = ctx.getBeansWithAnnotation(CoolService.class); if (beans != null && beans.size()>0){ for (Object bean : beans.values()){ String name = bean.getClass().getAnnotation(CoolService.class).value().getName(); servicesMap.put(name, bean); } } } }
此項目的開放api接口實現類須要用@CoolService
註解標識,服務端容器啓動時,會掃描全部帶有此註解的實現類,並注入到註冊中心
服務端處理器(netty handler):
@ChannelHandler.Sharable public class CoolServerHandler extends ChannelInboundHandlerAdapter { private static Logger log = LoggerFactory.getLogger(CoolServerHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CoolResponse response = new CoolResponse(); CoolRequest request = (CoolRequest) msg; try { Object result = invoke(request); response.setRequestID(request.getRequestID()); response.setResult(result); } catch (Throwable error) { response.setError(error); } ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private Object invoke(CoolRequest request) throws Throwable{ if (request == null){ throw new Throwable("cool rpc request not found"); } String className = request.getClassName(); String methodName = request.getMethodName(); Object[] parameters = request.getParameters(); Object service = CoolRpcServer.servicesMap.get(className); if (service == null){ throw new Throwable("cool rpc service not exist"); } Class<?> serviceClass = service.getClass(); Class<?>[] parameterTypes = request.getParameterTypes(); FastClass fastClass = FastClass.create(serviceClass); FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes); return fastMethod.invoke(service, parameters); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("server caught exception", cause); ctx.close(); } }
將客戶端傳輸過來的請求數據(類名,方法,參數)在本地以cglib的方式反射調用
調用成功後,將處理完畢的結果編碼返回給客戶端,而且關閉TCP鏈接
consumer只有api接口,並無其實現類,因此咱們能夠用java動態代理的方式去自定義方法實現,代理的方法實現即是創建TCP握手鍊接,有provider來執行方法,將獲得的結果返回給代理類,由此形成一種單憑接口就能調用實現類方法的假象
第一步: 使用java動態代理new出代理對象
public class CoolProxy { private static Logger log = LoggerFactory.getLogger(CoolProxy.class); private ServiceCenter serviceDiscovery; public CoolProxy(ServiceCenter serviceDiscovery){ this.serviceDiscovery = serviceDiscovery; } @SuppressWarnings("unchecked") public <T> T getInstance(Class<T> cls){ return (T)Proxy.newProxyInstance(cls.getClassLoader(), new Class<?>[]{cls}, (proxy, method, args) -> { CoolRequest request = new CoolRequest(); request.setRequestID(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameters(args); request.setParameterTypes(method.getParameterTypes()); String[] addr = serviceDiscovery.discover(cls.getName()).split(":",2); CoolRpcClient client = new CoolRpcClient(addr[0], Integer.parseInt(addr[1])); CoolResponse response = client.send(request); if (response.getError()!=null){ throw response.getError(); } else { return response.getResult(); } }); } }
第二步: 在代理方法中,使用遠程過程調用(rpc)
客戶端引導類:
public class CoolRpcClient { private static Logger log = LoggerFactory.getLogger(CoolRpcClient.class); private CountDownLatch countDownLatch; private EventLoopGroup group; private Bootstrap bootstrap; private CoolResponse response; private String serviceIP; private int port; { response = new CoolResponse(); countDownLatch = new CountDownLatch(1); group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); } public CoolRpcClient(String serviceIP, int port){ this.serviceIP = serviceIP; this.port = port; } public CoolResponse send(CoolRequest request){ try { bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new CoolRpcDecoder(CoolResponse.class)) .addLast(new CoolRpcEncoder(CoolRequest.class)) .addLast(new CoolClientHandler(countDownLatch, response)); } }); bootstrap.option(ChannelOption.TCP_NODELAY, true); Channel channel = bootstrap.connect(serviceIP, port).sync().channel(); channel.writeAndFlush(request).sync(); countDownLatch.await(); channel.closeFuture().sync(); return response; } catch (Exception e){ e.printStackTrace(); return null; } finally { group.shutdownGracefully(); } } }
客戶端處理器(handler):
@ChannelHandler.Sharable public class CoolClientHandler extends ChannelInboundHandlerAdapter { private static Logger log = LoggerFactory.getLogger(CoolClientHandler.class); private CountDownLatch latch; private CoolResponse response; public CoolClientHandler(CountDownLatch latch, CoolResponse response){ this.latch = latch; this.response = response; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CoolResponse enResponse = (CoolResponse) msg; this.response.sync(enResponse); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { latch.countDown(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("api caught exception", cause); ctx.close(); } }
最後使用CountDownLatch同步通知調用者,rpc調用完畢
以上即是Cool-Rpc的簡單講解,若有更好的想法請聯繫我
熱烈歡迎你們一塊兒維護此項目Cool-RPC