目標:介紹遠程調用中跟hessian協議相關的設計和實現,介紹dubbo-rpc-hessian的源碼。
本文講解可能是dubbo集成的第二種協議,hessian協議,Hessian 是 Caucho 開源的一個 RPC 框架,其通信效率高於 WebService 和 Java 自帶的序列化。dubbo集成hessian所提供的hessian協議相關介紹能夠參考官方文檔,我就再也不贅述。html
文檔地址: http://dubbo.apache.org/zh-cn...
該類繼承了HessianURLConnectionFactory類,是dubbo,用於建立與服務器的鏈接的內部工廠,重寫了父類中open方法。java
public class DubboHessianURLConnectionFactory extends HessianURLConnectionFactory { /** * 打開與HTTP服務器的新鏈接或循環鏈接 * @param url * @return * @throws IOException */ @Override public HessianConnection open(URL url) throws IOException { // 得到一個鏈接 HessianConnection connection = super.open(url); // 得到上下文 RpcContext context = RpcContext.getContext(); for (String key : context.getAttachments().keySet()) { // 在http協議頭裏面加入dubbo中附加值,key爲 header+key value爲附加值的value connection.addHeader(Constants.DEFAULT_EXCHANGER + key, context.getAttachment(key)); } return connection; } }
在hessian上加入dubbo本身所須要的附加值,放到協議頭裏面進行發送。git
該類是基於HttpClient封裝來實現HessianConnection接口,其中邏輯比較簡單。github
public class HttpClientConnection implements HessianConnection { /** * http客戶端對象 */ private final HttpClient httpClient; /** * 字節輸出流 */ private final ByteArrayOutputStream output; /** * http post請求對象 */ private final HttpPost request; /** * http 響應對象 */ private volatile HttpResponse response; public HttpClientConnection(HttpClient httpClient, URL url) { this.httpClient = httpClient; this.output = new ByteArrayOutputStream(); this.request = new HttpPost(url.toString()); } /** * 增長協議頭 * @param key * @param value */ @Override public void addHeader(String key, String value) { request.addHeader(new BasicHeader(key, value)); } @Override public OutputStream getOutputStream() throws IOException { return output; } /** * 發送請求 * @throws IOException */ @Override public void sendRequest() throws IOException { request.setEntity(new ByteArrayEntity(output.toByteArray())); this.response = httpClient.execute(request); } /** * 得到請求後的狀態碼 * @return */ @Override public int getStatusCode() { return response == null || response.getStatusLine() == null ? 0 : response.getStatusLine().getStatusCode(); } @Override public String getStatusMessage() { return response == null || response.getStatusLine() == null ? null : response.getStatusLine().getReasonPhrase(); } @Override public String getContentEncoding() { return (response == null || response.getEntity() == null || response.getEntity().getContentEncoding() == null) ? null : response.getEntity().getContentEncoding().getValue(); } @Override public InputStream getInputStream() throws IOException { return response == null || response.getEntity() == null ? null : response.getEntity().getContent(); } @Override public void close() throws IOException { HttpPost request = this.request; if (request != null) { request.abort(); } } @Override public void destroy() throws IOException { }
該類實現了HessianConnectionFactory接口,是建立HttpClientConnection的工廠類。該類的實現跟DubboHessianURLConnectionFactory類相似,可是DubboHessianURLConnectionFactory是標準的Hessian接口調用會採用的工廠類,而HttpClientConnectionFactory是Dubbo 的 Hessian 協議調用。固然Dubbo 的 Hessian 協議也是基於http的。apache
public class HttpClientConnectionFactory implements HessianConnectionFactory { /** * httpClient對象 */ private final HttpClient httpClient = new DefaultHttpClient(); @Override public void setHessianProxyFactory(HessianProxyFactory factory) { // 設置鏈接超時時間 HttpConnectionParams.setConnectionTimeout(httpClient.getParams(), (int) factory.getConnectTimeout()); // 設置讀取數據時阻塞鏈路的超時時間 HttpConnectionParams.setSoTimeout(httpClient.getParams(), (int) factory.getReadTimeout()); } @Override public HessianConnection open(URL url) throws IOException { // 建立一個HttpClientConnection實例 HttpClientConnection httpClientConnection = new HttpClientConnection(httpClient, url); // 得到上下文,用來得到附加值 RpcContext context = RpcContext.getContext(); // 遍歷附加值,放入到協議頭裏面 for (String key : context.getAttachments().keySet()) { httpClientConnection.addHeader(Constants.DEFAULT_EXCHANGER + key, context.getAttachment(key)); } return httpClientConnection; } }
實現了兩個方法,第一個方法是給http鏈接設置兩個參數配置,第二個方法是建立一個鏈接。服務器
該類繼承了AbstractProxyProtocol類,是hessian協議的實現類。其中實現類基於hessian協議的服務引用、服務暴露等方法。網絡
/** * http服務器集合 * key爲ip:port */ private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>(); /** * HessianSkeleto 集合 * key爲服務名 */ private final Map<String, HessianSkeleton> skeletonMap = new ConcurrentHashMap<String, HessianSkeleton>(); /** * HttpBinder對象,默認是jetty實現 */ private HttpBinder httpBinder;
@Override protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException { // 得到ip地址 String addr = getAddr(url); // 得到http服務器對象 HttpServer server = serverMap.get(addr); // 若是爲空,則從新建立一個server,而後放入集合 if (server == null) { server = httpBinder.bind(url, new HessianHandler()); serverMap.put(addr, server); } // 得到服務path final String path = url.getAbsolutePath(); // 建立Hessian服務端對象 final HessianSkeleton skeleton = new HessianSkeleton(impl, type); // 加入集合 skeletonMap.put(path, skeleton); // 得到通用的path final String genericPath = path + "/" + Constants.GENERIC_KEY; // 加入集合 skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class)); // 返回一個線程 return new Runnable() { @Override public void run() { skeletonMap.remove(path); skeletonMap.remove(genericPath); } }; }
該方法是服務暴露的主要邏輯實現。框架
@Override @SuppressWarnings("unchecked") protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException { // 得到泛化的參數 String generic = url.getParameter(Constants.GENERIC_KEY); // 是不是泛化調用 boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class); // 若是是泛化調用。則設置泛化的path和附加值 if (isGeneric) { RpcContext.getContext().setAttachment(Constants.GENERIC_KEY, generic); url = url.setPath(url.getPath() + "/" + Constants.GENERIC_KEY); } // 建立代理工廠 HessianProxyFactory hessianProxyFactory = new HessianProxyFactory(); // 是不是Hessian2的請求 默認爲否 boolean isHessian2Request = url.getParameter(Constants.HESSIAN2_REQUEST_KEY, Constants.DEFAULT_HESSIAN2_REQUEST); // 設置是否應使用Hessian協議的版本2來解析請求 hessianProxyFactory.setHessian2Request(isHessian2Request); // 是否應爲遠程調用啓用重載方法,默認爲否 boolean isOverloadEnabled = url.getParameter(Constants.HESSIAN_OVERLOAD_METHOD_KEY, Constants.DEFAULT_HESSIAN_OVERLOAD_METHOD); // 設置是否應爲遠程調用啓用重載方法。 hessianProxyFactory.setOverloadEnabled(isOverloadEnabled); // 得到client實現方式,默認爲jdk String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT); if ("httpclient".equals(client)) { // 用http來建立 hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory()); } else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) { // 拋出不支持的協議異常 throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!"); } else { // 建立一個HessianConnectionFactory對象 HessianConnectionFactory factory = new DubboHessianURLConnectionFactory(); // 設置代理工廠 factory.setHessianProxyFactory(hessianProxyFactory); // 設置工廠 hessianProxyFactory.setConnectionFactory(factory); } // 得到超時時間 int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 設置超時時間 hessianProxyFactory.setConnectTimeout(timeout); hessianProxyFactory.setReadTimeout(timeout); // 建立代理 return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader()); }
該方法是服務引用的主要邏輯實現,根據客戶端配置,來選擇標準 Hessian 接口調用仍是Dubbo 的 Hessian 協議調用。ide
@Override protected int getErrorCode(Throwable e) { // 若是屬於HessianConnectionException異常 if (e instanceof HessianConnectionException) { if (e.getCause() != null) { Class<?> cls = e.getCause().getClass(); // 若是屬於超時異常,則返回超時異常 if (SocketTimeoutException.class.equals(cls)) { return RpcException.TIMEOUT_EXCEPTION; } } // 不然返回網絡異常 return RpcException.NETWORK_EXCEPTION; } else if (e instanceof HessianMethodSerializationException) { // 序列化異常 return RpcException.SERIALIZATION_EXCEPTION; } return super.getErrorCode(e); }
該方法是針對異常的處理。源碼分析
private class HessianHandler implements HttpHandler { @Override public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { // 得到請求的uri String uri = request.getRequestURI(); // 得到對應的HessianSkeleton對象 HessianSkeleton skeleton = skeletonMap.get(uri); // 若是若是不是post方法 if (!request.getMethod().equalsIgnoreCase("POST")) { // 返回狀態設置爲500 response.setStatus(500); } else { // 設置遠程地址 RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort()); // 得到請求頭內容 Enumeration<String> enumeration = request.getHeaderNames(); // 遍歷請求頭內容 while (enumeration.hasMoreElements()) { String key = enumeration.nextElement(); // 若是key開頭是deader,則把附加值取出來放入上下文 if (key.startsWith(Constants.DEFAULT_EXCHANGER)) { RpcContext.getContext().setAttachment(key.substring(Constants.DEFAULT_EXCHANGER.length()), request.getHeader(key)); } } try { // 執行下一個 skeleton.invoke(request.getInputStream(), response.getOutputStream()); } catch (Throwable e) { throw new ServletException(e); } } } }
該內部類是Hessian的處理器,用來處理請求中的協議頭內容。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了遠程調用中關於hessian協議的部分,內容比較簡單,能夠參考着官方文檔瞭解一下。接下來我將開始對rpc模塊關於hessian協議部分進行講解。