Hadoop中RPC機制詳解之Server端

Hadoop 中 RPC 機制詳解之 Client 端java


1. Server.Listenerapache

RPC Client 端的 RPC 請求發送到 Server 端後, 首先由 Server.Listener 接收數組

Server.Listener 類繼承自 Thread 類, 監聽了 OP_READ 和 OP_ACCEPT 事件app

Server.Listener 接收 RPC 請求, 在 Server.Listener.doRead() 方法中讀取數據, 在 doRead() 方法中又調用了Server.Connection.readAndProcess() 方法, ide

最後會調用 Server.Connection.processRpcRequest() 方法, 源碼以下:oop

private void processRpcRequest(RpcRequestHeaderProto header,
        DataInputStream dis) throws WrappedRpcServerException,
        InterruptedException {
      ...
      Writable rpcRequest;
      // 從成員變量dis中反序列化出Client端發送來的RPC請求( WritableRpcEngine.Invocation對象 )
      try { //Read the rpc request
        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
        rpcRequest.readFields(dis);
      } catch (Throwable t) { // includes runtime exception from newInstance
        ...
      }
      // 構造Server端Server.Call實例對象
      Call call = new Call(header.getCallId(), header.getRetryCount(),
          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
              .getClientId().toByteArray());
      // 將Server.Call實例對象放入調用隊列中
      callQueue.put(call);              // queue the call; maybe blocked here
      incRpcCount();  // Increment the rpc count
    }

調用隊列 callQueue 是 Server 的成員變量, Server.Listener 和 Server.Handler 是典型的生產者, 消費者模型, this

Server.Listener( 生產者 )的doRead()方法最終調用Server.Connection.processRpcRequest() 方法, spa

而Server.Handler( 消費者 )處理RPC請求.net


2. Server.Handler 繼承 Thread 類, 其主要工做是處理 callQueue 中的調用, 都在 run() 方法中完成. 在 run() 的主循環中, 每次處理一個從 callQueue 中出隊的請求, Server.call() 是一個抽象方法, 實際是調用了 RPC.Server.call()方法, 最後經過 WritableRPCEngine.call() 方法完成 Server 端方法調用code

/** Handles queued calls . */
  private class Handler extends Thread {
    ...
    @Override
    public void run() {
      ...
      ByteArrayOutputStream buf = 
        new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
      while (running) {
          ...
          final Call call = callQueue.take();    // 獲取一個RPC調用請求
          ...
          Writable value = null;

          value = call.connection.user.doAs(new PrivilegedExceptionAction<Writable>() {
                     @Override
                     public Writable run() throws Exception {
                       // 調用RPC.Server.call()方法
                       // call.rpcKind : RPC調用請求的類型, 通常爲Writable
                       // call.connection.protocolName : RPC協議接口的類名
                       // call.rpcRequest : Invocation實例對象, 包括方法名, 參數列表, 參數列表的Class對象數組
                       // call.timestamp : 調用時間戳
                       return call(call.rpcKind, call.connection.protocolName, 
                                   call.rpcRequest, call.timestamp);
                     }
                   });
      }
      ...
    }
}

RPC.Server.call() 方法以下:

@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
	Writable rpcRequest, long receiveTime) throws Exception {
  return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
	  receiveTime);
}

最後經過 WritableRPCEngine.call() 方法完成 Server 端方法調用, 代碼以下:

@Override
public Writable call(org.apache.hadoop.ipc.RPC.Server server,
	  String protocolName, Writable rpcRequest, long receivedTime)
	  throws IOException, RPC.VersionMismatch {

	Invocation call = (Invocation)rpcRequest;	// 將RPC請求強制轉成WritableRpcEngine.Invocation對象
	...

	long clientVersion = call.getProtocolVersion();
	final String protoName;
	ProtoClassProtoImpl protocolImpl;	// Server端RPC協議接口的實現類的實例對象
	...
	// Invoke the protocol method
    try {
	  ...
	  // 獲取RPC請求中調用的方法對象Method
	  Method method = 
		  protocolImpl.protocolClass.getMethod(call.getMethodName(),
		  call.getParameterClasses());
	  method.setAccessible(true);
	  ...
	  // 在Server端RPC協議接口的實現類的實例對象 protocolImpl 上調用具體的方法
	  Object value = 
		  method.invoke(protocolImpl.protocolImpl, call.getParameters());
	  ...
	  // 調用正常結束, 返回調用結果
	  return new ObjectWritable(method.getReturnType(), value);

	} catch (InvocationTargetException e) {	// 調用出現異常, 用IOException包裝異常, 最後拋出該異常
	  Throwable target = e.getTargetException();
	  if (target instanceof IOException) {
		throw (IOException)target;
	  } else {
		IOException ioe = new IOException(target.toString());
		ioe.setStackTrace(target.getStackTrace());
		throw ioe;
	  }
	} catch (Throwable e) {
	  ...
	}
  }
}

在 WritableRpcEngine.call() 方法中, 傳入的 rpcRequest 會被強制轉換成 WritableRpcEngine.Invocation 類型的對象 call , 並經過 call 這個對象包含的方法名(getMethodName()方法)和參數列表的 Class對象數組(getParameterClasses())獲取 Method 對象, 最終經過 Method 對象的invoke() 方法, 調用實現類的實例對象 protocolImpl 上的方法, 完成 Hadoop 的遠程過程調用


好了, 如今 Server 端的具體方法已經被調用了, 調用結果分兩種狀況:

    1) 調用正常結束, 則將方法的返回值和調用結果封裝成一個 ObjectWritable 類型的對象, 並返回

    2) 調用出現異常, 拋出 IOException 類型的異常


3. Server.Responder

這個類的功能: 發送 Hadoop 遠程過程調用的應答給 Client 端, Server.Responder 類繼承自 Thread 類, 監聽了 OP_WRITE 事件, 即通道可寫.  具體細節寫不下去了


總結: 

    Server.Responder 和 Server.Listener, Server.Handler 一塊兒配合, 完成 Hadoop 中 RPC 的 Server 端處理:

Server.Listener 接收 Client 端的鏈接請求和請求數據; Server.Handler 完成實際的過程調用; Server.Responder 則進行應答發送

相關文章
相關標籤/搜索