HA模式下全部對namespace的修改操做都會被nn優先記錄[HDFS-2874]到jnode集羣以便跟snn共享,而後再記錄到nn本地文件中。java
一般JournalSet中維護了兩類記錄edit的方式:node
兩種方式都是先將操做記錄寫入雙緩衝區EditsDoubleBuffer,當累積必定量後再批量發送(寫)出去。 apache
EditsDoubleBuffer雙緩衝由兩個TxnBuff實現:bufCurrent和bufReady,默認初始容量initBufferSize都是512kb。操做記錄直接寫入bufCurrent,一旦寫滿,FSEditLog就會調用logSync( )將緩存的操做記錄發送出去。緩存
step一、QuorumOutputStream 和 EditLogFileOutputStream 分別調用setReadyToFlush( )方法將各自EditsDoubleBuffer的兩個緩衝區互換,使當前正在接收寫入的緩衝區bufCurrent變成就緒的緩衝區,以便發送出去;同時使先前已經發送完畢的緩衝區bufReady開始接手新的操做記錄緩存。app
public void setReadyToFlush() { assert isFlushed() : "previous data not flushed yet"; TxnBuffer tmp = bufReady; bufReady = bufCurrent; bufCurrent = tmp; }
step二、QuorumOutputStream 和 EditLogFileOutputStream 分別[QuorumOutputStream 優先於EditLogFileOutputStream ]調用flush( )j方法將各自bufReady中就緒的緩存數據發送出去。QuorumOutputStream將操做記錄數據寫入jnode, EditLogFileOutputStream將其寫入本地的editlog文件。完成以後EditsDoubleBuffer會重置爲空緩衝區。dom
QuorumOutputStream: protected void flushAndSync(boolean durable) throws IOException { int numReadyBytes = buf.countReadyBytes(); if (numReadyBytes > 0) { int numReadyTxns = buf.countReadyTxns(); long firstTxToFlush = buf.getFirstReadyTxId(); assert numReadyTxns > 0; // Copy from our double-buffer into a new byte array. This is for // two reasons: // 1) The IPC code has no way of specifying to send only a slice of // a larger array. // 2) because the calls to the underlying nodes are asynchronous, we // need a defensive copy to avoid accidentally mutating the buffer // before it is sent. DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes); buf.flushTo(bufToSend); assert bufToSend.getLength() == numReadyBytes; byte[] data = bufToSend.getData(); assert data.length == bufToSend.getLength(); //將數據發送到各個jnode QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits( segmentTxId, firstTxToFlush, numReadyTxns, data); loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits"); // Since we successfully wrote this batch, let the loggers know. Any future // RPCs will thus let the loggers know of the most recent transaction, even // if a logger has fallen behind. loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1); } }
EditLogFileOutputStream: public EditLogFileOutputStream(Configuration conf, File name, int size) throws IOException { super(); shouldSyncWritesAndSkipFsync = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT); file = name; doubleBuf = new EditsDoubleBuffer(size); RandomAccessFile rp; if (shouldSyncWritesAndSkipFsync) { rp = new RandomAccessFile(name, "rws"); } else { rp = new RandomAccessFile(name, "rw"); } fp = new FileOutputStream(rp.getFD()); // open for append fc = rp.getChannel(); fc.position(fc.size()); } public void flushAndSync(boolean durable) throws IOException { if (fp == null) { throw new IOException("Trying to use aborted output stream"); } if (doubleBuf.isFlushed()) { LOG.info("Nothing to flush"); return; } preallocate(); // preallocate file if necessary doubleBuf.flushTo(fp); if (durable && !shouldSkipFsyncForTests && !shouldSyncWritesAndSkipFsync) { fc.force(false); // metadata updates not needed } }