用netty實現一個rpc

        之前在作項目的時候接觸到了rpc,感受頗有意思,可是那個框架用了rabbitmq來進行channel的管理。正好前幾天看到了netty,一個高效的JAVA NIO框架,因此萌生了想本身寫一個rpc框架的想法。java

RPC原理簡介

        RPC(Remote Procedure Call)指遠程過程調用。它的做用大概能夠這麼描述一下:B 程序員 程序想要調用 A 程序的某個函數,可是因爲 A 與 B 是兩個獨立的項目,B 不可能直接調用 A 中的任何一個類裏的任何一個函數。這時 RPC 就能起到它的做用了。         爲了完成 B 程序的需求, A 程序 對 B 程序進行規定,若是 B 想要調用 A 的方法,須要給 A 一個規定的數據格式,而後 A 在本地執行完 B 所想要使用的函數後將結果 封裝成一個規定好的數據格式後發送給 B。這樣 B 就達到了不拷貝 A 的代碼的狀況下完成其所須要的業務功能。 程序員

rpc調用流程
rpc調用流程

RPC具體實現

通訊框架的選擇

        一個rpc底層應該支持io/nio,這種實現方法大體有兩種,一是經過代碼徹底有本身實現,可是這種方法對技術要求比較高,並且容易出現隱藏的bug,另外一種就是利用現有的開源框架,Netty 是個不錯的選擇,它是一個利用 Java 的高級網絡的能力,隱藏其背後的複雜性而提供一個易於使用的 API 的客戶端/服務器框架。它能大大的簡化咱們的開發流程,使得代碼更加牢靠。在此次的RPC中,咱們使用 Netty 來做爲鏈接客戶端與服務端的橋樑。redis

數據的序列化與反序列化

        一個好的rpc不該該受到語言的限制,因此client端到server端的數據交換格式應該有一個良好的定義,好比json、xml。如今這方面成熟的框架有不少好比Thrift、Protobuf等等。咱們不用本身去定義以及實現一個交換格式,這些成熟的框架都是久經考驗的。在本例中因爲是抱着學習的目的,本人採用java自帶的序列化與反序列化方法。json

服務的註冊與發現

        client的端想要調用服務端的某個方法,須要得知這個方法的某些信息,而如今問題就來了,得知這個信息的時候是由寫 A 程序的人去直接告訴 B 程序的人,仍是由 B 程序主動去發現 A 的服務。很明顯,第一種方法很不牢靠,如果採用這種方法的話,A服務的每次改動都要通知到 B ,B也要每次根據 A服務的改變,而重寫本身的代碼。相比之下,第二種方法更顯得可行。         其實實現服務的註冊與發現的方法也有不少,好比zookeeper,redis等等。大體原理就是,A 服務將本身暴露出的方法信息存在zookeeper或者redis上,每次更改由A主動通知或由 B 去zookeeper或redis上自動去拉取最新的信息。對於zookeeper來講存儲方法信息的是一個個固定的節點,對於reids來講就是一個key值。用zookeeper還解決了在分佈式的部署方案下,某個服務down機的問題。由於zookeeper與生俱來的容災能力(好比leader選舉),能夠確保服務註冊表的高可用性。在本例中,我並未實現服務的註冊於發現。服務器

client與server

        client端與server端有各自須要處理的發送格式與接受格式。對於client端來講須要封裝好本身所要請求的方法信息發送給server端,並等待server端返回的結果。server端則是接收client的請求數據,處理完成後返回給client端結果數據。         其實RPC在調用的時候應該讓調用者像調用本地服務通常的去完成業務邏輯。這種實如今java中就應該用代理來實現。網絡

關鍵代碼

數據交換格式定義

client請求格式 利用java自帶的序列化方法要繼承Serializable方法而且要實現無參構造方法。app

package com.example.nettyrpcfirst.netty.entity;

import java.io.Serializable;

/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class Request implements Serializable {
    private static final long serialVersionUID  = -1L;
    private String clientId;
    private String className;
    private String methodName;
    private Class[] paramterTypes;
    private Object[] parameters;


    public Request(String clientId, String className, String methodName, Class[] paramterTypes, Object[] parameters) {
        this.clientId = clientId;
        this.className = className;
        this.methodName = methodName;
        this.paramterTypes = paramterTypes;
        this.parameters = parameters;
    }

    public Request() {
    }
//getter and setter
}

複製代碼

server 響應數據格式 具體要求與client端相同框架

package com.example.nettyrpcfirst.netty.entity;

import java.io.Serializable;

/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class Response implements Serializable {
    private static final long serialVersionUID = -1L;
    private String clientId;
    private Throwable err;
    private Object result;


    public Response() {
    }
// getter and setter
}

複製代碼

注意 clientId字段的設置是爲了保證返回的數據是本身想要的。dom

server 實現

netty不熟悉的能夠去官網寫寫幾個例子socket

package com.example.nettyrpcfirst.netty.server;



/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class ServerHandler extends SimpleChannelInboundHandler {
    Logger logger = LoggerFactory.getLogger(ServerHandler.class);

    private final Map<String, Object> services;

    public ServerHandler(Map<String, Object> services) {
        this.services = services;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        logger.info("{} has created a channel",ctx.channel().remoteAddress());
    }


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
       Runnable r = () ->{
           Request request = (Request) o;
           Response response = new Response();
           response.setClientId(request.getClientId());
           try {
               Object service = services.get(request.getClassName());
               FastClass serviceFastClass = FastClass.create(service.getClass());
               FastMethod serviceFastMethod = serviceFastClass.getMethod(request.getMethodName(), request.getParamterTypes());
               Object result = serviceFastMethod.invoke(service, request.getParameters());
               response.setResult(result);
           }catch (Exception e){
               response.setErr(e);
           }
           channelHandlerContext.writeAndFlush(response).addListener(new ChannelFutureListener() {
               @Override
               public void operationComplete(ChannelFuture channelFuture) throws Exception {
                   logger.info("send response for request: "+request.getClientId());
               }
           });
       };
       Server.submit(r);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        logger.error("rpc server err occur" + cause.getMessage()+" | "+ctx.channel().remoteAddress());
        ctx.close();
    }
}

複製代碼

最主要的就是channelRead0方法,這裏定義了在接收到客戶端的數據後如何去調用本地方法,具體是用cglib代理完成。 server具體代碼

package com.example.nettyrpcfirst.netty.server;



/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
@Configuration
public class Server implements BeanNameAware, BeanFactoryAware, ApplicationContextAware,InitializingBean {
    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Server.class);

    private Map<String,Object> services = new ConcurrentHashMap<>();

    private static ExecutorService threadPoolExecutor;

    public Server(){
    }

    /** * 啓動netty server * @throws Exception */
    @Override
    public void afterPropertiesSet() throws Exception {
        logger.info("afterPropertiesSet");
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new ObjectDecoder(1024,ClassResolvers.cacheDisabled(this.getClass().getClassLoader())))
                                    .addLast(new ObjectEncoder())
                                    .addLast(new ServerHandler(services));
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture future = b.bind(8080).sync();
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
            logger.error("an error occur ----------->"+e.getMessage());
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /** * 經過掃描全部帶有@RPCServer註解的類進行註冊 * @param applicationContext * @throws BeansException */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            logger.info("setApplicationContext");
            Map<String,Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(RPCServer.class);
            if(!serviceBeanMap.isEmpty()){
                for (Object service :serviceBeanMap.values()){
                    String interfaceName = service.getClass().getAnnotation(RPCServer.class).value().getName();
                    logger.info("RPCService: {}" , interfaceName);
                    this.services.put(interfaceName,service);
                }
            }
    }
    public static void submit(Runnable task){
        if(threadPoolExecutor == null){
            synchronized (RPCServer.class){
                if(threadPoolExecutor == null){
                    threadPoolExecutor = Executors.newFixedThreadPool(16);
                }
            }
        }
        threadPoolExecutor.submit(task);
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        logger.info("setBeanFactory()");
    }

    @Override
    public void setBeanName(String s) {
        logger.info("setBeanName() {}", s);
    }
}

複製代碼

RPCServer 自定義註解

package com.example.nettyrpcfirst.netty.annoations;


/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface RPCServer {
    Class<?> value();
}

複製代碼

client實現

client代理類

package com.example.nettyrpcfirst.netty.client;


/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class ClientProxy {
    @SuppressWarnings("unchecked")
    public <T> T create(Class<?> interfaceClass){

        return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Request request = new Request();
                        request.setClientId(UUID.randomUUID().toString());
                        request.setClassName(method.getDeclaringClass().getName());
                        request.setMethodName(method.getName());
                        request.setParamterTypes(method.getParameterTypes());
                        request.setParameters(args);
                        Client client = new Client("127.0.0.1",8080);
                        Response response = client.send(request);
                        if(response.getErr()!=null){
                            throw response.getErr();
                        }else{
                            return response.getResult();
                        }
                    }
                });
    }
}

複製代碼

client 與server 鏈接發送數據並等待數據返回

package com.example.nettyrpcfirst.netty.client;



/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class Client extends SimpleChannelInboundHandler<Response> {
    private static org.slf4j.Logger logger = LoggerFactory.getLogger(Client.class);

    private Response response;
    private final static Object obj = new Object();
    private String host;
    private int port;
    ChannelFuture future;
    public Client(String host,int port){
        this.host = host;
        this.port = port;
    }

    /** * 接收到消息後喚醒線程 * @param channelHandlerContext * @param response * @throws Exception */

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
        this.response = response;
        synchronized (obj){
            obj.notifyAll();
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("client caught exception", cause);
        ctx.close();
    }

    /** * 鏈接server端channel,發送完數據後鎖定線程,等待數據返回 * @param request * @return * @throws Exception */
    public Response send(Request request) throws Exception{
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new ObjectDecoder(1024,ClassResolvers.cacheDisabled(this.getClass().getClassLoader())))
                                    .addLast(new ObjectEncoder())
                                    .addLast(Client.this);
                        }
                    })
                    .option(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture future = b.connect(host,port).sync();
            future.channel().writeAndFlush(request).sync();
            System.out.println("2 "+Thread.currentThread().getName());
            synchronized (obj){
                System.out.println("1111111111111111");
                obj.wait();
            }
            if(response != null){
                System.out.println("3333333333333");
            }
            return response;
        }finally {
            if(future!=null){
                future.channel().closeFuture().sync();
            }
            eventLoopGroup.shutdownGracefully();
        }
    }
}

複製代碼

測試

package com.example.nettyrpcfirst.netty.client;

import com.example.nettyrpcfirst.netty.entity.TestService;

/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class TestMain {
    public static void main(String[] args){
        TestService testService = new ClientProxy().create(TestService.class);
        String result = testService.play();
        System.out.println("收到消息 ------------> "+result);
    }
}

複製代碼

不出意外的話 控制檯會成功打印

相關文章
相關標籤/搜索