HDFS會週期性的檢查是否有文件缺乏副本,並觸發副本複製邏輯使之達到配置的副本數,node
<property>apache
<name>dfs.replication</name>less
<value>3</value>ide
</property>oop
具體實現是在BlockManager中啓動線程ReplicationMonitor完成:this
org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerspa
/** * Periodically calls computeReplicationWork(). */ private class ReplicationMonitor implements Runnable { @Override public void run() { while (namesystem.isRunning()) { try { // Process replication work only when active NN is out of safe mode. if (namesystem.isPopulatingReplQueues()) { computeDatanodeWork(); processPendingReplications(); } Thread.sleep(replicationRecheckInterval); } catch (Throwable t) {
註釋:sleep間隔replicationRecheckInterval取配置dfs.namenode.replication.interval,默認爲3,即3s線程
/** * Compute block replication and block invalidation work that can be scheduled * on data-nodes. The datanode will be informed of this work at the next * heartbeat. * * @return number of blocks scheduled for replication or removal. */ int computeDatanodeWork() { // Blocks should not be replicated or removed if in safe mode. // It's OK to check safe mode here w/o holding lock, in the worst // case extra replications will be scheduled, and these will get // fixed up later. if (namesystem.isInSafeMode()) { return 0; } final int numlive = heartbeatManager.getLiveDatanodeCount(); final int blocksToProcess = numlive * this.blocksReplWorkMultiplier; final int nodesToProcess = (int) Math.ceil(numlive * this.blocksInvalidateWorkPct); int workFound = this.computeReplicationWork(blocksToProcess);
註釋:倍數blocksReplWorkMultiplier取配置dfs.namenode.replication.work.multiplier.per.iteration,默認爲2,即每次處理datanode數量*2個block;code
/** * Scan blocks in {@link #neededReplications} and assign replication * work to data-nodes they belong to. * * The number of process blocks equals either twice the number of live * data-nodes or the number of under-replicated blocks whichever is less. * * @return number of blocks scheduled for replication during this iteration. */ int computeReplicationWork(int blocksToProcess) { List<List<Block>> blocksToReplicate = null; namesystem.writeLock(); try { // Choose the blocks to be replicated blocksToReplicate = neededReplications .chooseUnderReplicatedBlocks(blocksToProcess); } finally { namesystem.writeUnlock(); } return computeReplicationWorkForBlocks(blocksToReplicate); } int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) { ... // Add block to the to be replicated list rw.srcNode.addBlockToBeReplicated(block, targets); scheduledWork++;
註釋:具體的處理過程是將待複製block添加到對應的原始datanode上;orm
下面看DatanodeManager代碼:
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, final String blockPoolId, long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes ) throws IOException { ... final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(); //check pending replication List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand( maxTransfers); if (pendingList != null) { cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, pendingList)); }
註釋:而後在DatanodeManager中處理心跳時將複製block信息發給對應的原始datanode;其中maxTransfer取值爲
final int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress;
getMaxReplicationStreams取配置dfs.namenode.replication.max-streams,默認是2,即一個datanode同時最多有2個block在複製;