做爲分佈式文件系統,HDFS擅於處理大文件的讀/寫。這得益於「文件元信息與文件數據分離,文件數據分塊存儲」的思想:namenode管理文件元信息,datanode管理分塊的文件數據。java
HDFS 2.x進一步將數據塊存儲服務抽象爲blockpool,不過寫數據塊過程與1.x大同小異。本文假設副本系數1(即寫數據塊只涉及1個客戶端+1個datanode),未發生任何異常,分析datanode寫數據塊的過程。node
源碼版本:Apache Hadoop 2.6.0git
可參考猴子追源碼時的速記打斷點,親自debug一遍。github
副本系數1,即只須要一個datanode構成最小的管道,與更常見的管道寫相比,能夠認爲「無管道」。後續再寫兩篇文章分別分析管道寫無異常、管道寫有異常兩種狀況。安全
參考源碼|HDFS之DataNode:啓動過程,咱們大致瞭解了datanode上有哪些重要的工做線程。其中,與寫數據塊過程聯繫最緊密的是DataXceiverServer與BPServiceActor。多線程
參考HDFS-1.x、2.x的RPC接口,客戶端與數據節點間主要經過流接口DataTransferProtocol完成數據塊的讀/寫。DataTransferProtocol用於整個管道中的客戶端、數據節點間的流式通訊,其中,DataTransferProtocol#writeBlock()負責完成寫數據塊的工做:併發
public void writeBlock(final ExtendedBlock blk, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, final DataChecksum requestedChecksum, final CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException;
複製代碼
注意,DataTransferProtocol並非一個RPC協議,所以,常見經過的尋找DataTransferProtocol接口的實現類來肯定「客戶端調用的遠程方法」是站不住腳。不過依然能夠按照這個思路倒追,看實現類到底是如何被建立,與誰通訊,來驗證是否找到了正確的實現類。app
依靠debug,猴子從DataXceiver類反向追到了DataXceiverServer類。這裏從DataXceiverServer類開始,正向講解。異步
DataXceiverServer線程在DataNode#runDatanodeDaemon()方法中啓動。socket
DataXceiverServer#run():
public void run() {
Peer peer = null;
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
peer = peerServer.accept();
...// 檢查DataXceiver線程的數量,超過最大限制就拋出IOE
// 啓動一個新的DataXceiver線程
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (AsynchronousCloseException ace) {
// another thread closed our listener socket - that's expected during shutdown,
// but not in other circumstances
if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
}
} catch (IOException ie) {
...// 清理
} catch (OutOfMemoryError ie) {
...// 清理並sleep 30s
} catch (Throwable te) {
// 其餘異常就關閉datanode
LOG.error(datanode.getDisplayName()
+ ":DataXceiverServer: Exiting due to: ", te);
datanode.shouldRun = false;
}
}
...// 關閉peerServer並清理全部peers
}
複製代碼
DataXceiverServer線程是一個典型的Tcp Socket Server。客戶端每來一個TCP請求,若是datanode上的DataXceiver線程數量還沒超過限制,就啓動一個新的DataXceiver線程。
默認的最大DataXceiver線程數量爲4096,經過
dfs.datanode.max.transfer.threads
設置。
DataXceiver#run():
public void run() {
int opsProcessed = 0;
Op op = null;
try {
...// 一些初始化
// 使用一個循環,以容許客戶端發送新的操做請求時重用TCP鏈接
do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
try {
...// 超時設置
op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
} catch (IOException err) {
...// 此處的優化使得正常處理完一個操做後,必定會拋出EOFException或ClosedChannelException,能夠退出循環
...// 若是是其餘異常,則說明出現錯誤,從新拋出以退出循環
}
...// 超時設置
opStartTime = now();
processOp(op);
++opsProcessed;
} while ((peer != null) &&
(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
} catch (Throwable t) {
...// 異常處理
} finally {
...// 資源清理,包括打開的文件、socket等
}
}
複製代碼
此處的優化很少講。
DataXceiver#readOp()繼承自Receiver類:從客戶端發來的socket中讀取op碼,判斷客戶端要進行何種操做操做。寫數據塊使用的op碼爲80,返回的枚舉變量op = Op.WRITE_BLOCK
。
DataXceiver#processOp()也繼承自Receiver類:
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
...// 其餘case
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}
...
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
PBHelper.convert(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp(),
fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
} finally {
if (traceScope != null) traceScope.close();
}
}
複製代碼
HDFS 2.x相對於1.x的另外一項改進,在流式接口中也大幅替換爲使用protobuf,再也不是裸TCP分析字節流了。
Receiver類實現了DataTransferProtocol接口,但沒有實現DataTransferProtocol#writeBlock()。多態特性告訴咱們,這裏會調用DataXceiver#writeBlock()。
終於回到了DataXceiver#writeBlock():
public void writeBlock(final ExtendedBlock block, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientname, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException {
...// 檢查,設置參數等
...// 構建向上遊節點或客戶端回覆的輸出流(此處即爲客戶端)
...// 略
try {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// 建立BlockReceiver,準備接收數據塊
blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist);
storageUuid = blockReceiver.getStorageUuid();
} else {
...// 管道錯誤恢復相關
}
...// 下游節點的處理。一個datanode是沒有下游節點的。
// 發送的第一個packet是空的,只用於創建管道。這裏當即返回ack表示管道是否創建成功
// 因爲該datanode沒有下游節點,則執行到此處,表示管道已經創建成功
if (isClient && !isTransfer) {
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
BlockOpResponseProto.newBuilder()
.setStatus(mirrorInStatus)
.setFirstBadLink(firstBadLink)
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
// 接收數據塊(也負責發送到下游,不過此處沒有下游節點)
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets, false);
...// 數據塊複製相關
}
...// 數據塊恢復相關
...// 數據塊複製相關
} catch (IOException ioe) {
LOG.info("opWriteBlock " + block + " received exception " + ioe);
throw ioe;
} finally {
...// 清理資源
}
...// 更新metrics
}
複製代碼
特別說明幾個參數:
BlockConstructionStage.PIPELINE_SETUP_CREATE
。BlockConstructionStage.TRANSFER_RBW
或BlockConstructionStage.TRANSFER_FINALIZED
,則表示爲了數據塊複製。此處爲false。下面討論「準備接收數據塊」和「接收數據塊」兩個過程。
BlockReceiver.<init>()
BlockReceiver.<init>()
:
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
try{
...// 檢查,設置參數等
// 打開文件,準備接收數據塊
if (isDatanode) { // 數據塊複製和數據塊移動是由數據節點發起的,這是在tmp目錄下建立block文件
replicaInfo = datanode.data.createTemporary(storageType, block);
} else {
switch (stage) {
// 對於客戶端發起的寫數據請求(只考慮create,不考慮append),在rbw目錄下建立數據塊(block文件、meta文件,數據塊處於RBW狀態)
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break;
...// 其餘case
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
}
}
...// 略
// 對於數據塊複製、數據塊移動、客戶端建立數據塊,本質上都在建立新的block文件。對於這些狀況,isCreate爲true
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
streams = replicaInfo.createStreams(isCreate, requestedChecksum);
assert streams != null : "null streams!";
...// 計算meta文件的文件頭
// 若是須要建立新的block文件,也就須要同時建立新的meta文件,並寫入文件頭
if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
}
} catch (ReplicaAlreadyExistsException bae) {
throw bae;
} catch (ReplicaNotFoundException bne) {
throw bne;
} catch(IOException ioe) {
...// IOE一般涉及文件等資源,所以要額外清理資源
}
}
複製代碼
儘管上述代碼的註釋加了很多,但建立block的場景比較簡單,只須要記住在rbw目錄下建立block文件和meta文件便可。
在rbw目錄下建立數據塊後,還要經過DataNode#notifyNamenodeReceivingBlock()向namenode彙報正在接收的數據塊。該方法僅僅將數據塊放入緩衝區中,由BPServiceActor線程異步彙報。
此處不展開,後面會介紹一個類似的方法DataNode#notifyNamenodeReceivedBlock()。
BlockReceiver#receiveBlock():
void receiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, DatanodeInfo[] downstreams, boolean isReplaceBlock) throws IOException {
...// 參數設置
try {
// 若是是客戶端發起的寫請求(此處即爲數據塊create),則啓動PacketResponder發送ack
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut, mirrIn, downstreams));
responder.start(); // start thread to processes responses
}
// 同步接收packet,寫block文件和meta文件
while (receivePacket() >= 0) {}
// 此時,節點已接收了全部packet,能夠等待發送完全部ack後關閉responder
if (responder != null) {
((PacketResponder)responder.getRunnable()).close();
responderClosed = true;
}
...// 數據塊複製相關
} catch (IOException ioe) {
if (datanode.isRestarting()) {
LOG.info("Shutting down for restart (" + block + ").");
} else {
LOG.info("Exception for " + block, ioe);
throw ioe;
}
} finally {
...// 清理
}
}
複製代碼
先看BlockReceiver#receivePacket()。
嚴格來講,BlockReceiver#receivePacket()負責接收上游的packet,並繼續向下遊節點管道寫:
private int receivePacket() throws IOException {
// read the next packet
packetReceiver.receiveNextPacket(in);
PacketHeader header = packetReceiver.getHeader();
...// 略
...// 檢查packet頭
long offsetInBlock = header.getOffsetInBlock();
long seqno = header.getSeqno();
boolean lastPacketInBlock = header.isLastPacketInBlock();
final int len = header.getDataLen();
boolean syncBlock = header.getSyncBlock();
...// 略
// 若是不須要當即持久化也不須要校驗收到的數據,則能夠當即委託PacketResponder線程返回 SUCCESS 的ack,而後再進行校驗和持久化
if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
...// 管道寫相關
ByteBuffer dataBuf = packetReceiver.getDataSlice();
ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
if (lastPacketInBlock || len == 0) { // 收到空packet多是表示心跳或數據塊發送
// 這兩種狀況均可以嘗試把以前的數據刷到磁盤
if (syncBlock) {
flushOrSync(true);
}
} else { // 不然,須要持久化packet
final int checksumLen = diskChecksum.getChecksumSize(len);
final int checksumReceivedLen = checksumBuf.capacity();
...// 若是是管道中的最後一個節點,則持久化以前,要先對收到的packet作一次校驗(使用packet自己的校驗機制)
...// 若是校驗錯誤,則委託PacketResponder線程返回 ERROR_CHECKSUM 的ack
final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
&& streams.isTransientStorage();
try {
long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) {
...// 若是校驗塊不完整,須要加載並調整舊的meta文件內容,供後續從新計算crc
// 寫block文件
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ dataBuf.arrayOffset() + dataBuf.position();
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
// 寫meta文件
final byte[] lastCrc;
if (shouldNotWriteChecksum) {
lastCrc = null;
} else if (partialCrc != null) { // 若是是校驗塊不完整(以前收到過一部分)
...// 從新計算crc
...// 更新lastCrc
checksumOut.write(buf);
partialCrc = null;
} else { // 若是校驗塊完整
...// 更新lastCrc
checksumOut.write(checksumBuf.array(), offset, checksumLen);
}
...//略
}
} catch (IOException iex) {
datanode.checkDiskErrorAsync();
throw iex;
}
}
// 相反的,若是須要當即持久化或須要校驗收到的數據,則如今已經完成了持久化和校驗,能夠委託PacketResponder線程返回 SUCCESS 的ack
// if sync was requested, put in queue for pending acks here
// (after the fsync finished)
if (responder != null && (syncBlock || shouldVerifyChecksum())) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
...// 若是超過了響應時間,還要主動發送一個IN_PROGRESS的ack,防止超時
...// 節流器相關
// 當整個數據塊都發送完成以前,客戶端會可能會發送有數據的packet,也由於維持心跳或表示結束寫數據塊發送空packet
// 所以,當標誌位lastPacketInBlock爲true時,不能返回0,要返回一個負值,以區分未到達最後一個packet以前的狀況
return lastPacketInBlock?-1:len;
}
...
private boolean shouldVerifyChecksum() {
// 對於客戶端寫,只有管道中的最後一個節點知足`mirrorOut == null`
return (mirrorOut == null || isDatanode || needsChecksumTranslation);
}
複製代碼
BlockReceiver#shouldVerifyChecksum()主要與管道寫有關,本文只有一個datanode,則必定知足
mirrorOut == null
。
上述代碼看起來長,主要工做只有四項:
BlockReceiver#receivePacket() + PacketResponder線程 + PacketResponder#ackQueue構成一個生產者消費者模型。生產和消費的對象是ack,BlockReceiver#receivePacket()是生產者,PacketResponder線程是消費者。
掃一眼PacketResponder#enqueue():
void enqueue(final long seqno, final boolean lastPacketInBlock, final long offsetInBlock, final Status ackStatus) {
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
System.nanoTime(), ackStatus);
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": enqueue " + p);
}
synchronized(ackQueue) {
if (running) {
ackQueue.addLast(p);
ackQueue.notifyAll();
}
}
}
複製代碼
ackQueue是一個線程不安全的LinkedList。
關於如何利用線程不安全的容器實現生產者消費者模型可參考Java實現生產者-消費者模型中的實現三。
與BlockReceiver#receivePacket()相對,PacketResponder線程負責接收下游節點的ack,並繼續向上遊管道響應。
PacketResponder#run():
public void run() {
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (isRunning() && !lastPacketInBlock) {
long totalAckTimeNanos = 0;
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try {
// 若是當前節點不是管道的最後一個節點,且下游節點正常,則從下游讀取ack
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
ack.readFields(downstreamIn);
...// 統計相關
...// OOB相關(暫時忽略)
seqno = ack.getSeqno();
}
// 若是從下游節點收到了正常的 ack,或當前節點是管道的最後一個節點,則須要從隊列中消費pkt(即BlockReceiver#receivePacket()放入的ack)
if (seqno != PipelineAck.UNKOWN_SEQNO
|| type == PacketResponderType.LAST_IN_PIPELINE) {
pkt = waitForAckHead(seqno);
if (!isRunning()) {
break;
}
// 管道寫用seqno控制packet的順序:當且僅當下游正確接收的序號與當前節點正確處理完的序號相等時,當前節點才認爲該序號的packet已正確接收;上游同理
expected = pkt.seqno;
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
&& seqno != expected) {
throw new IOException(myString + "seqno: expected=" + expected
+ ", received=" + seqno);
}
...// 統計相關
lastPacketInBlock = pkt.lastPacketInBlock;
}
} catch (InterruptedException ine) {
...// 異常處理
} catch (IOException ioe) {
...// 異常處理
}
...// 中斷退出
// 若是是最後一個packet,將block的狀態轉換爲FINALIZED,並關閉BlockReceiver
if (lastPacketInBlock) {
finalizeBlock(startTime);
}
// 此時,必然知足 ack.seqno == pkt.seqno,構造新的 ack 發送給上游
sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0),
(pkt != null ? pkt.ackStatus : Status.SUCCESS));
// 已經處理完隊頭元素,出隊
// 只有一種狀況下知足pkt == null:PacketResponder#isRunning()返回false,即PacketResponder線程正在關閉。此時不管隊列中是否有元素,都不須要出隊了
if (pkt != null) {
removeAckHead();
}
} catch (IOException e) {
...// 異常處理
} catch (Throwable e) {
...// 異常處理
}
}
LOG.info(myString + " terminating");
}
複製代碼
總結起來,PacketResponder線程的核心工做以下:
一不當心把管道響應的邏輯也分析了。。。
掃一眼PacketResponder線程使用的出隊和查看對頭的方法:
// 查看隊頭
Packet waitForAckHead(long seqno) throws InterruptedException {
synchronized(ackQueue) {
while (isRunning() && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(myString + ": seqno=" + seqno +
" waiting for local datanode to finish write.");
}
ackQueue.wait();
}
return isRunning() ? ackQueue.getFirst() : null;
}
}
...
// 出隊
private void removeAckHead() {
synchronized(ackQueue) {
ackQueue.removeFirst();
ackQueue.notifyAll();
}
}
複製代碼
隊尾入隊,隊頭出隊。
- 每次查看對頭後,若是發現隊列非空,則只要不出隊,則隊列後續狀態必定是非空的,且隊頭元素不變。
- 查看隊頭後的第一次出隊,彈出的必定是剛纔查看隊頭看到的元素。
須要看下PacketResponder#finalizeBlock():
private void finalizeBlock(long startTime) throws IOException {
// 關閉BlockReceiver,並清理資源
BlockReceiver.this.close();
...// log
block.setNumBytes(replicaInfo.getNumBytes());
// datanode上的數據塊關閉委託給FsDatasetImpl#finalizeBlock()
datanode.data.finalizeBlock(block);
// namenode上的數據塊關閉委託給Datanode#closeBlock()
datanode.closeBlock(
block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
...// log
}
複製代碼
FsDatasetImpl#finalizeBlock():
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
}
ReplicaInfo replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
// this is legal, when recovery happens on a file that has
// been opened for append but never modified
return;
}
finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
...
private synchronized FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException {
FinalizedReplica newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR &&
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() ==
ReplicaState.FINALIZED) { // 數據塊恢復相關(略)
newReplicaInfo = (FinalizedReplica)
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
} else {
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
// 回憶BlockReceiver.<init>()的分析,咱們建立的block處於RBW狀態,block文件位於rbw目錄(固然,實際上位於哪裏也無所謂,緣由見後)
File f = replicaInfo.getBlockFile();
if (v == null) {
throw new IOException("No volume for temporary file " + f +
" for block " + replicaInfo);
}
// 在卷FsVolumeImpl上進行block文件與meta文件的狀態轉換
File dest = v.addFinalizedBlock(
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
// 該副本即表明最終的數據塊副本,處於FINALIZED狀態
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
...// 略
}
volumeMap.add(bpid, newReplicaInfo);
return newReplicaInfo;
}
複製代碼
FsVolumeImpl#addFinalizedBlock():
File addFinalizedBlock(String bpid, Block b, File f, long bytesReservedForRbw) throws IOException {
releaseReservedSpace(bytesReservedForRbw);
return getBlockPoolSlice(bpid).addBlock(b, f);
}
複製代碼
還記得datanode啓動過程當中分析的FsVolumeImpl與BlockPoolSlice的關係嗎?此處將操做繼續委託給BlockPoolSlice#addBlock():
可知,BlockPoolSlice僅管理處於FINALIZED的數據塊。
File addBlock(Block b, File f) throws IOException {
File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
if (!blockDir.exists()) {
if (!blockDir.mkdirs()) {
throw new IOException("Failed to mkdirs " + blockDir);
}
}
File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
...// 統計相關
return blockFile;
}
複製代碼
BlockPoolSlice反向藉助FsDatasetImpl提供的靜態方法FsDatasetImpl.moveBlockFiles():
static File moveBlockFiles(Block b, File srcfile, File destdir) throws IOException {
final File dstfile = new File(destdir, b.getBlockName());
final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
try {
NativeIO.renameTo(srcmeta, dstmeta);
} catch (IOException e) {
throw new IOException("Failed to move meta file for " + b
+ " from " + srcmeta + " to " + dstmeta, e);
}
try {
NativeIO.renameTo(srcfile, dstfile);
} catch (IOException e) {
throw new IOException("Failed to move block file for " + b
+ " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
}
...// 日誌
return dstfile;
}
複製代碼
直接將block文件和meta文件從原目錄(rbw目錄,對應RBW狀態)移動到finalized目錄(對應FINALIZED狀態)。
至此,datanode上的寫數據塊已經完成。
不過,namenode上的元信息尚未更新,所以,還要向namenode彙報收到了數據塊。
- 線程安全由FsDatasetImpl#finalizeReplica()保證
- 整個FsDatasetImpl#finalizeReplica()的流程中,都不關係數據塊的原位置,狀態轉換邏輯自己保證了其正確性。
Datanode#closeBlock():
void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
metrics.incrBlocksWritten();
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
// 向namenode彙報已收到的數據塊
bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
} else {
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
// 將新數據塊添加到blockScanner的掃描範圍中(暫不討論)
FsVolumeSpi volume = getFSDataset().getVolume(block);
if (blockScanner != null && !volume.isTransientStorage()) {
blockScanner.addBlock(block);
}
}
複製代碼
BPOfferService#notifyNamenodeReceivedBlock():
void notifyNamenodeReceivedBlock( ExtendedBlock block, String delHint, String storageUuid) {
checkBlock(block);
// 收到數據塊(增長)與刪除數據塊(減小)是一塊兒彙報的,都構造爲ReceivedDeletedBlockInfo
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
delHint);
// 每一個BPServiceActor都要向本身負責的namenode發送報告
for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeBlock(bInfo, storageUuid, true);
}
}
複製代碼
BPServiceActor#notifyNamenodeBlock():
void notifyNamenodeBlock(ReceivedDeletedBlockInfo bInfo, String storageUuid, boolean now) {
synchronized (pendingIncrementalBRperStorage) {
// 更新pendingIncrementalBRperStorage
addPendingReplicationBlockInfo(
bInfo, dn.getFSDataset().getStorage(storageUuid));
// sendImmediateIBR是一個volatile變量,控制是否當即發送BlockReport(BR)
sendImmediateIBR = true;
// 傳入的now爲true,接下來將喚醒阻塞在pendingIncrementalBRperStorage上的全部線程
if (now) {
pendingIncrementalBRperStorage.notifyAll();
}
}
}
複製代碼
該方法的核心是pendingIncrementalBRperStorage,它維護了兩次彙報之間收到、刪除的數據塊。pendingIncrementalBRperStorage是一個緩衝區,此處將收到的數據塊放入緩衝區後即認爲通知完成(固然,不必定成功);由其餘線程讀取緩衝區,異步向namenode彙報。
猴子看的源碼比較少,但這種緩衝區的設計思想在HDFS和Yarn中很是常見。緩衝區實現瞭解耦,解耦不只能提升可擴展性,還能在緩衝區兩端使用不一樣的處理速度、處理規模。如pendingIncrementalBRperStorage,生產者不按期、零散放入的數據塊,消費者就能夠按期、批量的對數據塊進行處理。而保障必定及時性的前提下,批量彙報減輕了RPC的壓力。
利用IDE,很容易得知,只有負責向各namenode發送心跳的BPServiceActor線程阻塞在pendingIncrementalBRperStorage上。後文將分析該線程如何進行實際的彙報。
根據對BlockReceiver#receivePacket()與PacketResponder線程的分析,節點已接收全部packet時,ack可能尚未發送完。
所以,須要調用PacketResponder#close(),等待發送完全部ack後關閉responder:
public void close() {
synchronized(ackQueue) {
// ackQueue非空就說明ack尚未發送完成
while (isRunning() && ackQueue.size() != 0) {
try {
ackQueue.wait();
} catch (InterruptedException e) {
running = false;
Thread.currentThread().interrupt();
}
}
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": closing");
}
// notify阻塞在PacketResponder#waitForAckHead()方法上的PacketResponder線程,使其檢測到關閉條件
running = false;
ackQueue.notifyAll();
}
// ???
synchronized(this) {
running = false;
notifyAll();
}
}
複製代碼
猴子沒明白19-22行的synchronized語句塊有什麼用,,,求解釋。
根據前文,接下來須要分析BPServiceActor線程如何讀取pendingIncrementalBRperStorage緩衝區,進行實際的彙報。
在BPServiceActor#offerService()中調用了pendingIncrementalBRperStorage#wait()。因爲涉及阻塞、喚醒等操做,沒法按照正常流程分析,這裏從線程被喚醒的位置開始分析:
// 若是目前不須要彙報,則wait一段時間
long waitTime = dnConf.heartBeatInterval -
(Time.now() - lastHeartbeat);
synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && !sendImmediateIBR) {
try {
// BPServiceActor線程今後處醒來,而後退出synchronized塊
pendingIncrementalBRperStorage.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted");
}
}
} // synchronized
複製代碼
可能有讀者閱讀過猴子的條件隊列大法好:使用wait、notify和notifyAll的正確姿式,認爲此處
if(){wait}
的寫法姿式不正確。讀者可再複習一下該文的「version2:過早喚醒」部分,結合HDFS的心跳機制,思考一下爲何此處的寫法沒有問題。更甚,此處偏偏應當這麼寫。
若是目前不須要彙報,則BPServiceActor線程會wait一段時間,正式這段wait的時間,讓BPServiceActor#notifyNamenodeBlock()的喚醒產生了意義。
BPServiceActor線程喚醒後,醒來後,繼續心跳循環:
while (shouldRun()) {
try {
final long startTime = now();
if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
複製代碼
假設還到達心跳發送間隔,則不執行if語句塊。
此時,在BPServiceActor#notifyNamenodeBlock()方法中修改的volatile變量sendImmediateIBR就派上了用場:
// 檢測到sendImmediateIBR爲true,則當即彙報已收到和已刪除的數據塊
if (sendImmediateIBR ||
(startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
// 彙報已收到和已刪除的數據塊
reportReceivedDeletedBlocks();
// 更新lastDeletedReport
lastDeletedReport = startTime;
}
// 再來一次完整的數據塊彙報
List<DatanodeCommand> cmds = blockReport();
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
// 處理namenode返回的命令
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
複製代碼
有意思的是,這裏先單獨彙報了一次數據塊收到和刪除的狀況,該RPC不須要等待namenode的返回值;又彙報了一次整體狀況,此時須要等待RPC的返回值了。
所以,儘管對於增刪數據塊採起增量式彙報,但因爲增量式彙報後必然跟着一次全量彙報,使得增量彙報的成本仍然很是高。爲了提升併發,BPServiceActor#notifyNamenodeBlock修改緩衝區後當即返回,不關心彙報是否成功。也沒必要擔憂彙報失敗的後果:在彙報以前,數據塊已經轉爲FINALIZED狀態+持久化到磁盤上+修改了緩衝區,若是彙報失敗能夠等待重試,若是datanode在發報告前掛了能夠等啓動後從新彙報,必然能保證一致性。
暫時不關心整體彙報的邏輯,只看單獨彙報的BPServiceActor#reportReceivedDeletedBlocks():
private void reportReceivedDeletedBlocks() throws IOException {
// 構造報告,並重置sendImmediateIBR爲false
ArrayList<StorageReceivedDeletedBlocks> reports =
new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
synchronized (pendingIncrementalBRperStorage) {
for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
pendingIncrementalBRperStorage.entrySet()) {
final DatanodeStorage storage = entry.getKey();
final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
if (perStorageMap.getBlockInfoCount() > 0) {
ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
}
}
sendImmediateIBR = false;
}
// 若是報告爲空,就直接返回
if (reports.size() == 0) {
return;
}
// 不然經過RPC向本身負責的namenode發送報告
boolean success = false;
try {
bpNamenode.blockReceivedAndDeleted(bpRegistration,
bpos.getBlockPoolId(),
reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
success = true;
} finally {
// 若是彙報失敗,則將增刪數據塊的信息放回緩衝區,等待從新彙報
if (!success) {
synchronized (pendingIncrementalBRperStorage) {
for (StorageReceivedDeletedBlocks report : reports) {
PerStoragePendingIncrementalBR perStorageMap =
pendingIncrementalBRperStorage.get(report.getStorage());
perStorageMap.putMissingBlockInfos(report.getBlocks());
sendImmediateIBR = true;
}
}
}
}
}
複製代碼
有兩個注意點:
1個客戶端+1個datanode構成了最小的管道。本文梳理了在這個最小管道上無異常狀況下的寫數據塊過程,在此之上,再來分析管道寫的有異常的難度將大大下降。
本文連接:源碼|HDFS之DataNode:寫數據塊(1)
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。