SpringBoot2+Netty打造通俗簡版RPC通訊框架

2019-07-19:完成基本RPC通訊!前端

2019-07-22:優化此框架,實現單一長鏈接!java

2019-07-24:繼續優化此框架:一、增長服務提供註解(帶版本號),而後利用Spring框架的在啓動時馬上保存提供服務的實現類。二、優化NettyConfig(區分消費者和提供者配置),由於一個項目可同時做爲服務提供者和服務消費者,因此增長兩個配置來區分是提供服務仍是消費服務,並且,由於若是都是本地啓動着兩個項目,那麼IP一定是同樣的,因此須要區分服務端口和消費端口。否則會有下面事故:先啓動client,再啓動server,可是他們一樣依賴於netty包,因此client也啓動了netty服務,只配置一個相同的端口會致使client的RPC通訊也是通道本身啓動的Netty服務。。。git

2019-07-27:優化此框架:增長註冊中心,使用Zookeeper做爲註冊中心。github

接下來:我會優化Netty方面的,例如增長心跳檢測、業務處理統一使用自定義業務線程池、客戶端或服務端異常斷開處理等,而後會優化一下項目的結構和rpc通訊返回結果等,最後可能會考慮增長Redis做爲註冊中心。等完成全部的這些,就會對整個項目從新寫一篇文章來介紹一下本身的總體思路,固然了,若是有同窗須要的,能夠在下方留言,我能夠提早寫文章,對完成註冊中心及以前的代碼進行詳細介紹,以後再補充其餘新增的功能實現過程!~spring

2019-07-30:已完成所有功能。放上鍊接:完整版RPC通訊框架框架

下面的是2019-07-19寫的文章,因此代碼是沒通過優化的,不過是核心代碼,仍是須要閱讀一下的,須要看完整代碼的請到最下面的github地址,你們可根據標籤拉到對應的代碼,麻煩啦~而後還有,測試方法是HelloController的sayHello方法呢,也能夠本身再搗鼓一些測試一下~ide

​​​​

前段時間,我花了兩個星期的時間去從新學習Netty,由於以前老是看過一會就沒看了,因此今次下定決心必定要所有看完,而後也思考作了一些的思考題,而且將簡單的控制檯版IM系統作出來了。雖然叫IM系統,可是是很簡陋的,哈哈,只有登陸、單聊、建羣、加羣、退羣、羣聊等簡單的功能。你們能夠到我github上看看:Netty-IM學習

寫完這個IM系統後,我是打算本身寫一個網頁版的,但是考慮到本身前端的技能好像都退化得差很少了,並且時間上可能沒那麼充裕,就不了了之了。而後有一天,忽然想起來以前使用的RPC框架->Dubbo,他的通訊底層就是使用Netty,那麼我就想着要不本身先搞個簡單版試試唄,由於最主要的是學習技能得實踐一番,否則學了好像沒學同樣。。。測試

在開始動手前,本身屢了一下思路,也參考了兩篇文章,決定先作一個簡版的RPC框架,不帶註冊中心的那種。那麼來了老弟,首先咱們看一下整個流程圖是咋樣的:優化

接下來重頭戲來了,下面將會較詳細得說一下流程:

先簡單介紹一下項目結構:

simple-rpc-client:服務消費

simple-rpc-server:服務提供

simple-rpc-encapsulation:消費者和提供者公共接口

simple-rpc-netty:是關於Netty的東西,包括:自定義協議,序列化,通訊實體Packet,各類Handler等等。

客戶端:         一、首先是兩個註解,一個註解是:標識那些接口的調用會進行RPC通訊,即@NettyRPC註解。         另一個註解是:告訴程序哪些包下的類會使用RPC通訊,像@ComponentScan同樣,即@EnableNettyRPC註解。

/**
 * @author Howinfun
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface NettyRPC {
}
/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface EnableNettyRPC {
    //掃描的包名,若是爲空,則根據啓動類所在的包名掃描
    String[] basePackages() default {};
}

        二、由於咱們使用@NettyRPC的將是一些接口,若是項目裏頭沒有實現類,那是調用失敗的,那麼咱們能夠經過實現ImportBeanDefinitionRegistrar和自定義FactoryBean和InvocationHandler,利用動態代理使接口有實現,而且能動態注入Bean。ImportBeanDefinitionRegistrar接口能夠詳細說一下,由於這裏是動態注入Bean,怎麼注入規則是能夠自定的,主要是靠ClassPathScanningCandidateComponentProvider這個類,它主要功能是掃描ClassPath下的全部類,而且根據isCandidateComponent方法來判斷哪些類能夠做爲候選人,固然了,isCandidateComponent方法你能夠重寫,而後加上你本身的規則,我這裏是必須是獨立的而且是接口,才能成爲候選人。而後ClassPathScanningCandidateComponentProvider還能添加過濾器,我這裏主要添加的過濾器是註解過濾器,只要帶有@NettyRPC註解的,其餘的都不要。     不過須要注意一點的是:記得在有@Configuration註解的配置類上使用@Import導入實現ImportBeanDefinitionRegistrar的類,否則實現動態注入Bean的做用,這裏咱們在客戶端的啓動類Import便可。

package com.hyf.rpc.netty.client.config;

import com.hyf.rpc.netty.anno.EnableNettyRPC;
import com.hyf.rpc.netty.anno.NettyRPC;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;


/**
 * 自定義註冊帶@NettyRPC註解的接口,利用動態代理使接口有實現
 * 而後在有@Configuration註解的配置類上使用@Import導入,否則不能注入這些實現@NettyRPC接口的BeanDefinition
 * @author Howinfun
 * @date 2019-07-18
 */
public class NettyRpcClientRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {

    private ClassLoader classLoader;

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {

        ClassPathScanningCandidateComponentProvider scan = getScanner();

        //指定註解,相似於Feign註解,只掃描帶@NettyRPC註解的接口
        scan.addIncludeFilter(new AnnotationTypeFilter(NettyRPC.class));

        Set<BeanDefinition> candidateComponents = new HashSet<>();
        for (String basePackage : getBasePackages(importingClassMetadata)) {
            candidateComponents.addAll(scan.findCandidateComponents(basePackage));
        }
        candidateComponents.stream().forEach(beanDefinition -> {
            if (!registry.containsBeanDefinition(beanDefinition.getBeanClassName())) {
                if (beanDefinition instanceof AnnotatedBeanDefinition) {
                    AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;
                    AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata();
                    Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(NettyRPC.class.getCanonicalName());

                    this.registerNettyRpcClient(registry, annotationMetadata,attributes);
                }
            }
        });
    }

    private void registerNettyRpcClient(BeanDefinitionRegistry registry,
                                        AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
        String className = annotationMetadata.getClassName();
        // 指定工廠,使用@NettyRPC註解的接口,當代碼中注入時,是從指定工廠獲取,而這裏的工廠返回的是代理
        BeanDefinitionBuilder definition = BeanDefinitionBuilder
                .genericBeanDefinition(NettyClientFactoryBean.class);
        // @Autowrie:根據類型注入
        definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
        // 註定type屬性
        definition.addPropertyValue("type", className);
        String name = attributes.get("name") == null ? "" :(String)(attributes.get("name"));
        // 別名
        String alias = name + "NettyRpcClient";
        AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
        beanDefinition.setPrimary(true);
        BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
                new String[] { alias });
        // 註冊BeanDefinition
        BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
    }



    protected ClassPathScanningCandidateComponentProvider getScanner() {
        return new ClassPathScanningCandidateComponentProvider(false) {
            @Override
            protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
                // 判斷候選人的條件:必須是獨立的,而後是接口
                if (beanDefinition.getMetadata().isIndependent() && beanDefinition.getMetadata().isInterface()){
                    return true;
                }
                return false;
            }
        };
    }

    /**
     * 獲取指定掃描@NettyRPC註解的包路徑
     * @param importingClassMetadata
     * @return
     */
    protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
        Map<String, Object> attributes = importingClassMetadata
                .getAnnotationAttributes(EnableNettyRPC.class.getCanonicalName());

        Set<String> basePackages = new HashSet<>();
        // 若是指定的包路徑爲空,則獲取啓動類當前路徑
        if (basePackages.isEmpty()) {
            basePackages.add(
                    ClassUtils.getPackageName(importingClassMetadata.getClassName()));
        }else{
            for (String pkg : (String[]) attributes.get("basePackages")) {
                if (StringUtils.hasText(pkg)) {
                    basePackages.add(pkg);
                }
            }
        }
        return basePackages;
    }
}
package com.hyf.rpc.netty.client.config;

import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.stereotype.Component;

import java.lang.reflect.Proxy;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Component
public class NettyClientFactoryBean implements FactoryBean<Object> {

    private Class<?> type;

    @Override
    public Object getObject() throws Exception {
        // 這裏的interfaces注意是就是type,由於咱們如今是給接口作代理,千萬別寫type.getInterfaces(),否則啓動會報錯
        return Proxy.newProxyInstance(type.getClassLoader(),new Class[]{type},new NettyRPCInvocationHandler(this.type));
    }

    @Override
    public Class<?> getObjectType() {
        return this.type;
    }
}

        三、在動態代理的invoke方法裏頭,咱們將啓動Netty的一個客戶端,帶上接口調用的信息,而後等待Netty服務端返回結果結果再返回到前端便可。

package com.hyf.rpc.netty.client.config;

import com.hyf.rpc.netty.client.NettyClient;
import com.hyf.rpc.netty.packet.RPCRequestPacket;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/15
 */
@NoArgsConstructor
@Component
public class NettyRPCInvocationHandler implements InvocationHandler {

    private Class<?> type;

    public NettyRPCInvocationHandler(Class<?> type){
        this.type = type;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RPCRequestPacket requestPacket = new RPCRequestPacket();
        requestPacket.setClazz(type);
        requestPacket.setMethodName(method.getName());
        requestPacket.setParamTypes(method.getParameterTypes());
        requestPacket.setParams(args);
        Object result = NettyClient.callRPC(requestPacket);
        return result;
    }
}

    有一個坑是:當客戶端接收到服務端的返回結果後,記得關閉通道[ctx.channel().close()],由於在客戶端中RPC調用後是同步等待Channel關閉的,否則不能響應給前端。

服務端:服務端的流程稍微會簡單不少         一、啓動Netty服務端服務,而後接收客戶端的連接請求,解析請求         二、而後根據接口調用信息,利用反射獲取到實現類和對應的方法,最後調用方法獲得結果,而後封裝一下結果就能夠相應給客戶端了。

package com.hyf.rpc.netty.server.handler;

import com.hyf.rpc.netty.packet.RPCRequestPacket;
import com.hyf.rpc.netty.packet.RPCResponsePacket;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.reflections.Reflections;

import java.lang.reflect.Method;
import java.util.Set;

/**
 * @author Howinfun
 * @desc
 * @date 2019/7/16
 */
@ChannelHandler.Sharable
public class RPCRequestPacketHandler extends SimpleChannelInboundHandler<RPCRequestPacket> {

    public static final RPCRequestPacketHandler INSTANCE = new RPCRequestPacketHandler();
    private RPCRequestPacketHandler(){}

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RPCRequestPacket msg) throws Exception {
        RPCResponsePacket responsePacket = new RPCResponsePacket();
        // 獲取rpc調用信息,利用反射執行方法,返回結果
        Class clazz = msg.getClazz();
        String methodName = msg.getMethodName();
        Object[] params = msg.getParams();
        Class[] paramTypes = msg.getParamTypes();
        // 掃面路徑下全部元數據
        Reflections reflections = new Reflections("com.hyf.rpc.serviceImpl");
        Set<Class> subTypes = reflections.getSubTypesOf(clazz);
        if (subTypes.isEmpty()){
            responsePacket.setSuccess(false);
            responsePacket.setMsg("沒有實現類");
        }else if (subTypes.size() > 1){
            responsePacket.setSuccess(false);
            responsePacket.setMsg("多個實現類,沒法判斷執行哪個");
        }else{
            Class subClass = subTypes.toArray(new Class[1])[0];
            Method method = subClass.getMethod(methodName,paramTypes);
            Object result = method.invoke(subClass.newInstance(),params);
            responsePacket.setSuccess(true);
            responsePacket.setResult(result);
        }
        ctx.channel().writeAndFlush(responsePacket);
    }
}

        三、這裏的反射我推薦一個很好用的框架->Reflections。簡單介紹一下我使用了哪些API,首先是根據路徑掃描反射元數據,     而後根據接口獲取它的全部實現類,而後就能夠獲取實現類的反射信息,獲得方法執行結果了。

若是同窗們對此比較簡陋的代碼還略感興趣,能夠到個人碼雲上看看:Netty-RPC

相關文章
相關標籤/搜索