Motan源碼閱讀--設計一個RPC

節點

在RPC中服務提供者和服務消費者均可以抽象成一個節點,節點包含了建立,銷燬,節點描述信息,和節點連接,節點狀態等。ide

public interface Node {

    void init();

    void destroy();

    boolean isAvailable();

    String desc();

    URL getUrl();

上面只是一個節點的通用屬性描述,按照DDD的說法他只是包含了本身的原始能力。this

在社會中每一個對象被賦予不一樣的社會使命,RPC中一個消費者節點的社會使命是發起請求獲取響應:代理

public interface Caller<T> extends Node {

    Class<T> getInterface();

    Response call(Request request);
}
public interface Provider<T> extends Caller<T> {

    Class<T> getInterface();

    Method lookupMethod(String methodName, String methodDesc);

    T getImpl();
}

...

上下文

一個rpc的請求,咱們一般須要上下文,將一些rpc請求信息進行傳遞。 通常經過threadLocal傳遞上下文:code

private static final ThreadLocal<RpcContext> localContext = new ThreadLocal<RpcContext>() {
        @Override
        protected RpcContext initialValue() {
            return new RpcContext();
        }
    };

這樣在一個request過來是咱們能夠將請求放入上下文:對象

public static RpcContext init(Request request) {
        RpcContext context = new RpcContext();
        if (request != null) {
            context.setRequest(request);
            context.setClientRequestId(request.getAttachments().get(URLParamType.requestIdFromClient.getName()));
        }
        localContext.set(context);
        return context;
    }

一樣,當請求結束後,咱們須要銷燬上下文信息:rem

public static void destroy() {
        localContext.remove();
    }

當方法執行異常,或者連接trace時咱們能夠從上下文拿到requestId放到咱們的trace系統保存起來:rpc

RpcContext.getContext().getRequestId()

請求狀態

咱們在請求過程當中須要對整個請求數據進行採集,好比建立Filter進行請求採集:get

private static ConcurrentHashMap<String, StatInfo> serviceStat = new ConcurrentHashMap<String, RpcStats.StatInfo>();

public class ActiveLimitFilter implements Filter {
    @Override
    public Response filter(Caller<?> caller, Request request) {
        RpcStats.beforeCall(caller.getUrl(), request);
		try{
			RpcStats.afterCall(caller.getUrl, request, true, time);
		}
    }
}

響應狀態

在請求和響應之間狀態通知能夠採用Object.Wait和Object.Notify實現。 並經過命令模式喚醒監聽者。string

private void notifyListeners() {
        if (listeners != null) {
            for (FutureListener listener : listeners) {
                notifyListener(listener);
            }
        }
    }

在值獲取時,咱們採用一個死循環進行請求結果返回,若是請求超時,則拋出異常,而這個超時時間,則是經過服務配置實現的。it

public Object getValue() {
        synchronized (lock) {
            if (!isDoing()) {
                return getValueOrThrowable();
            }

            if (timeout <= 0) {
                try {
                    lock.wait();
                } catch (Exception e) {
                    cancel(new MotanServiceException(this.getClass().getName() + " getValue InterruptedException : "
                            + MotanFrameworkUtil.toString(request) + " cost=" + (System.currentTimeMillis() - createTime), e));
                }

                return getValueOrThrowable();
            } else {
                long waitTime = timeout - (System.currentTimeMillis() - createTime);

                if (waitTime > 0) {
                    for (; ; ) {
                        try {
                            lock.wait(waitTime);
                        } catch (InterruptedException e) {
                        }

                        if (!isDoing()) {
                            break;
                        } else {
                            waitTime = timeout - (System.currentTimeMillis() - createTime);
                            if (waitTime <= 0) {
                                break;
                            }
                        }
                    }
                }

                if (isDoing()) {
                    timeoutSoCancel();
                }
            }
            return getValueOrThrowable();
        }
    }

獲取request的目標方法

咱們在進行rpc請求時,在Request參數中會傳遞經過代理訪問的目標類,咱們在AbstractProvider聲明一個抽象方法,在具體實現類進行實現:

protected abstract Response invoke(Request request);

經過反射進行方法調用,invoke方法以下:

@Override
    public Response invoke(Request request) {
        DefaultResponse response = new DefaultResponse();

        Method method = lookupMethod(request.getMethodName(), request.getParamtersDesc());

        if (method == null) {
            MotanServiceException exception =
                    new MotanServiceException("Service method not exist: " + request.getInterfaceName() + "." + request.getMethodName()
                            + "(" + request.getParamtersDesc() + ")", MotanErrorMsgConstant.SERVICE_UNFOUND);

            response.setException(exception);
            return response;
        }

        try {
            Object value = method.invoke(proxyImpl, request.getArguments());
            response.setValue(value);
        } catch (Exception e) {
		
		}
		
        // 傳遞rpc版本和attachment信息方便不一樣rpc版本的codec使用。
        response.setRpcProtocolVersion(request.getRpcProtocolVersion());
        response.setAttachments(request.getAttachments());
        return response;
    }

方法及描述放在map中:

protected Map<String, Method> methodMap = new HashMap<String, Method>();

獲取方法:

public Method lookupMethod(String methodName, String methodDesc) {
        Method method = null;
        String fullMethodName = ReflectUtil.getMethodDesc(methodName, methodDesc);
        method = methodMap.get(fullMethodName);
        if (method == null && StringUtils.isBlank(methodDesc)) {
            method = methodMap.get(methodName);
            if (method == null) {
                method = methodMap.get(methodName.substring(0, 1).toLowerCase() + methodName.substring(1));
            }
        }

        return method;
    }
相關文章
相關標籤/搜索