HDFS中的addBlock函數

 

public LocatedBlock addBlock(String src, String clientMachine) throws IOException { node

        int retries = 5;//設定重試次數爲5 數組

        Object results[] = namesystem.getAdditionalBlock(new UTF8(src), new UTF8(clientMachine)); 安全

//獲取新的一個Block和DataNode節點信息 app

        while (results != null && results[0] == null && retries > 0) { 函數

            try {//發生錯誤,且不超過錯誤次數,就從新獲取 this

                Thread.sleep(100); spa

            } catch (InterruptedException ie) { get

            } it

            results = namesystem.getAdditionalBlock(new UTF8(src), new UTF8(clientMachine)); io

            retries--;

        }

//不管如何,都結束了!

        if (results == null) {//沒找到?返回錯誤

            throw new IOException("Cannot obtain additional block for file " + src);

        } else if (results[0] == null) {

            return null;//沒有Block?返回null

        } else {

            Block b = (Block) results[0];

            DatanodeInfo targets[] = (DatanodeInfo[]) results[1];

            return new LocatedBlock(b, targets);//這個簡單,返回Block和DatanodeInfo

        }

    }

=====================

C1:OKAY  c2:OKAY   c3:OKAY

=====================

 

下面開始講解

 public synchronized Object[] getAdditionalBlock(UTF8 src, UTF8 clientMachine) {

---

public synchronized Object[] getAdditionalBlock(UTF8 src, UTF8 clientMachine) {

        Object results[] = null;

        if (dir.getFile(src) == null && pendingCreates.get(src) != null) {

//文件系統dir中沒有此文件且此文件正在建立過程當中

            results = new Object[2];//建立返回的變量數組

            //

            // If we fail this, bad things happen!

            //這裏的checkFileProgress很關鍵,

            //後面會分析checkFileProgress的做用

            if (checkFileProgress(src)) {

                // Get the array of replication targets //這個仍然很簡單,以前分析過了

                DatanodeInfo targets[] = chooseTargets(this.desiredReplicationnull, clientMachine);

                if (targets.length < this.minReplication) {

                    return null;

                }

                // Create next block返回很少說!

                results[0] = allocateBlock(src);

                results[1] = targets;

            }

        }

        return results;

    }

===

接下來是函數

 

synchronized boolean checkFileProgress(UTF8 src) {

 

這個函數仍是蠻重要的!

 

/**

     * Check that the indicated file's blocks are present and

     * replicated.  If not, return false.

     */

    synchronized boolean checkFileProgress(UTF8 src) {

        Vector v = (VectorpendingCreates.get(src);//獲取文件名src對應的每一個塊

        for (Iterator it = v.iterator(); it.hasNext(); ) {

            Block b = (Block) it.next();//對於當前塊來講

            TreeSet containingNodes = (TreeSetblocksMap.get(b);//獲取這個塊對應的datanode節點信息

            if (containingNodes == null || containingNodes.size() < this.minReplication) {

                return false;//若是此Block對應的datanode個數不足最小備份數,則禁止返回Block

            }

        }

        return true;//順利返回true.

}

從這個函數,咱們得知,若是在寫入的過程當中有datanode掛機,可是受影響的block的總個數達到安全閥值,咱們仍是允許繼續寫入的!

相關文章
相關標籤/搜索