目標:介紹redis協議的設計和實現,介紹dubbo-rpc-redis的源碼。
dubbo支持的redis協議是基於Redis的,Redis 是一個高效的 KV 存儲服務器,跟memcached協議實現差很少,在dubbo中也沒有涉及到關於redis協議的服務暴露,只有服務引用,由於在訪問服務器時,Redis客戶端能夠在服務器上存儲也能夠獲取。java
該類繼承了AbstractProtocol類,是redis協議實現的核心。git
/** * 默認端口號 */ public static final int DEFAULT_PORT = 6379;
@Override public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException { // 不支持redis協議的服務暴露,拋出異常 throw new UnsupportedOperationException("Unsupported export redis service. url: " + invoker.getUrl()); }
能夠看到不支持服務暴露。github
@Override public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException { try { // 實例化對象池 GenericObjectPoolConfig config = new GenericObjectPoolConfig(); // 若是 testOnBorrow 被設置,pool 會在 borrowObject 返回對象以前使用 PoolableObjectFactory的 validateObject 來驗證這個對象是否有效 // 要是對象沒經過驗證,這個對象會被丟棄,而後從新選擇一個新的對象。 config.setTestOnBorrow(url.getParameter("test.on.borrow", true)); // 若是 testOnReturn 被設置, pool 會在 returnObject 的時候經過 PoolableObjectFactory 的validateObject 方法驗證對象 // 若是對象沒經過驗證,對象會被丟棄,不會被放到池中。 config.setTestOnReturn(url.getParameter("test.on.return", false)); // 指定空閒對象是否應該使用 PoolableObjectFactory 的 validateObject 校驗,若是校驗失敗,這個對象會從對象池中被清除。 // 這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0) 的時候纔會生效。 config.setTestWhileIdle(url.getParameter("test.while.idle", false)); if (url.getParameter("max.idle", 0) > 0) // 控制一個pool最多有多少個狀態爲空閒的jedis實例。 config.setMaxIdle(url.getParameter("max.idle", 0)); if (url.getParameter("min.idle", 0) > 0) // 控制一個pool最少有多少個狀態爲空閒的jedis實例。 config.setMinIdle(url.getParameter("min.idle", 0)); if (url.getParameter("max.active", 0) > 0) // 控制一個pool最多有多少個jedis實例。 config.setMaxTotal(url.getParameter("max.active", 0)); if (url.getParameter("max.total", 0) > 0) config.setMaxTotal(url.getParameter("max.total", 0)); if (url.getParameter("max.wait", 0) > 0) //表示當引入一個jedis實例時,最大的等待時間,若是超過等待時間,則直接拋出JedisConnectionException; config.setMaxWaitMillis(url.getParameter("max.wait", 0)); if (url.getParameter("num.tests.per.eviction.run", 0) > 0) // 設置驅逐線程每次檢測對象的數量。這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0)的時候纔會生效。 config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0)); if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) // 指定驅逐線程的休眠時間。若是這個值不是正數( >0),不會有驅逐線程運行。 config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0)); if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) // 指定最小的空閒驅逐的時間間隔(空閒超過指定的時間的對象,會被清除掉)。 // 這個設置僅在 timeBetweenEvictionRunsMillis 被設置成正值( >0)的時候纔會生效。 config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0)); // 建立redis鏈接池 final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT), url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isBlank(url.getPassword()) ? null : url.getPassword(), url.getParameter("db.index", 0)); // 得到值的過時時間 final int expiry = url.getParameter("expiry", 0); // 得到get命令 final String get = url.getParameter("get", "get"); // 得到set命令 final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set"); // 得到delete命令 final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete"); return new AbstractInvoker<T>(type, url) { @Override protected Result doInvoke(Invocation invocation) throws Throwable { Jedis resource = null; try { resource = jedisPool.getResource(); // 若是是get命令 if (get.equals(invocation.getMethodName())) { // get 命令必須只有一個參數 if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // 得到值 byte[] value = resource.get(String.valueOf(invocation.getArguments()[0]).getBytes()); if (value == null) { return new RpcResult(); } // 反序列化 ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value)); return new RpcResult(oin.readObject()); } else if (set.equals(invocation.getMethodName())) { // 若是是set命令,參數長度必須是2 if (invocation.getArguments().length != 2) { throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes(); ByteArrayOutputStream output = new ByteArrayOutputStream(); // 對須要存入對值進行序列化 ObjectOutput value = getSerialization(url).serialize(url, output); value.writeObject(invocation.getArguments()[1]); // 存入值 resource.set(key, output.toByteArray()); // 設置該key過時時間,不能大於1000s if (expiry > 1000) { resource.expire(key, expiry / 1000); } return new RpcResult(); } else if (delete.equals(invocation.getMethodName())) { // 若是是刪除命令,則參數長度必須是1 if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // 刪除該值 resource.del(String.valueOf(invocation.getArguments()[0]).getBytes()); return new RpcResult(); } else { // 不然拋出該操做不支持的異常 throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service."); } } catch (Throwable t) { RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t); if (t instanceof TimeoutException || t instanceof SocketTimeoutException) { // 拋出超時異常 re.setCode(RpcException.TIMEOUT_EXCEPTION); } else if (t instanceof JedisConnectionException || t instanceof IOException) { // 拋出網絡異常 re.setCode(RpcException.NETWORK_EXCEPTION); } else if (t instanceof JedisDataException) { // 拋出序列化異常 re.setCode(RpcException.SERIALIZATION_EXCEPTION); } throw re; } finally { if (resource != null) { try { jedisPool.returnResource(resource); } catch (Throwable t) { logger.warn("returnResource error: " + t.getMessage(), t); } } } } @Override public void destroy() { super.destroy(); try { // 關閉鏈接池 jedisPool.destroy(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } }; } catch (Throwable t) { throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t); } }
能夠看到首先是對鏈接池的配置賦值,而後建立鏈接池後,根據redis的get、set、delete命令來進行相關操做。redis
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了遠程調用中關於redis協議實現的部分,邏輯比較簡單。接下來我將開始對rpc模塊關於rest協議部分進行講解。服務器