使用Netty實現一下簡單RPC

前言

最近在作一個本身的小項目,這個小項目分爲客戶端,路由和服務端,服務端和客戶端之間經過Netty來通訊,而路由主要用來作負載均衡,用戶上線下線的操做,客戶端和路由之間原本是採用HTTP來通訊,後來一想既然都用到netty了,那就乾脆基於netty作一個RPC來實現客戶端和路由的通訊吧。java

RPC的概念

RPC,全稱是Remote Procedure Call,從字面意思上也容易理解,一般寫代碼,好比本地寫一個類A,裏面一個方法B,咱們要調用A類的B方法就很容易,這叫本地方法調用。而現在,如今不少服務都是在不一樣的服務器上,若是仍是這樣的場景,咱們要調用另一臺服務器上的A類的B方法,這時候就能夠用RPC,這使得咱們調用A類的B方法就如同調用本地方法同樣。git

RPC的實現

能夠先簡單羅列一下,實現上面的想法須要哪些東西github

  • 網絡通訊 netty

這必然是須要的,由於這裏服務在兩臺服務器上,這裏使用netty來做爲兩端之間的通訊框架。web

  • 解碼編碼 Jackson

爲了方便通訊,使用Json來做爲統一的編碼格式,使用spring默認的Jackson來實現json和對象之間的轉化。算法

  • 服務註冊與發現 Nacos

服務端是多實例部署,那天然須要一個註冊中心,客戶端選擇的時候能夠經過必定的負載均衡算法來選擇一個服務實例進行通訊。這裏使用阿里的nacos。spring

  • 心跳機制

客戶端和服務端創建鏈接以後每隔必定的時間上報心跳,這裏使用netty自帶的心跳機制來實現。數據庫

  • 反射和動態代理

客戶端是沒有調用的接口的具體實現的,因此在服務端須要從請求報文裏獲得請求的接口,參數,方法名等信息,經過反射來獲得具體的類和方法,再經過動態代理來實現具體的接口方法的調用。json

  • 項目腳手架 Springboot
  • ......

固然,這裏若是要作細緻還有不少東西須要考慮,包括熔斷,限流,緩存,這些以後再考慮,這裏先完成咱們最初的需求。接下來一個個把這些都實現。爲此花了一個簡單的圖來方便理解這裏面有啥須要實現的模塊。bootstrap

avatar

公共模塊

  • 實體類

這裏介紹兩個主要的實體,一個是請求類數組

public class RpcRequest {
    /**
     * 請求id,惟一,有雪花算法生成
     * */
    @Getter
    @Setter
    private Long id;

    /**
     * 請求類型:正常請求0,心跳:1
     * */
    @Getter
    @Setter
    private RequestTypeEnum type;

    /**
     * 類名稱
     * */
    @Getter
    @Setter
    private String className;

    /**
     * 指定運行的方法名稱
     * */
    @Getter
    @Setter
    private String methodName;

    /**
     * 參數類型
     * */
    @Getter
    @Setter
    private Class<?>[] paramTypes;

    /**
     * 參數值
     * */
    @Getter
    @Setter
    private Object[] params;

}

另外一個是回覆類

public class RpcResponse {

    /**
     * 請求的ID
     * */
    @Getter
    @Setter
    private Long reqId;

    /**
     * 返回碼
     * */
    @Getter
    @Setter
    private RespCodeEnum code;

    /**
     * 發生錯誤時的錯誤信息
     * */
    @Getter
    @Setter
    private String errorMsg;

    /**
     * 正常返回下返回信息
     * */
    @Getter
    @Setter
    private Object data;

}
  • 工具類

工具類包括對對象和Json字符串/字節數組進行相互轉化的JsonUtil,以及一個用來生成請求ID的算法,算法採用的是Twitter開源的雪花算法,代碼是從網上借鑑來的。Json是用的Jackson作的解析。

public class JsonUtil{

    private static final ObjectMapper mapper = new ObjectMapper();

    /**
     * 對象轉Json字符串
     * */
    public static String parseToJsonStr(Object object) {
        String result = null;
        try {
            result = mapper.writeValueAsString(object);
        }catch (JsonProcessingException e){
            e.printStackTrace();
        }
        return result;
    }

    /**
     * 字節數組轉Json字符串
     * */
    public static byte[] parseToJsonBytes(Object object) {
        byte[] result = null;
        try {
            result = mapper.writeValueAsBytes(object);
        }catch (JsonProcessingException e){
            e.printStackTrace();
        }
        return result;
    }

    /**
     * Json字符串轉爲對象
     * */
    public static <T> T parseToObject(String json,Class<T> clazz){
        T t = null;
        try {
            t = mapper.readValue(json,clazz);
        }catch (JsonProcessingException e){
            e.printStackTrace();
        }
        return t;
    }


    /**
     * 字節數組轉爲對象
     * */
    public static <T> T parseToObject(byte[] jsonBytes,Class<T> clazz){
        T t = null;
        try {
            t = mapper.readValue(jsonBytes,clazz);
        }catch (JsonProcessingException e){
            e.printStackTrace();
        }catch (IOException e){
            e.printStackTrace();
        }
        return t;
    }

    /**
     * 獲取ObjectMapper
     * */
    public static ObjectMapper getMapper() {
        return mapper;
    }
}

服務註冊和發現

Nacos是阿里的開源的一款中間件,既有服務註冊發現,也有配置中心的做用,在java工程裏這兩個依賴也是獨立的。使用方面,跟着官網文檔走就能夠了。

  • 服務註冊

先須要把Nacos的地址,服務名稱,namespace這些配置好。

spring:
  application:
    name: rpc_producer
server:
  port: 7001

rpc:
  heartbeat: 600

app:
  ip: 127.0.0.1
  port: 8001

nacos:
  discovery:
    server-addr: 127.0.0.1:8848
    namespace: 35375ce2-f421-431f-bd2e-89677440dc9f
    register:
      group-name: netty_rpc_provider

接下來就是註冊了,註冊一個實例須要服務名,服務所在的IP和端口。

@EnableDiscoveryClient
@SpringBootApplication
@Slf4j
public class RpcProducerApplication implements CommandLineRunner {

    @NacosInjected
    private NamingService namingService;

    @Value("${spring.application.name}")
    private String appName;

    @Value("${app.port}")
    private Integer serverPort;

    @Value("${app.ip}")
    private String serverIp;

    public static void main(String[] args) {
        SpringApplication.run(RpcProducerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        log.info(String.format("Register service name:%s,port:%s",appName,serverPort));
        namingService.registerInstance(appName,serverIp,serverPort);
    }
}
  • 服務發現

    服務發現相比於服務註冊多了一些步驟,由於服務端是多實例的,因此須要獲取當前服務的全部實例,而後採用必定的負載均衡算法來選擇其中一個實例。這裏本項目暫且使用輪詢算法來作負載均衡。

    • 獲取Nacos中的全部實例,而後將Channel的列表交給專門的鏈接管理器來維護。ConnectionManager後續在負載均衡的時候用到。
    @Component
    public class RpcServiceDiscovery {
    
        private static final Logger logger = LoggerFactory.getLogger(RpcServiceDiscovery.class);
    
        @NacosInjected
        private NamingService namingService;
    
        @Value("${rpc.provider}")
        private String providerRegisterName;
    
        @Autowired
        private ConnectionManager connectionManager;
    
        private volatile List<ServerAddress> serverList = new ArrayList<>();
    
        @PostConstruct
        public void init(){
            List<Instance> instanceList;
            try {
                instanceList = namingService.getAllInstances(providerRegisterName);
                for (Instance ins : instanceList){
                    ServerAddress addr = new ServerAddress();
                    addr.setIp(ins.getIp());
                    addr.setPort(ins.getPort());
                    serverList.add(addr);
                }
                updateConnection();
            } catch (NacosException e) {
                logger.info(String.format("Nacos 獲取所有實例異常 %s",e));
            }
        }
    
        private void updateConnection(){
            connectionManager.updateConnection(serverList);
        }
    }

    ServerAddress是自定義類,結構比較簡單,就是IP和端口。

    public class ServerAddress {
    
        @Getter
        @Setter
        private String ip;
    
        @Getter
        @Setter
        private Integer port;
        
    }
    • 由於要實現簡單的負載均衡,因此在客戶端發送請求的時候須要從全部的Channel中選出一個來進行通訊。下面的ConnectionManager的chooseOneAvailableChannnel就是這個做用。
@Component
public class ConnectionManager {

    private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class);

    private AtomicInteger channelIndex = new AtomicInteger(0);

    private CopyOnWriteArrayList<Channel> channelList = new CopyOnWriteArrayList<>();

    private Map<SocketChannel,Channel> channelMap = new ConcurrentHashMap<>();

    @Autowired
    private NettyClient client;

    /**
     * 選擇一個可用Channel
     * */
    public Channel chooseOneVariableChannel(){
        if(channelList.size()>0){
            int size = channelList.size();
            //輪詢算法
            int index = (channelIndex.getAndAdd(1)+size)%size;
            return channelList.get(index);
        }else{
            return null;
        }
    }

    /**
     * 更新鏈接列表
     * */
    public synchronized void updateConnection(List<ServerAddress> serverList)
    {
        if (serverList==null||serverList.size()==0){
            logger.info("沒有可用的服務");
            for (Channel ch : channelList){
                SocketAddress remoteServerAddr = ch.remoteAddress();
                Channel channel = channelMap.get(remoteServerAddr);
                channel.close();
            }
            channelMap.clear();
            channelList.clear();
            return;
        }
        //去重
        HashSet<SocketAddress> serverNodeList = new HashSet<>();
        for(ServerAddress sa : serverList){
            serverNodeList.add(new InetSocketAddress(sa.getIp(),sa.getPort()));
        }

        for (SocketAddress addr : serverNodeList){
            Channel channel = channelMap.get(addr);
            if(channel!=null&&channel.isOpen()){
                logger.info("服務{}已經存在,不須要從新鏈接",addr);
            }
            //Channel沒打開的狀況下,從新鏈接
            connectToServer(addr);
        }
        //移除無效節點
        for (Channel ch : channelList){
            SocketAddress addr = ch.remoteAddress();
            if(!serverNodeList.contains(addr)){
                logger.info("服務{}無效,自動移除",addr);
                Channel channel = channelMap.get(addr);
                if (channel!=null){
                    channel.close();
                }
                channelList.remove(channel);
                channelMap.remove(addr);
            }
        }
    }
    /**
     * 鏈接到服務器
     * */
    private void connectToServer(SocketAddress addr){
        try {
            Channel channel = client.connect(addr);
            channelList.add(channel);
            logger.info("成功鏈接到服務器{}",addr);
        }catch (Exception e){
            logger.info("未能鏈接到服務器{}",addr);
        }
    }
    /**
     * 移除鏈接
     * */
    public void removeConnection(Channel channel){
        logger.info("Channel:{}已經被移除",channel.remoteAddress());
        SocketAddress address = channel.remoteAddress();
        channelList.remove(channel);
        channelMap.remove(address);
    }
}

解碼編碼

這裏採用的自定義的消息格式是json類型,客戶端發送一個請求到服務端,也就是將RpcRequest對象轉爲json對象,而後經netty到達客戶端,服務端作出反應以後,將RpcResponse對象以一樣的方式發送到客戶端。這裏用的是Springboot自帶的Jackson。但這裏值得注意的是,這兩個端的編碼解碼器並非相同的實現。

  • 客戶端解碼和編碼
//解碼
public class JsonDecoder extends LengthFieldBasedFrameDecoder {

    private static final Logger logger = LoggerFactory.getLogger(JsonDecoder.class);

    public JsonDecoder(){
        super(65535,0,4,0,4);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf decode = (ByteBuf) super.decode(ctx, in);
        if (decode==null){
            return null;
        }
        int data_len = decode.readableBytes();
        byte[] bytes = new byte[data_len];
        decode.readBytes(bytes);
        logger.info("JsonDecoder 解碼:{}",new String(bytes));
        Object ret = JsonUtil.parseToObject(bytes,RpcResponse.class);
        return ret;
    }
}
//編碼
public class JsonEncoder extends MessageToMessageEncoder {

    private static final Logger logger = LoggerFactory.getLogger(JsonEncoder.class);
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
        ByteBuf buf = ByteBufAllocator.DEFAULT.ioBuffer();
        byte[] bytes = JsonUtil.parseToJsonBytes(o);
        logger.info("JsonEncoder 編碼:{}",new String(bytes));
        buf.writeInt(bytes.length);
        buf.writeBytes(bytes);
        list.add(buf);
    }
}
  • 服務端解碼和編碼
//解碼
public class JsonDecoder extends LengthFieldBasedFrameDecoder {

    private static final Logger logger = LoggerFactory.getLogger(JsonDecoder.class);

    public JsonDecoder(){
        super(65535,0,4,0,4);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf decode = (ByteBuf) super.decode(ctx, in);
        if (decode==null){
            return null;
        }
        int data_len = decode.readableBytes();
        byte[] bytes = new byte[data_len];
        decode.readBytes(bytes);
        Object ret = JsonUtil.parseToObject(bytes,RpcRequest.class);
        logger.info("JsonDecoder 解碼 : {}",new String(bytes));
        return ret;
    }
}
//編碼
public class JsonEncoder extends MessageToMessageEncoder {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
        ByteBuf buf = ByteBufAllocator.DEFAULT.ioBuffer();
        byte[] bytes = JsonUtil.parseToJsonBytes(o);
        buf.writeInt(bytes.length);
        buf.writeBytes(bytes);
        list.add(buf);
    }
}

心跳機制

服務端須要對客戶端進行心跳檢測,約定好必定的心跳上報時間,若是客戶端在這個時間內沒有上報心跳,那麼服務端將與此客戶端之間的Channel關閉。這裏由於是用的netty作的通訊,因此心跳檢測起來並不麻煩。只要在ChannelPipeline裏面加入一個IdleStateHandler就好了。

  • 添加IdleStateHandler進行心跳配置
@Component
@Slf4j
public class NettyClient {

    private EventLoopGroup eventExecutors = new NioEventLoopGroup();

    private Bootstrap bootstrap = new Bootstrap();

    @Value("${rpc.heartbeat}")
    private int heartBeatTime;

    @Autowired
    private NettyClientHandler handler;

    @Autowired
    private ConnectionManager manager;

    public NettyClient(){
        bootstrap.group(eventExecutors)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY,true)
                .option(ChannelOption.SO_KEEPALIVE,true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new IdleStateHandler(0,0,heartBeatTime))
                                .addLast(new JsonEncoder())
                                .addLast(new JsonDecoder())
                                .addLast("handler",handler);
                    }
                });

    }
	.....
}

同時還有一個須要注意的地方就是實際上這個項目中的請求,也就是RpcRequest是分爲兩種類別的,一種是心跳,一種是Rpc請求,因此上報心跳的時候須要設置RpcRequest對象的種類。本項目中使用一個枚舉類來對這兩種請求進行分類。

public enum RequestTypeEnum {
    /**
     * 正常請求
     * */
    NORMAL,
    /**
     * 心跳請求
     * */
    HEART_BEAT
}
  • 上報心跳

    Netty中鏈接關閉,開啓,讀寫這些都是基於事件驅動的,因此這裏上報心跳須要實現ChannelInboundHandlerAdapter,至於這裏頭的netty的出站入站細節就很少說了,心跳的上報只須要重寫userEventTriggered就行。

/**
     * 心跳上報
     * */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx,Object event){

        try {
            if (event instanceof IdleStateEvent){
                IdleStateEvent evt = (IdleStateEvent)event;
                if (evt.state()==IdleState.ALL_IDLE){
                    RpcRequest request = new RpcRequest();
                    request.setId(IDUtil.getRpcRequestId());
                    request.setType(RequestTypeEnum.HEART_BEAT);
                    request.setMethodName("heartBeat");
                    ctx.channel().writeAndFlush(request);
                    logger.info("客戶端{} s發送心跳",heartBeatTime);
                }
            }else{
                super.userEventTriggered(ctx,event);
            }
        }catch (Exception e){
            logger.error("心跳異常 %s ",e);
        }
    }
  • 接收心跳
/**
     * 檢查心跳
     * */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx,Object event) throws Exception{
        if(event instanceof IdleStateEvent){
            IdleStateEvent idleStateEvent = (IdleStateEvent)event;
            if(idleStateEvent.state()==IdleState.ALL_IDLE){
                logger.info("客戶端{}心跳未上報,鏈接關閉",ctx.channel().remoteAddress());
                ctx.channel().close();
            }
        }else{
            super.userEventTriggered(ctx,event);
        }
    }

反射,動態代理

爲啥要用到反射呢,由於服務端從客戶端發來的請求中能夠獲得請求的接口,方法,方法的參數類型和參數值,那麼問題來了,這些東西知道了,怎麼才能夠調用呢,其實用笨辦法,窮舉不就完事了,寫上若干個case,可是這樣終究是不優雅的,並且一旦項目大了,接口多了,窮舉天然就不行了。那麼這個時候反射就派上用場了。服務端在接收到請求的時候,咱們能夠經過反射拿到請求對應的服務端接口的類,方法,這樣咱們就能夠利用反射拿到一個Method對象,經過Method對象的invoke方法就能夠實現對這個接口的這個方法的調用呢。

/**
     * 處理請求
     * */
    private Object handleRequest(RpcRequest req) throws Exception{
        String className = req.getClassName();
        Object serverInstance = serviceDictionary.get(className);

        if(serverInstance!=null){
            Class<?> serviceClass = serverInstance.getClass();
            String methodName = req.getMethodName();
            Class<?>[] paramTypes = req.getParamTypes();
            Object[] params = req.getParams();
            //獲取方法
            Method method = serviceClass.getMethod(methodName,paramTypes);
            method.setAccessible(true);
            return method.invoke(serverInstance,getParamValues(paramTypes,params));
        }else{
            logger.info("沒有找到實例:{},方法名:{}",className,req.getMethodName());
            throw new Exception("沒有找到合適的RPC實例");
        }
    }
        /**
     * 獲取方法參數的值
     * */
    public Object[] getParamValues(Class<?>[] paramTypes,Object[] params){
        if(params==null||paramTypes==null){
            return params;
        }
        Object[] retValues = new Object[params.length];

        for(int i = 0;i < params.length;i++){
            retValues[i] = JsonUtil.parseToObject(String.valueOf(params[i]),paramTypes[i]);
        }
        return retValues;
    }

那客戶端爲啥又要用動態代理呢,首先須要明確一點,客戶端沒有咱們請求的接口的實現,也就是說,咱們客戶端只申明瞭一個接口罷了,真正的實如今客戶端,好比這裏我有一個UserService的接口,裏面有一個getUserById的方法。

public interface UserService {
    User getUserById(Integer userId);
}

這裏咱們用通常寫web的方式來進行調用,也就是須要在Controller層來調用。好比:

@Controller
public class RpcController {

    private static final Logger logger = LoggerFactory.getLogger(RpcController.class);

    @Autowired
    private UserService userService;

    @RequestMapping("/getUserById")
    @ResponseBody
    public String getUserById(@RequestParam("id")Integer id){
        User user = userService.getUserById(id);
        String jsonStr = JsonUtil.parseToJsonStr(user);
        logger.info("getUserById {}",jsonStr);
        return jsonStr;
    }
}

咱們知道,若是這個UserService沒有實現類,這個地方確定是跑不起來的,可是這裏咱們又必須這樣子寫啊,由於須要作到RPC中的「就像調用本地方法同樣」啊,那怎麼辦呢,不要忘了這裏有Spring,Spring裏面不止只有@Autowired這種來初始化一個bean,否則人家MyBatis爲啥只要定義一個接口加一個@Mapper的註解,也不用寫代碼就能實現對數據庫增刪改查呢,咱們這裏就借鑑Mybatis這種思想來對客戶端的接口進行實例化,但注意不是進行實現,實現怎麼搞呢,固然就是用動態代理啦,在動態代理內部發送咱們的請求,而後拿到返回。不就是至關於,咱們在客戶端,像調用本地方法同樣,調用了服務端的方法麼。廢話很少說,看代碼吧。

  • 包掃描

Spring要去初始化bean,須要掃描接口路徑下的接口,對某一個路徑下的類進行掃描,Spring也提供了一些接口。

public class RpcScanner extends ClassPathBeanDefinitionScanner {

    private RpcFactoryBean<?> rpcFactoryBean = new RpcFactoryBean<>();

    private static final Logger logger = LoggerFactory.getLogger(RpcScanner.class);
    /**
     * 註解類
     * */
    @Setter
    private Class<? extends Annotation> annotationClass;

    public RpcScanner(BeanDefinitionRegistry registry) {
        super(registry);
    }

    @Override
    public Set<BeanDefinitionHolder> doScan(String... packages){
        Set<BeanDefinitionHolder> beanDefineHolders = super.doScan(packages);

        if(beanDefineHolders.isEmpty()){
            logger.warn("No proper Rpc mapper found in such paths : {}",Arrays.asList(packages));
        }else{
            postProcessBeanDefinitions(beanDefineHolders);
        }
        return beanDefineHolders;
    }

    /**
     * 註冊過濾器
     * */
    public void registerFilters(){
        boolean acceptAllInterfaces = true;
        //若是事先設置了註解類,那麼就只對這個註解類不設置過濾
        if(this.annotationClass!=null){
            addIncludeFilter(new AnnotationTypeFilter(this.annotationClass));
            acceptAllInterfaces = false;
        }
        //沒有設置註解類,那麼默認將basePackage下的類都進行掃描
        if(acceptAllInterfaces){
            addIncludeFilter(new TypeFilter() {
                @Override
                public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
                    return true;
                }
            });
        }
        addExcludeFilter(new TypeFilter() {
            @Override
            public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
                String className = metadataReader.getClassMetadata().getClassName();
                return className.endsWith("package-info");
            }
        });
    }

    /**
     * 配置自定義BeanDefinition的屬性
     */
    private void postProcessBeanDefinitions(Set<BeanDefinitionHolder> holders){
        GenericBeanDefinition definition;
        for (BeanDefinitionHolder holder : holders){
            definition = (GenericBeanDefinition)holder.getBeanDefinition();
            //添加FactoryBean帶參構造函數的參數值
            definition.getConstructorArgumentValues().addGenericArgumentValue(definition.getBeanClassName());
            definition.setBeanClass(this.rpcFactoryBean.getClass());
            //設置注入模式
            definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
            logger.info("BeanDefinitionHolder:{}",holder);
        }

    }

    @Override
    protected boolean isCandidateComponent(AnnotatedBeanDefinition definition){
        return definition.getMetadata().isInterface()&&definition.getMetadata().isIndependent();
    }
}

這裏的doScan的方法,就是在掃描某個路徑的時候調用的,在掃描以後,須要對BeanDefinition進行一下設置,設置構造方法傳入的參數,這裏須要在對應的FactoryBean裏添加對應的構造方法,設置BeanClass和注入模式,這裏要講還能夠講一大堆,建議看不懂的能夠了解一下FactoryBean的原理和用法。

public class RpcFactoryBean<T> implements FactoryBean<T> {

    private Class<T> rpcInterface;

    private static final Logger logger = LoggerFactory.getLogger(RpcFactoryBean.class);

    @Autowired
    private RpcFactory<T> factory;

    public RpcFactoryBean(){
    }

    public RpcFactoryBean(Class<T> rpcInterface){
        this.rpcInterface = rpcInterface;
    }

    /**
     * 返回對象實例
     * */
    @Override
    public T getObject() throws Exception {
        return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(),new Class[]{rpcInterface},factory);
    }

    /**
     * Bean的類型
     * */
    @Override
    public Class<?> getObjectType() {
        return this.rpcInterface;
    }

    /**
     * 是不是單例的
     * */
    @Override
    public boolean isSingleton(){
        return true;
    }
 }

最後就是客戶端調用服務端的本質所在了,瞭解過動態代理的人確定知道InvocationHandler這個接口。

@Component
public class RpcFactory<T> implements InvocationHandler {

    @Autowired
    private NettyClient client;

    private static final Logger logger = LoggerFactory.getLogger(RpcFactory.class);
    /**
     * 發送請求的地方
     * */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request = new RpcRequest();
        request.setType(RequestTypeEnum.NORMAL);
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParams(args);
        request.setParamTypes(method.getParameterTypes());
        request.setId(IDUtil.getRpcRequestId());

        //向服務端發送請求
        Object result = client.send(request);

        Class<?> returnType = method.getReturnType();

        ObjectMapper mapper = new ObjectMapper();

        RpcResponse response = mapper.readValue(String.valueOf(result),RpcResponse.class);

        if(response.getCode()==RespCodeEnum.ERROR){
            throw new Exception(response.getErrorMsg());
        }
        String respData = mapper.writeValueAsString(response.getData());
        if(returnType.isPrimitive()||String.class.isAssignableFrom(returnType)){
            return respData;
        }else if(Collection.class.isAssignableFrom(returnType)){
            CollectionType collectionType = mapper.getTypeFactory().constructCollectionType(Collection.class,Object.class);
            return mapper.readValue(respData,collectionType);
        }else if (Map.class.isAssignableFrom(returnType)){
            MapType mapType = mapper.getTypeFactory().constructMapType(Map.class,Object.class,Object.class);
            return mapper.readValue(respData,mapType);
        }else{
            Object data = response.getData();
            return mapper.readValue(respData,returnType);
        }
    }
}

這裏應該要對Response的返回值作不一樣類型的校驗和處理,篇幅有限,我以後再加上吧。另外上面還有一個問題,就是RpcFactoryBean這個類中的@Autowired註解,究竟有沒有做用,這段代碼我是參考了網上的,具體請看:

RPC基本原理以及如何用Netty來實現RPC

後來我通過一番查閱和本身debug,這個是起到了做用的,雖然這個類裏沒有@Component的註解,而且初始化是new出來的,不會交給spring來管理,那麼這個類是從哪裏初始化的呢,答案其實仍是加載順序的問題,RpcFactory這個類因爲加了@Component註解,因此這個bean在Spring容器中是存在的,而而且是單例的,RpcFactoryBean是在何時加載的呢,是在Controller層中的對UserService進行注入的時候,這個時候RpcFactory已經有實例了,而且因爲是單例,那天然用@Autowired獲取到的是同一個實例。

試驗一下看效果,仍是以Web請求來觸發RPC請求,請求比較簡單,就是根據一個用戶id獲取用戶信息。(http://localhost:6001/getUserById?id=1)

avatar

RPC客戶端的請求日誌

avatar

RPC服務端的日誌

avatar

總結

雖然這個RPC功能很簡陋,可是實現花了我挺多時間,主要仍是在查資料上,由於Spring涉及到源碼層面和FactoryBean這些沒了解過。可是還好磕磕絆絆也實現了出來。但說到底仍是有不少缺陷,好比負載均衡算法還能夠擴展幾個,好比限流操做,熔斷操做,還有功能上,這裏我默認服務端和客戶端兩個的包名是在同樣的,可是現實中是可能不同的,這樣就會出現找不到實現類的出錯的狀況,不過這也讓我有動力去重寫這個demo了,最後我把個人代碼實現傳到了個人GitHub,工做之餘我會盡力去完善的。

相關文章
相關標籤/搜索