目標:介紹遠程調用中跟http協議相關的設計和實現,介紹dubbo-rpc-http的源碼。
基於HTTP表單的遠程調用協議,採用 Spring 的HttpInvoker實現,關於http協議就不用多說了吧。java
該類繼承了RemoteInvocation類,是在RemoteInvocation上增長了泛化調用的參數設置,以及增長了dubbo自己須要的附加值設置。git
public class HttpRemoteInvocation extends RemoteInvocation { private static final long serialVersionUID = 1L; /** * dubbo的附加值名稱 */ private static final String dubboAttachmentsAttrName = "dubbo.attachments"; public HttpRemoteInvocation(MethodInvocation methodInvocation) { super(methodInvocation); // 把附加值加入到會話域的屬性裏面 addAttribute(dubboAttachmentsAttrName, new HashMap<String, String>(RpcContext.getContext().getAttachments())); } @Override public Object invoke(Object targetObject) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { // 得到上下文 RpcContext context = RpcContext.getContext(); // 得到附加值 context.setAttachments((Map<String, String>) getAttribute(dubboAttachmentsAttrName)); // 泛化標誌 String generic = (String) getAttribute(Constants.GENERIC_KEY); // 若是不爲空,則設置泛化標誌 if (StringUtils.isNotEmpty(generic)) { context.setAttachment(Constants.GENERIC_KEY, generic); } try { // 調用下一個調用鏈 return super.invoke(targetObject); } finally { context.setAttachments(null); } } }
該類是http實現的核心,跟我在《dubbo源碼解析(二十五)遠程調用——hessian協議》中講到的HessianProtocol實現有不少地方類似。github
/** * 默認的端口號 */ public static final int DEFAULT_PORT = 80; /** * http服務器集合 */ private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>(); /** * Spring HttpInvokerServiceExporter 集合 */ private final Map<String, HttpInvokerServiceExporter> skeletonMap = new ConcurrentHashMap<String, HttpInvokerServiceExporter>(); /** * HttpBinder對象 */ private HttpBinder httpBinder;
@Override protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException { // 得到ip地址 String addr = getAddr(url); // 得到http服務器 HttpServer server = serverMap.get(addr); // 若是服務器爲空,則從新建立服務器,而且加入到集合 if (server == null) { server = httpBinder.bind(url, new InternalHandler()); serverMap.put(addr, server); } // 得到服務path final String path = url.getAbsolutePath(); // 加入集合 skeletonMap.put(path, createExporter(impl, type)); // 通用path final String genericPath = path + "/" + Constants.GENERIC_KEY; // 添加泛化的服務調用 skeletonMap.put(genericPath, createExporter(impl, GenericService.class)); return new Runnable() { @Override public void run() { skeletonMap.remove(path); skeletonMap.remove(genericPath); } }; }
該方法是暴露服務等邏輯,由於dubbo實現http協議採用了Spring 的HttpInvoker實現,因此調用了createExporter方法來建立建立HttpInvokerServiceExporter。spring
private <T> HttpInvokerServiceExporter createExporter(T impl, Class<?> type) { // 建立HttpInvokerServiceExporter final HttpInvokerServiceExporter httpServiceExporter = new HttpInvokerServiceExporter(); // 設置要訪問的服務的接口 httpServiceExporter.setServiceInterface(type); // 設置服務實現 httpServiceExporter.setService(impl); try { // 在BeanFactory設置了全部提供的bean屬性,初始化bean的時候執行,能夠針對某個具體的bean進行配 httpServiceExporter.afterPropertiesSet(); } catch (Exception e) { throw new RpcException(e.getMessage(), e); } return httpServiceExporter; }
該方法是建立一個spring 的HttpInvokerServiceExporter。服務器
@Override @SuppressWarnings("unchecked") protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException { // 得到泛化配置 final String generic = url.getParameter(Constants.GENERIC_KEY); // 是否爲泛化調用 final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class); // 建立HttpInvokerProxyFactoryBean final HttpInvokerProxyFactoryBean httpProxyFactoryBean = new HttpInvokerProxyFactoryBean(); // 設置RemoteInvocation的工廠類 httpProxyFactoryBean.setRemoteInvocationFactory(new RemoteInvocationFactory() { /** * 爲給定的AOP方法調用建立一個新的RemoteInvocation對象。 * @param methodInvocation * @return */ @Override public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) { // 新建一個HttpRemoteInvocation RemoteInvocation invocation = new HttpRemoteInvocation(methodInvocation); // 若是是泛化調用 if (isGeneric) { // 設置標誌 invocation.addAttribute(Constants.GENERIC_KEY, generic); } return invocation; } }); // 得到identity message String key = url.toIdentityString(); // 若是是泛化調用 if (isGeneric) { key = key + "/" + Constants.GENERIC_KEY; } // 設置服務url httpProxyFactoryBean.setServiceUrl(key); // 設置服務接口 httpProxyFactoryBean.setServiceInterface(serviceType); // 得到客戶端參數 String client = url.getParameter(Constants.CLIENT_KEY); if (client == null || client.length() == 0 || "simple".equals(client)) { // 建立SimpleHttpInvokerRequestExecutor鏈接池 使用的是JDK HttpClient SimpleHttpInvokerRequestExecutor httpInvokerRequestExecutor = new SimpleHttpInvokerRequestExecutor() { @Override protected void prepareConnection(HttpURLConnection con, int contentLength) throws IOException { super.prepareConnection(con, contentLength); // 設置讀取超時時間 con.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); // 設置鏈接超時時間 con.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT)); } }; httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor); } else if ("commons".equals(client)) { // 建立 HttpComponentsHttpInvokerRequestExecutor鏈接池 使用的是Apache HttpClient HttpComponentsHttpInvokerRequestExecutor httpInvokerRequestExecutor = new HttpComponentsHttpInvokerRequestExecutor(); // 設置讀取超時時間 httpInvokerRequestExecutor.setReadTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); // 設置鏈接超時時間 httpInvokerRequestExecutor.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT)); httpProxyFactoryBean.setHttpInvokerRequestExecutor(httpInvokerRequestExecutor); } else { throw new IllegalStateException("Unsupported http protocol client " + client + ", only supported: simple, commons"); } httpProxyFactoryBean.afterPropertiesSet(); // 返回HttpInvokerProxyFactoryBean對象 return (T) httpProxyFactoryBean.getObject(); }
該方法是服務引用的方法,其中根據url配置simple仍是commons來選擇建立鏈接池的方式。其中的區別就是SimpleHttpInvokerRequestExecutor使用的是JDK HttpClient,HttpComponentsHttpInvokerRequestExecutor 使用的是Apache HttpClient。網絡
@Override protected int getErrorCode(Throwable e) { if (e instanceof RemoteAccessException) { e = e.getCause(); } if (e != null) { Class<?> cls = e.getClass(); if (SocketTimeoutException.class.equals(cls)) { // 返回超時異常 return RpcException.TIMEOUT_EXCEPTION; } else if (IOException.class.isAssignableFrom(cls)) { // 返回網絡異常 return RpcException.NETWORK_EXCEPTION; } else if (ClassNotFoundException.class.isAssignableFrom(cls)) { // 返回序列化異常 return RpcException.SERIALIZATION_EXCEPTION; } } return super.getErrorCode(e); }
該方法是處理異常狀況,設置錯誤碼。jvm
private class InternalHandler implements HttpHandler { @Override public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { // 得到請求uri String uri = request.getRequestURI(); // 得到服務暴露者HttpInvokerServiceExporter對象 HttpInvokerServiceExporter skeleton = skeletonMap.get(uri); // 若是不是post,則返回碼設置500 if (!request.getMethod().equalsIgnoreCase("POST")) { response.setStatus(500); } else { // 遠程地址放到上下文 RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort()); try { // 調用下一個調用 skeleton.handleRequest(request, response); } catch (Throwable e) { throw new ServletException(e); } } } }
該內部類實現了HttpHandler,作了設置遠程地址的邏輯。ide
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了遠程調用中關於http協議的部分,內容比較簡單,能夠參考着官方文檔瞭解一下。接下來我將開始對rpc模塊關於injvm本地調用部分進行講解。源碼分析