在RPC中服務提供者和服務消費者均可以抽象成一個節點,節點包含了建立,銷燬,節點描述信息,和節點連接,節點狀態等。ide
public interface Node { void init(); void destroy(); boolean isAvailable(); String desc(); URL getUrl();
上面只是一個節點的通用屬性描述,按照DDD的說法他只是包含了本身的原始能力。this
在社會中每一個對象被賦予不一樣的社會使命,RPC中一個消費者節點的社會使命是發起請求獲取響應:代理
public interface Caller<T> extends Node { Class<T> getInterface(); Response call(Request request); } public interface Provider<T> extends Caller<T> { Class<T> getInterface(); Method lookupMethod(String methodName, String methodDesc); T getImpl(); } ...
一個rpc的請求,咱們一般須要上下文,將一些rpc請求信息進行傳遞。 通常經過threadLocal傳遞上下文:code
private static final ThreadLocal<RpcContext> localContext = new ThreadLocal<RpcContext>() { @Override protected RpcContext initialValue() { return new RpcContext(); } };
這樣在一個request過來是咱們能夠將請求放入上下文:對象
public static RpcContext init(Request request) { RpcContext context = new RpcContext(); if (request != null) { context.setRequest(request); context.setClientRequestId(request.getAttachments().get(URLParamType.requestIdFromClient.getName())); } localContext.set(context); return context; }
一樣,當請求結束後,咱們須要銷燬上下文信息:rem
public static void destroy() { localContext.remove(); }
當方法執行異常,或者連接trace時咱們能夠從上下文拿到requestId放到咱們的trace系統保存起來:rpc
RpcContext.getContext().getRequestId()
咱們在請求過程當中須要對整個請求數據進行採集,好比建立Filter進行請求採集:get
private static ConcurrentHashMap<String, StatInfo> serviceStat = new ConcurrentHashMap<String, RpcStats.StatInfo>(); public class ActiveLimitFilter implements Filter { @Override public Response filter(Caller<?> caller, Request request) { RpcStats.beforeCall(caller.getUrl(), request); try{ RpcStats.afterCall(caller.getUrl, request, true, time); } } }
在請求和響應之間狀態通知能夠採用Object.Wait和Object.Notify實現。 並經過命令模式喚醒監聽者。string
private void notifyListeners() { if (listeners != null) { for (FutureListener listener : listeners) { notifyListener(listener); } } }
在值獲取時,咱們採用一個死循環進行請求結果返回,若是請求超時,則拋出異常,而這個超時時間,則是經過服務配置實現的。it
public Object getValue() { synchronized (lock) { if (!isDoing()) { return getValueOrThrowable(); } if (timeout <= 0) { try { lock.wait(); } catch (Exception e) { cancel(new MotanServiceException(this.getClass().getName() + " getValue InterruptedException : " + MotanFrameworkUtil.toString(request) + " cost=" + (System.currentTimeMillis() - createTime), e)); } return getValueOrThrowable(); } else { long waitTime = timeout - (System.currentTimeMillis() - createTime); if (waitTime > 0) { for (; ; ) { try { lock.wait(waitTime); } catch (InterruptedException e) { } if (!isDoing()) { break; } else { waitTime = timeout - (System.currentTimeMillis() - createTime); if (waitTime <= 0) { break; } } } } if (isDoing()) { timeoutSoCancel(); } } return getValueOrThrowable(); } }
咱們在進行rpc請求時,在Request參數中會傳遞經過代理訪問的目標類,咱們在AbstractProvider聲明一個抽象方法,在具體實現類進行實現:
protected abstract Response invoke(Request request);
經過反射進行方法調用,invoke方法以下:
@Override public Response invoke(Request request) { DefaultResponse response = new DefaultResponse(); Method method = lookupMethod(request.getMethodName(), request.getParamtersDesc()); if (method == null) { MotanServiceException exception = new MotanServiceException("Service method not exist: " + request.getInterfaceName() + "." + request.getMethodName() + "(" + request.getParamtersDesc() + ")", MotanErrorMsgConstant.SERVICE_UNFOUND); response.setException(exception); return response; } try { Object value = method.invoke(proxyImpl, request.getArguments()); response.setValue(value); } catch (Exception e) { } // 傳遞rpc版本和attachment信息方便不一樣rpc版本的codec使用。 response.setRpcProtocolVersion(request.getRpcProtocolVersion()); response.setAttachments(request.getAttachments()); return response; }
方法及描述放在map中:
protected Map<String, Method> methodMap = new HashMap<String, Method>();
獲取方法:
public Method lookupMethod(String methodName, String methodDesc) { Method method = null; String fullMethodName = ReflectUtil.getMethodDesc(methodName, methodDesc); method = methodMap.get(fullMethodName); if (method == null && StringUtils.isBlank(methodDesc)) { method = methodMap.get(methodName); if (method == null) { method = methodMap.get(methodName.substring(0, 1).toLowerCase() + methodName.substring(1)); } } return method; }