源碼|HDFS之DataNode:寫數據塊(2)

上一篇源碼|HDFS之DataNode:寫數據塊(1)分析了無管道無異常狀況下,datanode上的寫數據塊過程。本文分析管道寫無異常的狀況,假設副本系數3(即寫數據塊涉及1個客戶端+3個datanode),未發生任何異常java

源碼版本:Apache Hadoop 2.6.0node

本文內容雖短,倒是創建在前文的基礎之上。對於前文已經說明的內容,本文再也不贅述,建議讀者按順序閱讀。git

開始以前

總覽

根據源碼|HDFS之DataNode:寫數據塊(1),對於多副本的管道寫流程,主要影響DataXceiver#writeBlock()、BlockReceiver#receivePacket()、PacketResponder線程三部分。本文按照這三個分支展開。github

文章的組織結構

  1. 若是隻涉及單個分支的分析,則放在同一節。
  2. 若是涉及多個分支的分析,則在下一級分多個節,每節討論一個分支。
  3. 多線程的分析同多分支。
  4. 每個分支和線程的組織結構遵循規則1-3。

創建管道:DataXceiver#writeBlock()

準備接收數據塊:BlockReceiver.<init>()面試

public void writeBlock(final ExtendedBlock block, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientname, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException {
    ...// 檢查,設置參數等

    ...// 構建向上遊節點或客戶端回覆的輸出流(此處即爲客戶端)

    ...// 略
    
    try {
      if (isDatanode || 
          stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
        // 建立BlockReceiver,準備接收數據塊
        blockReceiver = new BlockReceiver(block, storageType, in,
            peer.getRemoteAddressString(),
            peer.getLocalAddressString(),
            stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
            clientname, srcDataNode, datanode, requestedChecksum,
            cachingStrategy, allowLazyPersist);

        storageUuid = blockReceiver.getStorageUuid();
      } else {
        ...// 管道錯誤恢復相關
      }

      // 下游節點的處理:以當前節點爲「客戶端」,繼續觸發下游管道的創建
      if (targets.length > 0) {
        // 鏈接下游節點
        InetSocketAddress mirrorTarget = null;
        mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Connecting to datanode " + mirrorNode);
        }
        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
        mirrorSock = datanode.newSocket();
        // 嘗試創建管道(下面展開)
        try {
          // 設置創建socket的超時時間、寫packet的超時時間、寫buf大小等
          int timeoutValue = dnConf.socketTimeout
              + (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
          int writeTimeout = dnConf.socketWriteTimeout + 
                      (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
          NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
          mirrorSock.setSoTimeout(timeoutValue);
          mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
          
          // 設置當前節點到下游的輸出流mirrorOut、下游到當前節點的輸入流mirrorIn等
          OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
              writeTimeout);
          InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
          DataEncryptionKeyFactory keyFactory =
            datanode.getDataEncryptionKeyFactoryForBlock(block);
          IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
            unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
          unbufMirrorOut = saslStreams.out;
          unbufMirrorIn = saslStreams.in;
          mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
              HdfsConstants.SMALL_BUFFER_SIZE));
          mirrorIn = new DataInputStream(unbufMirrorIn);

          // 向下遊節點發送創建管道的請求,將來將繼續使用mirrorOut做爲寫packet的輸出流
          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
              latestGenerationStamp, requestedChecksum, cachingStrategy, false);
          mirrorOut.flush();

          // 若是是客戶端發起的寫數據塊請求(知足),則存在管道,須要從下游節點讀取創建管道的ack
          if (isClient) {
            BlockOpResponseProto connectAck =
              BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
            // 將下游節點的管道創建結果做爲整個管道的創建結果(要麼從尾節點到頭結點都是成功的,要麼都是失敗的)
            mirrorInStatus = connectAck.getStatus();
            firstBadLink = connectAck.getFirstBadLink();
            if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
              LOG.info("Datanode " + targets.length +
                       " got response for connect ack " +
                       " from downstream datanode with firstbadlink as " +
                       firstBadLink);
            }
          }

        } catch (IOException e) {
          ...// 異常處理:清理資源,響應ack等
        }
      }
      
      // 發送的第一個packet是空的,只用於創建管道。這裏當即返回ack表示管道是否創建成功
      // 因爲該datanode沒有下游節點,則執行到此處,表示管道已經創建成功
      if (isClient && !isTransfer) {
        if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
          LOG.info("Datanode " + targets.length +
                   " forwarding connect ack to upstream firstbadlink is " +
                   firstBadLink);
        }
        BlockOpResponseProto.newBuilder()
          .setStatus(mirrorInStatus)
          .setFirstBadLink(firstBadLink)
          .build()
          .writeDelimitedTo(replyOut);
        replyOut.flush();
      }

      // 接收數據塊(也負責發送到下游,不過此處沒有下游節點)
      if (blockReceiver != null) {
        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
            mirrorAddr, null, targets, false);

        ...// 數據塊複製相關
      }

      ...// 數據塊恢復相關
      
      ...// 數據塊複製相關
      
    } catch (IOException ioe) {
      LOG.info("opWriteBlock " + block + " received exception " + ioe);
      throw ioe;
    } finally {
      ...// 清理資源
    }

    ...// 更新metrics
  }
複製代碼

與副本系數1的狀況下相比,僅僅增長了「下游節點的處理」的部分:以當前節點爲「客戶端」,繼續觸發下游管道的創建;對於下游節點,仍然要走一遍當前節點的流程當客戶端收到第一個datanode管道創建成功的ack時,下游全部的節點的管道必定已經創建成功,加上客戶端,組成了完整的管道。多線程

另外,根據前文的分析,直到執行BlockReceiver.receiveBlock()纔開始管道寫數據塊內容,結合管道的關閉過程,可知管道的生命週期分爲三個階段:異步

  1. 管道創建:以管道的方式向下遊發送管道創建的請求,從下游接收管道創建的響應。
  2. 管道寫:當客戶端收到管道創建成功的ack時,才利用剛剛創建的管道開始管道寫數據塊的內容。
  3. 管道關閉:以管道的方式向下遊發送管道關閉的請求,從下游接收管道關閉的響應。

如圖說明幾個參數:socket

image.png

  • in:上游節點到當前節點的輸入流,當前節點經過in接收上游節點的packet。
  • replyOut::當前節點到上游節點的輸出流,當前節點經過replyOut向上遊節點發送ack。
  • mirrorOut:當前節點到下游節點的輸出流,當前節點經過mirrorOut向下遊節點鏡像發送packet。
  • mirrorIn:下游節點到當前節點的輸入流,當前節點經過mirrorIn接收下游節點的鏡像ack。

請求創建管道:Sender#writeBlock()

Sender#writeBlock():oop

public void writeBlock(final ExtendedBlock blk, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, final CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException {
    ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
        blk, clientName, blockToken);
    
    ChecksumProto checksumProto =
      DataTransferProtoUtil.toProto(requestedChecksum);

    OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
      .setHeader(header)
      .setStorageType(PBHelper.convertStorageType(storageType))
      // 去掉targets中的第一個節點
      .addAllTargets(PBHelper.convert(targets, 1))
      .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
      .setStage(toProto(stage))
      .setPipelineSize(pipelineSize)
      .setMinBytesRcvd(minBytesRcvd)
      .setMaxBytesRcvd(maxBytesRcvd)
      .setLatestGenerationStamp(latestGenerationStamp)
      .setRequestedChecksum(checksumProto)
      .setCachingStrategy(getCachingStrategy(cachingStrategy))
      .setAllowLazyPersist(allowLazyPersist);
    
    if (source != null) {
      proto.setSource(PBHelper.convertDatanodeInfo(source));
    }

    send(out, Op.WRITE_BLOCK, proto.build());
  }
  
  ...
  
  private static void send(final DataOutputStream out, final Op opcode, final Message proto) throws IOException {
    if (LOG.isTraceEnabled()) {
      LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
          + ": " + proto);
    }
    op(out, opcode);
    proto.writeDelimitedTo(out);
    out.flush();
  }
複製代碼

邏輯很是簡單。爲何要去掉targets中的第一個節點?假設客戶端發送的targets中順序存儲d一、d二、d3,當前節點爲d1,那麼d1的下游只剩下d二、d3,繼續向下遊發送管道創建請求時,天然要去掉當前targets中的第一個節點d1;d二、d3同理。性能

依靠這種targets逐漸減小的邏輯,DataXceiver#writeBlock()才能用targets.length > 0判斷是否還有下游節點須要創建管道。

客戶端也使用Sender#writeBlock()創建管道。但發送過程略有不一樣:客戶端經過自定義的字節流寫入數據,須要將字節流中的數據整合成packet,再寫入管道。

向下遊管道發送packet:BlockReceiver#receivePacket()

同步接收packet:BlockReceiver#receivePacket()

先看BlockReceiver#receivePacket()。

嚴格來講,BlockReceiver#receivePacket()負責接收上游的packet,並繼續向下遊節點管道寫

private int receivePacket() throws IOException {
    // read the next packet
    packetReceiver.receiveNextPacket(in);

    PacketHeader header = packetReceiver.getHeader();
    ...// 略

    ...// 檢查packet頭

    long offsetInBlock = header.getOffsetInBlock();
    long seqno = header.getSeqno();
    boolean lastPacketInBlock = header.isLastPacketInBlock();
    final int len = header.getDataLen();
    boolean syncBlock = header.getSyncBlock();

    ...// 略
    
    // 若是不須要當即持久化也不須要校驗收到的數據,則能夠當即委託PacketResponder線程返回 SUCCESS 的ack,而後再進行校驗和持久化
    if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
      ((PacketResponder) responder.getRunnable()).enqueue(seqno,
          lastPacketInBlock, offsetInBlock, Status.SUCCESS);
    }

    // 管道寫相關:將in中收到的packet鏡像寫入mirrorOut
    if (mirrorOut != null && !mirrorError) {
      try {
        long begin = Time.monotonicNow();
        packetReceiver.mirrorPacketTo(mirrorOut);
        mirrorOut.flush();
        long duration = Time.monotonicNow() - begin;
        if (duration > datanodeSlowLogThresholdMs) {
          LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
              + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
        }
      } catch (IOException e) {
        handleMirrorOutError(e);
      }
    }
    
    ByteBuffer dataBuf = packetReceiver.getDataSlice();
    ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
    
    if (lastPacketInBlock || len == 0) {    // 收到空packet多是表示心跳或數據塊發送
      // 這兩種狀況均可以嘗試把以前的數據刷到磁盤
      if (syncBlock) {
        flushOrSync(true);
      }
    } else {    // 不然,須要持久化packet
      final int checksumLen = diskChecksum.getChecksumSize(len);
      final int checksumReceivedLen = checksumBuf.capacity();

      ...// 若是是管道中的最後一個節點,則持久化以前,要先對收到的packet作一次校驗(使用packet自己的校驗機制)
      ...// 若是校驗錯誤,則委託PacketResponder線程返回 ERROR_CHECKSUM 的ack

      final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
          && streams.isTransientStorage();
      try {
        long onDiskLen = replicaInfo.getBytesOnDisk();
        if (onDiskLen<offsetInBlock) {
          ...// 若是校驗塊不完整,須要加載並調整舊的meta文件內容,供後續從新計算crc

          // 寫block文件
          int startByteToDisk = (int)(onDiskLen-firstByteInBlock) 
              + dataBuf.arrayOffset() + dataBuf.position();
          int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
          out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
          
          // 寫meta文件
          final byte[] lastCrc;
          if (shouldNotWriteChecksum) {
            lastCrc = null;
          } else if (partialCrc != null) {  // 若是是校驗塊不完整(以前收到過一部分)
            ...// 從新計算crc
            ...// 更新lastCrc
            checksumOut.write(buf);
            partialCrc = null;
          } else { // 若是校驗塊完整
            ...// 更新lastCrc
            checksumOut.write(checksumBuf.array(), offset, checksumLen);
          }

          ...//略
        }
      } catch (IOException iex) {
        datanode.checkDiskErrorAsync();
        throw iex;
      }
    }

    // 相反的,若是須要當即持久化或須要校驗收到的數據,則如今已經完成了持久化和校驗,能夠委託PacketResponder線程返回 SUCCESS 的ack
    // if sync was requested, put in queue for pending acks here
    // (after the fsync finished)
    if (responder != null && (syncBlock || shouldVerifyChecksum())) {
      ((PacketResponder) responder.getRunnable()).enqueue(seqno,
          lastPacketInBlock, offsetInBlock, Status.SUCCESS);
    }

    ...// 若是超過了響應時間,還要主動發送一個IN_PROGRESS的ack,防止超時

    ...// 節流器相關
    
    // 當整個數據塊都發送完成以前,客戶端會可能會發送有數據的packet,也由於維持心跳或表示結束寫數據塊發送空packet
    // 所以,當標誌位lastPacketInBlock爲true時,不能返回0,要返回一個負值,以區分未到達最後一個packet以前的狀況
    return lastPacketInBlock?-1:len;
  }
  
  ...
  
  private boolean shouldVerifyChecksum() {
    // 對於客戶端寫,只有管道中的最後一個節點知足`mirrorOut == null`
    return (mirrorOut == null || isDatanode || needsChecksumTranslation);
  }
複製代碼

因爲已經在中創建了管道,接下來,管道寫的工做很是簡單,只涉及「管道寫相關」部分:

每收到一個packet,就將in中收到的packet鏡像寫入mirrorOut;對於下游節點,仍然要走一遍當前節點的流程

另外,BlockReceiver#shouldVerifyChecksum()也發揮了做用:管道的中間節點在落盤前不須要校驗

向上遊管道響應ack:PacketResponder線程

異步發送ack:PacketResponder線程

與BlockReceiver#receivePacket()相對,PacketResponder線程負責接收下游節點的ack,並繼續向上遊管道響應

PacketResponder#run():

public void run() {
      boolean lastPacketInBlock = false;
      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
      while (isRunning() && !lastPacketInBlock) {
        long totalAckTimeNanos = 0;
        boolean isInterrupted = false;
        try {
          Packet pkt = null;
          long expected = -2;
          PipelineAck ack = new PipelineAck();
          long seqno = PipelineAck.UNKOWN_SEQNO;
          long ackRecvNanoTime = 0;
          try {
            // 若是當前節點不是管道的最後一個節點,且下游節點正常,則從下游讀取ack
            if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
              ack.readFields(downstreamIn);
              ...// 統計相關
              ...// OOB相關(暫時忽略)
              seqno = ack.getSeqno();
            }
            // 若是從下游節點收到了正常的 ack,或當前節點是管道的最後一個節點,則須要從隊列中消費pkt(即BlockReceiver#receivePacket()放入的ack)
            if (seqno != PipelineAck.UNKOWN_SEQNO
                || type == PacketResponderType.LAST_IN_PIPELINE) {
              pkt = waitForAckHead(seqno);
              if (!isRunning()) {
                break;
              }
              // 管道寫用seqno控制packet的順序:當且僅當下游正確接收的序號與當前節點正確處理完的序號相等時,當前節點才認爲該序號的packet已正確接收;上游同理
              expected = pkt.seqno;
              if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
                  && seqno != expected) {
                throw new IOException(myString + "seqno: expected=" + expected
                    + ", received=" + seqno);
              }
              ...// 統計相關
              lastPacketInBlock = pkt.lastPacketInBlock;
            }
          } catch (InterruptedException ine) {
            ...// 異常處理
          } catch (IOException ioe) {
            ...// 異常處理
          }

          ...// 中斷退出

          // 若是是最後一個packet,將block的狀態轉換爲FINALIZE,並關閉BlockReceiver
          if (lastPacketInBlock) {
            finalizeBlock(startTime);
          }

          // 此時,必然知足 ack.seqno == pkt.seqno,構造新的 ack 發送給上游
          sendAckUpstream(ack, expected, totalAckTimeNanos,
              (pkt != null ? pkt.offsetInBlock : 0), 
              (pkt != null ? pkt.ackStatus : Status.SUCCESS));
          // 已經處理完隊頭元素,出隊
          // 只有一種狀況下知足pkt == null:PacketResponder#isRunning()返回false,即PacketResponder線程正在關閉。此時不管隊列中是否有元素,都不須要出隊了
          if (pkt != null) {
            removeAckHead();
          }
        } catch (IOException e) {
          ...// 異常處理
        } catch (Throwable e) {
          ...// 異常處理
        }
      }
      LOG.info(myString + " terminating");
    }
複製代碼

前文一不當心分析了PacketResponder線程如何處理以管道的方式響應ack,此處簡單複習,關注ack與pkt的關係。

總結起來,PacketResponder線程的核心工做以下:

  1. 接收下游節點的ack
  2. 比較ack.seqno與當前隊頭的pkt.seqno
  3. 若是相等,則向上遊發送pkt
  4. 若是是最後一個packet,將block的狀態轉換爲FINALIZED

一道面試題

早上碰巧看到一道面試題:

1個節點發送100G的數據到99個節點,硬盤、內存、網卡速度都是1G/s,如什麼時候間最短?

猴子有篇筆記裏分析了「管道寫」技術的優點。若是熟悉HDFS中的「管道寫」,就很容易解決該題:

單網卡1G/s,那麼同時讀寫的速度最大500M/s。假設硬盤大於100G,內存大於1G,忽略零碎的創建管道、響應ack的成本,管道寫一個100G大小的數據塊,至少須要100G / (500M/s) = 200s

能不能繼續優化呢?其實很容易估計,看集羣中閒置資源還有多少。在管道寫的方案中,兩個節點間的帶寬上始終佔着500M數據,所以,只有管道中的頭節點與尾節點剩餘500M/s的帶寬,其餘節點的帶寬都已經打滿。所以,已經沒法繼續優化。

若是題目的資源並無這麼理想,好比硬盤讀800M/s,寫200M/s,那麼明顯管道寫的速度最高也只能到200M/s,其餘資源和假設不變,則至少須要100G / (200M/s) = 500s。固然,實際狀況比這裏的假設要複雜的多,管道寫的最大好處在於性能平衡,讓每一個節點的資源佔用至關,不出現短板纔可能發揮最大的優點。

  • 忘記題目描述網卡1G/s,仍是帶寬1G/s。若是是後者,那麼速度快一倍,至少須要100s。
  • 題目還要求寫出僞碼。若是不考慮容錯性,徹底能夠按照這兩篇文章的分析,剝離出主幹代碼完成題目,猴子就不囉嗦了。

總結

引用一張圖作總結:

image.png

瞭解了管道寫的正常流程,下文將分析管道寫中的部分錯誤處理策略。


本文連接:源碼|HDFS之DataNode:寫數據塊(2)
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。

相關文章
相關標籤/搜索