2019-07-19:完成基本RPC通訊!前端
2019-07-22:優化此框架,實現單一長鏈接!java
2019-07-24:繼續優化此框架:一、增長服務提供註解(帶版本號),而後利用Spring框架的在啓動時馬上保存提供服務的實現類。二、優化NettyConfig(區分消費者和提供者配置),由於一個項目可同時做爲服務提供者和服務消費者,因此增長兩個配置來區分是提供服務仍是消費服務,並且,由於若是都是本地啓動着兩個項目,那麼IP一定是同樣的,因此須要區分服務端口和消費端口。否則會有下面事故:先啓動client,再啓動server,可是他們一樣依賴於netty包,因此client也啓動了netty服務,只配置一個相同的端口會致使client的RPC通訊也是通道本身啓動的Netty服務。。。git
2019-07-27:優化此框架:增長註冊中心,使用Zookeeper做爲註冊中心。github
接下來:我會優化Netty方面的,例如增長心跳檢測、業務處理統一使用自定義業務線程池、客戶端或服務端異常斷開處理等,而後會優化一下項目的結構和rpc通訊返回結果等,最後可能會考慮增長Redis做爲註冊中心。等完成全部的這些,就會對整個項目從新寫一篇文章來介紹一下本身的總體思路,固然了,若是有同窗須要的,能夠在下方留言,我能夠提早寫文章,對完成註冊中心及以前的代碼進行詳細介紹,以後再補充其餘新增的功能實現過程!~spring
2019-07-30:已完成所有功能。放上鍊接:完整版RPC通訊框架框架
下面的是2019-07-19寫的文章,因此代碼是沒通過優化的,不過是核心代碼,仍是須要閱讀一下的,須要看完整代碼的請到最下面的github地址,你們可根據標籤拉到對應的代碼,麻煩啦~而後還有,測試方法是HelloController的sayHello方法呢,也能夠本身再搗鼓一些測試一下~ide
前段時間,我花了兩個星期的時間去從新學習Netty,由於以前老是看過一會就沒看了,因此今次下定決心必定要所有看完,而後也思考作了一些的思考題,而且將簡單的控制檯版IM系統作出來了。雖然叫IM系統,可是是很簡陋的,哈哈,只有登陸、單聊、建羣、加羣、退羣、羣聊等簡單的功能。你們能夠到我github上看看:Netty-IM學習
寫完這個IM系統後,我是打算本身寫一個網頁版的,但是考慮到本身前端的技能好像都退化得差很少了,並且時間上可能沒那麼充裕,就不了了之了。而後有一天,忽然想起來以前使用的RPC框架->Dubbo,他的通訊底層就是使用Netty,那麼我就想着要不本身先搞個簡單版試試唄,由於最主要的是學習技能得實踐一番,否則學了好像沒學同樣。。。測試
在開始動手前,本身屢了一下思路,也參考了兩篇文章,決定先作一個簡版的RPC框架,不帶註冊中心的那種。那麼來了老弟,首先咱們看一下整個流程圖是咋樣的:優化
接下來重頭戲來了,下面將會較詳細得說一下流程:
先簡單介紹一下項目結構:
simple-rpc-client:服務消費
simple-rpc-server:服務提供
simple-rpc-encapsulation:消費者和提供者公共接口
simple-rpc-netty:是關於Netty的東西,包括:自定義協議,序列化,通訊實體Packet,各類Handler等等。
客戶端: 一、首先是兩個註解,一個註解是:標識那些接口的調用會進行RPC通訊,即@NettyRPC註解。 另一個註解是:告訴程序哪些包下的類會使用RPC通訊,像@ComponentScan同樣,即@EnableNettyRPC註解。
/** * @author Howinfun */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface NettyRPC { } /** * @author Howinfun * @desc * @date 2019/7/15 */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface EnableNettyRPC { //掃描的包名,若是爲空,則根據啓動類所在的包名掃描 String[] basePackages() default {}; }
二、由於咱們使用@NettyRPC的將是一些接口,若是項目裏頭沒有實現類,那是調用失敗的,那麼咱們能夠經過實現ImportBeanDefinitionRegistrar和自定義FactoryBean和InvocationHandler,利用動態代理使接口有實現,而且能動態注入Bean。ImportBeanDefinitionRegistrar接口能夠詳細說一下,由於這裏是動態注入Bean,怎麼注入規則是能夠自定的,主要是靠ClassPathScanningCandidateComponentProvider這個類,它主要功能是掃描ClassPath下的全部類,而且根據isCandidateComponent方法來判斷哪些類能夠做爲候選人,固然了,isCandidateComponent方法你能夠重寫,而後加上你本身的規則,我這裏是必須是獨立的而且是接口,才能成爲候選人。而後ClassPathScanningCandidateComponentProvider還能添加過濾器,我這裏主要添加的過濾器是註解過濾器,只要帶有@NettyRPC註解的,其餘的都不要。 不過須要注意一點的是:記得在有@Configuration註解的配置類上使用@Import導入實現ImportBeanDefinitionRegistrar的類,否則實現動態注入Bean的做用,這裏咱們在客戶端的啓動類Import便可。
package com.hyf.rpc.netty.client.config; import com.hyf.rpc.netty.anno.EnableNettyRPC; import com.hyf.rpc.netty.anno.NettyRPC; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider; import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; import org.springframework.core.type.AnnotationMetadata; import org.springframework.core.type.filter.AnnotationTypeFilter; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; import org.springframework.util.StringUtils; import java.util.HashSet; import java.util.Map; import java.util.Set; /** * 自定義註冊帶@NettyRPC註解的接口,利用動態代理使接口有實現 * 而後在有@Configuration註解的配置類上使用@Import導入,否則不能注入這些實現@NettyRPC接口的BeanDefinition * @author Howinfun * @date 2019-07-18 */ public class NettyRpcClientRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware { private ClassLoader classLoader; @Override public void setBeanClassLoader(ClassLoader classLoader) { this.classLoader = classLoader; } @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { ClassPathScanningCandidateComponentProvider scan = getScanner(); //指定註解,相似於Feign註解,只掃描帶@NettyRPC註解的接口 scan.addIncludeFilter(new AnnotationTypeFilter(NettyRPC.class)); Set<BeanDefinition> candidateComponents = new HashSet<>(); for (String basePackage : getBasePackages(importingClassMetadata)) { candidateComponents.addAll(scan.findCandidateComponents(basePackage)); } candidateComponents.stream().forEach(beanDefinition -> { if (!registry.containsBeanDefinition(beanDefinition.getBeanClassName())) { if (beanDefinition instanceof AnnotatedBeanDefinition) { AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition; AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata(); Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(NettyRPC.class.getCanonicalName()); this.registerNettyRpcClient(registry, annotationMetadata,attributes); } } }); } private void registerNettyRpcClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) { String className = annotationMetadata.getClassName(); // 指定工廠,使用@NettyRPC註解的接口,當代碼中注入時,是從指定工廠獲取,而這裏的工廠返回的是代理 BeanDefinitionBuilder definition = BeanDefinitionBuilder .genericBeanDefinition(NettyClientFactoryBean.class); // @Autowrie:根據類型注入 definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); // 註定type屬性 definition.addPropertyValue("type", className); String name = attributes.get("name") == null ? "" :(String)(attributes.get("name")); // 別名 String alias = name + "NettyRpcClient"; AbstractBeanDefinition beanDefinition = definition.getBeanDefinition(); beanDefinition.setPrimary(true); BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[] { alias }); // 註冊BeanDefinition BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry); } protected ClassPathScanningCandidateComponentProvider getScanner() { return new ClassPathScanningCandidateComponentProvider(false) { @Override protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) { // 判斷候選人的條件:必須是獨立的,而後是接口 if (beanDefinition.getMetadata().isIndependent() && beanDefinition.getMetadata().isInterface()){ return true; } return false; } }; } /** * 獲取指定掃描@NettyRPC註解的包路徑 * @param importingClassMetadata * @return */ protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) { Map<String, Object> attributes = importingClassMetadata .getAnnotationAttributes(EnableNettyRPC.class.getCanonicalName()); Set<String> basePackages = new HashSet<>(); // 若是指定的包路徑爲空,則獲取啓動類當前路徑 if (basePackages.isEmpty()) { basePackages.add( ClassUtils.getPackageName(importingClassMetadata.getClassName())); }else{ for (String pkg : (String[]) attributes.get("basePackages")) { if (StringUtils.hasText(pkg)) { basePackages.add(pkg); } } } return basePackages; } }
package com.hyf.rpc.netty.client.config; import lombok.Data; import lombok.EqualsAndHashCode; import org.springframework.beans.factory.FactoryBean; import org.springframework.stereotype.Component; import java.lang.reflect.Proxy; /** * @author Howinfun * @desc * @date 2019/7/15 */ @Data @EqualsAndHashCode(callSuper = false) @Component public class NettyClientFactoryBean implements FactoryBean<Object> { private Class<?> type; @Override public Object getObject() throws Exception { // 這裏的interfaces注意是就是type,由於咱們如今是給接口作代理,千萬別寫type.getInterfaces(),否則啓動會報錯 return Proxy.newProxyInstance(type.getClassLoader(),new Class[]{type},new NettyRPCInvocationHandler(this.type)); } @Override public Class<?> getObjectType() { return this.type; } }
三、在動態代理的invoke方法裏頭,咱們將啓動Netty的一個客戶端,帶上接口調用的信息,而後等待Netty服務端返回結果結果再返回到前端便可。
package com.hyf.rpc.netty.client.config; import com.hyf.rpc.netty.client.NettyClient; import com.hyf.rpc.netty.packet.RPCRequestPacket; import lombok.NoArgsConstructor; import org.springframework.stereotype.Component; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; /** * @author Howinfun * @desc * @date 2019/7/15 */ @NoArgsConstructor @Component public class NettyRPCInvocationHandler implements InvocationHandler { private Class<?> type; public NettyRPCInvocationHandler(Class<?> type){ this.type = type; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RPCRequestPacket requestPacket = new RPCRequestPacket(); requestPacket.setClazz(type); requestPacket.setMethodName(method.getName()); requestPacket.setParamTypes(method.getParameterTypes()); requestPacket.setParams(args); Object result = NettyClient.callRPC(requestPacket); return result; } }
有一個坑是:當客戶端接收到服務端的返回結果後,記得關閉通道[ctx.channel().close()],由於在客戶端中RPC調用後是同步等待Channel關閉的,否則不能響應給前端。
服務端:服務端的流程稍微會簡單不少 一、啓動Netty服務端服務,而後接收客戶端的連接請求,解析請求 二、而後根據接口調用信息,利用反射獲取到實現類和對應的方法,最後調用方法獲得結果,而後封裝一下結果就能夠相應給客戶端了。
package com.hyf.rpc.netty.server.handler; import com.hyf.rpc.netty.packet.RPCRequestPacket; import com.hyf.rpc.netty.packet.RPCResponsePacket; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.reflections.Reflections; import java.lang.reflect.Method; import java.util.Set; /** * @author Howinfun * @desc * @date 2019/7/16 */ @ChannelHandler.Sharable public class RPCRequestPacketHandler extends SimpleChannelInboundHandler<RPCRequestPacket> { public static final RPCRequestPacketHandler INSTANCE = new RPCRequestPacketHandler(); private RPCRequestPacketHandler(){} @Override protected void channelRead0(ChannelHandlerContext ctx, RPCRequestPacket msg) throws Exception { RPCResponsePacket responsePacket = new RPCResponsePacket(); // 獲取rpc調用信息,利用反射執行方法,返回結果 Class clazz = msg.getClazz(); String methodName = msg.getMethodName(); Object[] params = msg.getParams(); Class[] paramTypes = msg.getParamTypes(); // 掃面路徑下全部元數據 Reflections reflections = new Reflections("com.hyf.rpc.serviceImpl"); Set<Class> subTypes = reflections.getSubTypesOf(clazz); if (subTypes.isEmpty()){ responsePacket.setSuccess(false); responsePacket.setMsg("沒有實現類"); }else if (subTypes.size() > 1){ responsePacket.setSuccess(false); responsePacket.setMsg("多個實現類,沒法判斷執行哪個"); }else{ Class subClass = subTypes.toArray(new Class[1])[0]; Method method = subClass.getMethod(methodName,paramTypes); Object result = method.invoke(subClass.newInstance(),params); responsePacket.setSuccess(true); responsePacket.setResult(result); } ctx.channel().writeAndFlush(responsePacket); } }
三、這裏的反射我推薦一個很好用的框架->Reflections。簡單介紹一下我使用了哪些API,首先是根據路徑掃描反射元數據, 而後根據接口獲取它的全部實現類,而後就能夠獲取實現類的反射信息,獲得方法執行結果了。
若是同窗們對此比較簡陋的代碼還略感興趣,能夠到個人碼雲上看看: