java從零手寫實現redis(一)如何實現固定大小的緩存?java
java從零手寫實現redis(三)redis expire 過時原理git
java從零手寫實現redis(三)內存數據如何重啓不丟失?github
java從零手寫實現redis(五)過時策略的另外一種實現思路spring
咱們前面簡單實現了 redis 的幾個特性,java從零手寫實現redis(三)內存數據如何重啓不丟失? 中實現了相似 redis 的 RDB 模式。json
AOF 模式的性能特別好,有多好呢?緩存
用過 kafka 的同窗確定知道,kafka 也用到了順序寫這個特性。安全
順序寫添加文件內容,避免了文件 IO 的隨機寫問題,性能基本能夠和內存媲美。多線程
AOF 的實時性更好,這個是相對於 RDB 模式而言的。
咱們原來使用 RDB 模式,將緩存內容所有持久化,這個是比較耗時的動做,通常是幾分鐘持久化一次。
AOF 模式主要是針對修改內容的指令,而後將全部的指令順序添加到文件中。這樣的話,實時性會好不少,能夠提高到秒級別,甚至秒級別。
AOF 模式能夠每次操做都進行持久化,可是這樣會致使吞吐量大大降低。
提高吞吐量最經常使用的方式就是批量,這個 kafka 中也是相似的,好比咱們能夠 1s 持久化一次,將 1s 內的操做所有放入 buffer 中。
這裏其實就是一個 trade-off 問題,實時性與吞吐量的平衡藝術。
實際業務中,1s 的偏差通常都是能夠接受的,因此這個也是業界比較承認的方式。
kafka 中全部的操做實際上都是異步+回調的方式實現的。
異步+多線程,確實能夠提高操做的性能。
固然 redis 6 之前,其實一直是單線程的。那爲何性能依然這麼好呢?
其實多線程也有代價,那就是線程上下文的切換是須要耗時的,保持併發的安全問題,也須要加鎖,從而下降性能。
因此這裏要考慮異步的收益,與付出的耗時是否成正比的問題。
咱們 AOF 與 RDB 模式,歸根結底都是基於操做系統的文件系統作持久化的。
對於開發者而言,可能就是調用一個 api 就實現了,可是實際持久化落盤的動做並不見得就是一步完成的。
文件系統爲了提高吞吐量,也會採用相似 buffer 的方式。這突然有一點俄羅斯套娃的味道。
可是優秀的設計老是類似的,好比說緩存從 cpu 的設計中就有 L1/L2 等等,思路是一致的。
阿里的不少開源技術,都會針對操做系統的落盤作進一步的優化,這個咱們後續作深刻學習。
大道缺一,沒有銀彈。
AOF 千好萬好,和 RDB 對比也存在一個缺陷,那就是指令
接口和 rdb 的保持一致
/** * 持久化緩存接口 * @author binbin.hou * @since 0.0.7 * @param <K> key * @param <V> value */ public interface ICachePersist<K, V> { /** * 持久化緩存信息 * @param cache 緩存 * @since 0.0.7 */ void persist(final ICache<K, V> cache); }
爲了和耗時統計,刷新等特性保持一致,對於操做類的動做才添加到文件中(append to file)咱們也基於註解屬性來指定,而不是固定寫死在代碼中,便於後期拓展調整。
/** * 緩存攔截器 * @author binbin.hou * @since 0.0.5 */ @Documented @Inherited @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface CacheInterceptor { /** * 操做是否須要 append to file,默認爲 false * 主要針對 cache 內容有變動的操做,不包括查詢操做。 * 包括刪除,添加,過時等操做。 * @return 是否 * @since 0.0.10 */ boolean aof() default false; }
咱們在原來的 @CacheInterceptor
註解中添加 aof 屬性,用於指定是否對操做開啓 aof 模式。
咱們在會對數據形成變動的方法上指定這個註解屬性:
相似於 spring 的事務攔截器,咱們使用代理類調用 expireAt。
expire 方法就不須要添加 aof 攔截了。
/** * 設置過時時間 * @param key key * @param timeInMills 毫秒時間以後過時 * @return this */ @Override @CacheInterceptor public ICache<K, V> expire(K key, long timeInMills) { long expireTime = System.currentTimeMillis() + timeInMills; // 使用代理調用 Cache<K,V> cachePoxy = (Cache<K, V>) CacheProxy.getProxy(this); return cachePoxy.expireAt(key, expireTime); } /** * 指定過時信息 * @param key key * @param timeInMills 時間戳 * @return this */ @Override @CacheInterceptor(aof = true) public ICache<K, V> expireAt(K key, long timeInMills) { this.expire.expire(key, timeInMills); return this; }
@Override @CacheInterceptor(aof = true) public V put(K key, V value) { //1.1 嘗試驅除 CacheEvictContext<K,V> context = new CacheEvictContext<>(); context.key(key).size(sizeLimit).cache(this); boolean evictResult = evict.evict(context); if(evictResult) { // 執行淘汰監聽器 ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(value).type(CacheRemoveType.EVICT.code()); for(ICacheRemoveListener<K,V> listener : this.removeListeners) { listener.listen(removeListenerContext); } } //2. 判斷驅除後的信息 if(isSizeLimit()) { throw new CacheRuntimeException("當前隊列已滿,數據添加失敗!"); } //3. 執行添加 return map.put(key, value); } @Override @CacheInterceptor(aof = true) public V remove(Object key) { return map.remove(key); } @Override @CacheInterceptor(aof = true) public void putAll(Map<? extends K, ? extends V> m) { map.putAll(m); } @Override @CacheInterceptor(refresh = true, aof = true) public void clear() { map.clear(); }
/** * AOF 持久化明細 * @author binbin.hou * @since 0.0.10 */ public class PersistAofEntry { /** * 參數信息 * @since 0.0.10 */ private Object[] params; /** * 方法名稱 * @since 0.0.10 */ private String methodName; //getter & setter &toString }
這裏咱們只須要方法名,和參數對象。
暫時實現的簡單一些便可。
咱們定義攔截器,當 cache 中定義的持久化類爲 CachePersistAof
時,將操做的信息放入到 CachePersistAof 的 buffer 列表中。
public class CacheInterceptorAof<K,V> implements ICacheInterceptor<K, V> { private static final Log log = LogFactory.getLog(CacheInterceptorAof.class); @Override public void before(ICacheInterceptorContext<K,V> context) { } @Override public void after(ICacheInterceptorContext<K,V> context) { // 持久化類 ICache<K,V> cache = context.cache(); ICachePersist<K,V> persist = cache.persist(); if(persist instanceof CachePersistAof) { CachePersistAof<K,V> cachePersistAof = (CachePersistAof<K,V>) persist; String methodName = context.method().getName(); PersistAofEntry aofEntry = PersistAofEntry.newInstance(); aofEntry.setMethodName(methodName); aofEntry.setParams(context.params()); String json = JSON.toJSONString(aofEntry); // 直接持久化 log.debug("AOF 開始追加文件內容:{}", json); cachePersistAof.append(json); log.debug("AOF 完成追加文件內容:{}", json); } } }
當 AOF 的註解屬性爲 true 時,調用上述攔截器便可。
這裏爲了不浪費,只有當持久化類爲 AOF 模式時,才進行調用。
//3. AOF 追加 final ICachePersist cachePersist = cache.persist(); if(cacheInterceptor.aof() && (cachePersist instanceof CachePersistAof)) { if(before) { persistInterceptors.before(interceptorContext); } else { persistInterceptors.after(interceptorContext); } }
這裏的 AOF 模式和之前的 RDB 持久化類只是不一樣的模式,實際上兩者是相同的接口。
這裏咱們統必定義了不一樣的持久化類的時間,便於 RDB 與 AOF 不一樣任務的不一樣時間間隔觸發。
public interface ICachePersist<K, V> { /** * 持久化緩存信息 * @param cache 緩存 * @since 0.0.7 */ void persist(final ICache<K, V> cache); /** * 延遲時間 * @return 延遲 * @since 0.0.10 */ long delay(); /** * 時間間隔 * @return 間隔 * @since 0.0.10 */ long period(); /** * 時間單位 * @return 時間單位 * @since 0.0.10 */ TimeUnit timeUnit(); }
實現一個 Buffer 列表,用於每次攔截器直接順序添加。
持久化的實現也比較簡單,追加到文件以後,直接清空 buffer 列表便可。
/** * 緩存持久化-AOF 持久化模式 * @author binbin.hou * @since 0.0.10 */ public class CachePersistAof<K,V> extends CachePersistAdaptor<K,V> { private static final Log log = LogFactory.getLog(CachePersistAof.class); /** * 緩存列表 * @since 0.0.10 */ private final List<String> bufferList = new ArrayList<>(); /** * 數據持久化路徑 * @since 0.0.10 */ private final String dbPath; public CachePersistAof(String dbPath) { this.dbPath = dbPath; } /** * 持久化 * key長度 key+value * 第一個空格,獲取 key 的長度,而後截取 * @param cache 緩存 */ @Override public void persist(ICache<K, V> cache) { log.info("開始 AOF 持久化到文件"); // 1. 建立文件 if(!FileUtil.exists(dbPath)) { FileUtil.createFile(dbPath); } // 2. 持久化追加到文件中 FileUtil.append(dbPath, bufferList); // 3. 清空 buffer 列表 bufferList.clear(); log.info("完成 AOF 持久化到文件"); } @Override public long delay() { return 1; } @Override public long period() { return 1; } @Override public TimeUnit timeUnit() { return TimeUnit.SECONDS; } /** * 添加文件內容到 buffer 列表中 * @param json json 信息 * @since 0.0.10 */ public void append(final String json) { if(StringUtil.isNotEmpty(json)) { bufferList.add(json); } } }
ICache<String, String> cache = CacheBs.<String,String>newInstance() .persist(CachePersists.<String, String>aof("1.aof")) .build(); cache.put("1", "1"); cache.expire("1", 10); cache.remove("2"); TimeUnit.SECONDS.sleep(1);
expire 實際上調用的是 expireAt。
[DEBUG] [2020-10-02 12:20:41.979] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 開始追加文件內容:{"methodName":"put","params":["1","1"]} [DEBUG] [2020-10-02 12:20:41.980] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件內容:{"methodName":"put","params":["1","1"]} [DEBUG] [2020-10-02 12:20:41.982] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 開始追加文件內容:{"methodName":"expireAt","params":["1",1601612441990]} [DEBUG] [2020-10-02 12:20:41.982] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件內容:{"methodName":"expireAt","params":["1",1601612441990]} [DEBUG] [2020-10-02 12:20:41.984] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 開始追加文件內容:{"methodName":"remove","params":["2"]} [DEBUG] [2020-10-02 12:20:41.984] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件內容:{"methodName":"remove","params":["2"]} [DEBUG] [2020-10-02 12:20:42.088] [pool-1-thread-1] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 1, value: 1, type: expire [INFO] [2020-10-02 12:20:42.789] [pool-2-thread-1] [c.g.h.c.c.s.p.InnerCachePersist.run] - 開始持久化緩存信息 [INFO] [2020-10-02 12:20:42.789] [pool-2-thread-1] [c.g.h.c.c.s.p.CachePersistAof.persist] - 開始 AOF 持久化到文件 [INFO] [2020-10-02 12:20:42.798] [pool-2-thread-1] [c.g.h.c.c.s.p.CachePersistAof.persist] - 完成 AOF 持久化到文件 [INFO] [2020-10-02 12:20:42.799] [pool-2-thread-1] [c.g.h.c.c.s.p.InnerCachePersist.run] - 完成持久化緩存信息
1.aof
的文件內容以下
{"methodName":"put","params":["1","1"]} {"methodName":"expireAt","params":["1",1601612441990]} {"methodName":"remove","params":["2"]}
將每一次的操做,簡單的存儲到文件中。
相似於 RDB 的加載模式,aof 的加載模式也是相似的。
咱們須要根據文件的內容,還原之前的緩存的內容。
實現思路:遍歷文件內容,反射調用原來的方法。
@Override public void load(ICache<K, V> cache) { List<String> lines = FileUtil.readAllLines(dbPath); log.info("[load] 開始處理 path: {}", dbPath); if(CollectionUtil.isEmpty(lines)) { log.info("[load] path: {} 文件內容爲空,直接返回", dbPath); return; } for(String line : lines) { if(StringUtil.isEmpty(line)) { continue; } // 執行 // 簡單的類型還行,複雜的這種反序列化會失敗 PersistAofEntry entry = JSON.parseObject(line, PersistAofEntry.class); final String methodName = entry.getMethodName(); final Object[] objects = entry.getParams(); final Method method = METHOD_MAP.get(methodName); // 反射調用 ReflectMethodUtil.invoke(cache, method, objects); } }
Method 反射是固定的,爲了提高性能,咱們作一下預處理。
/** * 方法緩存 * * 暫時比較簡單,直接經過方法判斷便可,沒必要引入參數類型增長複雜度。 * @since 0.0.10 */ private static final Map<String, Method> METHOD_MAP = new HashMap<>(); static { Method[] methods = Cache.class.getMethods(); for(Method method : methods){ CacheInterceptor cacheInterceptor = method.getAnnotation(CacheInterceptor.class); if(cacheInterceptor != null) { // 暫時 if(cacheInterceptor.aof()) { String methodName = method.getName(); METHOD_MAP.put(methodName, method); } } } }
{"methodName":"put","params":["1","1"]}
ICache<String, String> cache = CacheBs.<String,String>newInstance() .load(CacheLoads.<String, String>aof("default.aof")) .build(); Assert.assertEquals(1, cache.size()); System.out.println(cache.keySet());
直接將 default.aof 文件加載到 cache 緩存中。
redis 的文件持久化,實際上更加豐富。
能夠支持 rdb 和 aof 兩種模式混合使用。
aof 模式的文件體積會很是大,redis 爲了解決這個問題,會定時對命令進行壓縮處理。
能夠理解爲 aof 就是一個操做流水錶,咱們實際上關心的只是一個終態,不論中間通過了多少步驟,咱們只關心最後的值。
文中主要講述了思路,實現部分由於篇幅限制,沒有所有貼出來。
開源地址: https://github.com/houbb/cache
以爲本文對你有幫助的話,歡迎點贊評論收藏關注一波~
你的鼓勵,是我最大的動力~