可先看下網友的解析:http://fire-balrog.iteye.com/blog/812281 node
如下爲個人解析: 函數
當一個Block經由全部的DataNode寫完後,就須要告訴namenode能夠執行reportWrittenBlock函數了。 this
下面就來解析下這個函數的意思! spa
==================================== orm
/** blog
* The client can report in a set written blocks that it wrote. element
* These blocks are reported via the client instead of the datanode rem
* to prevent weird heartbeat race conditions. get
*/ it
public void reportWrittenBlock(LocatedBlock lb) throws IOException {
Block b = lb.getBlock();//獲取完成的這個Block信息
DatanodeInfo targets[] = lb.getLocations();//獲取節點信息
for (int i = 0; i < targets.length; i++) {
namesystem.blockReceived(b, targets[i].getName());//對於每一個DataNode來講,都要調用一次此函數
}
}
C1:2014-12-19 18:26:00 C2:2014-12-19 18:59:00 C3:2014-12-19 19:03:00
=========================
那麼,接下來就是理解 namesystem.blockReceived(b, targets[i].getName());了。
/**
* The given node is reporting that it received a certain block.
*/
public synchronized void blockReceived(Block block, UTF8 name) {
DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);//獲取對應的datanode
if (node == null) {//爲空可不行
throw new IllegalArgumentException("Unexpected exception. Got blockReceived message from node " + name + ", but there is no info for " + name);
}
//
// Modify the blocks->datanode map
//
addStoredBlock(block, node);//下面兩行是來執行block和node的一個映射。
//
// Supplement node's blockreport
//
node.addBlock(block);//同上
}
C1:2014-12-19 19:11:00 C2:2014-12-19 19:11:00 C3:2014-12-19 19:12:00
===============那麼接下來還有2個函數須要攻破,分別是addStoredBlock和node.addBlock(block);
後面一個函數很是簡單,不細講,因此就剩下最後一個函數了!
addStoredBlock(block, node);的執行過程如下:
synchronized void addStoredBlock(Block block, DatanodeInfo node) {
TreeSet containingNodes = (TreeSet) blocksMap.get(block);//獲取當前block已經存在的datanode信息
if (containingNodes == null) {//這裏保證確定存在datanode集合,不保證必定有節點在內
containingNodes = new TreeSet();
blocksMap.put(block, containingNodes);
}
if (! containingNodes.contains(node)) {//根據須要決定是否加入此datanode信息
containingNodes.add(node);
} else {
LOG.info("Redundant addStoredBlock request received for block " + block + " on node " + node);
}
//接下來的邏輯是肯定是否須要從新備份
synchronized (neededReplications) {//鎖定neededReplications
if (dir.isValidBlock(block)) {//不懂這一句
if (containingNodes.size() >= this.desiredReplication) {//若是已經超過最大備份個數
neededReplications.remove(block);//刪除此block
pendingReplications.remove(block);//刪除此block
} else if (containingNodes.size() < this.desiredReplication) {
if (! neededReplications.contains(block)) {
neededReplications.add(block);//不然表示須要從新備份,這代碼寫的真夠差的。。。
}
}
//
// Find how many of the containing nodes are "extra", if any.
// If there are any extras, call chooseExcessReplicates() to
// mark them in the excessReplicateMap.
//
//也有可能一個block存儲的datanode節點數太多了,一樣要刪除這些block
Vector nonExcess = new Vector();//構造一個空的Vector
for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
DatanodeInfo cur = (DatanodeInfo) it.next();//對於當前節點來講
TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());//取到當前節點的多餘塊信息
if (excessBlocks == null || ! excessBlocks.contains(block)) {//若是以前沒有標誌在這個節點的多餘塊信息裏
nonExcess.add(cur);//則代表當前節點存儲了這個block
}
}
if (nonExcess.size() > this.maxReplication) {//若是超過了最大備份數
chooseExcessReplicates(nonExcess, block, this.maxReplication);//選擇若干來消除塊
}
}
}
}
void chooseExcessReplicates(Vector nonExcess, Block b, int maxReps) {
while (nonExcess.size() - maxReps > 0) {//若是還有須要
int chosenNode = r.nextInt(nonExcess.size());//隨機選擇一個節點
DatanodeInfo cur = (DatanodeInfo) nonExcess.elementAt(chosenNode);
nonExcess.removeElementAt(chosenNode);//獲取這個節點
TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
if (excessBlocks == null) {
excessBlocks = new TreeSet();
excessReplicateMap.put(cur.getName(), excessBlocks);
}
excessBlocks.add(b);//加入此block到excessReplicateMap
//
// The 'excessblocks' tracks blocks until we get confirmation
// that the datanode has deleted them; the only way we remove them
// is when we get a "removeBlock" message.
//
// The 'invalidate' list is used to inform the datanode the block
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
//
Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getName());
if (invalidateSet == null) {
invalidateSet = new Vector();
recentInvalidateSets.put(cur.getName(), invalidateSet);
}
invalidateSet.add(b);//一樣的,更新recentInvalidateSets,沒啥好解釋的
}
}