Hadoop之RPC

       Hadoop的RPC主要是經過Java的動態代理(Dynamic Proxy)與反射(Reflect)實現,代理類是由java.lang.reflect.Proxy類在運行期時根據接口,採用Java反射功能動態生成的,而且結合java.lang.reflect.InvocationHandler來處理客戶端的請求,當用戶調用這個動態生成的實現類時,其實是調用了InvocationHandler實現類的invoke方法。RPC源代碼在org.apache.hadoop.ipc下,有如下幾個主要類: 
    Client: 客戶端,鏈接服務器、傳遞函數名和相應的參數、等待結果;
    Server:服務器端,主要接受Client的請求、執行相應的函數、返回結果;
    VersionedProtocol:通訊雙方所遵循契約的父接口;
    RPC:RPC通訊機制,主要是爲通訊的服務方提供代理。

  1.通訊雙方遵循的契約

    要經過RPC服務進行通訊,服務的提供方必須實現某個接口,而這個便可是VersionedProtocol的子類,諸如:
InterTrackerProtocol,它是TaskTracker與JobTracker進行通訊所遵循的契約,JobTracker是一個Server,它必須實現這個接口;
JobSubmissionProtocol,它是JobTracker與JobClient通信所遵循的契約,JobClient利用契約中的方法能夠提交做業去執行, 而且獲得當前系統的狀態;
DatanodeProtocol,利用此契約,DataNode能夠向NameNode彙報本身的塊狀態以及負載狀況。
InterDatanodeProtocol,DataNode之間利用此契約能夠更新數據塊。
其它的接口在此再也不一一贅述。

    2.Hadoop中RPC通訊原理 

  咱們經過TaskTracker與JobTracker的通訊來剖析其通訊過程,JobTracker的代理是經過下面的方法獲得的,
 this.jobClient = (InterTrackerProtocol) 
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
      public Object run() throws IOException {
        return RPC.waitForProxy(InterTrackerProtocol.class,
            InterTrackerProtocol.versionID,
            jobTrackAddr, fConf);
      }
    });
View Code

  它是經過調用RPC類中的靜態方法waitForProxy()方法而獲得了InterTrackerProtocol的一個代理,藉助於這個代理對象,TaskTracker就能夠與JobTracker進行通訊了。java

  VersionedProtocol proxy =
        (VersionedProtocol) Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
View Code

  跟蹤Hadoop的源代碼,咱們能夠發現PRC.waitForProxy()最終是調用的Proxy.newProxyInstance()來建立一個代理對象,第一個參數是類加載器(代理類在運行的過程當中動態生成),第二個參數是要實現的代理類的接口,第三個參數是InvokercationHandler接口的子類,最終調用的也就是InvokercationHandler實現類的的invoker()方法。node

  private static class Invoker implements InvocationHandler {
    private Client.ConnectionId remoteId;
    private Client client;
.....

    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }

      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), remoteId);
      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }
    
....
  }
View Code

  咱們能夠看到,InvocationHandler的實現類Invoker中主要包含兩個成員變量即remoteId(惟一標識RPC的服務器端)、Client(經過工廠模式獲得的客戶端),invoke()方法中最重要的就是下面的語句:apache

ObjectWritable value = (ObjectWritable)client.call(new Invocation(method, args), remoteId);

  其中call方法的第一個參數封裝調用方法和參數並實現Writable接口的對象,以便於在分佈式環境中傳輸,第二個參數勿需多言,它就用於惟一標識RPC Server,也就是與指定的Server進行通訊。call方法的核心代碼以下:服務器

  public Writable call(Writable param, ConnectionId remoteId)  throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(remoteId, call);//請看下面的說明
    connection.sendParam(call);                 // 將參數封裝成一個call對象發送給Server
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // 等待Server發送的內容
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }
...
        return call.value;
  }
View Code

  其中居然出現了一個Call對象,咱們看到此方法返回的結果是call對象的一個成員變量,也就是說Call封裝了Client的請求以及Server的響應,synchronized的使用會同步Client的請求以及Server的響應。通Connection對象的sendParam方法能夠將請求發送給Server,那麼Connection又是什麼呢?分佈式

   private Connection getConnection(ConnectionId remoteId,Call call) throws IOException, InterruptedException {
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call));
    
...
    connection.setupIOstreams();
    return connection;
  }
View Code

  其實Connection是擴展Thread而獲得的一個線程,最終把全部的connection對象都放入到一個Hashtable中,同一個ConnectionId的Connection能夠複用,下降了建立線程的開銷。connection.setupIOstreams()用於在真正的創建鏈接,並將RPC的header寫入到輸出流中,經過start方法啓動線程,其核心代碼以下所示:ide

  public void run() {
      while (waitForWork()) {//等到能夠讀響應時返回true
        receiveResponse();
}   

  receiveResponse方法主要是從輸入流反序列化出value,並將其封裝在call對象中,這樣client端就獲得了server的響應,核心代碼以下:函數

private void receiveResponse() {
          try {
        int id = in.readInt();                  // 讀取鏈接id,以便從calls中取出相應的call對象
        Call call = calls.get(id);
        int state = in.readInt();     // 讀取輸入流的狀態
        if (state == Status.SUCCESS.state) {
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);                 // read value
          call.setValue(value);
          calls.remove(id);
        } 
...
    }
View Code

才疏學淺,錯誤之處在所不免,懇請各位予以指正。。 oop

相關文章
相關標籤/搜索