HDFS讀寫過程-從類調用角度

1、HDFS相關類說明java

FileSystem:通用文件系統的抽象基類,能夠被分佈式文件系統繼承,全部可能使用Hadoop文件系統的代碼都要使用到這個類。apache

DistributedFileSystem:Hadoop爲FileSystem這個抽象類提供了多種具體的實現,DistributedFileSystem就是FileSystem在HDFS文件系統中的實現。緩存

FSDataInputStream:FileSystem的open()方法返回的是一個輸入流FSDataInputStream對象,在HDSF文件系統中具體的輸入流就是DFSInputStream。服務器

FSDataOutputStream:FileSystem的create()方法返回的是一個輸出流FSDataOutputStream對象,在HDFS文件系統中具體的輸出流就是DFSOutputStream。網絡

2、讀數據的過程分佈式

客戶端連續調用open()、read()、close()讀取數據時,HDFS內部執行流程以下:
函數

  1. 客戶端經過FileSystem.open()打開文件,相應的,在HDFS文件系統中DistributedFileSystem具體實現了FileSystem。所以,調用open()方法後,DistributedFileSystem會建立輸入流FSDataInputStream,對於HDFS而言,具體的輸入流就是DFSInputStream。
  2. 在DFSInputStream的構造函數中,輸入流經過ClienProtocal.getBlockLocations()遠程調用名稱節點,得到文件開始部分數據塊的保存位置。對於該數據塊,名稱節點返回保存該數據塊的全部數據節點的地址,同時根據距離客戶端的遠近對數據節點進行排序;而後,DistributedFileSystem會利用DFSInputStream來實例化FSDataInputStream,返回給客戶端,同時返回了數據塊的數據節點地址。
  3. 得到輸入流FSDataInputStream後,客戶端調用read()函數開始讀取數據。輸入流根據前面的排序結果,選擇距離客戶端最近的數據節點創建鏈接並讀取數據。
  4. 數據從該數據節點讀到客戶端;當該數據塊讀取完畢時,FSDataInputStream關閉和該數據節點的鏈接。
  5. 輸入流經過getBlockLocations()方法查找下一個數據塊(若是客戶端緩存中已經包含了該數據塊的位置信息,就不須要調用該方法)。
  6. 找到該數據塊的最佳數據節點,讀取數據。
    當客戶端讀取完畢數據的時候,調用FSDataInputStream的close()函數,關閉輸入流。

ps:在讀取數據的過程當中,若是客戶端與數據節點通訊時出現錯誤,就會嘗試鏈接包含此數據塊的下一個數據節點oop

代碼舉例:學習

import java.io.BufferedReader;
import java.io.InputStreamReader; 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
 
public class Hdfsread {
        public static void main(String[] args) {
                try {
                        Configuration conf = new Configuration();
                        conf.set("fs.defaultFS","hdfs://localhost:9000"); 
                        conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
                        FileSystem fs = FileSystem.get(conf);
                        Path file = new Path("test"); 
                        FSDataInputStream getIt = fs.open(file);
                        BufferedReader d = new BufferedReader(new InputStreamReader(getIt));
                        String content = d.readLine(); //讀取文件一行
                        System.out.println(content);
                        d.close(); //關閉文件
                        fs.close(); //關閉hdfs
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
}

3、寫數據過程.net

客戶端向HDFS寫數據是一個複雜的過程,客戶端連續調用create()、write()和close()時,HDFS內部執行過程以下:(ps:不發生任何異常狀況)

  1. 客戶端經過FileSystem.create()建立文件,相應的,在HDFS文件系統中DistributedFileSystem具體實現了FileSystem。所以,調用create()方法後,DistributedFileSystem會建立輸出流FSDataOutputStream,對於HDFS而言,具體的輸出流就是DFSOutputStream。
  2. DistributedFileSystem經過RPC遠程調用名稱節點,在文件系統的命名空間中建立一個新的文件。名稱節點會執行一些檢查,好比文件是否已經存在、客戶端是否有權限建立文件等。檢查經過以後,名稱節點會構造一個新文件,並添加文件信息。遠程方法調用結束後,DistributedFileSystem會利用DFSOutputStream來實例化FSDataOutputStream,返回給客戶端,客戶端使用這個輸入流寫入數據。
  3. 得到輸出流FSDataOutputStream之後,客戶端調用輸出流的write()方法向HDFS中對應的文件寫入數據。
  4. 客戶端向輸出流FSDataOutputStream中寫入的數據會首先被分紅一個個的分包,這些分包被放入DFSOutputStream對象的內部隊列。輸出流FSDataOutputStream會向名稱節點申請保存文件和副本數據塊的若干個數據節點,這些數據節點造成一個數據流通道。隊列中的分包最後被打包成數據包,發往數據流管道中的第一個數據節點,第一個數據節點將數據包發送給第二個數據節點,第二個數據節點將數據包發送給第三個數據節點,這樣,數據包會流經管道上的各個數據節點(流水線複製策略)。
  5. 由於各個數據節點位於不一樣的機器上,數據須要經過網絡發送。所以,爲了保證全部的數據節點的數據都是準確的,接收到數據的數據節點要向發送者發送「確認包」(ACK Packet)。確認包沿着數據流管道逆流而上,從數據流管道依次經過各個數據節點並最終發往客戶端,當客戶端收到應答時,它將對應的分包從內部隊列移除。不斷執行3~5步驟,直到數據所有寫完。
  6. 客戶端調用close()方法關閉輸出流,此時開始,客戶端不會再向輸出流寫入數據,因此,當DFSOutputStream對象內部隊列的分包都收到應答後,就能夠使用ClientProtocol.complete()方法通知名稱節點關閉文件,完成一次正常的寫文件過程。

代碼舉例:

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; 
public class Hdfswrite {    
        public static void main(String[] args) { 
                try {
                        Configuration conf = new Configuration();  
                        conf.set("fs.defaultFS","hdfs://localhost:9000");
                        conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
                        FileSystem fs = FileSystem.get(conf);
                        byte[] buff = "Hello world".getBytes(); // 要寫入的內容
                        String filename = "test"; //要寫入的文件名
                        FSDataOutputStream os = fs.create(new Path(filename));
                        os.write(buff,0,buff.length);
                        System.out.println("Create:"+ filename);
                        os.close();
                        fs.close();
                } catch (Exception e) {  
                        e.printStackTrace();  
                }  
        }  
}

4、簡單總結

讀的過程:

  1. 客戶端訪問名稱節點,查詢並獲取文件的數據塊位置列表,返回輸入流對象。
  2. 就近挑選一臺數據節點服務器,請求創建輸入流 。
  3. 數據節點向輸入流中中寫數據。
  4. 關閉輸入流。

寫的過程:

  1. 客戶端向名稱發出寫文件請求。
  2. 檢查是否已存在文件、檢查權限。若經過檢查,返回輸出流對象。
  3. 客戶端按128MB的塊切分文件。
  4. 客戶端將名稱節點返回的分配的可寫的數據節點列表和Data數據一同發送給最近的第一個數據節點,此後客戶端和名稱節點分配的多個數據節點構成pipeline管道,客戶端向輸出流對象中寫數據。客戶端每向第一個寫入一個packet,這個packet便會直接在pipeline裏傳給第二個、第三個…數據節點。
  5. 每一個數據節點寫完一個塊後,會返回確認信息。
  6. 寫完數據,關閉輸輸出流。
  7. 發送完成信號給名稱節點。

補充2:若經過檢查,直接先將操做寫入EditLog,WAL(write aheadlog)操做,先寫log在寫內存,寫入失敗經過EditLog記錄校驗。
補充4:packet默認64k。
補充5:寫完一個block塊後彙總確認,不會每一個packet確認。
補充7:HDFS通常狀況下都是強調強一致性,即全部數據節點寫完後才向名稱節點彙報。

Reference:

dblab.xmu.edu.cn

blog.csdn.net/qq_38202756/article/details/82262453


學習交流,有任何問題還請隨時評論指出交流。

相關文章
相關標籤/搜索