Hadoop中RPC機制詳解之Client端

先看看這個吧, Hadoop 中 RPC 機制簡介java

Hadoop 中 RPC 機制的實現都在 org.apache.hadoop.ipc 這個包裏, 下面都將圍繞這個包解讀 Hadoop RPC 機制apache


1. RPC.getServer(Object instance, String bindAddress, int port, Configuration conf), 在Hadoop 1. 0中, 是這樣建立一個 RPC.Server 對象的, 數組

那麼, 在 Hadoop 2. 0中, 在 Server 端如何建立一個 RPC.Server 對象呢? 代碼以下: 
ide

public class RPCServer {
	public static void main(String[] args)
			throws HadoopIllegalArgumentException, IOException {
		// 設置4個必需參數:
		// setBindAddress("192.168.8.101") : Server端的IP地址
		// setPort(1234) : 端口
		// setProtocol(LoginProtocol.class) : setRPC協議接口的class對象
		// setInstance(new LoginProtocolImpl()) : RPC協議接口的實現類的實例
		RPC.Server server = new RPC.Builder(new Configuration())
				.setBindAddress("192.168.8.101").setPort(1234)
				.setProtocol(LoginProtocol.class)
				.setInstance(new LoginProtocolImpl()).build();
		server.start();
	}
}

好的, 繼續跟蹤源碼, 在 RPC 的內部類 Builder 中, 有一個 builder() 方法, 這應該是工廠模式oop

只要知道 RPC.Builder.builder() 這個方法的目的是構造一個 RPC.Server 實例對象
源碼分析

RPC$Builder.build() 方法源碼以下ui

/**
 * Build the RPC Server.
 */
public Server build() throws IOException, HadoopIllegalArgumentException {
      ...
      // getProtocolEngine() 獲取一個RPC協議接口的引擎對象 WritableRPCEngine
      // WritableRPCEngine.getServer() 經過WritableRPCEngine獲取RPC.Server實例對象
      return getProtocolEngine(this.protocol, this.conf).getServer(
          this.protocol, this.instance, this.bindAddress, this.port,
          this.numHandlers, this.numReaders, this.queueSizePerHandler,
          this.verbose, this.conf, this.secretManager, this.portRangeConfig);
    }
 }

RPC$Builder.build() 方法最終會調用 WritableRpcEngine.getServer(Class<?>, Object, String, int, int, int, int, boolean, Configuration, SecretManager<TokenIdentifier>, String) 方法, 獲取一個 RPC.Server實例對象, this

WritableRpcEngine.getServer() 源碼以下:spa

/* 
 * Construct a server for a protocol implementation instance listening on a port and address.
 */
@Override
public RPC.Server getServer(Class<?> protocolClass,
                      Object protocolImpl, String bindAddress, int port,
                      int numHandlers, int numReaders, int queueSizePerHandler,
                      boolean verbose, Configuration conf,
                      SecretManager<? extends TokenIdentifier> secretManager,
                      String portRangeConfig) 
    throws IOException {
    // 建立一個RPC服務端代理對象server
    // protocolClass: 被代理RPC協議接口( LoginProtocol )
    // protocolImpl: 代理代理RPC協議接口的實現類( LoginProtocolImp )
    // conf: 配置信息
    // port: RPC服務端的監聽端口
    // numHandlers: RPC服務端Handler線程的數目
    // ...
    return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
        portRangeConfig);
  }


好的, 如今已經構造了一個 RPC.Server 的實例對象 server , 監聽 Server 端的 "1234" 端口, Client 端只要調用代理RPC 代理對象( proxy )的方法( login() ), Server 端就會監聽到這個方法調用, 並調用 Server 端RPC協議接口( LoginProtocol ) 的實現方法 LoginProtocolImp.login()
.net

如今啓動 Server 端, server.start(), 靜候 Client 端的RPC請求,

如今來看看 Client 端吧!


2. Client 端獲取 RPC 代理對象, LoginProtocol proxy = RPC.getProxy()

public class LoginClient {
	public static void main(String[] args) throws IOException {
		// getProxy()參數:
		// LoginProtocol.class : RPC協議接口的class對象
		// 1L : RPC協議接口的版本信息(versionID)
		// new InetSocketAddress("192.168.8.101", 1234) : Server端的IP地址及端口
		// conf : Configuration實例
		LoginProtocol proxy = RPC.getProxy(LoginProtocol.class, 1L, new InetSocketAddress("192.168.8.101", 1234),
				new Configuration());
		String result = proxy.login("rpc", "xxx");
		System.out.println(result);
	}
}

Client 端如何獲取一個RPC代理對象呢? 源碼以下:

RPC.getProxy(Class<T>, long, InetSocketAddress, Configuration)方法以下:

/**
  * Construct a client-side proxy object with the default SocketFactory
  * @param <T>
  * @param protocol
  * @param clientVersion
  * @param addr
  * @param conf
  * @return a proxy instance
  * @throws IOException
 */
public static <T> T getProxy(Class<T> protocol,
                                 long clientVersion,
                                 InetSocketAddress addr, Configuration conf)
     throws IOException {
     return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
}

RPC.getProxy(Class<T>, long, InetSocketAddress, Configuration) 方法最終會調用   

 WritableRpcEngine.getProxy(Class<T>, long, InetSocketAddress, UserGroupInformation, Configuration, SocketFactory, int, RetryPolicy) 方法, 源碼以下:

/** Construct a client-side proxy object that implements the named protocol,
  * talking to a server at the named address. 
  * @param <T>
  */
@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                         InetSocketAddress addr, UserGroupInformation ticket,
                         Configuration conf, SocketFactory factory,
                         int rpcTimeout, RetryPolicy connectionRetryPolicy)
    throws IOException {    
    ...
    // 構造Client端RPC代理對象( proxy )
    // protocol.getClassLoader() : RPC協議接口LoginProtocol的類加載器
    // new Class[] { protocol } : RPC協議接口LoginProtocol的接口對象
    // new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout) : InvocationHandler的實例對象
    T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
        new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
            factory, rpcTimeout));
    return new ProtocolProxy<T>(protocol, proxy, true);
  }

使用 Proxy 類的靜態方法 getProxy() 構造 Client 端 RPC 代理對象( proxy ), 

如何構造? Proxy.newProxyInstance() 方法須要3個參數: 

    1). RPC 協議接口 LoginProtocol 的類加載器

    2). RPC 協議接口 LoginProtocol 的 Class 對象, 

    3). WritableRpcEngine.Invoker 實例對象, WritableRpcEngine.Invoker 實現了 InvocationHandler 接口


3. 調用 proxy.login() 方法, 如今已經構造了一個 Client 端 RPC 代理對象( proxy ), 如今調用 RPC 代理對象( proxy )的 login() 方法, Client 端會發生什麼呢?

調用 Proxy 實例的方法時, 都會被 InvocationHandler 實例對象的 invoke() 方法所捕獲

先來看一下 WritableRpcEngine.Invoker 這個類

private static class Invoker implements RpcInvocationHandler {
    private Client.ConnectionId remoteId;    // 鏈接標識符
    private Client client;    // RPC客戶端, 最重要的成員變量
    private boolean isClosed = false;    
    ...
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) {    // 最重要的方法
    }
}

WritableRpcEngine.Invoker 的構造方法: 

WritableRpcEngine$Invoker.invoke(Object, Method, Object[]) 方法以下: 

public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      ...
      // 調用RPC代理對象proxy的login()方法, Client端最終的方法調用在這裏
      ObjectWritable value = (ObjectWritable)
        client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
      ...
      return value.get();
    }

先來看一下WritableRpcEngine.Invocation這個類, 在 Hadoop 1.0中是 RPC.Invocation

/** A method invocation, including the method name and its parameters */
private static class Invocation implements Writable, Configurable {
    private String methodName;    // RPC代理對象調用的方法名
    private Class<?>[] parameterClasses;    // 方法的參數列表的Class對象數組
    private Object[] parameters;    // 方法的參數列表
    ...
    private long clientVersion;    // RPC協議接口的VersionID
    private int clientMethodsHash;    //
    private String declaringClassProtocolName;

WritableRpcEngine.Invocation 的構造方法:

WritableRpcEngine$Invocation.<init>(Method, Object[])

public Invocation(Method method, Object[] parameters) {
      this.methodName = method.getName();    // 獲取RPC代理對象調用的方法名
      this.parameterClasses = method.getParameterTypes();    // 獲取方法的參數列表的Class對象數組
      this.parameters = parameters;    // 獲取方法的參數列表
      this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass()); // 獲取RPC協議接口的VersionID
     ...
}


好的, 如今 Invocation 對象建立完成

再回去來看看 Client.call(RPC$RpcKind, Writable, Client$ConnectionId, int) 方法

Client.call(RPC$RpcKind, Writable, Client$ConnectionId, int) 這個方法, 首先根據輸入參數 param( Invocation實例對象 )構造一個 Client.Call 實例對象. 再經過 getConnection() 方法獲取 RPC 鏈接 connection, 再經過 connection.sendRpcReques(call) 方法把 RPC請求發送出去.

public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      ConnectionId remoteId, int serviceClass) throws IOException {
    final Call call = createCall(rpcKind, rpcRequest);    // 建立一個Client.Call實例對象
    Connection connection = getConnection(remoteId, call, serviceClass);    // 獲取Client.Connection實例
    try {
      connection.sendRpcRequest(call);    // 經過connection發送RPC請求
    } catch (Exception e) {
      ...
    }

    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();    // 等待調用完成的返回結果
        } catch (InterruptedException ie) {
          interrupted = true;    // 遠程調用被打斷
        }
      }
      if (interrupted) {
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {    // 遠程調用異常返回, 拋出異常給本地調用者
          call.error.fillInStackTrace();
          throw call.error;
        } else {    // 本地處理出現異常
          InetSocketAddress address = connection.getRemoteAddress();
          throw NetUtils.wrapException(address.getHostName(), address.getPort(), NetUtils.getHostname(),
                  0, call.error);
        }
      } else {
        return call.getRpcResponse();    // 遠程調用正常結束, 返回結果
      }
    }
  }


RPC 請求發送出去後, Client 端開始等待( call.wait() ) Server 端發送回來的應答, Client.Call.wait() 方法必然有對應的 Client.Call.notify() 方法, 在 Client.Call.callComplete() 方法中調用 notify() 方法

問題又來了, Client.Call.callComplete() 方法什麼時候被調用?

因此 Client.Call.callComplete() 方法最終是被 Client.Connection.receiveRpcResponse() 所調用, 如方法名receiveRpcResponse, 確定是在 Client 端接收到了 Server 端的應答時被調用. 


4. Server 端在 Client.call(RPC$RpcKind, Writable, Client$ConnectionId, int) 方法中調用connection.sendRpcRequest(call) 後, 

Server 端接收這個來自 Client 端的RPC請求後, 如何處理, 並調用具體的方法, 最後再向 Client 端發送調用響應呢? 

篇幅有限, Server 端的源碼分析, 請聽下文分解, Hadoop 中 RPC 機制詳解之 Server 端


總結: 

    調用 proxy.login() 方法後再到這裏, Client 端的處理, 其實只比動態代理稍微複雜: Client 端 RPC 代理對象proxy 的方法調用, 被 InvocationHandler 實例對象的 invoke() 方法所捕獲, RPC 請求被打包成 Invocation 實例對象, 發送到 Server 端, Client 端等待 Server 端的響應

相關文章
相關標籤/搜索