目標:介紹遠程調用中跟hessian協議相關的設計和實現,介紹dubbo-rpc-hessian的源碼。html
本文講解可能是dubbo集成的第二種協議,hessian協議,Hessian 是 Caucho 開源的一個 RPC 框架,其通信效率高於 WebService 和 Java 自帶的序列化。dubbo集成hessian所提供的hessian協議相關介紹能夠參考官方文檔,我就再也不贅述。java
文檔地址:dubbo.apache.org/zh-cn/docs/…git
該類繼承了HessianURLConnectionFactory類,是dubbo,用於建立與服務器的鏈接的內部工廠,重寫了父類中open方法。github
public class DubboHessianURLConnectionFactory extends HessianURLConnectionFactory {
/** * 打開與HTTP服務器的新鏈接或循環鏈接 * @param url * @return * @throws IOException */
@Override
public HessianConnection open(URL url) throws IOException {
// 得到一個鏈接
HessianConnection connection = super.open(url);
// 得到上下文
RpcContext context = RpcContext.getContext();
for (String key : context.getAttachments().keySet()) {
// 在http協議頭裏面加入dubbo中附加值,key爲 header+key value爲附加值的value
connection.addHeader(Constants.DEFAULT_EXCHANGER + key, context.getAttachment(key));
}
return connection;
}
}
複製代碼
在hessian上加入dubbo本身所須要的附加值,放到協議頭裏面進行發送。apache
該類是基於HttpClient封裝來實現HessianConnection接口,其中邏輯比較簡單。服務器
public class HttpClientConnection implements HessianConnection {
/** * http客戶端對象 */
private final HttpClient httpClient;
/** * 字節輸出流 */
private final ByteArrayOutputStream output;
/** * http post請求對象 */
private final HttpPost request;
/** * http 響應對象 */
private volatile HttpResponse response;
public HttpClientConnection(HttpClient httpClient, URL url) {
this.httpClient = httpClient;
this.output = new ByteArrayOutputStream();
this.request = new HttpPost(url.toString());
}
/** * 增長協議頭 * @param key * @param value */
@Override
public void addHeader(String key, String value) {
request.addHeader(new BasicHeader(key, value));
}
@Override
public OutputStream getOutputStream() throws IOException {
return output;
}
/** * 發送請求 * @throws IOException */
@Override
public void sendRequest() throws IOException {
request.setEntity(new ByteArrayEntity(output.toByteArray()));
this.response = httpClient.execute(request);
}
/** * 得到請求後的狀態碼 * @return */
@Override
public int getStatusCode() {
return response == null || response.getStatusLine() == null ? 0 : response.getStatusLine().getStatusCode();
}
@Override
public String getStatusMessage() {
return response == null || response.getStatusLine() == null ? null : response.getStatusLine().getReasonPhrase();
}
@Override
public String getContentEncoding() {
return (response == null || response.getEntity() == null || response.getEntity().getContentEncoding() == null) ? null : response.getEntity().getContentEncoding().getValue();
}
@Override
public InputStream getInputStream() throws IOException {
return response == null || response.getEntity() == null ? null : response.getEntity().getContent();
}
@Override
public void close() throws IOException {
HttpPost request = this.request;
if (request != null) {
request.abort();
}
}
@Override
public void destroy() throws IOException {
}
複製代碼
該類實現了HessianConnectionFactory接口,是建立HttpClientConnection的工廠類。該類的實現跟DubboHessianURLConnectionFactory類相似,可是DubboHessianURLConnectionFactory是標準的Hessian接口調用會採用的工廠類,而HttpClientConnectionFactory是Dubbo 的 Hessian 協議調用。固然Dubbo 的 Hessian 協議也是基於http的。網絡
public class HttpClientConnectionFactory implements HessianConnectionFactory {
/** * httpClient對象 */
private final HttpClient httpClient = new DefaultHttpClient();
@Override
public void setHessianProxyFactory(HessianProxyFactory factory) {
// 設置鏈接超時時間
HttpConnectionParams.setConnectionTimeout(httpClient.getParams(), (int) factory.getConnectTimeout());
// 設置讀取數據時阻塞鏈路的超時時間
HttpConnectionParams.setSoTimeout(httpClient.getParams(), (int) factory.getReadTimeout());
}
@Override
public HessianConnection open(URL url) throws IOException {
// 建立一個HttpClientConnection實例
HttpClientConnection httpClientConnection = new HttpClientConnection(httpClient, url);
// 得到上下文,用來得到附加值
RpcContext context = RpcContext.getContext();
// 遍歷附加值,放入到協議頭裏面
for (String key : context.getAttachments().keySet()) {
httpClientConnection.addHeader(Constants.DEFAULT_EXCHANGER + key, context.getAttachment(key));
}
return httpClientConnection;
}
}
複製代碼
實現了兩個方法,第一個方法是給http鏈接設置兩個參數配置,第二個方法是建立一個鏈接。框架
該類繼承了AbstractProxyProtocol類,是hessian協議的實現類。其中實現類基於hessian協議的服務引用、服務暴露等方法。ide
/** * http服務器集合 * key爲ip:port */
private final Map<String, HttpServer> serverMap = new ConcurrentHashMap<String, HttpServer>();
/** * HessianSkeleto 集合 * key爲服務名 */
private final Map<String, HessianSkeleton> skeletonMap = new ConcurrentHashMap<String, HessianSkeleton>();
/** * HttpBinder對象,默認是jetty實現 */
private HttpBinder httpBinder;
複製代碼
@Override
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
// 得到ip地址
String addr = getAddr(url);
// 得到http服務器對象
HttpServer server = serverMap.get(addr);
// 若是爲空,則從新建立一個server,而後放入集合
if (server == null) {
server = httpBinder.bind(url, new HessianHandler());
serverMap.put(addr, server);
}
// 得到服務path
final String path = url.getAbsolutePath();
// 建立Hessian服務端對象
final HessianSkeleton skeleton = new HessianSkeleton(impl, type);
// 加入集合
skeletonMap.put(path, skeleton);
// 得到通用的path
final String genericPath = path + "/" + Constants.GENERIC_KEY;
// 加入集合
skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class));
// 返回一個線程
return new Runnable() {
@Override
public void run() {
skeletonMap.remove(path);
skeletonMap.remove(genericPath);
}
};
}
複製代碼
該方法是服務暴露的主要邏輯實現。源碼分析
@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
// 得到泛化的參數
String generic = url.getParameter(Constants.GENERIC_KEY);
// 是不是泛化調用
boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
// 若是是泛化調用。則設置泛化的path和附加值
if (isGeneric) {
RpcContext.getContext().setAttachment(Constants.GENERIC_KEY, generic);
url = url.setPath(url.getPath() + "/" + Constants.GENERIC_KEY);
}
// 建立代理工廠
HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
// 是不是Hessian2的請求 默認爲否
boolean isHessian2Request = url.getParameter(Constants.HESSIAN2_REQUEST_KEY, Constants.DEFAULT_HESSIAN2_REQUEST);
// 設置是否應使用Hessian協議的版本2來解析請求
hessianProxyFactory.setHessian2Request(isHessian2Request);
// 是否應爲遠程調用啓用重載方法,默認爲否
boolean isOverloadEnabled = url.getParameter(Constants.HESSIAN_OVERLOAD_METHOD_KEY, Constants.DEFAULT_HESSIAN_OVERLOAD_METHOD);
// 設置是否應爲遠程調用啓用重載方法。
hessianProxyFactory.setOverloadEnabled(isOverloadEnabled);
// 得到client實現方式,默認爲jdk
String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT);
if ("httpclient".equals(client)) {
// 用http來建立
hessianProxyFactory.setConnectionFactory(new HttpClientConnectionFactory());
} else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
// 拋出不支持的協議異常
throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
} else {
// 建立一個HessianConnectionFactory對象
HessianConnectionFactory factory = new DubboHessianURLConnectionFactory();
// 設置代理工廠
factory.setHessianProxyFactory(hessianProxyFactory);
// 設置工廠
hessianProxyFactory.setConnectionFactory(factory);
}
// 得到超時時間
int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 設置超時時間
hessianProxyFactory.setConnectTimeout(timeout);
hessianProxyFactory.setReadTimeout(timeout);
// 建立代理
return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
}
複製代碼
該方法是服務引用的主要邏輯實現,根據客戶端配置,來選擇標準 Hessian 接口調用仍是Dubbo 的 Hessian 協議調用。
@Override
protected int getErrorCode(Throwable e) {
// 若是屬於HessianConnectionException異常
if (e instanceof HessianConnectionException) {
if (e.getCause() != null) {
Class<?> cls = e.getCause().getClass();
// 若是屬於超時異常,則返回超時異常
if (SocketTimeoutException.class.equals(cls)) {
return RpcException.TIMEOUT_EXCEPTION;
}
}
// 不然返回網絡異常
return RpcException.NETWORK_EXCEPTION;
} else if (e instanceof HessianMethodSerializationException) {
// 序列化異常
return RpcException.SERIALIZATION_EXCEPTION;
}
return super.getErrorCode(e);
}
複製代碼
該方法是針對異常的處理。
private class HessianHandler implements HttpHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
// 得到請求的uri
String uri = request.getRequestURI();
// 得到對應的HessianSkeleton對象
HessianSkeleton skeleton = skeletonMap.get(uri);
// 若是若是不是post方法
if (!request.getMethod().equalsIgnoreCase("POST")) {
// 返回狀態設置爲500
response.setStatus(500);
} else {
// 設置遠程地址
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
// 得到請求頭內容
Enumeration<String> enumeration = request.getHeaderNames();
// 遍歷請求頭內容
while (enumeration.hasMoreElements()) {
String key = enumeration.nextElement();
// 若是key開頭是deader,則把附加值取出來放入上下文
if (key.startsWith(Constants.DEFAULT_EXCHANGER)) {
RpcContext.getContext().setAttachment(key.substring(Constants.DEFAULT_EXCHANGER.length()),
request.getHeader(key));
}
}
try {
// 執行下一個
skeleton.invoke(request.getInputStream(), response.getOutputStream());
} catch (Throwable e) {
throw new ServletException(e);
}
}
}
}
複製代碼
該內部類是Hessian的處理器,用來處理請求中的協議頭內容。
該部分相關的源碼解析地址:github.com/CrazyHZM/in…
該文章講解了遠程調用中關於hessian協議的部分,內容比較簡單,能夠參考着官方文檔瞭解一下。接下來我將開始對rpc模塊關於Http協議部分進行講解。