Dubbo源碼解析(二十八)遠程調用——memcached協議

遠程調用——memcached協議

目標:介紹memcached協議的設計和實現,介紹dubbo-rpc-memcached的源碼。java

前言

dubbo實現memcached協議是基於Memcached,Memcached 是一個高效的 KV 緩存服務器,在dubbo中沒有涉及到關於memcached協議的服務暴露,只有服務引用,由於在訪問Memcached服務器時,Memcached客戶端能夠在服務器上存儲也能夠獲取。git

源碼分析

(一)MemcachedProtocol

該類繼承AbstractProtocol,是memcached協議實現的核心。github

1.屬性

/** * 默認端口號 */
public static final int DEFAULT_PORT = 11211;
複製代碼

2.export

@Override
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
    // 不支持memcached服務暴露
    throw new UnsupportedOperationException("Unsupported export memcached service. url: " + invoker.getUrl());
}
複製代碼

能夠看到,服務暴露方法直接拋出異常。redis

3.refer

@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
    try {
        // 得到地址
        String address = url.getAddress();
        // 得到備用地址
        String backup = url.getParameter(Constants.BACKUP_KEY);
        // 把備用地址拼接上
        if (backup != null && backup.length() > 0) {
            address += "," + backup;
        }
        // 建立Memcached客戶端構造器
        MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(address));
        // 建立客戶端
        final MemcachedClient memcachedClient = builder.build();
        // 到期時間參數配置
        final int expiry = url.getParameter("expiry", 0);
        // 得到值命令
        final String get = url.getParameter("get", "get");
        // 添加值命令根據類型來取決是put仍是set
        final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
        // 刪除值命令
        final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
        return new AbstractInvoker<T>(type, url) {
            @Override
            protected Result doInvoke(Invocation invocation) throws Throwable {
                try {
                    // 若是是獲取方法名的值
                    if (get.equals(invocation.getMethodName())) {
                        // 若是參數長度不等於1,則拋出異常
                        if (invocation.getArguments().length != 1) {
                            throw new IllegalArgumentException("The memcached get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                        }
                        // 不然調用get方法來獲取
                        return new RpcResult(memcachedClient.get(String.valueOf(invocation.getArguments()[0])));
                    } else if (set.equals(invocation.getMethodName())) {
                        // 若是參數長度不爲2,則拋出異常
                        if (invocation.getArguments().length != 2) {
                            throw new IllegalArgumentException("The memcached set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                        }
                        // 不管任何現有值如何,都在緩存中設置一個對象
                        memcachedClient.set(String.valueOf(invocation.getArguments()[0]), expiry, invocation.getArguments()[1]);
                        return new RpcResult();
                    } else if (delete.equals(invocation.getMethodName())) {
                        // 刪除操做只有一個參數,若是參數長度不等於1,則拋出異常
                        if (invocation.getArguments().length != 1) {
                            throw new IllegalArgumentException("The memcached delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                        }
                        // 刪除某個值
                        memcachedClient.delete(String.valueOf(invocation.getArguments()[0]));
                        return new RpcResult();
                    } else {
                        // 不支持的操做
                        throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in memcached service.");
                    }
                } catch (Throwable t) {
                    RpcException re = new RpcException("Failed to invoke memcached service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);
                    if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {
                        re.setCode(RpcException.TIMEOUT_EXCEPTION);
                    } else if (t instanceof MemcachedException || t instanceof IOException) {
                        re.setCode(RpcException.NETWORK_EXCEPTION);
                    }
                    throw re;
                }
            }

            @Override
            public void destroy() {
                super.destroy();
                try {
                    // 關閉客戶端
                    memcachedClient.shutdown();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        };
    } catch (Throwable t) {
        throw new RpcException("Failed to refer memcached service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);
    }
}
複製代碼

該方法是服務引用方法,基於MemcachedClient的get、set、delete方法來對應Memcached的get、set、delete命令進行對值的操做。緩存

後記

該部分相關的源碼解析地址:github.com/CrazyHZM/in…服務器

該文章講解了遠程調用中關於memcached協議實現的部分,邏輯比較簡單。接下來我將開始對rpc模塊關於redis協議部分進行講解。ide

相關文章
相關標籤/搜索