這篇分析一下Lease Recovery 和 Block Recoverynode
hdfs支持hflush後,須要保證hflush的數據被讀到,datanode重啓不能簡單的丟棄文件的最後一個block,而是須要保留下hflush的數據。同時爲了支持append,須要將已經finalized的block從新打開追加數據。這就爲宕機的恢復處理帶來了更大的困難,支持hflush/append以前,hdfs只須要將未關閉文件的最後一個block的多個副本刪除便可.apache
在hdfs的設計中,Lease是爲了實現一個文件在一個時刻只能被一個客戶端寫。客戶端寫文件或者append以前都須要向namenode申請這個文件的Lease,在客戶端寫數據的過程當中,後臺線程會不斷的renew lease,不斷的延長獨佔寫的時間.實際上,Lease有兩個limit,一個是soft limit,默認60s,一個是hard limit,默認1小時。這兩個limit的區別以下:app
lease soft limit過時以前,該客戶端擁有對這個文件的獨立訪問權,其餘客戶端不能剝奪該客戶端獨佔寫這個文件的權利。函數
lease soft limit過時後,任何一個客戶端均可以回收lease,繼而獲得這個文件的lease,得到對這個文件的獨佔訪問權。oop
lease hard limit過時後,namenode強制關閉文件,撤銷lease.this
考慮客戶端寫文件的過程當中宕機,那麼在lease soft limit過時以前,其餘的客戶端不能寫這個文件,等到lease soft limit過時後,其餘客戶端能夠寫這個文件,在寫文件以前,會首先檢查文件是否是沒有關閉,若是沒有,那麼就會進入lease recovery和block recovery階段,這個階段的目的是使文件的最後一個block的全部副本數據達到一致,由於客戶端寫block的多個副本是pipeline寫,pipeline中的副本數據不一致很正常。線程
本文考慮客戶端寫的過程當中客戶端宕機,隨後其餘客戶端對這個文件進行append操做的場景。設計
客戶端經過以下代碼對一個文件進行append:code
FileSystem fs = FileSystem.get(configuration); FSDataOutputStream out = fs.append(path); out.write(byte[]);
append操做在namenode這端主要邏輯在FSNameSystem的appendFileInternal函數中處理,內部會調用blog
// Opening an existing file for write - may need to recover lease. recoverLeaseInternal(myFile, src, holder, clientMachine, false);
來檢查是否須要首先對文件進行lease recovery.重點看看這個函數.
private void recoverLeaseInternal(INodeFile fileInode, String src, String holder, String clientMachine, boolean force) throws IOException { // holder是對這個文件進行append的clientname assert hasWriteLock(); if (fileInode != null && fileInode.isUnderConstruction()) { // // If the file is under construction , then it must be in our // leases. Find the appropriate lease record. // Lease lease = leaseManager.getLease(holder); // // We found the lease for this file. And surprisingly the original // holder is trying to recreate this file. This should never occur. // if (!force && lease != null) { Lease leaseFile = leaseManager.getLeaseByPath(src); if ((leaseFile != null && leaseFile.equals(lease)) || lease.getHolder().equals(holder)) { throw new AlreadyBeingCreatedException( "failed to create file " + src + " for " + holder + " for client " + clientMachine + " because current leaseholder is trying to recreate file."); } } // // Find the original holder. // FileUnderConstructionFeature uc = fileInode.getFileUnderConstructionFeature(); String clientName = uc.getClientName(); lease = leaseManager.getLease(clientName); if (lease == null) { throw new AlreadyBeingCreatedException( "failed to create file " + src + " for " + holder + " for client " + clientMachine + " because pendingCreates is non-null but no leases found."); } if (force) { // close now: no need to wait for soft lease expiration and // close only the file src LOG.info("recoverLease: " + lease + ", src=" + src + " from client " + clientName); internalReleaseLease(lease, src, holder); } else { assert lease.getHolder().equals(clientName) : "Current lease holder " + lease.getHolder() + " does not match file creator " + clientName; // // If the original holder has not renewed in the last SOFTLIMIT // period, then start lease recovery. // if (lease.expiredSoftLimit()) { LOG.info("startFile: recover " + lease + ", src=" + src + " client " + clientName); boolean isClosed = internalReleaseLease(lease, src, null); if(!isClosed) throw new RecoveryInProgressException( "Failed to close file " + src + ". Lease recovery is in progress. Try again later."); } else { final BlockInfo lastBlock = fileInode.getLastBlock(); if (lastBlock != null && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) { throw new RecoveryInProgressException("Recovery in progress, file [" + src + "], " + "lease owner [" + lease.getHolder() + "]"); } else { throw new AlreadyBeingCreatedException("Failed to create file [" + src + "] for [" + holder + "] for client [" + clientMachine + "], because this file is already being created by [" + clientName + "] on [" + uc.getClientMachine() + "]"); } } } } }
經過檢查文件的INode看文件的狀態,若是處於under construction狀態,說明,該文件不處於關閉狀態,那麼極可能這個文件須要通過lease recovery和block recovery階段來對文件的最後一個block的多個副本達到一致.
從lease manager中根據clientname拿到clientname持有的Lease(holder是調用這次append操做的clientname),若是不爲空,說明該客戶端依然持有lease,那麼接着看這個lease中是否包含append的這個文件名,若是確實有,那麼說明當前客戶端仍然持有這個文件的lease,append失敗,由於append的前提條件是文件處於closed狀態.若是lease中不包含這個文件,說明客戶端當前不持有這個文件的Lease,那麼繼續往下走
從INode中找出這個以前擁有這個文件的leaseholder,也就是在咱們設定的場景中的宕機的客戶端,而後從lease manager中找到宕機的客戶端對應的Lease,而後檢查是否這個lease已經soft limit過時,若是過時,則調用
boolean isClosed = internalReleaseLease(lease, src, null);
這個函數檢查是否須要真正的進入block recovery階段,這個階段須要datanode的參與。下面函數的主要邏輯以下.
3.1. 若是文件的全部block都是completed狀態,則不須要進行block recovery,關閉文件.
則從lease manager將這個文件的lease刪除,將INode的狀態置爲complete,最後記一條close file的edit log
3.2. 若是最後一個block是committed狀態,那麼看該文件的最後兩個block的狀態,若是倒數第二個block和最後一個block都知足最小副本數要求(默認是1),關閉文件.不然,客戶端拋異常。
3.3. 若是最後一個block是under construction或者under recovery狀態,而且最後一個block沒有任何datanode彙報上來,頗有多是pipeline還沒創建起來,客戶端就宕機了,這種狀況下,只須要把最後一個block從INode中溢出,而且關閉文件.
3.4. 進入block recovery階段.
將block狀態設置爲under recovery,從block的多個副本中選擇一個副本所在的datanode做爲primary data node,而後將這個block放入這個datanode的recoverBlocks列表中,隨後,namenode在處理datanode的按期心跳中,會將這個datanode的全部的recoverBlocks都在心跳回復中發送給datanode,以BlockRecoveryCommand的形式.代碼:
DatanodeManager::handleHeartbeat //check lease recovery BlockInfoUnderConstruction[] blocks = nodeinfo .getLeaseRecoveryCommand(Integer.MAX_VALUE); if (blocks != null) { BlockRecoveryCommand brCommand = new BlockRecoveryCommand( blocks.length); for (BlockInfoUnderConstruction b : blocks) { final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations(); // Skip stale nodes during recovery - not heart beated for some time (30s by default). final List<DatanodeStorageInfo> recoveryLocations = new ArrayList<DatanodeStorageInfo>(storages.length); for (int i = 0; i < storages.length; i++) { if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) { recoveryLocations.add(storages[i]); } } // If we only get 1 replica after eliminating stale nodes, then choose all // replicas for recovery and let the primary data node handle failures. if (recoveryLocations.size() > 1) { if (recoveryLocations.size() != storages.length) { LOG.info("Skipped stale nodes for recovery : " + (storages.length - recoveryLocations.size())); } brCommand.add(new RecoveringBlock( new ExtendedBlock(blockPoolId, b), DatanodeStorageInfo.toDatanodeInfos(recoveryLocations), b.getBlockRecoveryId())); } else { // If too many replicas are stale, then choose all replicas to participate // in block recovery. brCommand.add(new RecoveringBlock( new ExtendedBlock(blockPoolId, b), DatanodeStorageInfo.toDatanodeInfos(storages), b.getBlockRecoveryId())); } } return new DatanodeCommand[] { brCommand }; }
如今看DataNode端.
DataNode端的BPServiceActor處理心跳回復,在offerService()函數中,從心跳回復中拿出全部的DataNodeCommand處理。在processCommandFromActive函數中檢查,command類型是DNA_RECOVERBLOCK,說明是block recovery命令,調用DataNode的recoverBlocks處理.
case DatanodeProtocol.DNA_RECOVERBLOCK: String who = "NameNode at " + actor.getNNSocketAddress(); dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks()); break;
dn.recoverBlocks會起一個後臺線程專門來處理這件事,對於每一個須要recover的block:
從block拿出副本所在的datanode,給其餘兩個副本所在的datanode創建鏈接,datanode之間的接口定義在InterDatanodeProtocol接口中,調用DataNode(包括本身)的initReplicaRecovery(rBlock)函數,DataNode最終會調用FsDatasetImpl的initReplicaRecovery方法來初始化datanode上須要恢復的replica。看看這個函數:
static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, Block block, long recoveryId, long xceiverStopTimeout) throws IOException { final ReplicaInfo replica = map.get(bpid, block.getBlockId()); LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId + ", replica=" + replica); //check replica if (replica == null) { return null; } //stop writer if there is any if (replica instanceof ReplicaInPipeline) { final ReplicaInPipeline rip = (ReplicaInPipeline)replica; rip.stopWriter(xceiverStopTimeout); //check replica bytes on disk. if (rip.getBytesOnDisk() < rip.getVisibleLength()) { throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" + " getBytesOnDisk() < getVisibleLength(), rip=" + rip); } //check the replica's files checkReplicaFiles(rip); } //check generation stamp if (replica.getGenerationStamp() < block.getGenerationStamp()) { throw new IOException( "replica.getGenerationStamp() < block.getGenerationStamp(), block=" + block + ", replica=" + replica); } //check recovery id if (replica.getGenerationStamp() >= recoveryId) { throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" + " replica.getGenerationStamp() >= recoveryId = " + recoveryId + ", block=" + block + ", replica=" + replica); } //check RUR final ReplicaUnderRecovery rur; if (replica.getState() == ReplicaState.RUR) { rur = (ReplicaUnderRecovery)replica; if (rur.getRecoveryID() >= recoveryId) { throw new RecoveryInProgressException( "rur.getRecoveryID() >= recoveryId = " + recoveryId + ", block=" + block + ", rur=" + rur); } final long oldRecoveryID = rur.getRecoveryID(); rur.setRecoveryID(recoveryId); LOG.info("initReplicaRecovery: update recovery id for " + block + " from " + oldRecoveryID + " to " + recoveryId); } else { rur = new ReplicaUnderRecovery(replica, recoveryId); map.add(bpid, rur); LOG.info("initReplicaRecovery: changing replica state for " + block + " from " + replica.getState() + " to " + rur.getState()); } return rur.createInfo(); }
首先,檢查副本的狀態,若是當前副本的狀態是正在寫的過程當中,那麼調用replica的stopWriter中止這個寫線程,中止的方法就是interupt這個寫線程(寫pipeline時,datanode建立replica時會將當前寫線程的handle存到replica中),從這能夠看出blcok recovery優先級很高。而後作一些check,好比副本在磁盤上的文件是否存在,meta文件是否存在等,而後,檢查generation stamp,namenode記錄的generation stamp不能比實際的大,recovery id不能比副本的generation stamp小,最後,建立一個ReplicaUnderRecovery,放入replica map中,這裏還會檢查,若是replica已經處於under recovery狀態,則看當前的block recovery過程的recovery id和它誰大,若是更大,則強佔它。
接着,將三個副本的信息(包括recovery前的副本的信息)都加入一個列表,而後開始sync,sync就是根據三個副本的原來的狀態,作一些選擇,規則以下,這是兩個副本的狀況:
hadoop-hdfs-2.4.1.jar