源碼|HDFS之DataNode:啓動過程

掌握Mac編譯Hadoop源碼Hadoop單步debug追源碼後,就能告別人肉調用棧,利用IDE輕鬆愉快的追各類開源框架的源碼啦~java

今天是HDFS中DataNode的第一篇——DataNode啓動過程。node

源碼版本:Apache Hadoop 2.6.0linux

可參考猴子追源碼時的速記打斷點,親自debug一遍。git

在開始以前

總覽

HDFS-2.x與1.x的核心區別:github

  • 爲支持Federation,會爲每一個namespace(或稱nameservice)建立BPOfferService(提供BlockPool服務)
  • 爲支持HA,BPOfferService還會爲一個namespace下的每一個namenode建立BPServiceActor(做爲具體實例與各active、standby的namenode通訊;一個BPOfferService下只有一個active的BPServiceActor)

datanode的啓動過程主要完成如下工做:shell

  • 啓動多種工做線程,主要包括:
    • 通訊:BPServiceActor、IpcServer、DataXceiverServer、LocalDataXceiverServer
    • 監控:DataBlockScanner、DirectoryScanner、JVMPauseMonitor
    • 其餘:InfoServer
  • 向namenode註冊
  • 初始化存儲結構,包括各數據目錄${dfs.data.dir},及數據目錄下各塊池的存儲結構
  • 【可能】數據塊恢復等(暫不討論)

LazyWriter等特性暫不討論。緩存

文章的組織結構

  1. 若是隻涉及單個分支的分析,則放在同一節。
  2. 若是涉及多個分支的分析,則在下一級分多個節,每節討論一個分支。
  3. 多線程的分析同多分支。
  4. 每個分支和線程的組織結構遵循規則1-3。

主流程

datanode的Main Class是DataNode,先找DataNode.main():安全

public static void main(String args[]) {
    if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
      System.exit(0);
    }

    secureMain(args, null);
  }
  
  ...
  
  public static void secureMain(String args[], SecureResources resources) {
    int errorCode = 0;
    try {
      // 打印啓動信息
      StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
      // 完成建立datanode的主要工做
      DataNode datanode = createDataNode(args, null, resources);
      if (datanode != null) {
        datanode.join();
      } else {
        errorCode = 1;
      }
    } catch (Throwable e) {
      LOG.fatal("Exception in secureMain", e);
      terminate(1, e);
    } finally {
      LOG.warn("Exiting Datanode");
      terminate(errorCode);
    }
  }
複製代碼

datanode封裝了很是多工做線程,但絕大多數是守護線程,DataNode#join()只須要等待BPServiceActor線程結束,就能夠正常退出(略)。多線程

DataNode.createDataNode():併發

public static DataNode createDataNode(String args[], Configuration conf, SecureResources resources) throws IOException {
    // 完成大部分初始化的工做,並啓動部分工做線程
    DataNode dn = instantiateDataNode(args, conf, resources);
    if (dn != null) {
      // 啓動剩餘工做線程
      dn.runDatanodeDaemon();
    }
    return dn;
  }
複製代碼
  • 在DataNode.instantiateDataNode()執行的過程當中會啓動部分工做線程(見後)
  • DataNode#runDatanodeDaemon()啓動剩餘的DataXceiverServer、localDataXceiverServer、IpcServer等:
/** Start a single datanode daemon and wait for it to finish. * If this thread is specifically interrupted, it will stop waiting. */
  public void runDatanodeDaemon() throws IOException {
    // 在DataNode.instantiateDataNode()執行過程當中會調用該方法(見後)
    blockPoolManager.startAll();

    dataXceiverServer.start();
    if (localDataXceiverServer != null) {
      localDataXceiverServer.start();
    }
    ipcServer.start();
    startPlugins(conf);
  }
複製代碼

回到DataNode.instantiateDataNode():

public static DataNode instantiateDataNode(String args [], Configuration conf, SecureResources resources) throws IOException {
    if (conf == null)
      conf = new HdfsConfiguration();
    
    ... // 參數檢查等
    
    Collection<StorageLocation> dataLocations = getStorageLocations(conf);
    UserGroupInformation.setConfiguration(conf);
    SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
        DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
    return makeInstance(dataLocations, conf, resources);
  }
複製代碼

dataLocations維護的是所有${dfs.data.dir},猴子只配置了一個目錄,實際使用中會在將每塊磁盤都掛載爲一塊目錄。

從DataNode.makeInstance()開始建立DataNode:

static DataNode makeInstance(Collection<StorageLocation> dataDirs, Configuration conf, SecureResources resources) throws IOException {
    ...// 檢查數據目錄的權限

    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }
  
  ...
  
  DataNode(final Configuration conf,
           final List<StorageLocation> dataDirs,
           final SecureResources resources) throws IOException {
    super(conf);
    ...// 參數設置
    
    try {
      hostName = getHostName(conf);
      LOG.info("Configured hostname is " + hostName);
      startDataNode(conf, dataDirs, resources);
    } catch (IOException ie) {
      shutdown();
      throw ie;
    }
  }
  
  ...
  
  void startDataNode(Configuration conf, List<StorageLocation> dataDirs, SecureResources resources ) throws IOException {
    ...// 參數設置
    
    // 初始化DataStorage
    storage = new DataStorage();
    
    // global DN settings
    // 註冊JMX
    registerMXBean();
    // 初始化DataXceiver(流式通訊),DataNode#runDatanodeDaemon()中啓動
    initDataXceiver(conf);
    // 啓動InfoServer(Web UI)
    startInfoServer(conf);
    // 啓動JVMPauseMonitor(反向監控JVM狀況,可經過JMX查詢)
    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();
  
    ...// 略
    
    // 初始化IpcServer(RPC通訊),DataNode#runDatanodeDaemon()中啓動
    initIpcServer(conf);

    metrics = DataNodeMetrics.create(conf, getDisplayName());
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
    
    // 按照namespace(nameservice)、namenode的二級結構進行初始化
    blockPoolManager = new BlockPoolManager(this);
    blockPoolManager.refreshNamenodes(conf);
    
    ...// 略
  }
複製代碼

BlockPoolManager抽象了datanode提供的數據塊存儲服務。BlockPoolManager按照namespace(nameservice)、namenode二級結構組織,此處按照該二級結構進行初始化。

重點是BlockPoolManager#refreshNamenodes():

void refreshNamenodes(Configuration conf) throws IOException {
    LOG.info("Refresh request received for nameservices: " + conf.get
            (DFSConfigKeys.DFS_NAMESERVICES));

    Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
            .getNNServiceRpcAddressesForCluster(conf);

    synchronized (refreshNamenodesLock) {
      doRefreshNamenodes(newAddressMap);
    }
  }
複製代碼

命名爲刷新,是由於除了初始化過程主動調用,還能夠由namespace經過datanode心跳過程下達刷新命令。

newAddressMap是這樣一個映射:Map<namespace, Map<namenode, InetSocketAddress>>

BlockPoolManager#doRefreshNamenodes():

private void doRefreshNamenodes( Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
    assert Thread.holdsLock(refreshNamenodesLock);

    Set<String> toRefresh = Sets.newLinkedHashSet();
    Set<String> toAdd = Sets.newLinkedHashSet();
    Set<String> toRemove;
    
    synchronized (this) {
      // Step 1. For each of the new nameservices, figure out whether
      // it's an update of the set of NNs for an existing NS,
      // or an entirely new nameservice.
      for (String nameserviceId : addrMap.keySet()) {
        if (bpByNameserviceId.containsKey(nameserviceId)) {
          toRefresh.add(nameserviceId);
        } else {
          toAdd.add(nameserviceId);
        }
      }
      
      ...// 略
      
      // Step 3. Start new nameservices
      if (!toAdd.isEmpty()) {
        LOG.info("Starting BPOfferServices for nameservices: " +
            Joiner.on(",").useForNull("<default>").join(toAdd));
      
        for (String nsToAdd : toAdd) {
          ArrayList<InetSocketAddress> addrs =
            Lists.newArrayList(addrMap.get(nsToAdd).values());
          // 爲每一個namespace建立對應的BPOfferService
          BPOfferService bpos = createBPOS(addrs);
          bpByNameserviceId.put(nsToAdd, bpos);
          offerServices.add(bpos);
        }
      }
      // 而後經過startAll啓動全部BPOfferService
      startAll();
    }
    
    ...// 略
  }
複製代碼

addrMap即傳入的newAddressMap。Step 3爲每一個namespace建立對應的BPOfferService(包括每一個namenode對應的BPServiceActor),而後經過BlockPoolManager#startAll()啓動全部BPOfferService(實際是啓動全部 BPServiceActor)。

BlockPoolManager#createBPOS()

BlockPoolManager#createBPOS():

protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
    return new BPOfferService(nnAddrs, dn);
  }
複製代碼

BPOfferService.<init>

BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
    Preconditions.checkArgument(!nnAddrs.isEmpty(),
        "Must pass at least one NN.");
    this.dn = dn;

    for (InetSocketAddress addr : nnAddrs) {
      this.bpServices.add(new BPServiceActor(addr, this));
    }
  }
複製代碼

BPOfferService經過bpServices維護同一個namespace下各namenode對應的BPServiceActor。

BlockPoolManager#startAll()

BlockPoolManager#startAll():

synchronized void startAll() throws IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(
          new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
              for (BPOfferService bpos : offerServices) {
                bpos.start();
              }
              return null;
            }
          });
    } catch (InterruptedException ex) {
      IOException ioe = new IOException();
      ioe.initCause(ex.getCause());
      throw ioe;
    }
  }
複製代碼

逐個調用BPOfferService#start(),啓動BPOfferService:

//This must be called only by blockPoolManager
  void start() {
    for (BPServiceActor actor : bpServices) {
      actor.start();
    }
  }
複製代碼

逐個調用BPServiceActor#start(),啓動BPServiceActor:

//This must be called only by BPOfferService
  void start() {
    // 保證BPServiceActor線程只啓動一次
    if ((bpThread != null) && (bpThread.isAlive())) {
      return;
    }
    bpThread = new Thread(this, formatThreadName());
    bpThread.setDaemon(true); // needed for JUnit testing
    bpThread.start();
  }
複製代碼

BPServiceActor#start()的線程安全性由最外層的BlockPoolManager#startAll()(synchronized方法)保證。

在完成datanode的初始化後,DataNode#runDatanodeDaemon()中又調用了一次BlockPoolManager#startAll()。猴子沒明白此次調用的做用,但BlockPoolManager#startAll()的內部邏輯保證其只會被執行一次,沒形成什麼壞影響。

主流程小結

在datanode啓動的主流程中,啓動多種重要的工做線程,包括:

  • 通訊:BPServiceActor、IpcServer、DataXceiverServer、LocalDataXceiverServer
  • 監控:JVMPauseMonitor
  • 其餘:InfoServer

接下來討論BPServiceActor線程,它的主要工做是:

  • 向namonode註冊
  • 啓動DataBlockScanner、DirectoryScanner等工做線程
  • 存儲結構初始化

BPServiceActor線程

在datanode啓動的主流程中,啓動了多種工做線程,包括InfoServer、JVMPauseMonitor、BPServiceActor等。其中,最重要的是BPServiceActor線程,真正表明datanode與namenode通訊的正是BPServiceActor線程。

BPServiceActor#run():

@Override
  public void run() {
    LOG.info(this + " starting to offer service");

    try {
      while (true) {
        // init stuff
        try {
          // 與namonode握手,註冊
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
          ...// 大部分握手失敗的狀況都須要重試,除非拋出了非IOException異常或datanode關閉
        }
      }

      runningState = RunningState.RUNNING;

      while (shouldRun()) {
        try {
          // BPServiceActor提供的服務
          offerService();
        } catch (Exception ex) {
          ...// 無論拋出任何異常,都持續提供服務(包括心跳、數據塊彙報等),直到datanode關閉
        }
      }
      runningState = RunningState.EXITED;
    } catch (Throwable ex) {
      LOG.warn("Unexpected exception in block pool " + this, ex);
      runningState = RunningState.FAILED;
    } finally {
      LOG.warn("Ending block pool service for: " + this);
      cleanUp();
    }
  }
複製代碼

此處說的「通訊」包括與握手、註冊(BPServiceActor#connectToNNAndHandshake)和後期循環提供服務(BPServiceActor#offerService(),本文暫不討論)。

啓動過程當中主要關注BPServiceActor#connectToNNAndHandshake():

private void connectToNNAndHandshake() throws IOException {
    // get NN proxy
    bpNamenode = dn.connectToNN(nnAddr);

    // 先經過第一次握手得到namespace的信息
    NamespaceInfo nsInfo = retrieveNamespaceInfo();
    
    // 而後驗證並初始化該datanode上的BlockPool
    bpos.verifyAndSetNamespaceInfo(nsInfo);
    
    // 最後,經過第二次握手向各namespace註冊本身
    register();
  }
複製代碼

經過兩次握手完成了datanode的註冊,比較簡單,不討論。

重點是BPOfferService#verifyAndSetNamespaceInfo():

/** * Called by the BPServiceActors when they handshake to a NN. * If this is the first NN connection, this sets the namespace info * for this BPOfferService. If it's a connection to a new NN, it * verifies that this namespace matches (eg to prevent a misconfiguration * where a StandbyNode from a different cluster is specified) */
  void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
    writeLock();
    try {
      if (this.bpNSInfo == null) {
        // 若是是第一次鏈接namenode(也就必然是第一次鏈接namespace),則初始化blockpool(塊池)
        this.bpNSInfo = nsInfo;
        boolean success = false;

        try {
          // 以BPOfferService爲單位初始化blockpool
          dn.initBlockPool(this);
          success = true;
        } finally {
          if (!success) {
            // 若是一個BPServiceActor線程失敗了,還能夠由同BPOfferService的其餘BPServiceActor線程從新嘗試
            this.bpNSInfo = null;
          }
        }
      } else {
        ...// 若是不是第一次鏈接(刷新),則檢查下信息是否正確便可
      }
    } finally {
      writeUnlock();
    }
  }
複製代碼

儘管是在BPServiceActor線程中,卻試圖以BPOfferService爲單位初始化blockpool(包括內存與磁盤上的存儲結構)。若是初始化成功,萬事大吉,之後同BPOfferService的其餘BPServiceActor線程發現BPOfferService#bpNSInfo != null就再也不初始化;而若是一個BPServiceActor線程初始化blockpool失敗了,還能夠由同BPOfferService的其餘BPServiceActor線程從新嘗試初始化。

DataNode#initBlockPool():

/** * One of the Block Pools has successfully connected to its NN. * This initializes the local storage for that block pool, * checks consistency of the NN's cluster ID, etc. * * If this is the first block pool to register, this also initializes * the datanode-scoped storage. * * @param bpos Block pool offer service * @throws IOException if the NN is inconsistent with the local storage. */
  void initBlockPool(BPOfferService bpos) throws IOException {
    ...// 略

    // 將blockpool註冊到BlockManager
    blockPoolManager.addBlockPool(bpos);
    
    // 初步初始化存儲結構
    initStorage(nsInfo);

    ...// 檢查磁盤損壞

    // 啓動掃描器
    initPeriodicScanners(conf);
    
    // 將blockpool添加到FsDatasetIpml,並繼續初始化存儲結構
    data.addBlockPool(nsInfo.getBlockPoolID(), conf);
  }
複製代碼

此時可知,blockpool是按照namespace逐個初始化的。這很必要,由於要支持Federation的話,就必須讓多個namespace既能共用BlockManager提供的數據塊存儲服務,又能獨立啓動、關閉、升級、回滾等。

DataNode#initStorage()

在逐個初始化blockpool以前,先以datanode總體進行初始化。這一階段操做的主要對象是DataStorage、StorageDirectory、FsDatasetImpl、FsVolumeList、FsVolumeImpl等;後面的FsDatasetImpl#addBlockPool操做的主要對象纔會具體到各blockpool。

DataNode#initStorage():

private void initStorage(final NamespaceInfo nsInfo) throws IOException {
    final FsDatasetSpi.Factory<? extends FsDatasetSpi<?>> factory
        = FsDatasetSpi.Factory.getFactory(conf);
    
    if (!factory.isSimulated()) {
      ...// 構造參數
      // 初始化DataStorage(每一個datanode分別只持有一個)。可能會觸發DataStorage級別的狀態裝換,所以,要在DataNode上加鎖
      synchronized (this) {
        storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
      }
      final StorageInfo bpStorage = storage.getBPStorage(bpid);
      LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
          + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
          + ";nsInfo=" + nsInfo + ";dnuuid=" + storage.getDatanodeUuid());
    }

    ...// 檢查

    // 初始化FsDatasetImpl(同上,每一個datanode分別只持有一個)
    synchronized(this)  {
      if (data == null) {
        data = factory.newInstance(this, storage, conf);
      }
    }
  }
複製代碼

初始化DataStorage:DataStorage#recoverTransitionRead()

DataStorage#recoverTransitionRead():

void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException {
    // First ensure datanode level format/snapshot/rollback is completed
    recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt);

    // Create list of storage directories for the block pool
    Collection<File> bpDataDirs = new ArrayList<File>();
    for(StorageLocation dir : dataDirs) {
      File dnRoot = dir.getFile();
      File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot,
          STORAGE_DIR_CURRENT));
      bpDataDirs.add(bpRoot);
    }

    // 在各${dfs.data.dir}/current下檢查並建立blockpool目錄
    makeBlockPoolDataDir(bpDataDirs, null);
    
    // 建立BlockPoolSliceStorage,並放入映射DataStorage#bpStorageMap:`Map<bpid, BlockPoolSliceStorage>`
    BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
        nsInfo.getNamespaceID(), bpID, nsInfo.getCTime(), nsInfo.getClusterID());
    bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
    addBlockPoolStorage(bpID, bpStorage);
  }
複製代碼

根據Javadoc,BlockPoolSliceStorage管理着該datanode上相同bpid的全部BlockPoolSlice。然而,猴子暫時沒有發現這個類與升級外的操做有關(固然,啓動也多是因爲升級重啓),暫不深刻。

  • BlockPoolSlice詳見後文FsVolumeImpl#addBlockPool。
  • DataStorage#recoverTransitionRead()、BlockPoolSliceStorage#recoverTransitionRead()與數據節點恢復的關係很是大,猴子暫時還沒看懂,之後回來補充。

初始化FsDatasetImpl:FsDatasetFactory#newInstance()

FsDatasetFactory#newInstance():

public FsDatasetImpl newInstance(DataNode datanode, DataStorage storage, Configuration conf) throws IOException {
    return new FsDatasetImpl(datanode, storage, conf);
  }
複製代碼

FsDatasetImpl.<init>()

FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
      ) throws IOException {
    ...// 檢查,設置參數等

    @SuppressWarnings("unchecked")
    final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
        ReflectionUtils.newInstance(conf.getClass(
            DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
            RoundRobinVolumeChoosingPolicy.class,
            VolumeChoosingPolicy.class), conf);
    volumes = new FsVolumeList(volsFailed, blockChooserImpl);
    
    ...// 略

    // 每個Storagedirectory都對應一個卷FsVolumeImpl,須要將這些卷添加到FsVolumeList中
    for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
      addVolume(dataLocations, storage.getStorageDir(idx));
    }
    
    ...// 設置lazyWriter、cacheManager等
  }
  
  ...
  
  private void addVolume(Collection<StorageLocation> dataLocations, Storage.StorageDirectory sd) throws IOException {
    // 使用`${dfs.data.dir}/current`目錄
    final File dir = sd.getCurrentDir();
    final StorageType storageType =
        getStorageTypeFromLocations(dataLocations, sd.getRoot());

    FsVolumeImpl fsVolume = new FsVolumeImpl(
        this, sd.getStorageUuid(), dir, this.conf, storageType);
    
    ...// 略
    
    volumes.addVolume(fsVolume);
    
    ...// 略
    
    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
  }
複製代碼

初始化DataStorage的過程當中,將各${dfs.data.dir}放入了storage(即DataNode#storage)。對於datanode來講,${dfs.data.dir}/current目錄就是要添加的卷FsVolumeImpl。

FsDatasetImpl#initPeriodicScanners()

FsDatasetImpl#initPeriodicScanners()(名爲初始化,實爲啓動):

private void initPeriodicScanners(Configuration conf) {
    initDataBlockScanner(conf);
    initDirectoryScanner(conf);
  }
複製代碼

初始化並啓動DataBlockScanner、DirectoryScanners

命名爲init或許是考慮到有可能禁用了數據塊和目錄的掃描器,致使通過FsDatasetImpl#initPeriodicScanners方法後,掃描器並無啓動。但仍然給人形成了誤解。

FsDatasetImpl#addBlockPool()

FsDatasetImpl#addBlockPool()操做的主要對象具體到了各blockpool,完成blockpool、current、rbw、tmp等目錄的檢查、恢復或初始化:

public void addBlockPool(String bpid, Configuration conf) throws IOException {
    LOG.info("Adding block pool " + bpid);
    synchronized(this) {
      // 向全部卷添加blockpool(全部namespace共享全部卷)
      volumes.addBlockPool(bpid, conf);
      // 初始化ReplicaMap中blockpool的映射
      volumeMap.initBlockPool(bpid);
    }
    // 將全部副本加載到FsDatasetImpl#volumeMap中
    volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
  }
複製代碼

FsVolumeList#addBlockPool()

FsVolumeList#addBlockPool(),併發向FsVolumeList中的全部卷添加blockpool(全部namespace共享全部卷):

void addBlockPool(final String bpid, final Configuration conf) throws IOException {
    long totalStartTime = Time.monotonicNow();
    
    final List<IOException> exceptions = Collections.synchronizedList(
        new ArrayList<IOException>());
    List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
    // 併發向FsVolumeList中的全部卷添加blockpool(全部namespace共享全部卷)
    for (final FsVolumeImpl v : volumes) {
      Thread t = new Thread() {
        public void run() {
          try {
            ...// 時間統計
            // 向卷FsVolumeImpl添加blockpool
            v.addBlockPool(bpid, conf);
            ...// 時間統計
          } catch (IOException ioe) {
            ...// 異常處理,循環外統一處理
          }
        }
      };
      blockPoolAddingThreads.add(t);
      t.start();
    }
    for (Thread t : blockPoolAddingThreads) {
      try {
        t.join();
      } catch (InterruptedException ie) {
        throw new IOException(ie);
      }
    }
    ...// 異常處理。若是存在異常,僅拋出掃描捲過程當中的第一個異常
    
    ...// 時間統計
  }
複製代碼

正如FsVolumeList#addBlockPool(),FsVolumeList封裝了不少面向全部卷的操做。

FsVolumeImpl#addBlockPool():

void addBlockPool(String bpid, Configuration conf) throws IOException {
    File bpdir = new File(currentDir, bpid);
    // 建立BlockPoolSlice
    BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
    // 維護FsVolumeImpl中bpid到BlockPoolSlice的映射
    bpSlices.put(bpid, bp);
  }
複製代碼

BlockPoolSlice是blockpool在每一個捲上的實際存在形式。全部捲上相同bpid的BlockPoolSlice組合成小blockpool(概念上即爲BlockPoolSliceStorage),再將相關datanode(向同一個namespace彙報的datanode)上相同bpid的小blockpool組合起來,就構成了該namespace的blockpool。

而FsVolumeImpl#bpSlices維護了bpid到BlockPoolSlice的映射。FsVolumeImpl經過該映射獲取bpid對應的BlockPoolSlice,而BlockPoolSlice再反向藉助FsDatasetImpl中的靜態方法完成實際的文件操做(見後續文章中的寫數據塊過程)。

回到BlockPoolSlice.<init>

BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
      Configuration conf) throws IOException {
    this.bpid = bpid;
    this.volume = volume;
    this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); 
    this.finalizedDir = new File(
        currentDir, DataStorage.STORAGE_DIR_FINALIZED);
    this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST);
    // 檢查並建立finalized目錄
    if (!this.finalizedDir.exists()) {
      if (!this.finalizedDir.mkdirs()) {
        throw new IOException("Failed to mkdirs " + this.finalizedDir);
      }
    }

    this.deleteDuplicateReplicas = conf.getBoolean(
        DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
        DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);

    // 刪除tmp目錄。每次啓動datanode都會刪除tmp目錄(並重建),從新協調數據塊的一致性。
    this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
    if (tmpDir.exists()) {
      FileUtil.fullyDelete(tmpDir);
    }
    
    // 檢查並建立rbw目錄
    this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
    final boolean supportAppends = conf.getBoolean(
        DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
        DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
    // 若是不支持append,那麼同tmp同樣,rbw裏保存的必然是新寫入的數據,能夠在每次啓動datanode時刪除rbw目錄,從新協調
    if (rbwDir.exists() && !supportAppends) {
      FileUtil.fullyDelete(rbwDir);
    } // 若是支持append,待datanode啓動後,有可能繼續append數據,所以不能刪除,等待進一步肯定或恢復
    
    if (!rbwDir.mkdirs()) {
      if (!rbwDir.isDirectory()) {
        throw new IOException("Mkdirs failed to create " + rbwDir.toString());
      }
    }
    
    if (!tmpDir.mkdirs()) {
      if (!tmpDir.isDirectory()) {
        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
      }
    }
    
    // 啓動dfsUsage的監控線程(詳見對hadoop fs shell中df、du區別的總結)
    this.dfsUsage = new DU(bpDir, conf, loadDfsUsed());
    this.dfsUsage.start();
    ShutdownHookManager.get().addShutdownHook(
      new Runnable() {
        @Override
        public void run() {
          if (!dfsUsedSaved) {
            saveDfsUsed();
          }
        }
      }, SHUTDOWN_HOOK_PRIORITY);
  }
複製代碼

可知,每一個blockpool目錄下的存儲結構是在構造BlockPoolSlice時初始化的

關於du的做用及優化:

在linux系統上,該線程將按期經過du -sk命令統計各blockpool目錄的佔用狀況,隨着心跳彙報給namenode。

執行linux命令須要從JVM繼承fork出子進程,成本較高(儘管linux使用COW策略避免了對內存空間的徹底copy)。爲了加快datanode啓動速度,此處容許使用以前緩存的dfsUsage值,該值保存在current目錄下的dfsUsed文件中;緩存的dfsUsage會按期持久化到磁盤中;在虛擬機關閉時,也會將當前的dfsUsage值持久化。

ReplicaMap#initBlockPool()

ReplicaMap#initBlockPool(),初始化ReplicaMap中blockpool的映射:

void initBlockPool(String bpid) {
    checkBlockPool(bpid);
    synchronized(mutex) {
      Map<Long, ReplicaInfo> m = map.get(bpid);
      if (m == null) {
        // Add an entry for block pool if it does not exist already
        m = new HashMap<Long, ReplicaInfo>();
        map.put(bpid, m);
      }
    }
  }
複製代碼

FsDatasetImpl#volumeMap(ReplicaMap實例)中維護了bpid到各blockpool在該datanode上的全部副本:Map<bpid, Map<blockid, replicaInfo>>

例行挖坑

在之後的文章中,猴子會陸續整理DataNode章的寫數據塊過程、讀數據塊過程,NameNode章、Client章等。

因爲猴子也是一步步學習,不免有錯漏之處,煩請讀者批評指正;隨着猴子進一步的學習與自檢,也會隨時更新文章,重要修改會註明勘誤。


本文連接:源碼|HDFS之DataNode:啓動過程
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。

相關文章
相關標籤/搜索