目標:介紹memcached協議的設計和實現,介紹dubbo-rpc-memcached的源碼。java
dubbo實現memcached協議是基於Memcached,Memcached 是一個高效的 KV 緩存服務器,在dubbo中沒有涉及到關於memcached協議的服務暴露,只有服務引用,由於在訪問Memcached服務器時,Memcached客戶端能夠在服務器上存儲也能夠獲取。git
該類繼承AbstractProtocol,是memcached協議實現的核心。github
/** * 默認端口號 */
public static final int DEFAULT_PORT = 11211;
複製代碼
@Override
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
// 不支持memcached服務暴露
throw new UnsupportedOperationException("Unsupported export memcached service. url: " + invoker.getUrl());
}
複製代碼
能夠看到,服務暴露方法直接拋出異常。redis
@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
try {
// 得到地址
String address = url.getAddress();
// 得到備用地址
String backup = url.getParameter(Constants.BACKUP_KEY);
// 把備用地址拼接上
if (backup != null && backup.length() > 0) {
address += "," + backup;
}
// 建立Memcached客戶端構造器
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses(address));
// 建立客戶端
final MemcachedClient memcachedClient = builder.build();
// 到期時間參數配置
final int expiry = url.getParameter("expiry", 0);
// 得到值命令
final String get = url.getParameter("get", "get");
// 添加值命令根據類型來取決是put仍是set
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
// 刪除值命令
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
return new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
// 若是是獲取方法名的值
if (get.equals(invocation.getMethodName())) {
// 若是參數長度不等於1,則拋出異常
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The memcached get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 不然調用get方法來獲取
return new RpcResult(memcachedClient.get(String.valueOf(invocation.getArguments()[0])));
} else if (set.equals(invocation.getMethodName())) {
// 若是參數長度不爲2,則拋出異常
if (invocation.getArguments().length != 2) {
throw new IllegalArgumentException("The memcached set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 不管任何現有值如何,都在緩存中設置一個對象
memcachedClient.set(String.valueOf(invocation.getArguments()[0]), expiry, invocation.getArguments()[1]);
return new RpcResult();
} else if (delete.equals(invocation.getMethodName())) {
// 刪除操做只有一個參數,若是參數長度不等於1,則拋出異常
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The memcached delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 刪除某個值
memcachedClient.delete(String.valueOf(invocation.getArguments()[0]));
return new RpcResult();
} else {
// 不支持的操做
throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in memcached service.");
}
} catch (Throwable t) {
RpcException re = new RpcException("Failed to invoke memcached service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);
if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {
re.setCode(RpcException.TIMEOUT_EXCEPTION);
} else if (t instanceof MemcachedException || t instanceof IOException) {
re.setCode(RpcException.NETWORK_EXCEPTION);
}
throw re;
}
}
@Override
public void destroy() {
super.destroy();
try {
// 關閉客戶端
memcachedClient.shutdown();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
} catch (Throwable t) {
throw new RpcException("Failed to refer memcached service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);
}
}
複製代碼
該方法是服務引用方法,基於MemcachedClient的get、set、delete方法來對應Memcached的get、set、delete命令進行對值的操做。緩存
該部分相關的源碼解析地址:github.com/CrazyHZM/in…服務器
該文章講解了遠程調用中關於memcached協議實現的部分,邏輯比較簡單。接下來我將開始對rpc模塊關於redis協議部分進行講解。ide