Dubbo 淺讀

最近公司一直在用阿里的開源框架Dubbo,正好上一篇文章也是講到了RPC的概念,Dubbo聽過的兄弟都知道在業界好評仍是很高的,不光是設計優雅,文檔也很齊全,此次就簡單的分享下LZ的解讀成果,固然本文章只是淺層次的,着重分析的是Dubbo核心層如何去高效的執行調用遠程RPC服務的。html

這裏要簡單跟兄弟們區分下概念,最多見最具表明性也是比較簡單的HTTP協議(短鏈接)與Socket編程(長鏈接)的區別,這裏再也不多講前者,此次主要最涉及到後者。java

知識點儲備前提:編程

 JAVA 動態代理(網上不少,這裏推薦比較全面的文章:http://www.cnblogs.com/flyoung2008/archive/2013/08/11/3251148.html)bootstrap

JAVA NIO(http://www.cnblogs.com/flyoung2008/p/3251826.html)服務器

這裏LZ依然推薦的是帶着問題去學習,依舊是Scrum三要素:服務端怎麼暴露服務,怎麼引用服務,二者怎麼實現通信。app

1:服務端怎麼暴露服務框架

ServiceConfig類裏export方法有一段socket

        if (delay != null && delay > 0) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(delay);
                    } catch (Throwable e) {
                    }
                    doExport();
                }
            });
            thread.setDaemon(true);
            thread.setName("DelayExportServiceThread");
            thread.start();
        } else {
            doExport();
        }

看上面代碼,若是沒有設置延遲暴露服務,直接運行doExport();ide

一路跟蹤代碼,校驗HOST,類型轉換等跳過,最終看到下面代碼。學習

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);

光看着幾行依舊不太明瞭,首先關注Invoker類,最終又會看到URL類。

關注下URL類會看到一些鏈接須要的字段信息,大概猜想即爲鏈接所用的信息實體類。

繼續往下看proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper類不能正確處理帶$的類名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

能夠看到此方法根據參數,返回一個Invoker實體。再看protocol.export(invoker);

往下追會看到以下:

     try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

再往下追,看到

return getTransporter().bind(url, handler);

發現Transporter有三個實現類:GrizzlyTransporter,MinaTransporter,NettyTransporter 。

看到這些名字也許腦海裏會有點概念,哦~這裏就開始用Grizzly,Mina,Netty這裏開源框架去實現通信啦~

可是仍是不是很完全,不要緊,接着第二個問題

2:怎麼引用服務?

ReferenceConfig類init();方法裏有段

ref = createProxy(map);咱們從這能夠看得出有Proxy字樣,這就能夠肯定咱們的方向沒有走錯,繼續往下看。

咱們又能夠看到下面代碼:

if (urls.size() == 1) {
    invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
    URL registryURL = null;
    for (URL url : urls) {
        invokers.add(refprotocol.refer(interfaceClass, url));
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            registryURL = url; // 用了最後一個registry url
        }
    }
    if (registryURL != null) { // 有 註冊中心協議的URL
        // 對有註冊中心的Cluster 只用 AvailableCluster
        URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
        invoker = cluster.join(new StaticDirectory(u, invokers));
    }  else { // 不是 註冊中心的URL
        invoker = cluster.join(new StaticDirectory(invokers));
    }
}

這裏的代碼回顧以前的暴露服務時的Invoker有沒有熟悉?是的,這裏就是根據URL來獲得Invoker實體,

最後有個 getProxy(invoker, interfaces);

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

看到這裏就看熟悉了啦~再看InvokerInvocationHandler類,實現InvocationHandler接口,

if (method.getDeclaringClass() == Object.class) {
    return method.invoke(invoker, args);
}

其實最終仍是認出了它的本質。

那麼問題來了......

3:二者怎麼實現通信

回到剛剛暴露服務那裏,Transporter有三個實現類GrizzlyTransporter,MinaTransporter,NettyTransporter 。

咱們以NettyTransporter爲例:

 

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

 

咱們跟蹤NettyServer看到doOpen()裏:

channel = bootstrap.bind(getBindAddress());

這個channel也就是Netty裏的channel,終於交給Netty啦。

咱們再跟蹤NettyClient的時候貌似不是簡單的鏈接,日後跟蹤發現

 

cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));

 

這一段就能夠看到利用ExecutorService去分發鏈接。

那麼可能看到以上分析仍是有點暈乎乎,沒有連續感有木有?

是的,其實光分析以上的也不會有太直觀的感覺,LZ根據這些原理寫了個最直觀最簡單的Demo:

 這是簡化過的核心代理層:

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * Created by luyichen718 on 14-12-31.
 */
public class RpcDemo {

    /**
     * 暴露服務
     *
     * @param service 服務實現
     * @param port    服務端口
     * @throws Exception
     */
    public static void export(final Object service, int port) throws Exception {
        try {
            ServerSocket server = new ServerSocket(port);
            //該處採用不間斷阻塞式監聽端口
            //TODO:此處能夠用NIO來優化實現
            while (true){
                final Socket socket = server.accept();
                try {
                    try {
                        ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                        try {
                            //也能夠加服務註冊方式管理服務
                            String methodName = input.readUTF();
                            Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                            Object[] arguments = (Object[]) input.readObject();
                            ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                            //TODO:先後能夠作監控
                            try {
                                Method method = service.getClass().getMethod(methodName, parameterTypes);
                                Object result = method.invoke(service, arguments);
                                output.writeObject(result);
                            } catch (Throwable t) {
                                output.writeObject(t);
                            } finally {
                                output.close();
                            }
                        } finally {
                            input.close();
                        }
                    } finally {
                        socket.close();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 引用服務
     *
     * @param <T>            接口泛型
     * @param interfaceClass 接口類型
     * @param host           服務器主機名
     * @param port           服務器端口
     * @return 遠程服務
     * @throws Exception
     */
    @SuppressWarnings("unchecked")
    public static <T> T reference(final Class<T> interfaceClass, final String host, final int port) throws Exception {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
            public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
                Socket socket = new Socket(host, port);
                try {
                    ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                    try {
                        output.writeUTF(method.getName());
                        output.writeObject(method.getParameterTypes());
                        output.writeObject(arguments);
                        ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                        try {
                            Object result = input.readObject();
                            if (result instanceof Throwable) {
                                throw (Throwable) result;
                            }
                            return result;
                        } finally {
                            input.close();
                        }
                    } finally {
                        output.close();
                    }
                } finally {
                    socket.close();
                }
            }
        });
    }
}

服務生產者:

    public static void main(String[] args) throws Exception {
        BizService service = new BizServiceImpl();
        RpcDemo.export(service, 6543);
    }

服務消費者:

    public static void main(String[] args) throws Exception {
        BizService service = RpcDemo.reference(BizService.class, "127.0.0.1", 6543);
        service.hello();
    }

該處的BizService就是一個最簡單的業務接口,裏面能夠實現具體的業務,接口通常單獨生產jar包給消費方使用。

可能對於通常讀者也是最有價值的了吧,以上暈乎乎的能夠先看Demo而後再下載了源碼看看分析^_^

這個Demo也算是對通常長鏈接的最簡易實現,固然Dubbo加了不少擴展及監控,還運用了不少Decorator模式,接口設計也很優雅,光看官網的一些接口圖都暈乎乎了,因此仍是要靜下心來慢慢研讀。

此處也算是對長鏈接RPC的實現作了一個小結,歡迎各兄弟交流吐槽~

相關文章
相關標籤/搜索