Hadoop基於Protocol Buffer的RPC實現代碼分析-Server端--轉載

 原文地址:http://yanbohappy.sinaapp.com/?p=110 java

  最新版本的Hadoop代碼中已經默認了Protocol buffer(如下簡稱PB,http://code.google.com/p/protobuf/)做爲RPC的默認實現,原來的WritableRpcEngine已經被淘汰了。來自cloudera的Aaron T. Myers在郵件中這樣說的「since PB can provide support for evolving protocols in a compatible fashion.」node

首先要明白PB是什麼,PB是Google開源的一種輕便高效的結構化數據存儲格式,能夠用於結構化數據序列化/反序列化,很適合作數據存儲或 RPC 數據交換格式。它可用於通信協議、數據存儲等領域的語言無關、平臺無關、可擴展的序列化結構數據格式。目前提供了 C++、Java、Python 三種語言的 API。簡單理解就是某個進程把一些結構化數據經過網絡通訊的形式傳遞給另一個進程(典型應用就是RPC);或者某個進程要把某些結構化數據持久化存儲到磁盤上(這個有點相似於在Mongodb中的BSON格式)。對於存儲的這個例子來講,使用PB和XML,JSON相比的缺點就是存儲在磁盤上的數據用戶是沒法理解的,除非用PB反序列化以後才行,這個有點相似於IDL。優勢就是序列化/反序列化速度快,網絡或者磁盤IO傳輸的數據少,這個在Data-Intensive Scalable Computing中是很是重要的。linux

Hadoop使用PB做爲RPC實現的另一個緣由是PB的語言、平臺無關性。在mailing list裏據說過社區的人有這樣的考慮:就是如今每一個MapReduce task都是在一個JVM虛擬機上運行的(即便是Streaming的模式,MR任務的數據流也是經過JVM與NN或者DN進行RPC交換的),JVM最嚴重的問題就是內存,例如OOM。我看社區裏有人討論說若是用PB這樣的RPC實現,那麼每一個MR task均可以直接與NN或者DN進行RPC交換了,這樣就能夠用C/C++來實現每個MR task了。百度作的HCE(https://issues.apache.org/jira/browse/MAPREDUCE-1270)和這種思路有點相似,可是因爲當時的Hadoop RPC通訊仍是經過WritableRpcEngine來實現的,因此MR task仍是沒有擺脫經過本地的JVM代理與NN或者DN通訊的束縛,由於Child JVM Process仍是存在的,仍是由它來設置運行時環境和RPC交互。apache

關於PB的原理和實現,請你們參考http://code.google.com/p/protobuf/或者http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/?ca=drs-tp4608,本文再也不贅述。服務器

下面來看看Hadoop代碼中的RPC是如何實現的。RPC就是一臺機器上的某個進程要調用另一臺機器上的某個進程的方法,中間通訊傳輸的就是相似於「方法名、參數一、參數2……」這樣的信息,是結構化的。同時通訊除了這些RPC實體之外,還要有header等。網絡

咱們要定義一種PB實現的RPC傳輸格式,首先要定義相應的.proto文件,在Hadoop common工程裏,這些文件放在D:\Hadoop-trunk\hadoop-common-project\hadoop-common\src\main\proto目錄下;在Hadoop HDFS工程裏這些文件放在D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目錄下,以此類推。Hadoop編譯腳本會調用相應的protoc二進制程序來編譯這些以.proto結尾的文件,生成相應的.java文件。數據結構

以D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目錄下的ClientNamenodeProtocol.proto爲例說明。文件最開始定義了一些參數:多線程

option java_package = "org.apache.hadoop.hdfs.protocol.proto";

option java_outer_classname = "ClientNamenodeProtocolProtos";

option java_generic_services = true;

option java_generate_equals_and_hash = true;

這個表示這個.proto文件通過protoc編譯以後會生成org.apache.hadoop.hdfs.protocol.proto這個包下面的ClientNamenodeProtocolProtos.java類文件,那麼在Hadoop源碼裏就能夠調用這個類裏的方法了。app

這個文件的主體主要是兩種數據類型message和rpc,仔細看下這個文件就知道了,message就是這個ClientNamenodeProtocol協議中傳輸的結構體,rpc就是調用的方法。那麼這兩種類型在通過編譯以後會生成什麼呢?框架

編譯以後,在Hadoop-trunk/hadoop-hdfs-project/hadoop-hdfs/target/generated-sources/java/org/apache/hadoop/hdfs/protocol/proto目錄裏生成了ClientNamenodeProtocolProtos.java文件,裏面把message都包裝成了類,而把rpc都包裝成了方法。這個文件是由PB編譯器自動生成的,因此不能修改。

有了這些java類以後,咱們就能夠看看在Server端是怎麼實現RPC的了。首先仍是NameNode初始化的流程,會調用到rpcServer = createRpcServer(conf)來建立RPC server。下面看看NameNodeRpcServer的構造函數裏都作了哪些工做:

public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {
    this.nn = nn;
    this.namesystem = nn.getNamesystem();
    this.metrics = NameNode.getNameNodeMetrics();

    int handlerCount =
      conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
                  DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
    InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
    //設置ProtolEngine,目前只支持PB協議。表示接收到的RPC協議若是是ClientNamenodeProtocolPB,
    //那麼處理這個RPC協議的引擎是ProtobufRpcEngine
    RPC.setProtocolEngine(conf,ClientNamenodeProtocolPB.class,ProtobufRpcEngine.class);
    //聲明一個ClientNamenodeProtocolServerSideTranslatorPB,
    //這個類負責把Server接收到的PB格式對象的數據,拼裝成NameNode內村中的數據類型,
    //調用NameNodeRpcServer類中相應的邏輯,而後再把執行結果拼裝成PB格式。
    ClientNamenodeProtocolServerSideTranslatorPB
    clientProtocolServerTranslator =
         new ClientNamenodeProtocolServerSideTranslatorPB(this);
    BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

    DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
        new DatanodeProtocolServerSideTranslatorPB(this);
    BlockingService dnProtoPbService = DatanodeProtocolService
        .newReflectiveBlockingService(dnProtoPbTranslator);

    NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
        new NamenodeProtocolServerSideTranslatorPB(this);
      BlockingService NNPbService = NamenodeProtocolService
          .newReflectiveBlockingService(namenodeProtocolXlator);

    RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator =
        new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);
    BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService
        .newReflectiveBlockingService(refreshAuthPolicyXlator);

    RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator =
        new RefreshUserMappingsProtocolServerSideTranslatorPB(this);
    BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
        .newReflectiveBlockingService(refreshUserMappingXlator);

    GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =
        new GetUserMappingsProtocolServerSideTranslatorPB(this);
    BlockingService getUserMappingService = GetUserMappingsProtocolService
        .newReflectiveBlockingService(getUserMappingXlator);

    HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
        new HAServiceProtocolServerSideTranslatorPB(this);
    BlockingService haPbService = HAServiceProtocolService
        .newReflectiveBlockingService(haServiceProtocolXlator);

    WritableRpcEngine.ensureInitialized();

    InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
    if (dnSocketAddr != null) {
      int serviceHandlerCount =
        conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      // Add all the RPC protocols that the namenode implements
      this.serviceRpcServer =
          RPC.getServer(org.apache.hadoop.hdfs.protocolPB.
              ClientNamenodeProtocolPB.class, clientNNPbService,
          dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
          serviceHandlerCount,
          false, conf, namesystem.getDelegationTokenSecretManager());
      DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
          refreshAuthService, serviceRpcServer);
      DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
          refreshUserMappingService, serviceRpcServer);
      DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
          getUserMappingService, serviceRpcServer);

      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
      nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
    } else {
      serviceRpcServer = null;
      serviceRPCAddress = null;
    }
    // Add all the RPC protocols that the namenode implements
    this.clientRpcServer = RPC.getServer(
        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,
        clientNNPbService, socAddr.getHostName(),
            socAddr.getPort(), handlerCount, false, conf,
            namesystem.getDelegationTokenSecretManager());
    DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
        refreshAuthService, clientRpcServer);
    DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
        refreshUserMappingService, clientRpcServer);
    DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
        getUserMappingService, clientRpcServer);

    // set service-level authorization security policy
    if (serviceAuthEnabled =
          conf.getBoolean(
            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
      this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      if (this.serviceRpcServer != null) {
        this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      }
    }

    // The rpc-server port can be ephemeral... ensure we have the correct info
    this.clientRpcAddress = this.clientRpcServer.getListenerAddress();
    nn.setRpcServerAddress(conf, clientRpcAddress);

    this.minimumDataNodeVersion = conf.get(
        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
  }

ClientNamenodeProtocol是protoc編譯生成的ClientNamenodeProtocolProtos類中的inner class。

public static com.google.protobuf.BlockingService
       newReflectiveBlockingService(final BlockingInterface impl) {
	……
       }

這個方法也是由protoc編譯器自動生成的。這個方法會返回一個com.google.protobuf.BlockingService類型的對象,這種類型的對象定義了RPC的各類服務,後面會講。

this.clientRpcServer = RPC.getServer(
        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,
        clientNNPbService, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf,
        namesystem.getDelegationTokenSecretManager());

這個RPC.getServer()函數生成一個Server對象,負責接收網絡鏈接,讀取數據,調用處理數據函數,返回結果。這個Server對象裏有Listener, Handler, Responder內部類,分別開啓多個線程負責監聽、讀取、處理和返回結果。前兩個參數表示若是RPC發送過來的是ClientNamenodeProtocolPB協議,那麼負責處理這個協議的服務(com.google.protobuf.BlockingService類型的對象)就是clientNNPbService。

這個RPC.getServer()會通過層層調用,由於如今默認的RPCEngine是ProtobufRpcEngine(ProtobufRpcEngine.java),就會調用到下面這個函數,在這生成了一個Server對象,就是用於接收client端RPC請求,處理,回覆的Server。這個Server對象是一個純粹的網絡服務的Server,在RPC中起到基礎網絡IO服務的做用。

public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
      String bindAddress, int port, int numHandlers, int numReaders,
      int queueSizePerHandler, boolean verbose, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      String portRangeConfig)
      throws IOException {
    return new Server(protocol, protocolImpl, conf, bindAddress, port,
        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
        portRangeConfig);
  }

如今該用到的東西都生成好了,就要看看client端來了一個RPC請求以後,Server端是怎麼處理的呢?

Server裏的Reader線程也是基於Selector的異步IO模式,每次Select選出一個SelectionKey以後,會調用SelectionKey.attachment()把這個SelectionKey所attach的Connection對象獲取,而後執行對應的readAndProcess()方法,把這個SelectionKey所對應的管道上的網絡IO數據讀入緩衝區。readAndProcess()方法會層層調用到Server.processData()方法,在這個方法內部,會把剛纔從網絡IO中讀取的數據反序列化成對象rpcRequest對象。rpcRequest對象的類型是繼承自Writable類型的子類的對象,也就是說能夠序列化/反序列化的類。這裏rpcRequest對象裏包含的RPC請求的內容對象是由.proto文件中Message生成的類,也就是說PB框架自動編譯出來的類,後面能夠經過調用這個類的get方法獲取RPC中真正傳輸的數據。以後把生成的rpcRequest對象放到一個Call對象裏面,再把Call對象放到隊列Server.callQueue裏面。至此網絡服務器的Reader線程作的工做就OK了。

下面看看Handler線程是怎麼處理的。Handler線程默認有10個,因此處理邏輯是多線程的。每一個Handler線程會從剛纔提到的callQueue中取一個Call對象,而後調用Server.call()方法執行這個Call對象中蘊含的RPC請求。Server.call()->RPC.Server.call()->Server.getRpcInvoker()->ProtobufRpcInvoker.call()在最後這個call()函數裏面真正執行嘍。。。。重點看這個函數,首先校驗這個請求發過來的數據是否是合理的。而後就是獲取實現這個協議的服務。實現協議的服務在初始化的時候已經註冊過了,就是前面說的那個com.google.protobuf.BlockingService類型的對象,例如:

BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

這個就是實現Client和NameNode之間的ClientNamenodeProtocol協議的服務。固然還有dnProtoPbService, NNPbService, refreshAuthService, refreshUserMappingService, haPbService等等這些不一樣的服務。

這個Service獲取了以後,經過調用這句代碼

result = service.callBlockingMethod(methodDescriptor, null, param);

就會執行這個RPC請求的邏輯。

再往深刻執行就要涉及到google protocol buffer內部的東西了,這個service對象會把相應的方法調用轉移到一個繼承自BlockingInterface接口的實現類上。Service的真正實現類就是clientProtocolServerTranslator,是newReflectiveBlockingService()這個函數的參數。

BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

這個初始化過程當中的參數,也就是service.callBlockingMethod()真正調用的是clientProtocolServerTranslator中對應的方法。這一點能夠經過由protoc自動編譯生成的代碼中看出:

public static com.google.protobuf.BlockingService
        newReflectiveBlockingService(final BlockingInterface impl) {
      return new com.google.protobuf.BlockingService() {
        public final com.google.protobuf.Descriptors.ServiceDescriptor
            getDescriptorForType() {
          return getDescriptor();
        }

        public final com.google.protobuf.Message callBlockingMethod(
            com.google.protobuf.Descriptors.MethodDescriptor method,
            com.google.protobuf.RpcController controller,
            com.google.protobuf.Message request)
            throws com.google.protobuf.ServiceException {
          if (method.getService() != getDescriptor()) {
            throw new java.lang.IllegalArgumentException(
              "Service.callBlockingMethod() given method descriptor for " +
              "wrong service type.");
          }
          switch(method.getIndex()) {
            case 0:
              return impl.getBlockLocations(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto)request);
            case 1:
              return impl.getServerDefaults(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto)request);
            case 2:
              return impl.create(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto)request);
            case 3:
              return impl.append(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto)request);
            ……
}
……
}

上面就是proto編譯生成的ClientNamenodeProtocolProtos.java文件,從中能夠看出對callBlockingMethod()方法的調用都是轉移到BlockingInterface impl上面了。

而後咱們看看clientProtocolServerTranslator是怎麼進一步執行的。下面以getBlockLocations()函數爲例說明:

public GetBlockLocationsResponseProto getBlockLocations(
      RpcController controller, GetBlockLocationsRequestProto req)
      throws ServiceException {
    try {
      //下面這個server是由NameNodeRpcServer類生成的對象,定義了HDFS元數據操做邏輯。
      LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(),
          req.getLength());
      //因爲server返回的是NameNode內存中的數據結構,要把這個結果經過RPC傳回client端,
      //那麼咱們須要利用PB框架提供的對應Message的Builder類,把內存中的數據結構經過這個接口序列化。
      Builder builder = GetBlockLocationsResponseProto
          .newBuilder();
      if (b != null) {
        builder.setLocations(PBHelper.convert(b)).build();
      }
      return builder.build();
    } catch (IOException e) {
      throw new ServiceException(e);
    }
  }

至此,Hadoop的RPC流程Server端已經分析結束,不過這個是正確執行的流程。若是中間拋出了異常呢?仍是以上面這個getBlockLocations()函數爲例,若是元數據操做邏輯NameNodeRpcServer裏面拋出IOException,那麼它都會把它封裝成ServiceException,而後一路傳遞給client端。在client端,會經過ProtobufHelper.getRemoteException()把封裝在ServiceException中的IOException獲取出來。

相關文章
相關標籤/搜索