dubbo源碼解析(二十九)遠程調用——redis協議

遠程調用——redis協議

目標:介紹redis協議的設計和實現,介紹dubbo-rpc-redis的源碼。

前言

dubbo支持的redis協議是基於Redis的,Redis 是一個高效的 KV 存儲服務器,跟memcached協議實現差很少,在dubbo中也沒有涉及到關於redis協議的服務暴露,只有服務引用,由於在訪問服務器時,Redis客戶端能夠在服務器上存儲也能夠獲取。java

源碼分析

(一)RedisProtocol

該類繼承了AbstractProtocol類,是redis協議實現的核心。git

1.屬性

/**
 * 默認端口號
 */
public static final int DEFAULT_PORT = 6379;

2.export

@Override
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
    // 不支持redis協議的服務暴露,拋出異常
    throw new UnsupportedOperationException("Unsupported export redis service. url: " + invoker.getUrl());
}

能夠看到不支持服務暴露。github

3.refer

@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
    try {
        // 實例化對象池
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        // 若是 testOnBorrow 被設置,pool 會在 borrowObject 返回對象以前使用 PoolableObjectFactory的 validateObject 來驗證這個對象是否有效
        // 要是對象沒經過驗證,這個對象會被丟棄,而後從新選擇一個新的對象。
        config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
        // 若是 testOnReturn 被設置, pool 會在 returnObject 的時候經過 PoolableObjectFactory 的validateObject 方法驗證對象
        // 若是對象沒經過驗證,對象會被丟棄,不會被放到池中。
        config.setTestOnReturn(url.getParameter("test.on.return", false));
        // 指定空閒對象是否應該使用 PoolableObjectFactory 的 validateObject 校驗,若是校驗失敗,這個對象會從對象池中被清除。
        // 這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0) 的時候纔會生效。
        config.setTestWhileIdle(url.getParameter("test.while.idle", false));
        if (url.getParameter("max.idle", 0) > 0)
            // 控制一個pool最多有多少個狀態爲空閒的jedis實例。
            config.setMaxIdle(url.getParameter("max.idle", 0));
        if (url.getParameter("min.idle", 0) > 0)
            // 控制一個pool最少有多少個狀態爲空閒的jedis實例。
            config.setMinIdle(url.getParameter("min.idle", 0));
        if (url.getParameter("max.active", 0) > 0)
            // 控制一個pool最多有多少個jedis實例。
            config.setMaxTotal(url.getParameter("max.active", 0));
        if (url.getParameter("max.total", 0) > 0)
            config.setMaxTotal(url.getParameter("max.total", 0));
        if (url.getParameter("max.wait", 0) > 0)
            //表示當引入一個jedis實例時,最大的等待時間,若是超過等待時間,則直接拋出JedisConnectionException;
            config.setMaxWaitMillis(url.getParameter("max.wait", 0));
        if (url.getParameter("num.tests.per.eviction.run", 0) > 0)
            // 設置驅逐線程每次檢測對象的數量。這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0)的時候纔會生效。
            config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
        if (url.getParameter("time.between.eviction.runs.millis", 0) > 0)
            // 指定驅逐線程的休眠時間。若是這個值不是正數( >0),不會有驅逐線程運行。
            config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
        if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)
            // 指定最小的空閒驅逐的時間間隔(空閒超過指定的時間的對象,會被清除掉)。
            // 這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0)的時候纔會生效。
            config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
        // 建立redis鏈接池
        final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT),
                url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT),
                StringUtils.isBlank(url.getPassword()) ? null : url.getPassword(),
                url.getParameter("db.index", 0));
        // 得到值的過時時間
        final int expiry = url.getParameter("expiry", 0);
        // 得到get命令
        final String get = url.getParameter("get", "get");
        // 得到set命令
        final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
        // 得到delete命令
        final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
        return new AbstractInvoker<T>(type, url) {
            @Override
            protected Result doInvoke(Invocation invocation) throws Throwable {
                Jedis resource = null;
                try {
                    resource = jedisPool.getResource();

                    // 若是是get命令
                    if (get.equals(invocation.getMethodName())) {
                        // get 命令必須只有一個參數
                        if (invocation.getArguments().length != 1) {
                            throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                        }
                        // 得到值
                        byte[] value = resource.get(String.valueOf(invocation.getArguments()[0]).getBytes());
                        if (value == null) {
                            return new RpcResult();
                        }
                        // 反序列化
                        ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value));
                        return new RpcResult(oin.readObject());
                    } else if (set.equals(invocation.getMethodName())) {
                        // 若是是set命令,參數長度必須是2
                        if (invocation.getArguments().length != 2) {
                            throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                        }
                        //
                        byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes();
                        ByteArrayOutputStream output = new ByteArrayOutputStream();
                        // 對須要存入對值進行序列化
                        ObjectOutput value = getSerialization(url).serialize(url, output);
                        value.writeObject(invocation.getArguments()[1]);
                        // 存入值
                        resource.set(key, output.toByteArray());
                        // 設置該key過時時間,不能大於1000s
                        if (expiry > 1000) {
                            resource.expire(key, expiry / 1000);
                        }
                        return new RpcResult();
                    } else if (delete.equals(invocation.getMethodName())) {
                        // 若是是刪除命令,則參數長度必須是1
                        if (invocation.getArguments().length != 1) {
                            throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                        }
                        // 刪除該值
                        resource.del(String.valueOf(invocation.getArguments()[0]).getBytes());
                        return new RpcResult();
                    } else {
                        // 不然拋出該操做不支持的異常
                        throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service.");
                    }
                } catch (Throwable t) {
                    RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);
                    if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {
                        // 拋出超時異常
                        re.setCode(RpcException.TIMEOUT_EXCEPTION);
                    } else if (t instanceof JedisConnectionException || t instanceof IOException) {
                        // 拋出網絡異常
                        re.setCode(RpcException.NETWORK_EXCEPTION);
                    } else if (t instanceof JedisDataException) {
                        // 拋出序列化異常
                        re.setCode(RpcException.SERIALIZATION_EXCEPTION);
                    }
                    throw re;
                } finally {
                    if (resource != null) {
                        try {
                            jedisPool.returnResource(resource);
                        } catch (Throwable t) {
                            logger.warn("returnResource error: " + t.getMessage(), t);
                        }
                    }
                }
            }

            @Override
            public void destroy() {
                super.destroy();
                try {
                    // 關閉鏈接池
                    jedisPool.destroy();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        };
    } catch (Throwable t) {
        throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);
    }
}

能夠看到首先是對鏈接池的配置賦值,而後建立鏈接池後,根據redis的get、set、delete命令來進行相關操做。redis

後記

該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...

該文章講解了遠程調用中關於redis協議實現的部分,邏輯比較簡單。接下來我將開始對rpc模塊關於rest協議部分進行講解。服務器

相關文章
相關標籤/搜索