目標:介紹memcached協議的設計和實現,介紹dubbo-rpc-memcached的源碼。
dubbo實現memcached協議是基於Memcached,Memcached 是一個高效的 KV 緩存服務器,在dubbo中沒有涉及到關於memcached協議的服務暴露,只有服務引用,由於在訪問Memcached服務器時,Memcached客戶端能夠在服務器上存儲也能夠獲取。java
該類繼承AbstractProtocol,是memcached協議實現的核心。git
/** * 默認端口號 */ public static final int DEFAULT_PORT = 11211;
@Override public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException { // 不支持memcached服務暴露 throw new UnsupportedOperationException("Unsupported export memcached service. url: " + invoker.getUrl()); }
能夠看到,服務暴露方法直接拋出異常。github
@Override public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException { try { // 得到地址 String address = url.getAddress(); // 得到備用地址 String backup = url.getParameter(Constants.BACKUP_KEY); // 把備用地址拼接上 if (backup != null && backup.length() > 0) { address += "," + backup; } // 建立Memcached客戶端構造器 MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(address)); // 建立客戶端 final MemcachedClient memcachedClient = builder.build(); // 到期時間參數配置 final int expiry = url.getParameter("expiry", 0); // 得到值命令 final String get = url.getParameter("get", "get"); // 添加值命令根據類型來取決是put仍是set final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set"); // 刪除值命令 final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete"); return new AbstractInvoker<T>(type, url) { @Override protected Result doInvoke(Invocation invocation) throws Throwable { try { // 若是是獲取方法名的值 if (get.equals(invocation.getMethodName())) { // 若是參數長度不等於1,則拋出異常 if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The memcached get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // 不然調用get方法來獲取 return new RpcResult(memcachedClient.get(String.valueOf(invocation.getArguments()[0]))); } else if (set.equals(invocation.getMethodName())) { // 若是參數長度不爲2,則拋出異常 if (invocation.getArguments().length != 2) { throw new IllegalArgumentException("The memcached set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // 不管任何現有值如何,都在緩存中設置一個對象 memcachedClient.set(String.valueOf(invocation.getArguments()[0]), expiry, invocation.getArguments()[1]); return new RpcResult(); } else if (delete.equals(invocation.getMethodName())) { // 刪除操做只有一個參數,若是參數長度不等於1,則拋出異常 if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The memcached delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } // 刪除某個值 memcachedClient.delete(String.valueOf(invocation.getArguments()[0])); return new RpcResult(); } else { // 不支持的操做 throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in memcached service."); } } catch (Throwable t) { RpcException re = new RpcException("Failed to invoke memcached service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t); if (t instanceof TimeoutException || t instanceof SocketTimeoutException) { re.setCode(RpcException.TIMEOUT_EXCEPTION); } else if (t instanceof MemcachedException || t instanceof IOException) { re.setCode(RpcException.NETWORK_EXCEPTION); } throw re; } } @Override public void destroy() { super.destroy(); try { // 關閉客戶端 memcachedClient.shutdown(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } }; } catch (Throwable t) { throw new RpcException("Failed to refer memcached service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t); } }
該方法是服務引用方法,基於MemcachedClient的get、set、delete方法來對應Memcached的get、set、delete命令進行對值的操做。redis
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了遠程調用中關於memcached協議實現的部分,邏輯比較簡單。接下來我將開始對rpc模塊關於redis協議部分進行講解。緩存