源碼|HDFS之DataNode:寫數據塊(3)

源碼|HDFS之DataNode:寫數據塊(1)源碼|HDFS之DataNode:寫數據塊(2)分別分析了無管道無異常、管道寫無異常的狀況下,datanode上的寫數據塊過程。本文分析管道寫有異常的狀況,假設副本系數3(即寫數據塊涉及1個客戶端+3個datanode),討論datanode對不一樣異常種類、不一樣異常時機的處理。java

源碼版本:Apache Hadoop 2.6.0node

結論與實現都相對簡單。可僅閱讀總覽。git

開始以前

總覽

datanode對寫數據塊過程當中的異常處理比較簡單,一般採用兩種策略:github

  1. 當前節點拋異常,關閉上下游的IO流、socket等,以關閉管道。
  2. 向上遊節點發送攜帶故障信息的ack。

只有少部分狀況採用方案2;大部分狀況採用方案1,簡單關閉管道了事;部分狀況兩者結合。多線程

雖然異常處理策略簡單,但涉及異常處理的代碼卻很多,總體思路參照源碼|HDFS之DataNode:寫數據塊(1)主流程中的DataXceiver#writeBlock()方法,部分融合了源碼|HDFS之DataNode:寫數據塊(2)中管道寫的內容 。本文從主流程DataXceiver#writeBlock()入手,部分涉及DataXceiver#writeBlock()的外層方法。app

更值得關注的是寫數據塊的故障恢復流程,該工做由客戶端主導,猴子將在對客戶端的分析中討論。異步

文章的組織結構

  1. 若是隻涉及單個分支的分析,則放在同一節。
  2. 若是涉及多個分支的分析,則在下一級分多個節,每節討論一個分支。
  3. 多線程的分析同多分支。
  4. 每個分支和線程的組織結構遵循規則1-3。

主流程:DataXceiver#writeBlock()

DataXceiver#writeBlock():socket

public void writeBlock(final ExtendedBlock block, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientname, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException {
    ...// 檢查,設置參數等

    ...// 構建向上遊節點或客戶端回覆的輸出流(此處即爲客戶端)

    ...// 略
    
    try {
      if (isDatanode || 
          stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
        // 建立BlockReceiver,準備接收數據塊
        blockReceiver = new BlockReceiver(block, storageType, in,
            peer.getRemoteAddressString(),
            peer.getLocalAddressString(),
            stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
            clientname, srcDataNode, datanode, requestedChecksum,
            cachingStrategy, allowLazyPersist);

        storageUuid = blockReceiver.getStorageUuid();
      } else {
        ...// 管道錯誤恢復相關
      }

      // 下游節點的處理:以當前節點爲「客戶端」,繼續觸發下游管道的創建
      if (targets.length > 0) {
        // 鏈接下游節點
        InetSocketAddress mirrorTarget = null;
        mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Connecting to datanode " + mirrorNode);
        }
        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
        mirrorSock = datanode.newSocket();
        // 嘗試創建管道(下面展開)
        try {
          // 設置創建socket的超時時間、寫packet的超時時間、寫buf大小等
          int timeoutValue = dnConf.socketTimeout
              + (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
          int writeTimeout = dnConf.socketWriteTimeout + 
                      (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
          NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
          mirrorSock.setSoTimeout(timeoutValue);
          mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
          
          // 設置當前節點到下游的輸出流mirrorOut、下游到當前節點的輸入流mirrorIn等
          OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
              writeTimeout);
          InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
          DataEncryptionKeyFactory keyFactory =
            datanode.getDataEncryptionKeyFactoryForBlock(block);
          IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
            unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
          unbufMirrorOut = saslStreams.out;
          unbufMirrorIn = saslStreams.in;
          mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
              HdfsConstants.SMALL_BUFFER_SIZE));
          mirrorIn = new DataInputStream(unbufMirrorIn);

          // 向下遊節點發送創建管道的請求,將來將繼續使用mirrorOut做爲寫packet的輸出流
          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
              latestGenerationStamp, requestedChecksum, cachingStrategy, false);
          mirrorOut.flush();

          // 若是是客戶端發起的寫數據塊請求(知足),則存在管道,須要從下游節點讀取創建管道的ack
          if (isClient) {
            BlockOpResponseProto connectAck =
              BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
            // 將下游節點的管道創建結果做爲整個管道的創建結果(要麼從尾節點到頭結點都是成功的,要麼都是失敗的)
            mirrorInStatus = connectAck.getStatus();
            firstBadLink = connectAck.getFirstBadLink();
            if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
              LOG.info("Datanode " + targets.length +
                       " got response for connect ack " +
                       " from downstream datanode with firstbadlink as " +
                       firstBadLink);
            }
          }

        } catch (IOException e) {
          // 若是是客戶端發起的寫數據塊請求(知足),則當即向上遊發送狀態ERROR的ack(儘管沒法保證上游已收到)
          if (isClient) {
            BlockOpResponseProto.newBuilder()
              .setStatus(ERROR)
               // NB: Unconditionally using the xfer addr w/o hostname
              .setFirstBadLink(targets[0].getXferAddr())
              .build()
              .writeDelimitedTo(replyOut);
            replyOut.flush();
          }
          // 關閉下游的IO流,socket
          IOUtils.closeStream(mirrorOut);
          mirrorOut = null;
          IOUtils.closeStream(mirrorIn);
          mirrorIn = null;
          IOUtils.closeSocket(mirrorSock);
          mirrorSock = null;
          // 若是是客戶端發起的寫數據塊請求(知足),則從新拋出該異常
          // 而後,將跳到外層的catch塊
          if (isClient) {
            LOG.error(datanode + ":Exception transfering block " +
                      block + " to mirror " + mirrorNode + ": " + e);
            throw e;
          } else {
            LOG.info(datanode + ":Exception transfering " +
                     block + " to mirror " + mirrorNode +
                     "- continuing without the mirror", e);
          }
        }
      }
      
      // 發送的第一個packet是空的,只用於創建管道。這裏當即返回ack表示管道是否創建成功
      // 因爲該datanode沒有下游節點,則執行到此處,表示管道已經創建成功
      if (isClient && !isTransfer) {
        if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
          LOG.info("Datanode " + targets.length +
                   " forwarding connect ack to upstream firstbadlink is " +
                   firstBadLink);
        }
        BlockOpResponseProto.newBuilder()
          .setStatus(mirrorInStatus)
          .setFirstBadLink(firstBadLink)
          .build()
          .writeDelimitedTo(replyOut);
        replyOut.flush();
      }

      // 接收數據塊(也負責發送到下游,不過此處沒有下游節點)
      if (blockReceiver != null) {
        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
            mirrorAddr, null, targets, false);

        ...// 數據塊複製相關
      }

      ...// 數據塊恢復相關
      
      ...// 數據塊複製相關
      
    } catch (IOException ioe) {
      // 若是捕獲到IOC,則直接拋出
      LOG.info("opWriteBlock " + block + " received exception " + ioe);
      throw ioe;
    } finally {
      // 無論正常仍是異常,都直接關閉IO流、socket
      IOUtils.closeStream(mirrorOut);
      IOUtils.closeStream(mirrorIn);
      IOUtils.closeStream(replyOut);
      IOUtils.closeSocket(mirrorSock);
      IOUtils.closeStream(blockReceiver);
      blockReceiver = null;
    }

    ...// 更新metrics
  }
複製代碼

最後的finally塊對異常處理相當重要:oop

正常狀況不表。對於異常狀況,關閉全部到下游的IO流(mirrorOut、mirrorIn)、socket(mirrorSock),關閉到上游的輸出流(replyOut),關閉blockReceiver內部封裝的大部分資源(經過BlockReceiver#close()完成),剩餘資源如到上游的輸入流(in)由外層的DataXceiver#run()中的finally塊關閉。ui

replyOut只是一個過濾器流,其包裝的底層輸出流也能夠由DataXceiver#run()中的finally塊關閉。限於篇幅,本文不展開。

記住此處finally塊的做用,後面將屢次重複該處代碼,構成總覽中的方案1。

下面以三個關鍵過程爲例,分析這三個關鍵過程當中的異常處理,及其與外層異常處理邏輯的交互。

本地準備:BlockReceiver.<init>()

根據前文的分析,BlockReceiver.<init>()的主要工做比較簡單:在rbw目錄下建立block文件和meta文件:

BlockReceiver(final ExtendedBlock block, final StorageType storageType,
      final DataInputStream in,
      final String inAddr, final String myAddr,
      final BlockConstructionStage stage, 
      final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
      final String clientname, final DatanodeInfo srcDataNode,
      final DataNode datanode, DataChecksum requestedChecksum,
      CachingStrategy cachingStrategy,
      final boolean allowLazyPersist) throws IOException {
    try{
      ...// 檢查,設置參數等

      // 打開文件,準備接收數據塊
      if (isDatanode) { // 數據塊複製和數據塊移動是由數據節點發起的,這是在tmp目錄下建立block文件
        replicaInfo = datanode.data.createTemporary(storageType, block);
      } else {
        switch (stage) {
        // 對於客戶端發起的寫數據請求(只考慮create,不考慮append),在rbw目錄下建立數據塊(block文件、meta文件,數據塊處於RBW狀態)
        case PIPELINE_SETUP_CREATE:
          replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
          datanode.notifyNamenodeReceivingBlock(
              block, replicaInfo.getStorageUuid());
          break;
        ...// 其餘case
        default: throw new IOException("Unsupported stage " + stage + 
              " while receiving block " + block + " from " + inAddr);
        }
      }
      ...// 略
      
      // 對於數據塊複製、數據塊移動、客戶端建立數據塊,本質上都在建立新的block文件。對於這些狀況,isCreate爲true
      final boolean isCreate = isDatanode || isTransfer 
          || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
      streams = replicaInfo.createStreams(isCreate, requestedChecksum);
      assert streams != null : "null streams!";

      ...// 計算meta文件的文件頭
      // 若是須要建立新的block文件,也就須要同時建立新的meta文件,並寫入文件頭
      if (isCreate) {
        BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
      } 
    } catch (ReplicaAlreadyExistsException bae) {
      throw bae;
    } catch (ReplicaNotFoundException bne) {
      throw bne;
    } catch(IOException ioe) {
      // IOE一般涉及文件等資源,所以要額外清理資源
      IOUtils.closeStream(this);
      cleanupBlock();
      
      // check if there is a disk error
      IOException cause = DatanodeUtil.getCauseIfDiskError(ioe);
      DataNode.LOG.warn("IOException in BlockReceiver constructor. Cause is ",
          cause);
      
      if (cause != null) { // possible disk error
        ioe = cause;
        datanode.checkDiskErrorAsync();
      }
      
      // 從新拋出IOE
      throw ioe;
    }
  }
複製代碼

特別提一下DataNode#checkDiskErrorAsync(),該方法異步檢查是否有磁盤錯誤,若是錯誤磁盤超過閾值,就關閉datanode。但閾值的計算猴子尚未看懂,看起來是對DataStorage的理解有問題。

BlockReceiver#close()的工做已經介紹過了。須要關注的是_對ReplicaAlreadyExistsException與其餘IOException的處理:從新拋出_。

ReplicaAlreadyExistsException是IOException的子類,由FsDatasetImpl#createRbw()拋出。

至於拋出IOException的狀況就太多了,無權限、磁盤錯誤等很是緣由。

從新拋出這些異常塊會怎樣呢?觸發外層DataXceiver#writeBlock()中的catch塊與finally塊。

因爲至今尚未創建下游管道,先讓咱們看看因爲異常執行finally塊,對上游節點產生的惡果:

  • 在DataXceiver線程啓動後,DataXceiver#peer中封裝了當前節點到上游節點的輸出流(out)與上游節點到當前節點的輸入流(in)。
  • 這些IO流的本質是socket,關閉當前節點端的socket後,上游節點端的socket也會在一段時間後觸發超時關閉,並拋出SocketException(IOException的子類)。
  • 上游節點因爲socket關閉捕獲到了IOException,因而也執行finally塊,重複一遍當前節點的流程。

如此,逐級關閉上游節點的管道,直到客戶端對管道關閉的異常做出處理。

若是在建立block文件或meta文件時拋出了異常,目前沒有策略及時清理rbw目錄下的「無主」數據塊。讀者可嘗試debug執行BlockReceiver.<init>(),在rbw目錄下建立數據塊後長時間不讓線程繼續執行,最終管道超時關閉,但rbw目錄下的文件依然存在。

不過數據塊恢復過程可完成清理工做,此處不展開。

創建管道:if (targets.length > 0) {代碼塊

若是本地準備沒有發生異常,則開始創建管道:

// 下游節點的處理:以當前節點爲「客戶端」,繼續觸發下游管道的創建
      if (targets.length > 0) {
        // 鏈接下游節點
        InetSocketAddress mirrorTarget = null;
        mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Connecting to datanode " + mirrorNode);
        }
        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
        mirrorSock = datanode.newSocket();
        // 嘗試創建管道(下面展開)
        try {
          // 設置創建socket的超時時間、寫packet的超時時間、寫buf大小等
          int timeoutValue = dnConf.socketTimeout
              + (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
          int writeTimeout = dnConf.socketWriteTimeout + 
                      (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
          NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
          mirrorSock.setSoTimeout(timeoutValue);
          mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
          
          // 設置當前節點到下游的輸出流mirrorOut、下游到當前節點的輸入流mirrorIn等
          OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
              writeTimeout);
          InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
          DataEncryptionKeyFactory keyFactory =
            datanode.getDataEncryptionKeyFactoryForBlock(block);
          IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
            unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
          unbufMirrorOut = saslStreams.out;
          unbufMirrorIn = saslStreams.in;
          mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
              HdfsConstants.SMALL_BUFFER_SIZE));
          mirrorIn = new DataInputStream(unbufMirrorIn);

          // 向下遊節點發送創建管道的請求,將來將繼續使用mirrorOut做爲寫packet的輸出流
          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
              latestGenerationStamp, requestedChecksum, cachingStrategy, false);
          mirrorOut.flush();

          // 若是是客戶端發起的寫數據塊請求(知足),則存在管道,須要從下游節點讀取創建管道的ack
          if (isClient) {
            BlockOpResponseProto connectAck =
              BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
            // 將下游節點的管道創建結果做爲整個管道的創建結果(要麼從尾節點到頭結點都是成功的,要麼都是失敗的)
            mirrorInStatus = connectAck.getStatus();
            firstBadLink = connectAck.getFirstBadLink();
            if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
              LOG.info("Datanode " + targets.length +
                       " got response for connect ack " +
                       " from downstream datanode with firstbadlink as " +
                       firstBadLink);
            }
          }

        } catch (IOException e) {
          // 若是是客戶端發起的寫數據塊請求(知足),則當即向上遊發送狀態ERROR的ack(儘管沒法保證上游已收到)
          if (isClient) {
            BlockOpResponseProto.newBuilder()
              .setStatus(ERROR)
               // NB: Unconditionally using the xfer addr w/o hostname
              .setFirstBadLink(targets[0].getXferAddr())
              .build()
              .writeDelimitedTo(replyOut);
            replyOut.flush();
          }
          // 關閉下游的IO流,socket
          IOUtils.closeStream(mirrorOut);
          mirrorOut = null;
          IOUtils.closeStream(mirrorIn);
          mirrorIn = null;
          IOUtils.closeSocket(mirrorSock);
          mirrorSock = null;
          // 若是是客戶端發起的寫數據塊請求(知足),則從新拋出該異常
          // 而後,將跳到外層的catch塊
          if (isClient) {
            LOG.error(datanode + ":Exception transfering block " +
                      block + " to mirror " + mirrorNode + ": " + e);
            throw e;
          } else {
            LOG.info(datanode + ":Exception transfering " +
                     block + " to mirror " + mirrorNode +
                     "- continuing without the mirror", e);
          }
        }
      }
複製代碼

根據前文對管道創建過程的分析,此處要建立到與下游節點間的部分IO流、socket。

創建資源、發送管道創建請求的過程當中都有可能發生故障,拋出IOException及其子類。catch塊處理這些IOException的邏輯採用了方案2:先向上游節點發送ack告知ERROR,而後關閉到下游節點的IO流(mirrorOut、mirrorIn)、關閉到下游的socket(mirrorSock)。最後,從新拋出異常,以觸發外層的finally塊。

此處執行的清理是外層finally塊的子集,重點是多發送了一個ack,對該ack的處理留到PacketResponder線程的分析中。

不過,此時已經開始創建下游管道,再來看看因爲異常執行catch塊(外層finally塊的分析見上),對下游節點產生的惡果:

  • 初始化mirrorOut、mirrorIn、mirrorSock後,下游節點也經過DataXceiverServer創建了配套的IO流、socket等。
  • 這些IO流的本質是socket,關閉當前節點端的socket後,下游節點端的socket也會在一段時間後觸發超時關閉,並拋出SocketException(IOException的子類)。
  • 下游節點因爲socket關閉捕獲到了IOException,因而也執行此處的catch塊或外層的finally塊,重複一遍當前節點的流程。

如此,逐級關閉下游節點的管道,直到客戶端對管道關閉的異常做出處理。同時,因爲最終會執行外層finally塊,則也會逐級關閉上游節點的管道

IO流mirrorOut、mirrorIn實際上共享TCP套接字mirrorSock;in、out同理。但管子IO流時,除了底層socket,還要清理緩衝區等資源,所以,將它們分別列出是合理的。

管道寫:BlockReceiver#receiveBlock()

根據前文的分析,若是管道成功創建,則BlockReceiver#receiveBlock()開始接收packet並響應ack:

void receiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, DatanodeInfo[] downstreams, boolean isReplaceBlock) throws IOException {
    ...// 參數設置

    try {
      // 若是是客戶端發起的寫請求(此處即爲數據塊create),則啓動PacketResponder發送ack
      if (isClient && !isTransfer) {
        responder = new Daemon(datanode.threadGroup, 
            new PacketResponder(replyOut, mirrIn, downstreams));
        responder.start(); // start thread to processes responses
      }

      // 同步接收packet,寫block文件和meta文件
      while (receivePacket() >= 0) {}

      // 此時,節點已接收了全部packet,能夠等待發送完全部ack後關閉responder
      if (responder != null) {
        ((PacketResponder)responder.getRunnable()).close();
        responderClosed = true;
      }

      ...// 數據塊複製相關

    } catch (IOException ioe) {
      if (datanode.isRestarting()) {
        LOG.info("Shutting down for restart (" + block + ").");
      } else {
        LOG.info("Exception for " + block, ioe);
        throw ioe;
      }
    } finally {
      ...// 清理
    }
  }
複製代碼

仍舊分接收packet與響應ack兩部分討論。

同步接收packet:BlockReceiver#receivePacket()

根據前文的分析,BlockReceiver#receivePacket()負責接收上游的packet,並繼續向下遊節點管道寫:

private int receivePacket() throws IOException {
    // read the next packet
    packetReceiver.receiveNextPacket(in);

    PacketHeader header = packetReceiver.getHeader();
    ...// 略

    // 檢查packet頭
    if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
      throw new IOException("Received an out-of-sequence packet for " + block + 
          "from " + inAddr + " at offset " + header.getOffsetInBlock() +
          ". Expecting packet starting at " + replicaInfo.getNumBytes());
    }
    if (header.getDataLen() < 0) {
      throw new IOException("Got wrong length during writeBlock(" + block + 
                            ") from " + inAddr + " at offset " + 
                            header.getOffsetInBlock() + ": " +
                            header.getDataLen()); 
    }

    long offsetInBlock = header.getOffsetInBlock();
    long seqno = header.getSeqno();
    boolean lastPacketInBlock = header.isLastPacketInBlock();
    final int len = header.getDataLen();
    boolean syncBlock = header.getSyncBlock();

    ...// 略
    
    // 若是不須要當即持久化也不須要校驗收到的數據,則能夠當即委託PacketResponder線程返回 SUCCESS 的ack,而後再進行校驗和持久化
    if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
      ((PacketResponder) responder.getRunnable()).enqueue(seqno,
          lastPacketInBlock, offsetInBlock, Status.SUCCESS);
    }

    // 管道寫相關:將in中收到的packet鏡像寫入mirrorOut
    if (mirrorOut != null && !mirrorError) {
      try {
        long begin = Time.monotonicNow();
        packetReceiver.mirrorPacketTo(mirrorOut);
        mirrorOut.flush();
        long duration = Time.monotonicNow() - begin;
        if (duration > datanodeSlowLogThresholdMs) {
          LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
              + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
        }
      } catch (IOException e) {
        // 假設沒有發生中斷,則此處僅僅標記mirrorError = true
        handleMirrorOutError(e);
      }
    }
    
    ByteBuffer dataBuf = packetReceiver.getDataSlice();
    ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
    
    if (lastPacketInBlock || len == 0) {    // 收到空packet多是表示心跳或數據塊發送
      // 這兩種狀況均可以嘗試把以前的數據刷到磁盤
      if (syncBlock) {
        flushOrSync(true);
      }
    } else {    // 不然,須要持久化packet
      final int checksumLen = diskChecksum.getChecksumSize(len);
      final int checksumReceivedLen = checksumBuf.capacity();

      // packet頭有錯誤,直接拋出IOE
      if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) {
        throw new IOException("Invalid checksum length: received length is "
            + checksumReceivedLen + " but expected length is " + checksumLen);
      }

      // 若是是管道中的最後一個節點,則持久化以前,要先對收到的packet作一次校驗(使用packet自己的校驗機制)
      if (checksumReceivedLen > 0 && shouldVerifyChecksum()) {
        try {
          // 若是校驗失敗,拋出IOE
          verifyChunks(dataBuf, checksumBuf);
        } catch (IOException ioe) {
          // 若是校驗錯誤,則委託PacketResponder線程返回 ERROR_CHECKSUM 的ack
          if (responder != null) {
            try {
              ((PacketResponder) responder.getRunnable()).enqueue(seqno,
                  lastPacketInBlock, offsetInBlock,
                  Status.ERROR_CHECKSUM);
              // 等3s,指望PacketResponder線程能把全部ack都發送完(這樣就不須要從新發送那麼多packet了)
              Thread.sleep(3000);
            } catch (InterruptedException e) {
              // 不作處理,也不清理中斷標誌,僅僅中止sleep
            }
          }
          // 若是校驗錯誤,則認爲上游節點收到的packet也是錯誤的,直接拋出IOE
          throw new IOException("Terminating due to a checksum error." + ioe);
        }
 
        ...// checksum 翻譯相關
      }

      if (checksumReceivedLen == 0 && !streams.isTransientStorage()) {
        // checksum is missing, need to calculate it
        checksumBuf = ByteBuffer.allocate(checksumLen);
        diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
      }

      final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
          && streams.isTransientStorage();
      try {
        long onDiskLen = replicaInfo.getBytesOnDisk();
        if (onDiskLen<offsetInBlock) {
          ...// 若是校驗塊不完整,須要加載並調整舊的meta文件內容,供後續從新計算crc

          // 寫block文件
          int startByteToDisk = (int)(onDiskLen-firstByteInBlock) 
              + dataBuf.arrayOffset() + dataBuf.position();
          int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
          out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
          
          // 寫meta文件
          final byte[] lastCrc;
          if (shouldNotWriteChecksum) {
            lastCrc = null;
          } else if (partialCrc != null) {  // 若是是校驗塊不完整(以前收到過一部分)
            ...// 從新計算crc
            ...// 更新lastCrc
            checksumOut.write(buf);
            partialCrc = null;
          } else { // 若是校驗塊完整
            ...// 更新lastCrc
            checksumOut.write(checksumBuf.array(), offset, checksumLen);
          }

          ...//略
        }
      } catch (IOException iex) {
        // 異步檢查磁盤
        datanode.checkDiskErrorAsync();
        // 從新拋出IOE
        throw iex;
      }
    }

    // 相反的,若是須要當即持久化或須要校驗收到的數據,則如今已經完成了持久化和校驗,能夠委託PacketResponder線程返回 SUCCESS 的ack
    // if sync was requested, put in queue for pending acks here
    // (after the fsync finished)
    if (responder != null && (syncBlock || shouldVerifyChecksum())) {
      ((PacketResponder) responder.getRunnable()).enqueue(seqno,
          lastPacketInBlock, offsetInBlock, Status.SUCCESS);
    }

    ...// 若是超過了響應時間,還要主動發送一個IN_PROGRESS的ack,防止超時

    ...// 節流器相關
    
    // 當整個數據塊都發送完成以前,客戶端會可能會發送有數據的packet,也由於維持心跳或表示結束寫數據塊發送空packet
    // 所以,當標誌位lastPacketInBlock爲true時,不能返回0,要返回一個負值,以區分未到達最後一個packet以前的狀況
    return lastPacketInBlock?-1:len;
  }
  
  ...
  
  private boolean shouldVerifyChecksum() {
    // 對於客戶端寫,只有管道中的最後一個節點知足`mirrorOut == null`
    return (mirrorOut == null || isDatanode || needsChecksumTranslation);
  }
  
  ...
  
  private void handleMirrorOutError(IOException ioe) throws IOException {
    String bpid = block.getBlockPoolId();
    LOG.info(datanode.getDNRegistrationForBP(bpid)
        + ":Exception writing " + block + " to mirror " + mirrorAddr, ioe);
    if (Thread.interrupted()) { // 若是BlockReceiver線程被中斷了,則從新拋出IOE
      throw ioe;
    } else {    // 不然,僅僅標記下游節點錯誤,交給外層處理
      mirrorError = true;
    }
  }
複製代碼

對管道寫過程的分析要分尾節點與中間節點兩種狀況展開:

  • 若是是尾節點,則持久化以前,要先對收到的packet作一次校驗(使用packet自己的校驗機制)。若是校驗失敗,則委託PacketResponder線程發送ERROR_CHECKSUM狀態的ack,並再次拋出IOE。
  • 若是是中間節點,則只須要向下遊鏡像寫packet。假設在非中斷的狀況下發生異常,則僅僅標記mirrorError = true。這形成兩個影響:
    1. 後續包都不會再寫往下游節點,最終socket超時關閉,並逐級關閉上下游管道。
    2. 上游將經過ack得知下游發生了錯誤(見後)。

尾節點異常的處理仍是走方案1,中間節點同時走方案1與方案2。

異步發送ack:PacketResponder線程

根據前文的分析,PacketResponder線程負責接收下游節點的ack,並繼續向上遊管道響應:

public void run() {
      boolean lastPacketInBlock = false;
      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
      while (isRunning() && !lastPacketInBlock) {
        long totalAckTimeNanos = 0;
        boolean isInterrupted = false;
        try {
          Packet pkt = null;
          long expected = -2;
          PipelineAck ack = new PipelineAck();
          long seqno = PipelineAck.UNKOWN_SEQNO;
          long ackRecvNanoTime = 0;
          try {
            // 若是當前節點不是管道的最後一個節點,且下游節點正常,則從下游讀取ack
            if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
              ack.readFields(downstreamIn);
              ...// 統計相關
              // 若是下游狀態爲OOB,則繼續向上遊發送OOB
              Status oobStatus = ack.getOOBStatus();
              if (oobStatus != null) {
                LOG.info("Relaying an out of band ack of type " + oobStatus);
                sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
                    Status.SUCCESS);
                continue;
              }
              seqno = ack.getSeqno();
            }
            // 若是從下游節點收到了正常的 ack,或當前節點是管道的最後一個節點,則須要從隊列中消費pkt(即BlockReceiver#receivePacket()放入的ack)
            if (seqno != PipelineAck.UNKOWN_SEQNO
                || type == PacketResponderType.LAST_IN_PIPELINE) {
              pkt = waitForAckHead(seqno);
              if (!isRunning()) {
                break;
              }
              // 管道寫用seqno控制packet的順序:當且僅當下游正確接收的序號與當前節點正確處理完的序號相等時,當前節點才認爲該序號的packet已正確接收;上游同理
              expected = pkt.seqno;
              if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
                  && seqno != expected) {
                throw new IOException(myString + "seqno: expected=" + expected
                    + ", received=" + seqno);
              }
              ...// 統計相關
              lastPacketInBlock = pkt.lastPacketInBlock;
            }
          } catch (InterruptedException ine) {
            // 記錄異常標記,標誌當前InterruptedException
            isInterrupted = true;
          } catch (IOException ioe) {
            ...// 異常處理
            if (Thread.interrupted()) { // 若是發生了中斷(與本地變量isInterrupted區分),則記錄中斷標記
              isInterrupted = true;
            } else {
              // 這裏將全部異常都標記mirrorError = true不太合理,但影響不大
              mirrorError = true;
              LOG.info(myString, ioe);
            }
          }

          // 中斷退出
          if (Thread.interrupted() || isInterrupted) {
            LOG.info(myString + ": Thread is interrupted.");
            running = false;
            continue;
          }

          // 若是是最後一個packet,將block的狀態轉換爲FINALIZED,並關閉BlockReceiver
          if (lastPacketInBlock) {
            finalizeBlock(startTime);
          }

          // 此時,必然知足 ack.seqno == pkt.seqno,構造新的 ack 發送給上游
          sendAckUpstream(ack, expected, totalAckTimeNanos,
              (pkt != null ? pkt.offsetInBlock : 0), 
              (pkt != null ? pkt.ackStatus : Status.SUCCESS));
          // 已經處理完隊頭元素,出隊
          // 只有一種狀況下知足pkt == null:PacketResponder#isRunning()返回false,即PacketResponder線程正在關閉。此時不管隊列中是否有元素,都不須要出隊了
          if (pkt != null) {
            removeAckHead();
          }
        } catch (IOException e) {
          // 一旦發現IOE,若是不是由於中斷引發的,就中斷線程
          LOG.warn("IOException in BlockReceiver.run(): ", e);
          if (running) {
            datanode.checkDiskErrorAsync();
            LOG.info(myString, e);
            running = false;
            if (!Thread.interrupted()) { // failure not caused by interruption
              receiverThread.interrupt();
            }
          }
        } catch (Throwable e) {
          // 其餘異常則直接中斷
          if (running) {
            LOG.info(myString, e);
            running = false;
            receiverThread.interrupt();
          }
        }
      }
      LOG.info(myString + " terminating");
    }
    
    ...
    
    // PacketResponder#sendAckUpstream()封裝了PacketResponder#sendAckUpstreamUnprotected()
    private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, long totalAckTimeNanos, long offsetInBlock, Status myStatus) throws IOException {
      Status[] replies = null;
      if (ack == null) { // 發送OOB ack時,要求ack爲null,myStatus爲OOB。什麼破設計。。。
        replies = new Status[1];
        replies[0] = myStatus;
      } else if (mirrorError) { // 前面置爲true的mirrorError,在此處派上用場
        replies = MIRROR_ERROR_STATUS;
      } else {  // 不然,正常構造replies
        short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
            .getNumOfReplies();
        replies = new Status[1 + ackLen];
        replies[0] = myStatus;
        for (int i = 0; i < ackLen; i++) {
          replies[i + 1] = ack.getReply(i);
        }
        // 若是下游有ERROR_CHECKSUM,則拋出IOE,中斷當前節點的PacketResponder線程(結合後面的代碼,能保證從第一個ERROR_CHECKSUM節點開始,上游的全部節點都是ERROR_CHECKSUM的)
        if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
          throw new IOException("Shutting down writer and responder "
              + "since the down streams reported the data sent by this "
              + "thread is corrupt");
        }
      }
      
      // 構造replyAck,發送到上游
      PipelineAck replyAck = new PipelineAck(seqno, replies,
          totalAckTimeNanos);
      if (replyAck.isSuccess()
          && offsetInBlock > replicaInfo.getBytesAcked()) {
        replicaInfo.setBytesAcked(offsetInBlock);
      }
      long begin = Time.monotonicNow();
      replyAck.write(upstreamOut);
      upstreamOut.flush();
      long duration = Time.monotonicNow() - begin;
      if (duration > datanodeSlowLogThresholdMs) {
        LOG.warn("Slow PacketResponder send ack to upstream took " + duration
            + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
            + ", replyAck=" + replyAck);
      } else if (LOG.isDebugEnabled()) {
        LOG.debug(myString + ", replyAck=" + replyAck);
      }

      // 若是當前節點是ERROR_CHECKSUM狀態,則發送ack後,拋出IOE
      if (myStatus == Status.ERROR_CHECKSUM) {
        throw new IOException("Shutting down writer and responder "
            + "due to a checksum error in received data. The error "
            + "response has been sent upstream.");
      }
    }
複製代碼

對於OOB,還要關注PipelineAck#getOOBStatus():

public Status getOOBStatus() {
    // seqno不等於UNKOWN_SEQNO的話,就必定不是OOB狀態
    if (getSeqno() != UNKOWN_SEQNO) {
      return null;
    }
    // 有任何一個下游節點是OOB,則認爲下游管道是OOB狀態(固然,該機制保證從第一個OOB節點開始,在每一個節點查看ack時,都能發現下游有節點OOB)
    for (Status reply : proto.getStatusList()) {
      // The following check is valid because protobuf guarantees to
      // preserve the ordering of enum elements.
      if (reply.getNumber() >= OOB_START && reply.getNumber() <= OOB_END) {
        return reply;
      }
    }
    return null;
  }
複製代碼

與以前的分支相比,PacketResponder線程大量使用中斷來代替拋異常使線程終止。除此以外,關於OOB狀態與ERROR_CHECKSUM狀態的處理有些特殊:

  • OOB狀態:將第一個OOB節點的狀態,傳遞到客戶端。OOB是由datanode重啓引發的,所以,第一個OOB節點在發送OOB的ack後,就不會再發送其餘ack,最終因爲引發socket超時引發整個管道的關閉。
  • ERROR_CHECKSUM狀態:只有尾節點可能發出ERROR_CHECKSUM狀態的ack,發送後拋出IOE主動關閉PacketResponder線程而後上游節點收到ERROR_CHECKSUM狀態的ack後,也將拋出IOE關閉PacketResponder線程,但再也不發送ack;若是還有上游節點,將由於長期收不到ack,socket超時關閉。最終關閉整個管道。

須要注意的,OOB一般能保證傳遞到客戶端;但尾節點發送的ERROR_CHECKSUM沒法保證被上游節點發現(先發ack再拋IOE只是一種努力,不過一般能保證),若是多於兩個備份,則必定不會被客戶端發現。

猴子沒明白爲何此處要使用中斷使線程終止。

總結

儘管總覽中列出了兩種方案,但能夠看到,做爲異常處理的主要方式,主要仍是依靠方案1:拋異常關socket,而後逐級致使管道關閉。

關閉管道後,由客戶端決定後續處理,如數據塊恢復等。


本文連接:源碼|HDFS之DataNode:寫數據塊(3)
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。

相關文章
相關標籤/搜索