balance過程就是從存儲使用率超出集羣平均使用率的datanode上將超出的block移動到低於集羣平均使用率的datanode上,最終知足平衡標準。
over-utilized------>under-utilized
over-utilized------>below-average
above-average--->under-utilizedjava
1. over-utilized :使用率超出閥值的datanode
2. above-average : 使用率超出平均值,可是低於閥值的datanode
3. below-average : 使用率低於平均值,可是高於閥值的datanode
4. under-utilized: 使用率低於閥值的datanodenode
平衡標準:參與balance的每一個datanode的使用率 與 全部參與balance的全局的使用率 的差值 接近 給定的閥值thresholdapache
balance過程分多個迭代完成,每一個迭代開始時balancer按照上面的規則將全部參與balance的datanode分類、配對,造成<source,target>集合,一個<source,target>表示將source datanode上的部分block遷移到target datanode上。緩存
每一個<source,target>對應一個線程,該線程主要功能:
一、從namenode上隨機拉取必定數量的block(每次最多2G,累計20G),篩選以後保存到src_block列表中。觸發拉取的條件之一:src_block列表中還沒有被遷移的block數量少於5(固定值,不可配)。
二、將src_block列表中的block提交到線程池(線程池大小:dfs.balancer.moverThreads,默認1000)進行遷移。多線程
遷移過程使用多線程(線程數量:dfs.balancer.moverThreads)完成,每一個線程每次遷移一個block。
遷移過程:
step1:從src_block列表中選擇一個block
step2:檢查block的全部副本所在的datanode中是否有跟target同機架的datanode,若是有則選擇其做爲proxy,若是沒有則從中隨機選擇一個datanode做爲proxy。
step3:balancer通知target 從proxy上將block複製一份到本機。
step4:若是複製成功,則target 請求namenode將block從source上刪除,並反饋balancer。
step1選擇block時,會檢查target,step2選擇proxy時,會檢查備選datanode,檢查的內容包括:
一、當前正在執行復制的線程數量是否超過最大線程數(dfs.datanode.balance.max.concurrent.moves)
二、是否在禁用期。每次複製失敗都會致使datanode 10秒鐘內禁止作爲proxy和target。socket
核心方法、流程:oop
Balancer.runOneIteration() //開始一輪 Dispatcher.init(); //選擇參與balance的datanode Balancer.init(); //分組 Balancer.chooseStorageGroups() //構建<source, target> 優先匹配同機架、而後隨意搭配 Dispatcher.dispatchAndCheckContinue() //以source爲單位分發block移動任務 Dispatcher.dispatchBlockMoves(); //每一個source啓動一個線程 Source.dispatchBlocks(); Source.getBlockList(); //從namenode拉取總大小爲2G的block Source.chooseNextMove() //每次選一個block放到線程池去移動 PendingMove.chooseBlockAndProxy(); //選擇一個block和一個proxy PendingMove.markMovedIfGoodBlock(); PendingMove.isGoodBlockCandidate(); //是不是合適的block PendingMove.chooseProxySource() //爲選擇的block選擇一個合適的proxy
datanode分類:ui
private long init(List<DatanodeStorageReport> reports) { // compute average utilization for (DatanodeStorageReport r : reports) { policy.accumulateSpaces(r); } policy.initAvgUtilization(); // create network topology and classify utilization collections: // over-utilized, above-average, below-average and under-utilized. long overLoadedBytes = 0L, underLoadedBytes = 0L; for(DatanodeStorageReport r : reports) { final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); for(StorageType t : StorageType.getMovableTypes()) { //全部參與balance的節點[lived 且包含在include中的節點]的使用率(已使用的存儲/總的存儲) final Double utilization = policy.getUtilization(r, t); if (utilization == null) { // datanode does not have such storage type continue; } final long capacity = getCapacity(r, t); final double utilizationDiff = utilization - policy.getAvgUtilization(t); //thresholdDiff越小說明該datanode越理想 final double thresholdDiff = Math.abs(utilizationDiff) - threshold; final long maxSize2Move = computeMaxSize2Move(capacity, getRemaining(r, t), utilizationDiff, threshold); final StorageGroup g; if (utilizationDiff > 0) { final Source s = dn.addSource(t, maxSize2Move, dispatcher); if (thresholdDiff <= 0) { // within threshold aboveAvgUtilized.add(s); } else { overLoadedBytes += precentage2bytes(thresholdDiff, capacity); overUtilized.add(s); } g = s; } else { g = dn.addTarget(t, maxSize2Move); if (thresholdDiff <= 0) { // within threshold belowAvgUtilized.add(g); } else { underLoadedBytes += precentage2bytes(thresholdDiff, capacity); underUtilized.add(g); } } dispatcher.getStorageGroupMap().put(g); } } logUtilizationCollections(); Preconditions.checkState(dispatcher.getStorageGroupMap().size() == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(), "Mismatched number of storage groups"); // return number of bytes to be moved in order to make the cluster balanced return Math.max(overLoadedBytes, underLoadedBytes); }
選擇proxy:this
private boolean chooseProxySource() { final DatanodeInfo targetDN = target.getDatanodeInfo(); // if source and target are same nodes then no need of proxy if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) { return true; } // if node group is supported, first try add nodes in the same node group if (cluster.isNodeGroupAware()) { for (StorageGroup loc : block.getLocations()) { if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; } } } // check if there is replica which is on the same rack with the target //優選選擇副本所在機器跟target在相同rack的dn做爲proxy,同時須要改dn當前處理balance[發送block]的線程數量是否超標 for (StorageGroup loc : block.getLocations()) { if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; } } // find out a non-busy replica //若是以上都沒有選擇出合適的proxy,那麼就選一個不忙的dn做爲proxy for (StorageGroup loc : block.getLocations()) { if (addTo(loc)) { return true; } } return false; }
block 遷移:spa
step1:balancer socket鏈接target,發起replaceBlock 請求,請求target從proxy上覆制一個block副本到本地來替換掉source上的副本。
step2:target向proxy 發起copyBlock請求,從proxy上將block副本複製到本地,複製完成後 target 經過notifyNamenodeReceivedBlock 方法生成一個ReceivedDeletedBlockInfo對象並緩存在隊列,下一次發起心跳的時候會據此對象通知namenode 將target上新加的block副本存入blockmap,並將source上對應的block 副本刪除
private void dispatch() { if (LOG.isDebugEnabled()) { LOG.debug("Start moving " + this); } Socket sock = new Socket(); DataOutputStream out = null; DataInputStream in = null; try { //balaner創建跟target的鏈接 sock.connect( NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()), HdfsServerConstants.READ_TIMEOUT); sock.setKeepAlive(true); OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock()); final KeyManager km = nnc.getKeyManager(); Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, unbufIn, km, accessToken, target.getDatanodeInfo()); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.IO_FILE_BUFFER_SIZE)); in = new DataInputStream(new BufferedInputStream(unbufIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); //向target發請求,命令其複製block副本 sendRequest(out, eb, accessToken); receiveResponse(in); nnc.getBytesMoved().addAndGet(block.getNumBytes()); LOG.info("Successfully moved " + this); } catch (IOException e) { LOG.warn("Failed to move " + this + ": " + e.getMessage()); target.getDDatanode().setHasFailure(); // Proxy or target may have some issues, delay before using these nodes // further in order to avoid a potential storm of "threads quota // exceeded" warnings when the dispatcher gets out of sync with work // going on in datanodes. //遷移失敗,多是由於proxy、target當前過於繁忙(同時處理blockReplace的操做太多),因此延遲其參與balance proxySource.activateDelay(delayAfterErrors); target.getDDatanode().activateDelay(delayAfterErrors); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeSocket(sock); //無論遷移成功仍是失敗,都將當前block從隊列中刪除 proxySource.removePendingBlock(this); target.getDDatanode().removePendingBlock(this); synchronized (this) { reset(); } synchronized (Dispatcher.this) { Dispatcher.this.notifyAll(); } } } private void sendRequest(DataOutputStream out, ExtendedBlock eb, Token<BlockTokenIdentifier> accessToken) throws IOException { new Sender(out).replaceBlock(eb, target.storageType, accessToken, source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); //第4個參數是source 的uuid(在name上惟一標示一個datanode),用於通知namenode刪source上的副本 }
target複製完成後的日誌:
2019-07-03 10:15:41,319 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Moved BP-691805646-10.116.100.3-1498296722300:blk_2306375028_1234536481 from /10.116.100.149:36907, delHint=7a29645c-cc12-44ce-b9c3-6642a82317c4
常見異常:
target異常: WARN balancer.Dispatcher: Failed to move blk_10818540065_9759022625 with size=12178 from 10.116.100.126:50010:DISK to 10.116.100.143:50010:DISK through 10.116.102.93:50010: Got error, status message Not able to receive block 10818540065 from /10.116.100.149:57891 because threads quota is exceeded., block move is failed proxy異常: WARN balancer.Dispatcher: Failed to move blk_13904085291_12851796248 with size=412 from 10.116.101.126:50010:DISK to 10.116.101.227:50010:DISK through 10.116.100.51:50010: Got error, status message opReplaceBlock BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 received exception java.io.IOException: Got error, status message Not able to copy block 13904085291 to /10.116.101.227:42066 because threads quota is exceeded., copy block BP-691805646-10.116.100.3-1498296722300:blk_13904085291_12851796248 from /10.116.100.51:50010, block move is failed
threads quota is exceeded 說明datanode上(target、proxy)上當前正在參與balance進行blockReplace(target)和blockCopy(proxy)的線程數量超過閥值(dfs.datanode.balance.max.concurrent.moves),因此block遷移失敗。
若是大量出現此類異常,那麼balance的速度會很慢:
直接緣由:block遷移失敗;
間接緣由:target和proxy會進入禁用期,致使可選proxy減小,進而src_block中的block不能及時被消費,也不能拉取新的blcok
balance慢的解決辦法: 一、將datanode的遷移線程數dfs.datanode.balance.max.concurrent.moves增大。增大以後threads quota is exceeded 的問題會緩解,balance速度會加快。修改此參數需重啓datanode。 二、將balancer的dfs.datanode.balance.max.concurrent.moves 增大。balancer上此值應該比datanode上的值稍微小一點,由於兩個股進程存在狀態不一樣步的可能。 三、將-threshold 增大。增大以後能夠優先遷移使用率最高的datanode,待使用率將下來以後,再將threshold下降。 四、若是是多namespace的狀況,則能夠將-policy 由默認datanode改成Pool,在遷移時應根據namespace對應blockpool的使用率來評估datanode是否要balance。