dubbo源碼解析(二十六)遠程調用——http協議

遠程調用——http協議

目標:介紹遠程調用中跟http協議相關的設計和實現,介紹dubbo-rpc-http的源碼。

前言

基於HTTP表單的遠程調用協議,採用 Spring 的HttpInvoker實現,關於http協議就不用多說了吧。java

源碼分析

(一)HttpRemoteInvocation

該類繼承了RemoteInvocation類,是在RemoteInvocation上增長了泛化調用的參數設置,以及增長了dubbo自己須要的附加值設置。git

public class HttpRemoteInvocation extends RemoteInvocation {

    private static final long serialVersionUID = 1L;
    /**
     * dubbo的附加值名稱
     */
    private static final String dubboAttachmentsAttrName = "dubbo.attachments";

    public HttpRemoteInvocation(MethodInvocation methodInvocation) {
        super(methodInvocation);
        // 把附加值加入到會話域的屬性裏面
        addAttribute(dubboAttachmentsAttrName, new HashMap<String, String>(RpcContext.getContext().getAttachments()));
    }

    @Override
    public Object invoke(Object targetObject) throws NoSuchMethodException, IllegalAccessException,
            InvocationTargetException {
        // 得到上下文
        RpcContext context = RpcContext.getContext();
        // 得到附加值
        context.setAttachments((Map<String, String>) getAttribute(dubboAttachmentsAttrName));

        // 泛化標誌
        String generic = (String) getAttribute(Constants.GENERIC_KEY);
        // 若是不爲空,則設置泛化標誌
        if (StringUtils.isNotEmpty(generic)) {
            context.setAttachment(Constants.GENERIC_KEY, generic);
        }
        try {
            // 調用下一個調用鏈
            return super.invoke(targetObject);
        } finally {
            context.setAttachments(null);

        }
    }
}

(二)HttpProtocol

該類是http實現的核心,跟我在《dubbo源碼解析(二十五)遠程調用——hessian協議》中講到的HessianProtocol實現有不少地方類似。github

1.屬性

/**
 * 默認的端口號
 */
public static final int DEFAULT_PORT = 80;

/**
 * http服務器集合
 */
private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>();

/**
 * Spring HttpInvokerServiceExporter 集合
 */
private final Map<String, HttpInvokerServiceExporter> skeletonMap = new ConcurrentHashMap<String, HttpInvokerServiceExporter>();

/**
 * HttpBinder對象
 */
private HttpBinder httpBinder;

2.doExport

@Override
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
    // 得到ip地址
    String addr = getAddr(url);
    // 得到http服務器
    HttpServer server = serverMap.get(addr);
    // 若是服務器爲空,則從新建立服務器,而且加入到集合
    if (server == null) {
        server = httpBinder.bind(url, new InternalHandler());
        serverMap.put(addr, server);
    }

    // 得到服務path
    final String path = url.getAbsolutePath();

    // 加入集合
    skeletonMap.put(path, createExporter(impl, type));

    // 通用path
    final String genericPath = path + "/" + Constants.GENERIC_KEY;

    // 添加泛化的服務調用
    skeletonMap.put(genericPath, createExporter(impl, GenericService.class));
    return new Runnable() {
        @Override
        public void run() {
            skeletonMap.remove(path);
            skeletonMap.remove(genericPath);
        }
    };
}

該方法是暴露服務等邏輯,由於dubbo實現http協議採用了Spring 的HttpInvoker實現,因此調用了createExporter方法來建立建立HttpInvokerServiceExporter。spring

3.createExporter

private <T> HttpInvokerServiceExporter createExporter(T impl, Class<?> type) {
    // 建立HttpInvokerServiceExporter
    final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter();
    // 設置要訪問的服務的接口
    httpServiceExporter.setServiceInterface(type);
    // 設置服務實現
    httpServiceExporter.setService(impl);
    try {
        // 在BeanFactory設置了全部提供的bean屬性,初始化bean的時候執行,能夠針對某個具體的bean進行配
        httpServiceExporter.afterPropertiesSet();
    } catch (Exception e) {
        throw new RpcException(e.getMessage(), e);
    }
    return httpServiceExporter;
}

該方法是建立一個spring 的HttpInvokerServiceExporter。服務器

4.doRefer

@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
    // 得到泛化配置
    final String generic = url.getParameter(Constants.GENERIC_KEY);
    // 是否爲泛化調用
    final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);

    // 建立HttpInvokerProxyFactoryBean
    final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean();
    // 設置RemoteInvocation的工廠類
    httpProxyFactoryBean.setRemoteInvocationFactory(new RemoteInvocationFactory() {
        /**
         * 爲給定的AOP方法調用建立一個新的RemoteInvocation對象。
         * @param methodInvocation
         * @return
         */
        @Override
        public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) {
            // 新建一個HttpRemoteInvocation
            RemoteInvocation invocation = new HttpRemoteInvocation(methodInvocation);
            // 若是是泛化調用
            if (isGeneric) {
                // 設置標誌
                invocation.addAttribute(Constants.GENERIC_KEY, generic);
            }
            return invocation;
        }
    });
    // 得到identity message
    String key = url.toIdentityString();
    // 若是是泛化調用
    if (isGeneric) {
        key = key + "/" + Constants.GENERIC_KEY;
    }
    // 設置服務url
    httpProxyFactoryBean.setServiceUrl(key);
    // 設置服務接口
    httpProxyFactoryBean.setServiceInterface(serviceType);
    // 得到客戶端參數
    String client = url.getParameter(Constants.CLIENT_KEY);
    if (client == null || client.length() == 0 || "simple".equals(client)) {
        // 建立SimpleHttpInvokerRequestExecutor鏈接池 使用的是JDK HttpClient
        SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() {
            @Override
            protected void prepareConnection(HttpURLConnection con,
                                             int contentLength) throws IOException {
                super.prepareConnection(con, contentLength);
                // 設置讀取超時時間
                con.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
                // 設置鏈接超時時間
                con.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
            }
        };
        httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
    } else if ("commons".equals(client)) {
        // 建立 HttpComponentsHttpInvokerRequestExecutor鏈接池 使用的是Apache HttpClient
        HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor();
        // 設置讀取超時時間
        httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
        // 設置鏈接超時時間
        httpInvokerRequestExecutor.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
        httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor);
    } else {
        throw new IllegalStateException("Unsupported http protocol client " + client + ", only supported: simple, commons");
    }
    httpProxyFactoryBean.afterPropertiesSet();
    // 返回HttpInvokerProxyFactoryBean對象
    return (T) httpProxyFactoryBean.getObject();
}

該方法是服務引用的方法,其中根據url配置simple仍是commons來選擇建立鏈接池的方式。其中的區別就是SimpleHttpInvokerRequestExecutor使用的是JDK HttpClient,HttpComponentsHttpInvokerRequestExecutor 使用的是Apache HttpClient。網絡

5.getErrorCode

@Override
protected int getErrorCode(Throwable e) {
    if (e instanceof RemoteAccessException) {
        e = e.getCause();
    }
    if (e != null) {
        Class<?> cls = e.getClass();
        if (SocketTimeoutException.class.equals(cls)) {
            // 返回超時異常
            return RpcException.TIMEOUT_EXCEPTION;
        } else if (IOException.class.isAssignableFrom(cls)) {
            // 返回網絡異常
            return RpcException.NETWORK_EXCEPTION;
        } else if (ClassNotFoundException.class.isAssignableFrom(cls)) {
            // 返回序列化異常
            return RpcException.SERIALIZATION_EXCEPTION;
        }
    }
    return super.getErrorCode(e);
}

該方法是處理異常狀況,設置錯誤碼。jvm

6.InternalHandler

private class InternalHandler implements HttpHandler {

    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response)
            throws IOException, ServletException {
        // 得到請求uri
        String uri = request.getRequestURI();
        // 得到服務暴露者HttpInvokerServiceExporter對象
        HttpInvokerServiceExporter skeleton = skeletonMap.get(uri);
        // 若是不是post,則返回碼設置500
        if (!request.getMethod().equalsIgnoreCase("POST")) {
            response.setStatus(500);
        } else {
            // 遠程地址放到上下文
            RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
            try {
                // 調用下一個調用
                skeleton.handleRequest(request, response);
            } catch (Throwable e) {
                throw new ServletException(e);
            }
        }
    }

}

該內部類實現了HttpHandler,作了設置遠程地址的邏輯。ide

後記

該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...

該文章講解了遠程調用中關於http協議的部分,內容比較簡單,能夠參考着官方文檔瞭解一下。接下來我將開始對rpc模塊關於injvm本地調用部分進行講解。源碼分析

相關文章
相關標籤/搜索