FSEditLog之edit 記錄過程

HA模式下全部對namespace的修改操做都會被nn優先記錄[HDFS-2874]到jnode集羣以便跟snn共享,而後再記錄到nn本地文件中。java

edit 記錄過程

一般JournalSet中維護了兩類記錄edit的方式:node

  1. 經過QuorumOutputStream 記錄到全部的jnode中,圖中的4-->5
  2. 經過EditLogFileOutputStream記錄到nn本地文件中,圖中的6-->7

兩種方式都是先將操做記錄寫入雙緩衝區EditsDoubleBuffer,當累積必定量後再批量發送(寫)出去。 同步過程apache

EditsDoubleBuffer雙緩衝由兩個TxnBuff實現:bufCurrent和bufReady,默認初始容量initBufferSize都是512kb。操做記錄直接寫入bufCurrent,一旦寫滿,FSEditLog就會調用logSync( )將緩存的操做記錄發送出去。緩存

step一、QuorumOutputStream 和 EditLogFileOutputStream 分別調用setReadyToFlush( )方法將各自EditsDoubleBuffer的兩個緩衝區互換,使當前正在接收寫入的緩衝區bufCurrent變成就緒的緩衝區,以便發送出去;同時使先前已經發送完畢的緩衝區bufReady開始接手新的操做記錄緩存。app

public void setReadyToFlush() {
    assert isFlushed() : "previous data not flushed yet";
    TxnBuffer tmp = bufReady;
    bufReady = bufCurrent;
    bufCurrent = tmp;
  }

step二、QuorumOutputStream 和 EditLogFileOutputStream 分別[QuorumOutputStream 優先於EditLogFileOutputStream ]調用flush( )j方法將各自bufReady中就緒的緩存數據發送出去。QuorumOutputStream將操做記錄數據寫入jnode, EditLogFileOutputStream將其寫入本地的editlog文件。完成以後EditsDoubleBuffer會重置爲空緩衝區。dom

QuorumOutputStream:

  protected void flushAndSync(boolean durable) throws IOException {
    int numReadyBytes = buf.countReadyBytes();
    if (numReadyBytes > 0) {
      int numReadyTxns = buf.countReadyTxns();
      long firstTxToFlush = buf.getFirstReadyTxId();

      assert numReadyTxns > 0;

      // Copy from our double-buffer into a new byte array. This is for
      // two reasons:
      // 1) The IPC code has no way of specifying to send only a slice of
      //    a larger array.
      // 2) because the calls to the underlying nodes are asynchronous, we
      //    need a defensive copy to avoid accidentally mutating the buffer
      //    before it is sent.
      DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes);
      buf.flushTo(bufToSend);
      assert bufToSend.getLength() == numReadyBytes;
      byte[] data = bufToSend.getData();
      assert data.length == bufToSend.getLength();
      //將數據發送到各個jnode
      QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
          segmentTxId, firstTxToFlush,
          numReadyTxns, data);
      loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits");
      
      // Since we successfully wrote this batch, let the loggers know. Any future
      // RPCs will thus let the loggers know of the most recent transaction, even
      // if a logger has fallen behind.
      loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1);
    }
  }
EditLogFileOutputStream:

public EditLogFileOutputStream(Configuration conf, File name, int size)
      throws IOException {
    super();
    shouldSyncWritesAndSkipFsync = conf.getBoolean(
            DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH,
            DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT);

    file = name;
    doubleBuf = new EditsDoubleBuffer(size);
    RandomAccessFile rp;
    if (shouldSyncWritesAndSkipFsync) {
      rp = new RandomAccessFile(name, "rws");
    } else {
      rp = new RandomAccessFile(name, "rw");
    }
    fp = new FileOutputStream(rp.getFD()); // open for append
    fc = rp.getChannel();
    fc.position(fc.size());
  }

 public void flushAndSync(boolean durable) throws IOException {
    if (fp == null) {
      throw new IOException("Trying to use aborted output stream");
    }
    if (doubleBuf.isFlushed()) {
      LOG.info("Nothing to flush");
      return;
    }
    preallocate(); // preallocate file if necessary
    doubleBuf.flushTo(fp);
    if (durable && !shouldSkipFsyncForTests && !shouldSyncWritesAndSkipFsync) {
      fc.force(false); // metadata updates not needed
    }
  }

操做類型

操做記錄輸出流

EditsDoubleBuffer

相關文章
相關標籤/搜索