Hadoop源碼學習筆記之NameNode啓動場景流程四:rpc server初始化及啓動

老規矩,仍是分三步走,分別爲源碼調用分析、僞代碼核心梳理、調用關係圖解。node

1、源碼調用分析apache

  根據上篇的梳理,直接從initialize()方法着手。源碼以下,部分代碼的功能以及說明,已經在註釋闡述了。安全

protected void initialize(Configuration conf) throws IOException { // 能夠經過找到下面變量名的映射,在hdfs-default.xml中找到對應的配置
  if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) { String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY); if (intervals != null) { conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS, intervals); } } ...... // 核心代碼:啓動HttpServer
  if (NamenodeRole.NAMENODE == role) { startHttpServer(conf); } this.spanReceiverHost = SpanReceiverHost.getInstance(conf); // 核心代碼:FSNamesystem初始化
 loadNamesystem(conf); // 核心代碼:建立一個rpc server實例
  rpcServer = createRpcServer(conf); ...... // 核心代碼:啓動一些服務組件,包括rpc server等
 startCommonServices(conf); }

  這段代碼涉及到rpc server初始化及啓動的核心,有兩處:ide

  第一處是rpcServer = createRpcServer(conf); 這個createRpcServer()的功能就是建立了一個rpc server的實例。以下:函數

protected NameNodeRpcServer createRpcServer(Configuration conf) throws IOException { return new NameNodeRpcServer(conf, this); }

  咱們繼續進去NameNodeRpcServer類的構造方法中看一看,到底裏面作了哪些事情:oop

public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { this.nn = nn; this.namesystem = nn.getNamesystem(); this.metrics = NameNode.getNameNodeMetrics(); int handlerCount = conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, DFS_NAMENODE_HANDLER_COUNT_DEFAULT); RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); 
// ----------1------------
// ---- 下面一堆都是實例化各類協議和服務的對象,全部的服務都是BlockingService接口的實現
// client和namenode之間進行通訊須要調用的接口,包括:建立目錄、管理block、設置權限等一些操做 ClientNamenodeProtocolServerSideTranslatorPB clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB(this); BlockingService clientNNPbService = ClientNamenodeProtocol. newReflectiveBlockingService(clientProtocolServerTranslator); // datanode和namenode之間進行通訊調用的接口,包括:datanode註冊、heartbeatReport、blockReport等接口 DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = new DatanodeProtocolServerSideTranslatorPB(this); BlockingService dnProtoPbService = DatanodeProtocolService .newReflectiveBlockingService(dnProtoPbTranslator); // 不一樣的namenode之間進行通訊須要調用的接口 NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = new NamenodeProtocolServerSideTranslatorPB(this); BlockingService NNPbService = NamenodeProtocolService .newReflectiveBlockingService(namenodeProtocolXlator); ......
// ---- 以上都是初始化rpc server關鍵的部分
// 確保供寫數據的rpc服務引擎已經初始化,若是沒有初始化,
// 則在此方法中調用registerProtocolEngine()從而將WritableRpcEngine引擎加入到內存中的引擎map WritableRpcEngine.ensureInitialized();

......
if (serviceRpcAddr != null) { ......
// -----------2----------
    // 實例化一個監聽datanode請求的rpc server
this.serviceRpcServer = new RPC.Builder(conf) .setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build();     // 將前面實例化的各類協議service添加到這個監聽datanode請求的rpc server // Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, serviceRpcServer); ...... ......
} else { ...... } ...... // -----------3------------
// 實例化一個監聽客戶端請求的rpc server
this.clientRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService).setBindAddress(bindHost) .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();
   // 將前面實例化的各類協議service添加到這個監聽客戶端請求的RPC server
// Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, clientRpcServer); ......
......
}

  進入到NameNodeRpcServer類的構造方法中,能夠看到除了上面幾行的一些初始化賦值以外,下面的代碼好長啊。其實並不複雜,ui

  根據代碼的邏輯劃分,主要有三部分:this

    第一部分,實例化各類通訊協議和服務對象,好比:負責建立目錄、管理block、設置權限等操做的客戶端同NameNode通訊的spa

      協議服務——ClientNameNodeProtocolServerSideTranslatorPB、負責datanode的啓動時向NameNode註冊、發送心跳報告、線程

      block信息報告等操做的datanode同NameNode通訊的協議服務——DatanodeProtocolServerSideTranslatorPB。篇幅有限,

      後面還有不一樣NameNode之間通訊協議服務、HA高可靠、用戶權限管理等不在一一詳細說明。

    第二部分,實例化了一個監聽datanode請求的rpc server,而且將第一部分實例化的各類ProtocolService同此rpc server進行綁定,

      用於處理rpc server監聽到的來自datanode的各類rpc請求。

    第三部分,實例化了一個監聽客戶端請求的rpc server,並將第一部分實例化的各類ProtocolService同此rpc server進行綁定,用於

      處理監聽到的來自客戶端的rpc請求。

  至此,rpc server啓動以前相關的準備工做已經完畢,接下來就要開始啓動rpc server了。繼續回到本篇剛開始的initialize()方法中的最後一處核心,

  startCommonServices(); 內容以下:  

private void startCommonServices(Configuration conf) throws IOException {
// 核心代碼: namesystem.startCommonServices(conf, haContext); registerNNSMXBean();
if (NamenodeRole.NAMENODE != role) { startHttpServer(conf); httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } // 啓動rpcServer rpcServer.start(); ...... }

  在此方法中,直接就啓動了rpcServer。

  可是在第一行的startCommonServices()方法也是核心之一,裏面有很重要的一些服務,好比磁盤檢查、安全模式判斷等,後續篇幅會跟進。

2、僞代碼調用流程梳理

  NameNode.main() // 入口函數
    |——createNameNode(); // 經過new NameNode()進行實例化
      |——initialize(); // 方法進行初始化操做
        |——startHttpServer(); // 啓動HttpServer
        |——loadNamesystem(); // 加載元數據
        |——createRpcServer(); // 建立rpc server實例
          |——new NameNodeRpcServer();
            |——service1 // 各類通訊協議service
            |——service2 // 各類通訊協議service
            |——service.... // 各類通訊協議service
            |——serviceRpcServer = new RPC.builder(); // 實例化一個監聽datanode請求的rpc server
            |——serviceRpcServer.add(service1...); // 將各類service添加到serviceRpcServer
            |——clientRpcServer = new RPC.builder(); // 實例化一個監聽客戶端請求的rpc server
            |——clientRpcServer.add(service2...); // 將各類service添加到clientRpcServer
        |——startCommonServices();
          |——namesystem.startCommonServices(); // 啓動一些磁盤檢查、安全模式等一些後臺服務及線程
          |——rpcServer.start(); // 啓動rpcServer
    |——join()

3、rpc server初始化及啓動流程圖解

相關文章
相關標籤/搜索