上一篇源碼|HDFS之DataNode:寫數據塊(1)分析了無管道無異常狀況下,datanode上的寫數據塊過程。本文分析管道寫無異常的狀況,假設副本系數3(即寫數據塊涉及1個客戶端+3個datanode),未發生任何異常。java
源碼版本:Apache Hadoop 2.6.0node
本文內容雖短,倒是創建在前文的基礎之上。對於前文已經說明的內容,本文再也不贅述,建議讀者按順序閱讀。git
根據源碼|HDFS之DataNode:寫數據塊(1),對於多副本的管道寫流程,主要影響DataXceiver#writeBlock()、BlockReceiver#receivePacket()、PacketResponder線程三部分。本文按照這三個分支展開。github
準備接收數據塊:BlockReceiver.<init>()
面試
public void writeBlock(final ExtendedBlock block, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientname, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException {
...// 檢查,設置參數等
...// 構建向上遊節點或客戶端回覆的輸出流(此處即爲客戶端)
...// 略
try {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// 建立BlockReceiver,準備接收數據塊
blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist);
storageUuid = blockReceiver.getStorageUuid();
} else {
...// 管道錯誤恢復相關
}
// 下游節點的處理:以當前節點爲「客戶端」,繼續觸發下游管道的創建
if (targets.length > 0) {
// 鏈接下游節點
InetSocketAddress mirrorTarget = null;
mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + mirrorNode);
}
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
// 嘗試創建管道(下面展開)
try {
// 設置創建socket的超時時間、寫packet的超時時間、寫buf大小等
int timeoutValue = dnConf.socketTimeout
+ (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
int writeTimeout = dnConf.socketWriteTimeout +
(HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
// 設置當前節點到下游的輸出流mirrorOut、下游到當前節點的輸入流mirrorIn等
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout);
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
DataEncryptionKeyFactory keyFactory =
datanode.getDataEncryptionKeyFactoryForBlock(block);
IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
unbufMirrorOut = saslStreams.out;
unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
// 向下遊節點發送創建管道的請求,將來將繼續使用mirrorOut做爲寫packet的輸出流
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy, false);
mirrorOut.flush();
// 若是是客戶端發起的寫數據塊請求(知足),則存在管道,須要從下游節點讀取創建管道的ack
if (isClient) {
BlockOpResponseProto connectAck =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
// 將下游節點的管道創建結果做爲整個管道的創建結果(要麼從尾節點到頭結點都是成功的,要麼都是失敗的)
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" got response for connect ack " +
" from downstream datanode with firstbadlink as " +
firstBadLink);
}
}
} catch (IOException e) {
...// 異常處理:清理資源,響應ack等
}
}
// 發送的第一個packet是空的,只用於創建管道。這裏當即返回ack表示管道是否創建成功
// 因爲該datanode沒有下游節點,則執行到此處,表示管道已經創建成功
if (isClient && !isTransfer) {
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
BlockOpResponseProto.newBuilder()
.setStatus(mirrorInStatus)
.setFirstBadLink(firstBadLink)
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
// 接收數據塊(也負責發送到下游,不過此處沒有下游節點)
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets, false);
...// 數據塊複製相關
}
...// 數據塊恢復相關
...// 數據塊複製相關
} catch (IOException ioe) {
LOG.info("opWriteBlock " + block + " received exception " + ioe);
throw ioe;
} finally {
...// 清理資源
}
...// 更新metrics
}
複製代碼
與副本系數1的狀況下相比,僅僅增長了「下游節點的處理」的部分:以當前節點爲「客戶端」,繼續觸發下游管道的創建;對於下游節點,仍然要走一遍當前節點的流程。當客戶端收到第一個datanode管道創建成功的ack時,下游全部的節點的管道必定已經創建成功,加上客戶端,組成了完整的管道。多線程
另外,根據前文的分析,直到執行BlockReceiver.receiveBlock()纔開始管道寫數據塊內容,結合管道的關閉過程,可知管道的生命週期分爲三個階段:異步
如圖說明幾個參數:socket
Sender#writeBlock():oop
public void writeBlock(final ExtendedBlock blk, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, final CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
ChecksumProto checksumProto =
DataTransferProtoUtil.toProto(requestedChecksum);
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
.setStorageType(PBHelper.convertStorageType(storageType))
// 去掉targets中的第一個節點
.addAllTargets(PBHelper.convert(targets, 1))
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
.setStage(toProto(stage))
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto)
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.setAllowLazyPersist(allowLazyPersist);
if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source));
}
send(out, Op.WRITE_BLOCK, proto.build());
}
...
private static void send(final DataOutputStream out, final Op opcode, final Message proto) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
+ ": " + proto);
}
op(out, opcode);
proto.writeDelimitedTo(out);
out.flush();
}
複製代碼
邏輯很是簡單。爲何要去掉targets中的第一個節點?假設客戶端發送的targets中順序存儲d一、d二、d3,當前節點爲d1,那麼d1的下游只剩下d二、d3,繼續向下遊發送管道創建請求時,天然要去掉當前targets中的第一個節點d1;d二、d3同理。性能
依靠這種targets逐漸減小的邏輯,DataXceiver#writeBlock()才能用targets.length > 0
判斷是否還有下游節點須要創建管道。
客戶端也使用Sender#writeBlock()創建管道。但發送過程略有不一樣:客戶端經過自定義的字節流寫入數據,須要將字節流中的數據整合成packet,再寫入管道。
同步接收packet:BlockReceiver#receivePacket()
先看BlockReceiver#receivePacket()。
嚴格來講,BlockReceiver#receivePacket()負責接收上游的packet,並繼續向下遊節點管道寫:
private int receivePacket() throws IOException {
// read the next packet
packetReceiver.receiveNextPacket(in);
PacketHeader header = packetReceiver.getHeader();
...// 略
...// 檢查packet頭
long offsetInBlock = header.getOffsetInBlock();
long seqno = header.getSeqno();
boolean lastPacketInBlock = header.isLastPacketInBlock();
final int len = header.getDataLen();
boolean syncBlock = header.getSyncBlock();
...// 略
// 若是不須要當即持久化也不須要校驗收到的數據,則能夠當即委託PacketResponder線程返回 SUCCESS 的ack,而後再進行校驗和持久化
if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
// 管道寫相關:將in中收到的packet鏡像寫入mirrorOut
if (mirrorOut != null && !mirrorError) {
try {
long begin = Time.monotonicNow();
packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush();
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
} catch (IOException e) {
handleMirrorOutError(e);
}
}
ByteBuffer dataBuf = packetReceiver.getDataSlice();
ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
if (lastPacketInBlock || len == 0) { // 收到空packet多是表示心跳或數據塊發送
// 這兩種狀況均可以嘗試把以前的數據刷到磁盤
if (syncBlock) {
flushOrSync(true);
}
} else { // 不然,須要持久化packet
final int checksumLen = diskChecksum.getChecksumSize(len);
final int checksumReceivedLen = checksumBuf.capacity();
...// 若是是管道中的最後一個節點,則持久化以前,要先對收到的packet作一次校驗(使用packet自己的校驗機制)
...// 若是校驗錯誤,則委託PacketResponder線程返回 ERROR_CHECKSUM 的ack
final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
&& streams.isTransientStorage();
try {
long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) {
...// 若是校驗塊不完整,須要加載並調整舊的meta文件內容,供後續從新計算crc
// 寫block文件
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ dataBuf.arrayOffset() + dataBuf.position();
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
// 寫meta文件
final byte[] lastCrc;
if (shouldNotWriteChecksum) {
lastCrc = null;
} else if (partialCrc != null) { // 若是是校驗塊不完整(以前收到過一部分)
...// 從新計算crc
...// 更新lastCrc
checksumOut.write(buf);
partialCrc = null;
} else { // 若是校驗塊完整
...// 更新lastCrc
checksumOut.write(checksumBuf.array(), offset, checksumLen);
}
...//略
}
} catch (IOException iex) {
datanode.checkDiskErrorAsync();
throw iex;
}
}
// 相反的,若是須要當即持久化或須要校驗收到的數據,則如今已經完成了持久化和校驗,能夠委託PacketResponder線程返回 SUCCESS 的ack
// if sync was requested, put in queue for pending acks here
// (after the fsync finished)
if (responder != null && (syncBlock || shouldVerifyChecksum())) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
...// 若是超過了響應時間,還要主動發送一個IN_PROGRESS的ack,防止超時
...// 節流器相關
// 當整個數據塊都發送完成以前,客戶端會可能會發送有數據的packet,也由於維持心跳或表示結束寫數據塊發送空packet
// 所以,當標誌位lastPacketInBlock爲true時,不能返回0,要返回一個負值,以區分未到達最後一個packet以前的狀況
return lastPacketInBlock?-1:len;
}
...
private boolean shouldVerifyChecksum() {
// 對於客戶端寫,只有管道中的最後一個節點知足`mirrorOut == null`
return (mirrorOut == null || isDatanode || needsChecksumTranslation);
}
複製代碼
因爲已經在中創建了管道,接下來,管道寫的工做很是簡單,只涉及「管道寫相關」部分:
每收到一個packet,就將in中收到的packet鏡像寫入mirrorOut;對於下游節點,仍然要走一遍當前節點的流程。
另外,BlockReceiver#shouldVerifyChecksum()也發揮了做用:管道的中間節點在落盤前不須要校驗。
異步發送ack:PacketResponder線程
與BlockReceiver#receivePacket()相對,PacketResponder線程負責接收下游節點的ack,並繼續向上遊管道響應。
PacketResponder#run():
public void run() {
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (isRunning() && !lastPacketInBlock) {
long totalAckTimeNanos = 0;
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try {
// 若是當前節點不是管道的最後一個節點,且下游節點正常,則從下游讀取ack
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
ack.readFields(downstreamIn);
...// 統計相關
...// OOB相關(暫時忽略)
seqno = ack.getSeqno();
}
// 若是從下游節點收到了正常的 ack,或當前節點是管道的最後一個節點,則須要從隊列中消費pkt(即BlockReceiver#receivePacket()放入的ack)
if (seqno != PipelineAck.UNKOWN_SEQNO
|| type == PacketResponderType.LAST_IN_PIPELINE) {
pkt = waitForAckHead(seqno);
if (!isRunning()) {
break;
}
// 管道寫用seqno控制packet的順序:當且僅當下游正確接收的序號與當前節點正確處理完的序號相等時,當前節點才認爲該序號的packet已正確接收;上游同理
expected = pkt.seqno;
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
&& seqno != expected) {
throw new IOException(myString + "seqno: expected=" + expected
+ ", received=" + seqno);
}
...// 統計相關
lastPacketInBlock = pkt.lastPacketInBlock;
}
} catch (InterruptedException ine) {
...// 異常處理
} catch (IOException ioe) {
...// 異常處理
}
...// 中斷退出
// 若是是最後一個packet,將block的狀態轉換爲FINALIZE,並關閉BlockReceiver
if (lastPacketInBlock) {
finalizeBlock(startTime);
}
// 此時,必然知足 ack.seqno == pkt.seqno,構造新的 ack 發送給上游
sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0),
(pkt != null ? pkt.ackStatus : Status.SUCCESS));
// 已經處理完隊頭元素,出隊
// 只有一種狀況下知足pkt == null:PacketResponder#isRunning()返回false,即PacketResponder線程正在關閉。此時不管隊列中是否有元素,都不須要出隊了
if (pkt != null) {
removeAckHead();
}
} catch (IOException e) {
...// 異常處理
} catch (Throwable e) {
...// 異常處理
}
}
LOG.info(myString + " terminating");
}
複製代碼
前文一不當心分析了PacketResponder線程如何處理以管道的方式響應ack,此處簡單複習,關注ack與pkt的關係。
總結起來,PacketResponder線程的核心工做以下:
早上碰巧看到一道面試題:
1個節點發送100G的數據到99個節點,硬盤、內存、網卡速度都是1G/s,如什麼時候間最短?
猴子有篇筆記裏分析了「管道寫」技術的優點。若是熟悉HDFS中的「管道寫」,就很容易解決該題:
單網卡1G/s,那麼同時讀寫的速度最大500M/s。假設硬盤大於100G,內存大於1G,忽略零碎的創建管道、響應ack的成本,管道寫一個100G大小的數據塊,至少須要100G / (500M/s) = 200s
。
能不能繼續優化呢?其實很容易估計,看集羣中閒置資源還有多少。在管道寫的方案中,兩個節點間的帶寬上始終佔着500M數據,所以,只有管道中的頭節點與尾節點剩餘500M/s的帶寬,其餘節點的帶寬都已經打滿。所以,已經沒法繼續優化。
若是題目的資源並無這麼理想,好比硬盤讀800M/s,寫200M/s,那麼明顯管道寫的速度最高也只能到200M/s,其餘資源和假設不變,則至少須要100G / (200M/s) = 500s
。固然,實際狀況比這裏的假設要複雜的多,管道寫的最大好處在於性能平衡,讓每一個節點的資源佔用至關,不出現短板纔可能發揮最大的優點。
- 忘記題目描述網卡1G/s,仍是帶寬1G/s。若是是後者,那麼速度快一倍,至少須要100s。
- 題目還要求寫出僞碼。若是不考慮容錯性,徹底能夠按照這兩篇文章的分析,剝離出主幹代碼完成題目,猴子就不囉嗦了。
引用一張圖作總結:
瞭解了管道寫的正常流程,下文將分析管道寫中的部分錯誤處理策略。
本文連接:源碼|HDFS之DataNode:寫數據塊(2)
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。