先看看這個吧, Hadoop 中 RPC 機制簡介, java
Hadoop 中 RPC 機制的實現都在 org.apache.hadoop.ipc 這個包裏, 下面都將圍繞這個包解讀 Hadoop RPC 機制apache
1. RPC.getServer(Object instance, String bindAddress, int port, Configuration conf), 在Hadoop 1. 0中, 是這樣建立一個 RPC.Server 對象的, 數組
那麼, 在 Hadoop 2. 0中, 在 Server 端如何建立一個 RPC.Server 對象呢? 代碼以下:
public class RPCServer { public static void main(String[] args) throws HadoopIllegalArgumentException, IOException { // 設置4個必需參數: // setBindAddress("") : Server端的IP地址 // setPort(1234) : 端口 // setProtocol(LoginProtocol.class) : setRPC協議接口的class對象 // setInstance(new LoginProtocolImpl()) : RPC協議接口的實現類的實例 RPC.Server server = new RPC.Builder(new Configuration()) .setBindAddress("").setPort(1234) .setProtocol(LoginProtocol.class) .setInstance(new LoginProtocolImpl()).build(); server.start(); } }
好的, 繼續跟蹤源碼, 在 RPC 的內部類 Builder 中, 有一個 builder() 方法, 這應該是工廠模式oop
只要知道 RPC.Builder.builder() 這個方法的目的是構造一個 RPC.Server 實例對象
RPC$Builder.build() 方法源碼以下ui
/** * Build the RPC Server. */ public Server build() throws IOException, HadoopIllegalArgumentException { ... // getProtocolEngine() 獲取一個RPC協議接口的引擎對象 WritableRPCEngine // WritableRPCEngine.getServer() 經過WritableRPCEngine獲取RPC.Server實例對象 return getProtocolEngine(this.protocol, this.conf).getServer( this.protocol, this.instance, this.bindAddress, this.port, this.numHandlers, this.numReaders, this.queueSizePerHandler, this.verbose, this.conf, this.secretManager, this.portRangeConfig); } }
RPC$Builder.build() 方法最終會調用 WritableRpcEngine.getServer(Class<?>, Object, String, int, int, int, int, boolean, Configuration, SecretManager<TokenIdentifier>, String) 方法, 獲取一個 RPC.Server實例對象, this
WritableRpcEngine.getServer() 源碼以下:spa
/* * Construct a server for a protocol implementation instance listening on a port and address. */ @Override public RPC.Server getServer(Class<?> protocolClass, Object protocolImpl, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException { // 建立一個RPC服務端代理對象server // protocolClass: 被代理RPC協議接口( LoginProtocol ) // protocolImpl: 代理代理RPC協議接口的實現類( LoginProtocolImp ) // conf: 配置信息 // port: RPC服務端的監聽端口 // numHandlers: RPC服務端Handler線程的數目 // ... return new Server(protocolClass, protocolImpl, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, portRangeConfig); }
好的, 如今已經構造了一個 RPC.Server 的實例對象 server , 監聽 Server 端的 "1234" 端口, Client 端只要調用代理RPC 代理對象( proxy )的方法( login() ), Server 端就會監聽到這個方法調用, 並調用 Server 端RPC協議接口( LoginProtocol ) 的實現方法 LoginProtocolImp.login()
如今啓動 Server 端, server.start(), 靜候 Client 端的RPC請求,
如今來看看 Client 端吧!
2. Client 端獲取 RPC 代理對象, LoginProtocol proxy = RPC.getProxy()
public class LoginClient { public static void main(String[] args) throws IOException { // getProxy()參數: // LoginProtocol.class : RPC協議接口的class對象 // 1L : RPC協議接口的版本信息(versionID) // new InetSocketAddress("", 1234) : Server端的IP地址及端口 // conf : Configuration實例 LoginProtocol proxy = RPC.getProxy(LoginProtocol.class, 1L, new InetSocketAddress("", 1234), new Configuration()); String result = proxy.login("rpc", "xxx"); System.out.println(result); } }
Client 端如何獲取一個RPC代理對象呢? 源碼以下:
RPC.getProxy(Class<T>, long, InetSocketAddress, Configuration)方法以下:
/** * Construct a client-side proxy object with the default SocketFactory * @param <T> * @param protocol * @param clientVersion * @param addr * @param conf * @return a proxy instance * @throws IOException */ public static <T> T getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy(); }
RPC.getProxy(Class<T>, long, InetSocketAddress, Configuration) 方法最終會調用
WritableRpcEngine.getProxy(Class<T>, long, InetSocketAddress, UserGroupInformation, Configuration, SocketFactory, int, RetryPolicy) 方法, 源碼以下:
/** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. * @param <T> */ @Override public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException { ... // 構造Client端RPC代理對象( proxy ) // protocol.getClassLoader() : RPC協議接口LoginProtocol的類加載器 // new Class[] { protocol } : RPC協議接口LoginProtocol的接口對象 // new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout) : InvocationHandler的實例對象 T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); return new ProtocolProxy<T>(protocol, proxy, true); }
使用 Proxy 類的靜態方法 getProxy() 構造 Client 端 RPC 代理對象( proxy ),
如何構造? Proxy.newProxyInstance() 方法須要3個參數:
1). RPC 協議接口 LoginProtocol 的類加載器,
2). RPC 協議接口 LoginProtocol 的 Class 對象,
3). WritableRpcEngine.Invoker 實例對象, WritableRpcEngine.Invoker 實現了 InvocationHandler 接口
3. 調用 proxy.login() 方法, 如今已經構造了一個 Client 端 RPC 代理對象( proxy ), 如今調用 RPC 代理對象( proxy )的 login() 方法, Client 端會發生什麼呢?
先來看一下 WritableRpcEngine.Invoker 這個類
private static class Invoker implements RpcInvocationHandler { private Client.ConnectionId remoteId; // 鏈接標識符 private Client client; // RPC客戶端, 最重要的成員變量 private boolean isClosed = false; ... @Override public Object invoke(Object proxy, Method method, Object[] args) { // 最重要的方法 } }
WritableRpcEngine.Invoker 的構造方法:
WritableRpcEngine$Invoker.invoke(Object, Method, Object[]) 方法以下:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ... // 調用RPC代理對象proxy的login()方法, Client端最終的方法調用在這裏 ObjectWritable value = (ObjectWritable) client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId); ... return value.get(); }
先來看一下WritableRpcEngine.Invocation這個類, 在 Hadoop 1.0中是 RPC.Invocation
/** A method invocation, including the method name and its parameters */ private static class Invocation implements Writable, Configurable { private String methodName; // RPC代理對象調用的方法名 private Class<?>[] parameterClasses; // 方法的參數列表的Class對象數組 private Object[] parameters; // 方法的參數列表 ... private long clientVersion; // RPC協議接口的VersionID private int clientMethodsHash; // private String declaringClassProtocolName;
WritableRpcEngine.Invocation 的構造方法:
WritableRpcEngine$Invocation.<init>(Method, Object[])
public Invocation(Method method, Object[] parameters) { this.methodName = method.getName(); // 獲取RPC代理對象調用的方法名 this.parameterClasses = method.getParameterTypes(); // 獲取方法的參數列表的Class對象數組 this.parameters = parameters; // 獲取方法的參數列表 this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass()); // 獲取RPC協議接口的VersionID ... }
好的, 如今 Invocation 對象建立完成
再回去來看看 Client.call(RPC$RpcKind, Writable, Client$ConnectionId, int) 方法
Client.call(RPC$RpcKind, Writable, Client$ConnectionId, int) 這個方法, 首先根據輸入參數 param( Invocation實例對象 )構造一個 Client.Call 實例對象. 再經過 getConnection() 方法獲取 RPC 鏈接 connection, 再經過 connection.sendRpcReques(call) 方法把 RPC請求發送出去.
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass) throws IOException { final Call call = createCall(rpcKind, rpcRequest); // 建立一個Client.Call實例對象 Connection connection = getConnection(remoteId, call, serviceClass); // 獲取Client.Connection實例 try { connection.sendRpcRequest(call); // 經過connection發送RPC請求 } catch (Exception e) { ... } boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // 等待調用完成的返回結果 } catch (InterruptedException ie) { interrupted = true; // 遠程調用被打斷 } } if (interrupted) { Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { // 遠程調用異常返回, 拋出異常給本地調用者 call.error.fillInStackTrace(); throw call.error; } else { // 本地處理出現異常 InetSocketAddress address = connection.getRemoteAddress(); throw NetUtils.wrapException(address.getHostName(), address.getPort(), NetUtils.getHostname(), 0, call.error); } } else { return call.getRpcResponse(); // 遠程調用正常結束, 返回結果 } } }
RPC 請求發送出去後, Client 端開始等待( call.wait() ) Server 端發送回來的應答, Client.Call.wait() 方法必然有對應的 Client.Call.notify() 方法, 在 Client.Call.callComplete() 方法中調用 notify() 方法
問題又來了, Client.Call.callComplete() 方法什麼時候被調用?
因此 Client.Call.callComplete() 方法最終是被 Client.Connection.receiveRpcResponse() 所調用, 如方法名receiveRpcResponse, 確定是在 Client 端接收到了 Server 端的應答時被調用.
4. Server 端在 Client.call(RPC$RpcKind, Writable, Client$ConnectionId, int) 方法中調用connection.sendRpcRequest(call) 後,
Server 端接收這個來自 Client 端的RPC請求後, 如何處理, 並調用具體的方法, 最後再向 Client 端發送調用響應呢?
篇幅有限, Server 端的源碼分析, 請聽下文分解, Hadoop 中 RPC 機制詳解之 Server 端
調用 proxy.login() 方法後再到這裏, Client 端的處理, 其實只比動態代理稍微複雜: Client 端 RPC 代理對象proxy 的方法調用, 被 InvocationHandler 實例對象的 invoke() 方法所捕獲, RPC 請求被打包成 Invocation 實例對象, 發送到 Server 端, Client 端等待 Server 端的響應