zookeeper爲了防止,系統宕機或重啓致使的數據丟失,會對數據進行定時持久化。有兩種持久化方式:java
1.爲每次事務操做記錄到日誌文件,這樣就能夠經過執行這些日誌文件來恢復數據。服務器
2.爲了加快ZooKeeper恢復的速度,ZooKeeper還提供了對樹結構和session信息進行數據快照持久化的操做。session
日誌文件app
日誌文件記錄zookeeper服務器上的每一次事務操做。ide
日誌文件格式:log.ZXID,ZXID很是重要,它表示該文件起始的事務id。this
數據快照spa
數據快照用來記錄zookeeper服務器上某一時刻的全量內存數據內容,並寫入指定磁盤文件中。rest
數據快照文件格式:snapshot.ZXID,ZXID很是重要,ZooKeeper會根據ZXID來肯定數據恢復的起始點。日誌
鏡像文件主要存儲zookeeper的樹結構和session信息。code
類圖
FileTxnSnapLog
是操做數據持久化的核心類,底層經過TxnLog和SnapShot來分別操做日誌文件和數據快照。
存儲數據快照
public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts) throws IOException { long lastZxid = dataTree.lastProcessedZxid; LOG.info("Snapshotting: " + Long.toHexString(lastZxid)); File snapshot=new File( snapDir, Util.makeSnapshotName(lastZxid)); snapLog.serialize(dataTree, sessionsWithTimeouts, snapshot); }
日誌文件操做
public boolean append(Request si) throws IOException { return txnLog.append(si.hdr, si.txn); } public void commit() throws IOException { txnLog.commit(); } public void rollLog() throws IOException { txnLog.rollLog(); }
數據恢復
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; while (true) { hdr = itr.getHeader(); ...if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error(highestZxid + "(higestZxid) > " + hdr.getZxid() + "(next log) for type " + hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage()); } if (!itr.next()) break; } return highestZxid; }
FileTxnLog
負責維護事務日誌對外的接口,包括事務日誌的寫入和讀取等。
寫入事務日誌
1.若是日誌文件打開,使用該日誌文件;若是沒有,使用該事務的zxid作爲後綴,建立新的日誌文件。
2.若是當前日誌文件剩餘空間不足4kb,對日誌文件擴容到64mb,使用0來填充。預分配的緣由是提升io效率。
3.對事務的頭和事務體序列號
4.生成checksum
5.寫入文件流。
public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { if (hdr != null) { ... if (logStream==null) { ... logFileWrite = new File(logDir, ("log." + Long.toHexString(hdr.getZxid()))); fos = new FileOutputStream(logFileWrite); logStream=new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); fhdr.serialize(oa, "fileheader"); // Make sure that the magic number is written before padding. logStream.flush(); currentSize = fos.getChannel().position(); streamsToFlush.add(fos); } padFile(fos); byte[] buf = Util.marshallTxnEntry(hdr, txn); ... Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf); return true; } return false; }
持久化本質是將內存中對象數據以二進制的方式存儲到磁盤上,這個過程,底層經過jute來序列號。
序列化和反序列化的本質就是數據流與對象數據之間的變換。jute的序列化理念是讓須要序列化的對象本身定義序列化協議。因此使用jute進行序列化的對象須要實現Record接口,該接口須要對象實現序列化和反序列化方法。此外jute還對序列化的流進行了抽象,OutputArchive表明輸入流,InputArchive表明輸入流,各類類型流的讀寫經過實現這兩個接口實現。經過實現Record接口,對象定義序列化和反序列化的協議;經過實現OutputArchive和InputArchive,實現數據存儲和讀取。
Record代碼:
1 public interface Record { 2 public void serialize(OutputArchive archive, String tag) 3 throws IOException; 4 public void deserialize(InputArchive archive, String tag) 5 throws IOException; 6 }
OutputArchive代碼:
1 public interface OutputArchive { 2 public void writeByte(byte b, String tag) throws IOException; 3 public void writeBool(boolean b, String tag) throws IOException; 4 public void writeInt(int i, String tag) throws IOException; 5 public void writeLong(long l, String tag) throws IOException; 6 public void writeFloat(float f, String tag) throws IOException; 7 public void writeDouble(double d, String tag) throws IOException; 8 public void writeString(String s, String tag) throws IOException; 9 public void writeBuffer(byte buf[], String tag) 10 throws IOException; 11 public void writeRecord(Record r, String tag) throws IOException; 12 public void startRecord(Record r, String tag) throws IOException; 13 public void endRecord(Record r, String tag) throws IOException; 14 public void startVector(List v, String tag) throws IOException; 15 public void endVector(List v, String tag) throws IOException; 16 public void startMap(TreeMap v, String tag) throws IOException; 17 public void endMap(TreeMap v, String tag) throws IOException; 18 19 }
InputArchive代碼:
1 public interface InputArchive { 2 public byte readByte(String tag) throws IOException; 3 public boolean readBool(String tag) throws IOException; 4 public int readInt(String tag) throws IOException; 5 public long readLong(String tag) throws IOException; 6 public float readFloat(String tag) throws IOException; 7 public double readDouble(String tag) throws IOException; 8 public String readString(String tag) throws IOException; 9 public byte[] readBuffer(String tag) throws IOException; 10 public void readRecord(Record r, String tag) throws IOException; 11 public void startRecord(String tag) throws IOException; 12 public void endRecord(String tag) throws IOException; 13 public Index startVector(String tag) throws IOException; 14 public void endVector(String tag) throws IOException; 15 public Index startMap(String tag) throws IOException; 16 public void endMap(String tag) throws IOException; 17 }
例如對FileHeader實現序列化,分別在serialize和deserialize方法中定義序列化協議,而後調用相應方法就能夠將該對象序列化和反序列化。
1 public class FileHeader implements Record { 2 private int magic; 3 private int version; 4 private long dbid; 5 public void serialize(OutputArchive a_, String tag) throws java.io.IOException { 6 a_.startRecord(this,tag); 7 a_.writeInt(magic,"magic"); 8 a_.writeInt(version,"version"); 9 a_.writeLong(dbid,"dbid"); 10 a_.endRecord(this,tag); 11 } 12 public void deserialize(InputArchive a_, String tag) throws java.io.IOException { 13 a_.startRecord(tag); 14 magic=a_.readInt("magic"); 15 version=a_.readInt("version"); 16 dbid=a_.readLong("dbid"); 17 a_.endRecord(tag); 18 } 19 }
具體對象會序列化爲何樣的數據形式以及從什麼樣數據形式中反序列化,取決於OutputArchive和InputArchive的實現。
二進制數據流實現:
BinaryOutputArchive:
1 public class BinaryOutputArchive implements OutputArchive { 2 private ByteBuffer bb = ByteBuffer.allocate(1024); 3 private DataOutput out; 4 public static BinaryOutputArchive getArchive(OutputStream strm) { 5 return new BinaryOutputArchive(new DataOutputStream(strm)); 6 } 7 public BinaryOutputArchive(DataOutput out) { 8 this.out = out; 9 } 10 public void writeByte(byte b, String tag) throws IOException { 11 out.writeByte(b); 12 } 13 public void writeBool(boolean b, String tag) throws IOException { 14 out.writeBoolean(b); 15 } 16 public void writeInt(int i, String tag) throws IOException { 17 out.writeInt(i); 18 } 19 public void writeLong(long l, String tag) throws IOException { 20 out.writeLong(l); 21 } 22 public void writeFloat(float f, String tag) throws IOException { 23 out.writeFloat(f); 24 } 25 public void writeDouble(double d, String tag) throws IOException { 26 out.writeDouble(d); 27 } 28 29 /** 30 * create our own char encoder to utf8. This is faster 31 * then string.getbytes(UTF8). 32 * @param s the string to encode into utf8 33 * @return utf8 byte sequence. 34 */ 35 final private ByteBuffer stringToByteBuffer(CharSequence s) { 36 bb.clear(); 37 final int len = s.length(); 38 for (int i = 0; i < len; i++) { 39 if (bb.remaining() < 3) { 40 ByteBuffer n = ByteBuffer.allocate(bb.capacity() << 1); 41 bb.flip(); 42 n.put(bb); 43 bb = n; 44 } 45 char c = s.charAt(i); 46 if (c < 0x80) { 47 bb.put((byte) c); 48 } else if (c < 0x800) { 49 bb.put((byte) (0xc0 | (c >> 6))); 50 bb.put((byte) (0x80 | (c & 0x3f))); 51 } else { 52 bb.put((byte) (0xe0 | (c >> 12))); 53 bb.put((byte) (0x80 | ((c >> 6) & 0x3f))); 54 bb.put((byte) (0x80 | (c & 0x3f))); 55 } 56 } 57 bb.flip(); 58 return bb; 59 } 60 61 public void writeString(String s, String tag) throws IOException { 62 if (s == null) { 63 writeInt(-1, "len"); 64 return; 65 } 66 ByteBuffer bb = stringToByteBuffer(s); 67 writeInt(bb.remaining(), "len"); 68 out.write(bb.array(), bb.position(), bb.limit()); 69 } 70 71 public void writeBuffer(byte barr[], String tag) 72 throws IOException { 73 if (barr == null) { 74 out.writeInt(-1); 75 return; 76 } 77 out.writeInt(barr.length); 78 out.write(barr); 79 } 80 81 public void writeRecord(Record r, String tag) throws IOException { 82 r.serialize(this, tag); 83 } 84 public void startRecord(Record r, String tag) throws IOException {} 85 86 public void endRecord(Record r, String tag) throws IOException {} 87 88 public void startVector(List v, String tag) throws IOException { 89 if (v == null) { 90 writeInt(-1, tag); 91 return; 92 } 93 writeInt(v.size(), tag); 94 } 95 public void endVector(List v, String tag) throws IOException {} 96 97 public void startMap(TreeMap v, String tag) throws IOException { 98 writeInt(v.size(), tag); 99 } 100 public void endMap(TreeMap v, String tag) throws IOException {} 101 }
BinaryInputArchive:
1 public class BinaryInputArchive implements InputArchive { 2 3 private DataInput in; 4 5 static public BinaryInputArchive getArchive(InputStream strm) { 6 return new BinaryInputArchive(new DataInputStream(strm)); 7 } 8 9 static private class BinaryIndex implements Index { 10 private int nelems; 11 BinaryIndex(int nelems) { 12 this.nelems = nelems; 13 } 14 public boolean done() { 15 return (nelems <= 0); 16 } 17 public void incr() { 18 nelems--; 19 } 20 } 21 /** Creates a new instance of BinaryInputArchive */ 22 public BinaryInputArchive(DataInput in) { 23 this.in = in; 24 } 25 26 public byte readByte(String tag) throws IOException { 27 return in.readByte(); 28 } 29 30 public boolean readBool(String tag) throws IOException { 31 return in.readBoolean(); 32 } 33 34 public int readInt(String tag) throws IOException { 35 return in.readInt(); 36 } 37 38 public long readLong(String tag) throws IOException { 39 return in.readLong(); 40 } 41 42 public float readFloat(String tag) throws IOException { 43 return in.readFloat(); 44 } 45 46 public double readDouble(String tag) throws IOException { 47 return in.readDouble(); 48 } 49 50 public String readString(String tag) throws IOException { 51 int len = in.readInt(); 52 if (len == -1) return null; 53 byte b[] = new byte[len]; 54 in.readFully(b); 55 return new String(b, "UTF8"); 56 } 57 58 static public final int maxBuffer = determineMaxBuffer(); 59 private static int determineMaxBuffer() { 60 String maxBufferString = System.getProperty("jute.maxbuffer"); 61 try { 62 return Integer.parseInt(maxBufferString); 63 } catch(Exception e) { 64 return 0xfffff; 65 } 66 67 } 68 public byte[] readBuffer(String tag) throws IOException { 69 int len = readInt(tag); 70 if (len == -1) return null; 71 if (len < 0 || len > maxBuffer) { 72 throw new IOException("Unreasonable length = " + len); 73 } 74 byte[] arr = new byte[len]; 75 in.readFully(arr); 76 return arr; 77 } 78 79 public void readRecord(Record r, String tag) throws IOException { 80 r.deserialize(this, tag); 81 } 82 83 public void startRecord(String tag) throws IOException {} 84 85 public void endRecord(String tag) throws IOException {} 86 87 public Index startVector(String tag) throws IOException { 88 int len = readInt(tag); 89 if (len == -1) { 90 return null; 91 } 92 return new BinaryIndex(len); 93 } 94 95 public void endVector(String tag) throws IOException {} 96 97 public Index startMap(String tag) throws IOException { 98 return new BinaryIndex(readInt(tag)); 99 } 100 101 public void endMap(String tag) throws IOException {} 102 103 }
其餘的實現還有,cvs文件(CsvInputArchive,CsvOutputArchive);xml文件(XmlInputArchive,XmlOutputArchive)。