使用Netty構建Rpc中間件(一)

Rpc中間件是目前互聯網企業用的最多的中間件,是實現分佈式系統最基礎的中間件系統。在國內,用的最多的就是Dubbo以及Thrift。在國外,包括grpc,以及Finagle。Rpc的原理大同小異,都是利用TCP/IP協議將要本地要調用的類,方法,參數按照某種協議傳輸到遠程主機,遠程主機執行完畢之後再返回到本地主機。
固然其真正的實現很是複雜,涉及到IO,網絡,多線程以及整個框架的架構設計。那麼接下來的幾篇文章(包括這篇文章)就來實現一個簡單的基本的RPC框架,NIO框架使用的是Netty,緣由你懂得。javascript

定義一個簡單的服務

假設有一個叫作IDemoService的簡單服務。這個服務放在了contract包中間:java

public interface IDemoService {
    public int sum(int a,int b);

}複製代碼

本地機器只有接口,實現是在遠端實現的。
那麼在本地調用的時候確定是經過代理走網絡發送到遠程主機。若是使用靜態代理,那麼每一個接口都必須實現一個代理類,因此通常來講沒有哪一個RPC框架使用靜態代理,都是使用動態代理:git

public class JDKDynamicService<T> implements InvocationHandler {

    private Class<T> clazz;

    private RpcClient client = new RpcClient("127.0.0.1", 6666);

    public void setClass(Class<T> clazz) {
        this.clazz = clazz;

    }

    @SuppressWarnings("unchecked")
    public T get() {

        return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class<?>[] { this.clazz }, this);

    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        return client.sendCommand(clazz, method, args);

    }

}複製代碼

使用動態代理,調用被代理類的每個方法都會調用invoke方法。在invoke方法內部,調用RpcClient來傳輸協議和返回結果。github

構建一個簡單的傳輸協議

本地主機要調用遠程接口,確定要告訴遠程主機調用哪一個類的哪一個接口,參數是什麼。這裏就簡單的定一個傳輸的協議類:spring

public class MethodInvoker implements Serializable {
    private static final long serialVersionUID = 6644047311439601478L;
    private Class clazz;
    private String method;
    private Object[] args;
    public MethodInvoker(String method, Object[] args, Class clazz) {
        super();
        this.method = method;
        this.args = args;
        this.clazz = clazz;
    }

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }

    public Object[] getArgs() {
        return args;
    }

    public void setArgs(Object[] args) {
        this.args = args;
    }

    public Class getClazz() {
        return clazz;
    }
    public void setClazz(Class clazz) {
        this.clazz = clazz;
    }

}複製代碼

這個類就不具體分析了,緣由你懂得,繼承Serializable接口是爲了使用JAVA自帶的序列化協議。bootstrap

使用RPCClient實際發送數據

public class RpcClient {
    private String host;

    private int port;

    public RpcClient(String host, int port) {
        super();
        this.host = host;
        this.port = port;
    }

    public Object sendCommand(Class clazz, Method method, Object[] args) {

        MethodInvoker invoker = new MethodInvoker(method.getName(), args, clazz);
        final ClientHandler clientHandler = new ClientHandler(invoker);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ObjectDecoder(1024 * 1024,
                            ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                    ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(clientHandler);

                }
            });

            // Start the client.
            ChannelFuture f = b.connect(new InetSocketAddress(host, port)).sync();
            // Wait until the connection is closed. 當一個任務完成的時候會繼續執行。
        f.channel().closeFuture().sync();
            return clientHandler.getResponse();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
        return null;

    }

}複製代碼

ClientHandler 用來向服務端發送數據:

public class ClientHandler extends ChannelInboundHandlerAdapter {
    private Object response;
    private MethodInvoker methodInvoker;

    public ClientHandler(MethodInvoker methodInvoker) {

        this.methodInvoker = methodInvoker;

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        ctx.writeAndFlush(this.methodInvoker); // 發送到下一個Handler。input處理完,就輸出,而後output。
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("channelRead:"+msg);
        response = msg;
        ctx.close();
    }

    public Object getResponse() {
        return response;
    }

}複製代碼

值得注意的是,被傳輸對象的編碼和解碼使用了Netty自帶的編碼與解碼器。此外,必定要調用ctx.close()方法來關閉這個連接。網絡

server端業務的實現

假設server端實現了IDemoService,而且使用Spring來管理bean對象:多線程

public class DemoServiceImpl implements IDemoService {

    public int sum(int a, int b) {
        return a+b;
    }

}複製代碼
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="demoService" class="com.slowlizard.rpc.server.business.DemoServiceImpl"></bean> </beans>複製代碼

server端接受傳過來的數據

public class ServerHandler extends ChannelInboundHandlerAdapter {
    private static ApplicationContext springApplicationContext;
    static {
        springApplicationContext = new ClassPathXmlApplicationContext("context.xml");
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server come here:channelRead");
        MethodInvoker methodInvoker = (MethodInvoker) msg;
         Object service = springApplicationContext.getBean(methodInvoker.getClazz());
        Method[] methods = service.getClass().getDeclaredMethods();
        for (Method method : methods) {
            if (method.getName().equals(methodInvoker.getMethod())) {
                Object result = method.invoke(service, methodInvoker.getArgs());
                ctx.writeAndFlush(result);
            }

        }

    }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("server come here:channelActive");
}
}複製代碼

啓動server

public class Server {
    private static int port = 6666;
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel arg0) throws Exception {
                            arg0.pipeline().addLast(new ObjectDecoder(1024*1024,
                                    ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())) );
                            arg0.pipeline().addLast(new ObjectEncoder());
                            arg0.pipeline().addLast(new ServerHandler());
                        }
                    }

                    ).option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true); // 保持長鏈接狀態
            // 綁定端口,開始接收進來的鏈接
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();// 子線程開始監聽
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}複製代碼

測試

public class App {
    public static void main(String[] args) {

        JDKDynamicService<IDemoService> proxy = new JDKDynamicService<IDemoService>();
        proxy.setClass(IDemoService.class);
        IDemoService service = proxy.get();
        System.out.println("result" + service.sum(1, 2));
    }
}複製代碼

很快,咱們就是實現了一個簡單的RPC遠程調用。可是它只是一個原理性的示範,離真正的RPC框架還很是遠。架構

首先,它的性能怎麼樣?框架

RpcClient每次發送協議到服務端,都會創建一個新的鏈接,可否優化它?
Server端能更快的查找到Bean並更快執行嗎?

客戶端可否與Spring集成?

Server端 Netty傳輸可否與Spring分離?

在下一章,咱們將着手優化這個「框架」,來解決目前遇到的問題。

差點忘了附上Github地址:
github.com/slowlizard/…

相關文章
相關標籤/搜索