本文主要是對勇哥的simpleRpc進行了簡單的剖析,用來學習rpc,加深對rpc的理解!java
源碼地址:http://git.oschina.net/huangyong/rpcnode
勇哥博客:https://my.oschina.net/huangyong/blog/361751git
rpc(Remote Procedure Call)主要是將遠程服務調用包裝成爲本地服務調用,使用起來比較直觀。之前,項目中有使用過將定時任務模塊剝離出來,單獨部署,定時任務模塊使用rmi調用主系統的服務,使用起來和rpc應該是相似。spring
首先是down下源碼,部署在本地IDE,具體過程參見gitee介紹吧!bootstrap
下面是eclipse的工程截圖:api
先試用下:服務器
1:啓動本地zk,使用默認配置(工程中的默認配置就是ZK的默認配置,基本不用改)app
2:啓動rpc-sample-server工程的RpcBootstrap.javaeclipse
啓動日誌:ide
start server connect zookeeper create address node: /registry/com.xxx.rpc.sample.api.HelloService-sample.hello2/address-0000000003 register service: com.xxx.rpc.sample.api.HelloService-sample.hello2 => 127.0.0.1:8000 create address node: /registry/com.xxx.rpc.sample.api.HelloService/address-0000000003 register service: com.xxx.rpc.sample.api.HelloService => 127.0.0.1:8000 server started on port 8000
3:啓動rpc-sample-client工程的HelloClient.java
connect zookeeper get only address node: address-0000000003 discover service: com.xxx.rpc.sample.api.HelloService => 127.0.0.1:8000 time: 364ms Hello! World connect zookeeper get only address node: address-0000000003 discover service: com.xxx.rpc.sample.api.HelloService-sample.hello2 => 127.0.0.1:8000 time: 19ms 你好! 世界
試用成功!
下面來進行分析
服務端啓動時序圖:
1:啓動main方法,spring開始初始化
2:初始化zk註冊類,鏈接zk,返回zkClient
3:初始化server,掃描application,將被@RpcService標註的bean提取並存儲
4:啓動netty服務端,並將rpc服務類及對應的服務地址註冊到zk中
5:獲取請求並利用反射調用請求處理類,返回結果
流程很清晰,反推一下:
要提供服務,必需要啓動一個服務端,這裏使用netty實現,有了服務端就要考慮對外提供什麼服務,這裏的服務被統一註冊到了zk,因此只須要去zk裏面查詢便可,也就意味着必需要有一個zk連接的過程,有了連接,就須要註冊服務了,那麼就須要辨識哪些服務是須要被註冊的,這裏是經過@RpcService標註的,同時也是spring的bean,那麼當獲取到請求後,利用反射調用真正的服務提供類,處理完畢以後返回結果,這樣就基本實現了上述的流程。
簡單看下源碼:
啓動spring
new ClassPathXmlApplicationContext("spring.xml");
spring.xml配置
<context:component-scan base-package="com.xxx.rpc.sample.server"/> <context:property-placeholder location="classpath:rpc.properties"/> <bean id="serviceRegistry" class="com.xxx.rpc.registry.zookeeper.ZooKeeperServiceRegistry"> <constructor-arg name="zkAddress" value="${rpc.registry_address}"/> </bean> <bean id="rpcServer" class="com.xxx.rpc.server.RpcServer"> <constructor-arg name="serviceAddress" value="${rpc.service_address}"/> <constructor-arg name="serviceRegistry" ref="serviceRegistry"/> </bean>
看到首先是初始化zk註冊類ZooKeeperServiceRegistry
基本就是建立zk連接:
public ZooKeeperServiceRegistry(String zkAddress) { // 建立 ZooKeeper 客戶端 zkClient = new ZkClient(zkAddress, Constant.ZK_SESSION_TIMEOUT, Constant.ZK_CONNECTION_TIMEOUT); LOGGER.debug("connect zookeeper"); }
緊接着就初始化RpcSserver類,它依賴zk註冊類以及提供rpc服務的服務器address
這裏的rpc.properties:
rpc.service_address=127.0.0.1:8000
rpc.registry_address=127.0.0.1:2181
RpcServer類實現了ApplicationContextAware, InitializingBean接口
在重寫setApplicationContext時,實現了服務類辨識及存儲:
// 掃描帶有 RpcService 註解的類並初始化 handlerMap 對象 Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); if (MapUtils.isNotEmpty(serviceBeanMap)) { for (Object serviceBean : serviceBeanMap.values()) { RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class); String serviceName = rpcService.value().getName(); String serviceVersion = rpcService.version(); if (StringUtil.isNotEmpty(serviceVersion)) { serviceName += "-" + serviceVersion; } handlerMap.put(serviceName, serviceBean); } }
在重寫afterPropertiesSet時實現了Netty服務的啓動:
@Override public void afterPropertiesSet() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 建立並初始化 Netty 服務端 Bootstrap 對象 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new RpcDecoder(RpcRequest.class)); // 解碼 RPC 請求 pipeline.addLast(new RpcEncoder(RpcResponse.class)); // 編碼 RPC 響應 pipeline.addLast(new RpcServerHandler(handlerMap)); // 處理 RPC 請求 } }); bootstrap.option(ChannelOption.SO_BACKLOG, 1024); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 獲取 RPC 服務器的 IP 地址與端口號 String[] addressArray = StringUtil.split(serviceAddress, ":"); String ip = addressArray[0]; int port = Integer.parseInt(addressArray[1]); // 啓動 RPC 服務器 ChannelFuture future = bootstrap.bind(ip, port).sync(); // 註冊 RPC 服務地址 if (serviceRegistry != null) { for (String interfaceName : handlerMap.keySet()) { serviceRegistry.register(interfaceName, serviceAddress); LOGGER.debug("register service: {} => {}", interfaceName, serviceAddress); } } LOGGER.debug("server started on port {}", port); // 關閉 RPC 服務器 future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }
標準的啓動Netty服務端,同時使用依賴的zk註冊類去zk中註冊服務,上一步已經獲得了rpc服務類,提供服務類的地址也配置在文件中,那麼就能夠開始註冊了
@Override public void register(String serviceName, String serviceAddress) { // 建立 registry 節點(持久) String registryPath = Constant.ZK_REGISTRY_PATH; if (!zkClient.exists(registryPath)) { zkClient.createPersistent(registryPath); LOGGER.debug("create registry node: {}", registryPath); } // 建立 service 節點(持久) String servicePath = registryPath + "/" + serviceName; if (!zkClient.exists(servicePath)) { zkClient.createPersistent(servicePath); LOGGER.debug("create service node: {}", servicePath); } // 建立 address 節點(臨時) String addressPath = servicePath + "/address-"; String addressNode = zkClient.createEphemeralSequential(addressPath, serviceAddress); LOGGER.debug("create address node: {}", addressNode); }
註冊完成後,zk信息以下:
能夠看到已經註冊了兩個類了,這兩個類就位於rpc-sample-server工程中
那麼如何處理請求呢?因爲啓動的是Netty服務端,那麼請求到來時,確定是netty獲取到,在啓動服務端時,配置了一個RpcServerHandler,請求的反射處理就是在這裏進行的:
@Override public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception { // 建立並初始化 RPC 響應對象 RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { Object result = handle(request); response.setResult(result); } catch (Exception e) { LOGGER.error("handle result failure", e); response.setException(e); } // 寫入 RPC 響應對象並自動關閉鏈接 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private Object handle(RpcRequest request) throws Exception { // 獲取服務對象 String serviceName = request.getInterfaceName(); String serviceVersion = request.getServiceVersion(); if (StringUtil.isNotEmpty(serviceVersion)) { serviceName += "-" + serviceVersion; } Object serviceBean = handlerMap.get(serviceName); if (serviceBean == null) { throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName)); } // 獲取反射調用所需的參數 Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); // 執行反射調用 // Method method = serviceClass.getMethod(methodName, parameterTypes); // method.setAccessible(true); // return method.invoke(serviceBean, parameters); // 使用 CGLib 執行反射調用 FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); return serviceFastMethod.invoke(serviceBean, parameters); }
這裏的請求流程基本是Netty的標準處理流程,在處理時,從請求中獲取到指定的請求參數,再去handlerMap中匹配對應的服務類bean,而後利用反射調用此bean對應的處理方法,並獲得返回結果
這樣一來,服務端的基本功能就完畢了!
看看主要的涉及面有哪些:
利用zk作服務註冊
利用@RpcService標識遠程服務類
利用Netty實現服務請求接收
利用反射實現真實服務調用
利用Protostuff實現序列化
這些應該是rpc基礎的涉及點吧,代碼量很少,理解起來相對容易,看來,完成rpc功能相對簡單,可是考慮其餘的因素就複雜了,慢慢學習理解吧!