本文主要研究一下claudb的exportRDBjava
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.javagit
public class DBServerState { //...... public void exportRDB(OutputStream output) throws IOException { RDBOutputStream rdb = new RDBOutputStream(output); rdb.preamble(RDB_VERSION); for (int i = 0; i < databases.size(); i++) { Database db = databases.get(i); if (!db.isEmpty()) { rdb.select(i); rdb.dabatase(db); } } rdb.end(); } //...... }
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/persistence/RDBOutputStream.javagithub
public class RDBOutputStream { private static final byte[] REDIS = safeString("REDIS").getBytes(); private static final int TTL_MILLISECONDS = 0xFC; private static final int END_OF_STREAM = 0xFF; private static final int SELECT = 0xFE; private final CheckedOutputStream out; public RDBOutputStream(OutputStream out) { super(); this.out = new CheckedOutputStream(out, new CRC64()); } public void preamble(int version) throws IOException { out.write(REDIS); out.write(version(version)); } private byte[] version(int version) { StringBuilder sb = new StringBuilder(String.valueOf(version)); for (int i = sb.length(); i < Integer.BYTES; i++) { sb.insert(0, '0'); } return sb.toString().getBytes(StandardCharsets.UTF_8); } public void select(int db) throws IOException { out.write(SELECT); length(db); } public void dabatase(Database db) throws IOException { for (Tuple2<DatabaseKey, DatabaseValue> entry : db.entrySet()) { value(entry.get1(), entry.get2()); } } private void value(DatabaseKey key, DatabaseValue value) throws IOException { expiredAt(value.getExpiredAt()); type(value.getType()); key(key); value(value); } private void expiredAt(Instant expiredAt) throws IOException { if (expiredAt != null) { out.write(TTL_MILLISECONDS); out.write(ByteUtils.toByteArray(expiredAt.toEpochMilli())); } } private void type(DataType type) throws IOException { out.write(type.ordinal()); } private void key(DatabaseKey key) throws IOException { string(key.getValue()); } private void value(DatabaseValue value) throws IOException { switch (value.getType()) { case STRING: string(value.getString()); break; case LIST: list(value.getList()); break; case HASH: hash(value.getHash()); break; case SET: set(value.getSet()); break; case ZSET: zset(value.getSortedSet()); break; default: break; } } private void length(int length) throws IOException { if (length < 0x40) { // 1 byte: 00XXXXXX out.write(length); } else if (length < 0x4000) { // 2 bytes: 01XXXXXX XXXXXXXX int b1 = length >> 8; int b2 = length & 0xFF; out.write(0x40 | b1); out.write(b2); } else { // 5 bytes: 10...... XXXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX out.write(0x80); out.write(toByteArray(length)); } } private void string(String value) throws IOException { string(safeString(value)); } private void string(SafeString value) throws IOException { byte[] bytes = value.getBytes(); length(bytes.length); out.write(bytes); } private void string(double value) throws IOException { string(String.valueOf(value)); } private void list(ImmutableList<SafeString> value) throws IOException { length(value.size()); for (SafeString item : value) { string(item); } } private void hash(ImmutableMap<SafeString, SafeString> value) throws IOException { length(value.size()); for (Tuple2<SafeString, SafeString> entry : value.entries()) { string(entry.get1()); string(entry.get2()); } } private void set(ImmutableSet<SafeString> value) throws IOException { length(value.size()); for (SafeString item : value) { string(item); } } private void zset(NavigableSet<Entry<Double, SafeString>> value) throws IOException { length(value.size()); for (Entry<Double, SafeString> item : value) { string(item.getValue()); string(item.getKey()); } } public void end() throws IOException { out.write(END_OF_STREAM); out.write(toByteArray(out.getChecksum().getValue())); out.flush(); } }
java.base/java/util/zip/CheckedOutputStream.javaredis
public class CheckedOutputStream extends FilterOutputStream { private Checksum cksum; /** * Creates an output stream with the specified Checksum. * @param out the output stream * @param cksum the checksum */ public CheckedOutputStream(OutputStream out, Checksum cksum) { super(out); this.cksum = cksum; } /** * Writes a byte. Will block until the byte is actually written. * @param b the byte to be written * @exception IOException if an I/O error has occurred */ public void write(int b) throws IOException { out.write(b); cksum.update(b); } /** * Writes an array of bytes. Will block until the bytes are * actually written. * @param b the data to be written * @param off the start offset of the data * @param len the number of bytes to be written * @exception IOException if an I/O error has occurred */ public void write(byte[] b, int off, int len) throws IOException { out.write(b, off, len); cksum.update(b, off, len); } /** * Returns the Checksum for this output stream. * @return the Checksum */ public Checksum getChecksum() { return cksum; } }
public class CRC64 implements Checksum { private static final int LOOKUPTABLE_SIZE = 256; private static final long POLY64REV = 0xC96C5795D7870F42L; private static final long LOOKUPTABLE[] = new long[LOOKUPTABLE_SIZE]; private long crc = -1; static { for (int b = 0; b < LOOKUPTABLE.length; ++b) { long r = b; for (int i = 0; i < Long.BYTES; ++i) { if ((r & 1) == 1) { r = (r >>> 1) ^ POLY64REV; } else { r >>>= 1; } } LOOKUPTABLE[b] = r; } } @Override public void update(int b) { crc = LOOKUPTABLE[((b & 0xFF) ^ (int) crc) & 0xFF] ^ (crc >>> 8); } @Override public void update(byte[] buf, int off, int len) { int end = off + len; while (off < end) { crc = LOOKUPTABLE[(buf[off++] ^ (int) crc) & 0xFF] ^ (crc >>> 8); } } @Override public long getValue() { return ~crc; } @Override public void reset() { crc = -1; } }
exportRDB方法先經過rdb.preamble(RDB_VERSION)寫入redis魔數及版本;而後遍歷databases,挨個執行rdb.select(i)寫入SELECT及db的長度,在執行rdb.dabatase(db),遍歷entry,挨個按expiredAt、type、key、value寫入數據;end方法寫入END_OF_STREAM,而後再寫入checksumide