HDFS寫文件過程分析

參考:
  HDFS寫文件過程分析http://shiyanjun.cn/archives/942.html
  HDFS的工做流程分析https://blog.csdn.net/z66261123/article/details/51194204
  簡單搞定hdfs讀寫流程https://blog.csdn.net/github_36444580/article/details/77840481
  Hadoop核心-HDFS讀寫流程https://yq.aliyun.com/articles/325428
 
HDFS的工做機制概述
  • HDFS集羣分爲兩大角色:NameNode、DataNode
  • NameNode負責管理整個文件系統的元數據
  • DataNode 負責管理用戶的文件數據塊
  • 文件會按照固定的大小(blocksize)切成若干塊後分布式存儲在若干臺datanode上
  • 每個文件塊能夠有多個副本,並存放在不一樣的datanode上
  • Datanode會按期向Namenode彙報自身所保存的文件block信息,而namenode則會負責保持文件的副本數量
  • HDFS的內部工做機制對客戶端保持透明,客戶端請求訪問HDFS都是經過向namenode申請來進行

HDFS寫文件過程分析html

  HDFS是一個分佈式文件系統,在HDFS上寫文件的過程與咱們平時使用的單機文件系統很是不一樣,從宏觀上來看,在HDFS文件系統上建立並寫一個文件,流程以下圖(來自《Hadoop:The Definitive Guide》一書)所示:node

  

  

具體過程描述以下:git

  1. Client調用DistributedFileSystem對象的create方法,建立一個文件輸出流(FSDataOutputStream)對象
  2. 經過DistributedFileSystem對象與Hadoop集羣的NameNode進行一次RPC遠程調用,在HDFS的Namespace中建立一個文件條目(Entry),該條目沒有任何的Block
  3. 經過FSDataOutputStream對象,向DataNode寫入數據,數據首先被寫入FSDataOutputStream對象內部的Buffer中,而後數據被分割成一個個Packet數據包
  4. 以Packet最小單位,基於Socket鏈接發送到按特定算法選擇的HDFS集羣中一組DataNode(正常是3個,可能大於等於1)中的一個節點上,在這組DataNode組成的Pipeline上依次傳輸Packet
  5. 這組DataNode組成的Pipeline反方向上,發送ack,最終由Pipeline中第一個DataNode節點將Pipeline ack發送給Client
  6. 完成向文件寫入數據,Client在文件輸出流(FSDataOutputStream)對象上調用close方法,關閉流
  7. 調用DistributedFileSystem對象的complete方法,通知NameNode文件寫入成功

更詳細的流程:github

  1. client發起文件上傳請求,經過RPC與NameNode創建鏈接,NameNode檢查目標文件是否已經存在,父目錄是否存在,並檢查用戶是否有相應的權限,若檢查經過,會爲該文件建立一個新的記錄,不然的話文件建立失敗,客戶端獲得異常信息
  2. client經過請求NameNode,第一個block應該傳輸到哪些DataNode服務器上
  3. NameNode根據配置文件中指定的備份(replica)數量及機架感知原理進行文件分配,返回可用的DataNode的地址 以三臺DataNode爲例:A B C。注: Hadoop在設計時考慮到數據的安全與高效,數據文件默認在HDFS上存放三份,存儲策略爲:第一個備份放在客戶端相同的datanode上(若客戶端在集羣外運行,就隨機選取一個datanode來存放第一個replica),第二個replica放在與第一個replica不一樣機架的一個隨機datanode上,第三個replica放在與第二個replica相同機架的隨機datanode上,若是replica數大於三,則隨後的replica在集羣中隨機存放,Hadoop會盡可能避免過多的replica存放在同一個機架上.選取replica存放在同一個機架上.(Hadoop 1.x之後容許replica是可插拔的,意思是說能夠定製本身須要的replica分配策略)
  4. client請求3臺的DataNode的一臺A上傳數據,(本質是一個RPC調用,創建pipeline),A收到請求會繼續調用B,而後B調用C,將整個pipeline創建完成後,逐級返回client
  5. client開始往A上傳第一個block(先從磁盤讀取數據放到一個本地內存緩存),以packet爲單位(默認 64K),A收到一個packet就會傳給B,B傳遞給C;A每傳一個packet會放入一個應答隊列等待應答。注: 若是某個datanode在寫數據的時候宕掉了下面這些對用戶透明的步驟會被執行:數據被分割成一個個packet數據包在pipeline上一次傳輸,在pipeline反方向上,逐個發送ack(命令正確應答),最終由pipeline中第一個DataNode節點A將pipeline ack發送給client
    1. 管道線關閉,全部確認隊列上的數據會被挪到數據隊列的首部從新發送,這樣也就確保管道線中宕掉的datanode下流的datanode不會由於宕掉的datanode而丟失數據包
    2. 在還在正常運行datanode上的當前block上作一個標誌,這樣當宕掉的datanode從新啓動之後namenode就會知道該datanode上哪一個block是剛纔宕機殘留下的局部損壞block,從而把他刪除掉
    3. 已經宕掉的datanode從管道線中被移除,未寫完的block的其餘數據繼續唄寫入到其餘兩個還在正常運行的datanode中,namenode知道這個block還處在under-replicated狀態(即備份數不足的狀態)下,而後它會安排一個新的replica從而達到要求的備份數,後續的block寫入方法同前面正常時候同樣
    4. 有可能管道線中的多個datanode宕掉(通常這種狀況不多),但只要dfs.relication.min(默認值爲1)個replica被建立,我麼就認爲該建立成功了,剩餘的relica會在之後異步建立以達到指定的replica數
  6. 當一個block傳輸完成後,client再次發送請求NameNode上傳第二個block到服務器

機架感知(副本節點選擇):算法

  1. 第一個副本在client所處的節點上。若是客戶端在集羣外,隨機選一個
  2. 第二個副本和第一個副本位於相同機架,隨機節點。
  3. 第三個副本位於不一樣機架,隨機節點

  

 

下面代碼使用Hadoop的API來實現向HDFS的文件寫入數據,一樣也包括建立一個文件和寫數據兩個主要過程,代碼以下所示:緩存

static String[] contents = new String[] {
     "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
     "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
     "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
     "dddddddddddddddddddddddddddddddd",
     "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
};
 
public static void main(String[] args) {
     String file = "hdfs://h1:8020/data/test/test.log";
   Path path = new Path(file);
   Configuration conf = new Configuration();
   FileSystem fs = null;
   FSDataOutputStream output = null;
   try {
          fs = path.getFileSystem(conf);
          output = fs.create(path); // 建立文件
          for(String line : contents) { // 寫入數據
               output.write(line.getBytes("UTF-8"));
               output.flush();
          }
     } catch (IOException e) {
          e.printStackTrace();
     } finally {
          try {
               output.close();
          } catch (IOException e) {
               e.printStackTrace();
          }
     }
}

結合上面的示例代碼,咱們先從fs.create(path);開始,能夠看到FileSystem的實現DistributedFileSystem中給出了最終返回FSDataOutputStream對象的抽象邏輯,代碼以下所示:安全

public FSDataOutputStream create(Path f, FsPermission permission,
  boolean overwrite,
  int bufferSize, short replication, long blockSize,
  Progressable progress) throws IOException {
 
  statistics.incrementWriteOps(1);
  return new FSDataOutputStream
     (dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics);
}

上面,DFSClient dfs的create方法中建立了一個OutputStream對象,在DFSClient的create方法:服務器

 public OutputStream create(String src,
                             FsPermission permission,
                             boolean overwrite,
                             boolean createParent,
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize
                             ) throws IOException {
   ... ...
}
建立了一個DFSOutputStream對象,以下所示:
final DFSOutputStream result = new DFSOutputStream(src, masked,
    overwrite, createParent, replication, blockSize, progress, buffersize,
    conf.getInt("io.bytes.per.checksum", 512));

下面,咱們從DFSOutputStream類開始,說明其內部實現原理。數據結構

DFSOutputStream內部原理
  打開一個DFSOutputStream流,Client會寫數據到流內部的一個緩衝區中,而後數據被分解成多個Packet,每一個Packet大小爲64k字節,每一個Packet又由一組chunk和這組chunk對應的checksum數據組成,默認chunk大小爲512字節,每一個checksum是對512字節數據計算的校驗和數據。
  當Client寫入的字節流數據達到一個Packet的長度,這個Packet會被構建出來,而後會被放到隊列dataQueue中,接着DataStreamer線程會不斷地從dataQueue隊列中取出Packet,發送到複製Pipeline中的第一個DataNode上,並將該Packet從dataQueue隊列中移到ackQueue隊列中。
  ResponseProcessor線程接收從Datanode發送過來的ack,若是是一個成功的ack,表示複製Pipeline中的全部Datanode都已經接收到這個Packet,ResponseProcessor線程將packet從隊列ackQueue中刪除。
  在發送過程當中,若是發生錯誤,全部未完成的Packet都會從ackQueue隊列中移除掉,而後從新建立一個新的Pipeline,排除掉出錯的那些DataNode節點,接着DataStreamer線程繼續從dataQueue隊列中發送Packet。
  下面是DFSOutputStream的結構及其原理,如圖所示:
  
  咱們從下面3個方面來描述內部流程:
  • 建立Packet,Client寫數據時,會將字節流數據緩存到內部的緩衝區中,當長度知足一個Chunk大小(512B)時,便會建立一個Packet對象,而後向該Packet對象中寫Chunk Checksum校驗和數據,以及實際數據塊Chunk Data,校驗和數據是基於實際數據塊計算獲得的。每次知足一個Chunk大小時,都會向Packet中寫上述數據內容,直到達到一個Packet對象大小(64K),就會將該Packet對象放入到dataQueue隊列中,等待DataStreamer線程取出併發送到DataNode節點。
  • 發送Packet,DataStreamer線程從dataQueue隊列中取出Packet對象,放到ackQueue隊列中,而後向DataNode節點發送這個Packet對象所對應的數據
  • 接收ack,發送一個Packet數據包之後,會有一個用來接收ack的ResponseProcessor線程,若是收到成功的ack,則表示一個Packet發送成功。若是成功,則ResponseProcessor線程會將ackQueue隊列中對應的Packet刪除
 
DFSOutputStream初始化
首先看一下,DFSOutputStream的初始化過程,構造方法以下所示:
    DFSOutputStream(String src, FsPermission masked, boolean overwrite,
        boolean createParent, short replication, long blockSize, Progressable progress,
        int buffersize, int bytesPerChecksum) throws IOException {
      this(src, blockSize, progress, bytesPerChecksum, replication);
 
      computePacketChunkSize(writePacketSize, bytesPerChecksum); // 默認 writePacketSize=64*1024(即64K),bytesPerChecksum=512(沒512個字節計算一個校驗和),
 
      try {
        if (createParent) { // createParent爲true表示,若是待建立的文件的父級目錄不存在,則自動建立
          namenode.create(src, masked, clientName, overwrite, replication, blockSize);
        } else {
          namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
        }
      } catch(RemoteException re) {
        throw re.unwrapRemoteException(AccessControlException.class,
                                       FileAlreadyExistsException.class,
                                       FileNotFoundException.class,
                                       NSQuotaExceededException.class,
                                       DSQuotaExceededException.class);
      }
      streamer.start(); // 啓動一個DataStreamer線程,用來將寫入的字節流打包成packet,而後發送到對應的Datanode節點上
    }
上面computePacketChunkSize方法計算了一個packet的相關參數,咱們結合代碼來查看,以下所示:
      int chunkSize = csize + checksum.getChecksumSize();
      int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
      chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
      packetSize = n + chunkSize*chunksPerPacket;

咱們用默認的參數值替換上面的參數,獲得:併發

int chunkSize = 512 + 4;
int n = 21 + 4;
chunksPerPacket = Math.max((64*1024 - 25 + 516-1)/516, 1);  // 127
packetSize = 25 + 516*127;

上面對應的參數,說明以下表所示:

參數名稱 參數值 參數含義
chunkSize 512+4=516 每一個chunk的字節數(數據+校驗和)
csize 512 每一個chunk數據的字節數
psize 64*1024 每一個packet的最大字節數(不包含header)
DataNode.PKT_HEADER_LEN 21 每一個packet的header的字節數
chunksPerPacket 127 組成每一個packet的chunk的個數
packetSize 25+516*127=65557 每一個packet的字節數(一個header+一組chunk)

在計算好一個packet相關的參數之後,調用create方法與Namenode進行RPC請求,請求建立文件:

if (createParent) { // createParent爲true表示,若是待建立的文件的父級目錄不存在,則自動建立
  namenode.create(src, masked, clientName, overwrite, replication, blockSize);
} else {
  namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
}

遠程調用上面方法,會在FSNamesystem中建立對應的文件路徑,並初始化與該建立的文件相關的一些信息,如租約(向Datanode節點寫數據的憑據)。文件在FSNamesystem中建立成功,就要初始化並啓動一個DataStreamer線程,用來向Datanode寫數據,後面咱們詳細說明具體處理邏輯。

Packet結構與定義

Client向HDFS寫數據,數據會被組裝成Packet,而後發送到Datanode節點。Packet分爲兩類,一類是實際數據包,另外一類是heatbeat包。一個Packet數據包的組成結構,如圖所示:
上圖中,一個Packet是由Header和Data兩部分組成,其中Header部分包含了一個Packet的概要屬性信息,以下表所示:
字段名稱 字段類型 字段長度 字段含義
pktLen int 4 4 + dataLen + checksumLen
offsetInBlock long 8 Packet在Block中偏移量
seqNo long 8 Packet序列號,在同一個Block惟一
lastPacketInBlock boolean 1 是不是一個Block的最後一個Packet
dataLen int 4 dataPos – dataStart,不包含Header和Checksum的長度
Data部分是一個Packet的實際數據部分,主要包括一個4字節校驗和(Checksum)與一個Chunk部分,Chunk部分最大爲512字節。
在構建一個Packet的過程當中,首先將字節流數據寫入一個buffer緩衝區中,也就是從偏移量爲25的位置(checksumStart)開始寫Packet數據的Chunk Checksum部分,從偏移量爲533的位置(dataStart)開始寫Packet數據的Chunk Data部分,直到一個Packet建立完成爲止。若是一個Packet的大小未能達到最大長度,也就是上圖對應的緩衝區中,Chunk Checksum與Chunk Data之間還保留了一段未被寫過的緩衝區位置,這種狀況說明,已經在寫一個文件的最後一個Block的最後一個Packet。在發送這個Packet以前,會檢查Chunksum與Chunk Data之間的緩衝區是否爲空白緩衝區(gap),若是有則將Chunk Data部分向前移動,使得Chunk Data 1與Chunk Checksum N相鄰,而後纔會被髮送到DataNode節點。
咱們看一下Packet對應的Packet類定義,定義了以下一些字段:
ByteBuffer buffer;           // only one of buf and buffer is non-null
byte[]  buf;
long    seqno;               // sequencenumber of buffer in block
long    offsetInBlock;       // 該packet在block中的偏移量
boolean lastPacketInBlock;   // is this the last packet in block?
int     numChunks;           // number of chunks currently in packet
int     maxChunks;           // 一個packet中包含的chunk的個數
int     dataStart;
int     dataPos;
int     checksumStart;
int     checksumPos;

Packet類有一個默認的沒有參數的構造方法,它是用來作heatbeat的,以下所示:

Packet() {
  this.lastPacketInBlock = false;
  this.numChunks = 0;
  this.offsetInBlock = 0;
  this.seqno = HEART_BEAT_SEQNO; // 值爲-1
 
  buffer = null;
  int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; // 21+4=25
  buf = new byte[packetSize];
 
  checksumStart = dataStart = packetSize;
  checksumPos = checksumStart;
  dataPos = dataStart;
  maxChunks = 0;
}

經過代碼能夠看到,一個heatbeat的內容,實際上只有一個長度爲25字節的header數據。經過this.seqno = HEART_BEAT_SEQNO;的值能夠判斷一個packet是不是heatbeat包,若是seqno爲-1表示這是一個heatbeat包。

Client發送Packet數據

能夠DFSClient類中看到,發送一個Packet以前,首先須要向選定的DataNode發送一個Header數據包,代表要向DataNode寫數據,該Header的數據結構,如圖所示:
上圖顯示的是Client發送Packet到第一個DataNode節點的Header數據結構,主要包括待發送的Packet所在的Block(先向NameNode分配Block ID等信息)的相關信息、Pipeline中另外2個DataNode的信息、訪問令牌(Access Token)和校驗和信息,Header中各個字段及其類型,詳見下表:
字段名稱 字段類型 字段長度 字段含義
Transfer Version short 2 Client與DataNode之間數據傳輸版本號,由常量DataTransferProtocol.DATA_TRANSFER_VERSION定義,值爲17
OP int 4 操做類型,由常量DataTransferProtocol.OP_WRITE_BLOCK定義,值爲80
blkId long 8 Block的ID值,由NameNode分配
GS long 8 時間戳(Generation Stamp),NameNode分配blkId的時候生成的時間戳
DNCnt int 4 DataNode複製Pipeline中DataNode節點的數量
Recovery Flag boolean 1 Recover標誌
Client Text   Client主機的名稱,在使用Text進行序列化的時候,實際包含長度len與主機名稱字符串ClientHost
srcNode boolean 1 是否發送src node的信息,默認值爲false,不發送src node的信息
nonSrcDNCnt int 4 由Client寫的該Header數據,該數不包含Pipeline中第一個節點(即爲DNCnt-1)
DN2 DatanodeInfo   DataNode信息,包括StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState
DN3 DatanodeInfo   DataNode信息,包括StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState
Access Token Token   訪問令牌信息,包括IdentifierLength、Identifier、PwdLength、Pwd、KindLength、Kind、ServiceLength、Service
CheckSum Header DataChecksum 1+4 校驗和Header信息,包括type、bytesPerChecksum

Header數據包發送成功,Client會收到一個成功響應碼(DataTransferProtocol.OP_STATUS_SUCCESS = 0),接着將Packet數據發送到Pipeline中第一個DataNode上,以下所示:

Packet one = null;
one = dataQueue.getFirst(); // regular data packet
ByteBuffer buf = one.getBuffer();
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
 
if (one.lastPacketInBlock) { // 若是是Block中的最後一個Packet,還要寫入一個0標識該Block已經寫入完成
    blockStream.writeInt(0); // indicate end-of-block
}
不然,若是失敗,則會與NameNode進行RPC調用,刪除該Block,並把該Pipeline中第一個DataNode加入到excludedNodes列表中,代碼以下所示:
if (!success) {
  LOG.info("Abandoning " + block);
  namenode.abandonBlock(block, src, clientName);
 
  if (errorIndex < nodes.length) {
    LOG.info("Excluding datanode " + nodes[errorIndex]);
    excludedNodes.add(nodes[errorIndex]);
  }
 
  // Connection failed.  Let's wait a little bit and retry
  retry = true;
}

 

DataNode端服務組件
數據最終會發送到DataNode節點上,在一個DataNode上,數據在各個組件之間流動,流程以下圖所示:
DataNode服務中建立一個後臺線程DataXceiverServer,它是一個SocketServer,用來接收來自Client(或者DataNode Pipeline中的非最後一個DataNode節點)的寫數據請求,而後在DataXceiverServer中將鏈接過來的Socket直接派發給一個獨立的後臺線程DataXceiver進行處理。因此,Client寫數據時鏈接一個DataNode Pipeline的結構,實際流程如圖所示:
每一個DataNode服務中的DataXceiver後臺線程接收到來自前一個節點(Client/DataNode)的Socket鏈接,首先讀取Header數據:
Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, in.readLong());
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress);
int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
boolean isRecovery = in.readBoolean(); // is this part of recovery?
String client = Text.readString(in); // working on behalf of this client
boolean hasSrcDataNode = in.readBoolean(); // is src node info present
if (hasSrcDataNode) {
  srcDataNode = new DatanodeInfo();
  srcDataNode.readFields(in);
}
int numTargets = in.readInt();
if (numTargets < 0) {
  throw new IOException("Mislabelled incoming datastream.");
}
DatanodeInfo targets[] = new DatanodeInfo[numTargets];
for (int i = 0; i < targets.length; i++) {
  DatanodeInfo tmp = new DatanodeInfo();
  tmp.readFields(in);
  targets[i] = tmp;
}
Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
accessToken.readFields(in);
上面代碼中,讀取Header的數據,與前一個Client/DataNode寫入Header字段的順序相對應,再也不累述。在完成讀取Header數據後,當前DataNode會首先將Header數據再發送到Pipeline中下一個DataNode結點,固然該DataNode確定不是Pipeline中最後一個DataNode節點。接着,該DataNode會接收來自前一個Client/DataNode節點發送的Packet數據,接收Packet數據的邏輯實際上在BlockReceiver中完成,包括未來自前一個Client/DataNode節點發送的Packet數據寫入本地磁盤。在BlockReceiver中,首先會將接收到的Packet數據發送寫入到Pipeline中下一個DataNode節點,而後再將接收到的數據寫入到本地磁盤的Block文件中。
DataNode持久化Packet數據
在DataNode節點的BlockReceiver中進行Packet數據的持久化,一個Packet是一個Block中一個數據分組,咱們首先看一下,一個Block在持久化到磁盤上的物理存儲結構,以下圖所示:
每一個Block文件(如上圖中blk_1084013198文件)都對應一個meta文件(如上圖中blk_1084013198_10273532.meta文件),Block文件是一個一個Chunk的二進制數據(每一個Chunk的大小是512字節),而meta文件是與每個Chunk對應的Checksum數據,是序列化形式存儲。
 
寫文件過程當中Client/DataNode與NameNode進行RPC調用
Client在HDFS文件系統中寫文件過程當中,會發生屢次與NameNode節點進行RPC調用來完成寫數據相關操做,主要是在以下時機進行RPC調用:
  • 寫文件開始時建立文件:Client調用create在NameNode節點的Namespace中建立一個標識該文件的條目
  • 在Client鏈接Pipeline中第一個DataNode節點以前,Client調用addBlock分配一個Block(blkId+DataNode列表+租約)
  • 若是與Pipeline中第一個DataNode節點鏈接失敗,Client調用abandonBlock放棄一個已經分配的Block
  • 一個Block已經寫入到DataNode節點磁盤,Client調用fsync讓NameNode持久化Block的位置信息數據
  • 文件寫完之後,Client調用complete方法通知NameNode寫入文件成功
  • DataNode節點接收到併成功持久化一個Block的數據後,DataNode調用blockReceived方法通知NameNode已經接收到Block
相關文章
相關標籤/搜索