HDFS 是一個分佈式文件系統,在 HDFS 上寫文件的過程與咱們平時使用的單機文件系統很是不一樣,從宏觀上來看,在 HDFS 文件系統上建立並寫一個文件,流程以下圖(來自《Hadoop:The Definitive Guide》一書)所示:node
具體過程描述以下:算法
下面代碼使用 Hadoop 的 API 來實現向 HDFS 的文件寫入數據,一樣也包括建立一個文件和寫數據兩個主要過程,代碼以下所示:緩存
static String[] contents = new String[] {
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
"cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
"dddddddddddddddddddddddddddddddd",
"eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
};
public static void main(String[] args) {
String file = "hdfs://h1:8020/data/test/test.log";
Path path = new Path(file);
Configuration conf = new Configuration();
FileSystem fs = null;
FSDataOutputStream output = null;
try {
fs = path.getFileSystem(conf);
output = fs.create(path); // 建立文件
for(String line : contents) { // 寫入數據
output.write(line.getBytes("UTF-8"));
output.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
複製代碼
結合上面的示例代碼,咱們先從 fs.create(path); 開始,能夠看到 FileSystem 的實現 DistributedFileSystem 中給出了最終返回 FSDataOutputStream 對象的抽象邏輯,代碼以下所示:bash
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
return new FSDataOutputStream
(dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics);
}
複製代碼
上面,DFSClient dfs 的 create 方法中建立了一個 OutputStream 對象,在 DFSClient 的 create 方法:數據結構
public OutputStream create(String src,
FsPermission permission,
boolean overwrite,
boolean createParent,
short replication,
long blockSize,
Progressable progress,
int buffersize
) throws IOException {
... ...
}
複製代碼
建立了一個 DFSOutputStream 對象,以下所示:併發
final DFSOutputStream result = new DFSOutputStream(src, masked,
overwrite, createParent, replication, blockSize, progress, buffersize,
conf.getInt("io.bytes.per.checksum", 512));
複製代碼
下面,咱們從 DFSOutputStream 類開始,說明其內部實現原理。分佈式
DFSOutputStream 內部原理ide
打開一個 DFSOutputStream 流,Client 會寫數據到流內部的一個緩衝區中,而後數據被分解成多個 Packet,每一個 Packet 大小爲 64k 字節,每一個 Packet 又由一組 chunk 和這組 chunk 對應的 checksum 數據組成,默認 chunk 大小爲 512 字節,每一個 checksum 是對 512 字節數據計算的校驗和數據。 當 Client 寫入的字節流數據達到一個 Packet 的長度,這個 Packet 會被構建出來,而後會被放到隊列 dataQueue 中,接着 DataStreamer 線程會不斷地從 dataQueue 隊列中取出 Packet,發送到複製 Pipeline 中的第一個 DataNode 上,並將該 Packet 從 dataQueue 隊列中移到 ackQueue 隊列中。ResponseProcessor 線程接收從 Datanode 發送過來的 ack,若是是一個成功的 ack,表示複製 Pipeline 中的全部 Datanode 都已經接收到這個 Packet,ResponseProcessor 線程將 packet 從隊列 ackQueue 中刪除。 在發送過程當中,若是發生錯誤,全部未完成的 Packet 都會從 ackQueue 隊列中移除掉,而後從新建立一個新的 Pipeline,排除掉出錯的那些 DataNode 節點,接着 DataStreamer 線程繼續從 dataQueue 隊列中發送 Packet。 下面是 DFSOutputStream 的結構及其原理,如圖所示:oop
咱們從下面 3 個方面來描述內部流程:ui
Client 寫數據時,會將字節流數據緩存到內部的緩衝區中,當長度知足一個 Chunk 大小(512B)時,便會建立一個 Packet 對象,而後向該 Packet 對象中寫 Chunk Checksum 校驗和數據,以及實際數據塊 Chunk Data,校驗和數據是基於實際數據塊計算獲得的。每次知足一個 Chunk 大小時,都會向 Packet 中寫上述數據內容,直到達到一個 Packet 對象大小(64K),就會將該 Packet 對象放入到 dataQueue 隊列中,等待 DataStreamer 線程取出併發送到 DataNode 節點。
DataStreamer 線程從 dataQueue 隊列中取出 Packet 對象,放到 ackQueue 隊列中,而後向 DataNode 節點發送這個 Packet 對象所對應的數據。
發送一個 Packet 數據包之後,會有一個用來接收 ack 的 ResponseProcessor 線程,若是收到成功的 ack,則表示一個 Packet 發送成功。若是成功,則 ResponseProcessor 線程會將 ackQueue 隊列中對應的 Packet 刪除。
DFSOutputStream 初始化
首先看一下,DFSOutputStream 的初始化過程,構造方法以下所示:
DFSOutputStream(String src, FsPermission masked, boolean overwrite,
boolean createParent, short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum) throws IOException {
this(src, blockSize, progress, bytesPerChecksum, replication);
computePacketChunkSize(writePacketSize, bytesPerChecksum); // 默認 writePacketSize=64*1024(即64K),bytesPerChecksum=512(沒512個字節計算一個校驗和),
try {
if (createParent) { // createParent爲true表示,若是待建立的文件的父級目錄不存在,則自動建立
namenode.create(src, masked, clientName, overwrite, replication, blockSize);
} else {
namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
}
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class);
}
streamer.start(); // 啓動一個DataStreamer線程,用來將寫入的字節流打包成packet,而後發送到對應的Datanode節點上
}
上面computePacketChunkSize方法計算了一個packet的相關參數,咱們結合代碼來查看,以下所示:
int chunkSize = csize + checksum.getChecksumSize();
int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
packetSize = n + chunkSize*chunksPerPacket;
複製代碼
咱們用默認的參數值替換上面的參數,獲得:
int chunkSize = 512 + 4;
int n = 21 + 4;
chunksPerPacket = Math.max((64*1024 - 25 + 516-1)/516, 1); // 127
packetSize = 25 + 516*127;
複製代碼
上面對應的參數,說明以下表所示:
參數名稱 | 參數值 | 參數含義 |
---|---|---|
chunkSize | 512+4=516 | 每一個 chunk 的字節數(數據 + 校驗和) |
csize | 512 | 每一個 chunk 數據的字節數 |
psize | 64*1024 | 每一個 packet 的最大字節數(不包含 header) |
DataNode.PKT_HEADER_LEN | 21 | 每一個 packet 的 header 的字節數 |
chunksPerPacket | 127 | 組成每一個 packet 的 chunk 的個數 |
packetSize | 25+516*127=65557 | 每一個 packet 的字節數(一個 header + 一組 chunk) |
在計算好一個 packet 相關的參數之後,調用 create 方法與 Namenode 進行 RPC 請求,請求建立文件:
if (createParent) { // createParent爲true表示,若是待建立的文件的父級目錄不存在,則自動建立
namenode.create(src, masked, clientName, overwrite, replication, blockSize);
} else {
namenode.create(src, masked, clientName, overwrite, false, replication, blockSize);
}
複製代碼
遠程調用上面方法,會在 FSNamesystem 中建立對應的文件路徑,並初始化與該建立的文件相關的一些信息,如租約(向 Datanode 節點寫數據的憑據)。文件在 FSNamesystem 中建立成功,就要初始化並啓動一個 DataStreamer 線程,用來向 Datanode 寫數據,後面咱們詳細說明具體處理邏輯。
Packet 結構與定義
Client 向 HDFS 寫數據,數據會被組裝成 Packet,而後發送到 Datanode 節點。Packet 分爲兩類,一類是實際數據包,另外一類是 heatbeat 包。一個 Packet 數據包的組成結構,如圖所示:
上圖中,一個 Packet 是由 Header 和 Data 兩部分組成,其中 Header 部分包含了一個 Packet 的概要屬性信息,以下表所示:
字段名稱 | 字段類型 | 字段長度 | 字段含義 |
---|---|---|---|
pktLen | int | 4 | 4 + dataLen + checksumLen |
offsetInBlock | long | 8 | Packet 在 Block 中偏移量 |
seqNo | long | 8 | Packet 序列號,在同一個 Block 惟一 |
lastPacketInBlock | boolean | 1 | 是不是一個 Block 的最後一個 Packet |
dataLen | int | 4 | dataPos – dataStart,不包含 Header 和 Checksum 的長度 |
Data 部分是一個 Packet 的實際數據部分,主要包括一個 4 字節校驗和(Checksum)與一個 Chunk 部分,Chunk 部分最大爲 512 字節。 在構建一個 Packet 的過程當中,首先將字節流數據寫入一個 buffer 緩衝區中,也就是從偏移量爲 25 的位置(checksumStart)開始寫 Packet 數據的 Chunk Checksum 部分,從偏移量爲 533 的位置(dataStart)開始寫 Packet 數據的 Chunk Data 部分,直到一個 Packet 建立完成爲止。若是一個 Packet 的大小未能達到最大長度,也就是上圖對應的緩衝區中,Chunk Checksum 與 Chunk Data 之間還保留了一段未被寫過的緩衝區位置,這種狀況說明,已經在寫一個文件的最後一個 Block 的最後一個 Packet。在發送這個 Packet 以前,會檢查 Chunksum 與 Chunk Data 之間的緩衝區是否爲空白緩衝區(gap),若是有則將 Chunk Data 部分向前移動,使得 Chunk Data 1 與 Chunk Checksum N 相鄰,而後纔會被髮送到 DataNode 節點。 咱們看一下 Packet 對應的 Packet 類定義,定義了以下一些字段:
ByteBuffer buffer; // only one of buf and buffer is non-null
byte[] buf;
long seqno; // sequencenumber of buffer in block
long offsetInBlock; // 該packet在block中的偏移量
boolean lastPacketInBlock; // is this the last packet in block?
int numChunks; // number of chunks currently in packet
int maxChunks; // 一個packet中包含的chunk的個數
int dataStart;
int dataPos;
int checksumStart;
int checksumPos;
複製代碼
Packet 類有一個默認的沒有參數的構造方法,它是用來作 heatbeat 的,以下所示:
Packet() {
this.lastPacketInBlock = false;
this.numChunks = 0;
this.offsetInBlock = 0;
this.seqno = HEART_BEAT_SEQNO; // 值爲-1
buffer = null;
int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; // 21+4=25
buf = new byte[packetSize];
checksumStart = dataStart = packetSize;
checksumPos = checksumStart;
dataPos = dataStart;
maxChunks = 0;
}
複製代碼
經過代碼能夠看到,一個 heatbeat 的內容,實際上只有一個長度爲 25 字節的 header 數據。經過 this.seqno = HEART_BEAT_SEQNO; 的值能夠判斷一個 packet 是不是 heatbeat 包,若是 seqno 爲 - 1 表示這是一個 heatbeat 包。
Client 發送 Packet 數據
能夠 DFSClient 類中看到,發送一個 Packet 以前,首先須要向選定的 DataNode 發送一個 Header 數據包,代表要向 DataNode 寫數據,該 Header 的數據結構,如圖所示:
上圖顯示的是 Client 發送 Packet 到第一個 DataNode 節點的 Header 數據結構,主要包括待發送的 Packet 所在的 Block(先向 NameNode 分配 Block ID 等信息)的相關信息、Pipeline 中另外 2 個 DataNode 的信息、訪問令牌(Access Token)和校驗和信息,Header 中各個字段及其類型,詳見下表:
字段名稱 | 字段類型 | 字段長度 | 字段含義 |
---|---|---|---|
Transfer Version | short | 2 | Client 與 DataNode 之間數據傳輸版本號,由常量 DataTransferProtocol.DATA_TRANSFER_VERSION 定義,值爲 17 |
OP | int | 4 | 操做類型,由常量 DataTransferProtocol.OP_WRITE_BLOCK 定義,值爲 80 |
blkId | long | 8 | Block 的 ID 值,由 NameNode 分配 |
GS | long | 8 | 時間戳(Generation Stamp),NameNode 分配 blkId 的時候生成的時間戳 |
DNCnt | int | 4 | DataNode 複製 Pipeline 中 DataNode 節點的數量 |
Recovery Flag | boolean | 1 | Recover 標誌 |
Client | Text | Client 主機的名稱,在使用 Text 進行序列化的時候,實際包含長度 len 與主機名稱字符串 ClientHost | |
srcNode | boolean | 1 | 是否發送 src node 的信息,默認值爲 false,不發送 src node 的信息 |
nonSrcDNCnt | int | 4 | 由 Client 寫的該 Header 數據,該數不包含 Pipeline 中第一個節點(即爲 DNCnt-1) |
DN2 | DatanodeInfo | DataNode 信息,包括 StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState | |
DN3 | DatanodeInfo | DataNode 信息,包括 StorageID、InfoPort、IpcPort、capacity、DfsUsed、remaining、LastUpdate、XceiverCount、Location、HostName、AdminState | |
Access Token | Token | 訪問令牌信息,包括 IdentifierLength、Identifier、PwdLength、Pwd、KindLength、Kind、ServiceLength、Service | |
CheckSum Header | DataChecksum | 1+4 | 校驗和 Header 信息,包括 type、bytesPerChecksum |
Header 數據包發送成功,Client 會收到一個成功響應碼(DataTransferProtocol.OP_STATUS_SUCCESS = 0),接着將 Packet 數據發送到 Pipeline 中第一個 DataNode 上,以下所示:
Packet one = null;
one = dataQueue.getFirst(); // regular data packet
ByteBuffer buf = one.getBuffer();
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
if (one.lastPacketInBlock) { // 若是是Block中的最後一個Packet,還要寫入一個0標識該Block已經寫入完成
blockStream.writeInt(0); // indicate end-of-block
}
複製代碼
不然,若是失敗,則會與 NameNode 進行 RPC 調用,刪除該 Block,並把該 Pipeline 中第一個 DataNode 加入到 excludedNodes 列表中,代碼以下所示:
if (!success) {
LOG.info("Abandoning " + block);
namenode.abandonBlock(block, src, clientName);
if (errorIndex < nodes.length) {
LOG.info("Excluding datanode " + nodes[errorIndex]);
excludedNodes.add(nodes[errorIndex]);
}
// Connection failed. Let's wait a little bit and retry retry = true; } 複製代碼
DataNode 端服務組件
數據最終會發送到 DataNode 節點上,在一個 DataNode 上,數據在各個組件之間流動,流程以下圖所示:
DataNode 服務中建立一個後臺線程 DataXceiverServer,它是一個 SocketServer,用來接收來自 Client(或者 DataNode Pipeline 中的非最後一個 DataNode 節點)的寫數據請求,而後在 DataXceiverServer 中將鏈接過來的 Socket 直接派發給一個獨立的後臺線程 DataXceiver 進行處理。因此,Client 寫數據時鏈接一個 DataNode Pipeline 的結構,實際流程如圖所示:
每一個 DataNode 服務中的 DataXceiver 後臺線程接收到來自前一個節點(Client/DataNode)的 Socket 鏈接,首先讀取 Header 數據:
Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, in.readLong());
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " + localAddress);
int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
boolean isRecovery = in.readBoolean(); // is this part of recovery?
String client = Text.readString(in); // working on behalf of this client
boolean hasSrcDataNode = in.readBoolean(); // is src node info present
if (hasSrcDataNode) {
srcDataNode = new DatanodeInfo();
srcDataNode.readFields(in);
}
int numTargets = in.readInt();
if (numTargets < 0) {
throw new IOException("Mislabelled incoming datastream.");
}
DatanodeInfo targets[] = new DatanodeInfo[numTargets];
for (int i = 0; i < targets.length; i++) {
DatanodeInfo tmp = new DatanodeInfo();
tmp.readFields(in);
targets[i] = tmp;
}
Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
accessToken.readFields(in);
複製代碼
上面代碼中,讀取 Header 的數據,與前一個 Client/DataNode 寫入 Header 字段的順序相對應,再也不累述。在完成讀取 Header 數據後,當前 DataNode 會首先將 Header 數據再發送到 Pipeline 中下一個 DataNode 結點,固然該 DataNode 確定不是 Pipeline 中最後一個 DataNode 節點。接着,該 DataNode 會接收來自前一個 Client/DataNode 節點發送的 Packet 數據,接收 Packet 數據的邏輯實際上在 BlockReceiver 中完成,包括未來自前一個 Client/DataNode 節點發送的 Packet 數據寫入本地磁盤。在 BlockReceiver 中,首先會將接收到的 Packet 數據發送寫入到 Pipeline 中下一個 DataNode 節點,而後再將接收到的數據寫入到本地磁盤的 Block 文件中。
DataNode 持久化 Packet 數據
在 DataNode 節點的 BlockReceiver 中進行 Packet 數據的持久化,一個 Packet 是一個 Block 中一個數據分組,咱們首先看一下,一個 Block 在持久化到磁盤上的物理存儲結構,以下圖所示:
每一個 Block 文件(如上圖中 blk_1084013198 文件)都對應一個 meta 文件(如上圖中 blk_1084013198_10273532.meta 文件),Block 文件是一個一個 Chunk 的二進制數據(每一個 Chunk 的大小是 512 字節),而 meta 文件是與每個 Chunk 對應的 Checksum 數據,是序列化形式存儲。
寫文件過程當中 Client/DataNode 與 NameNode 進行 RPC 調用
Client 在 HDFS 文件系統中寫文件過程當中,會發生屢次與 NameNode 節點進行 RPC 調用來完成寫數據相關操做,主要是在以下時機進行 RPC 調用:
具體 RPC 調用的詳細過程,能夠參考源碼。