在互聯網項目中,通常以堆內緩存的使用居多,不管是Guava,Memcache,仍是JDK自帶的HashMap,ConcurrentHashMap等,都是在堆內內存中作數據計算操做。這樣作的好處顯而易見,用戶徹底沒必要在乎數據的分配,溢出,回收等操做,所有交由JVM來進行處理。因爲JVM提供了諸多的垃圾回收算法,能夠保證在不影響甚至微影響系統的前提下,作到堆內內存接近完美的管控。君不見,小如圖書管理這樣的系統,大如整個電商交易平臺,都在JVM的加持下,服務於幾個,十幾個,乃至於上億用戶,而在這些系統中,堆內緩存組件所帶來的收益但是居功至偉。在自下而上的互聯網架構中,堆內緩存就像把衛這宮廷入口的劍士,神聖而莊嚴,真可謂誰敢橫刀立馬,惟我堆內緩存將軍。html
堆內緩存的劣勢java
可是,事物都是有兩面性的,堆內緩存在JVM的管理下,縱然無可挑剔,可是在GC過程當中產生的程序小停頓和程序大停頓,則像一把利劍同樣,斬斷了對構造出完美高併發系統的念想。簡單的以HashMap這個JDK自帶的緩存組件爲例,benchmark結果以下:git
Benchmark Mode Cnt Score Error Units localCacheBenchmark.testlocalCacheSet thrpt 20 85056.759 ± 126702.544 ops/s
其插入速度最快爲85056.759+126702.544=211759.303ops,最慢爲0,也就是每秒插入速度最快爲20w,最慢爲0。之因此爲0,是由於HashMap中的數據在快速的增加過程當中,引發了頻繁的GC操做,爲了給當前HashMap騰出足夠的空間進行插入操做,不得不釋放一些對象。頻繁的GC,勢必對插入速度有不小的影響,形成應用的偶爾性暫停。因此這也能解釋爲啥最慢的時候,ops爲0了。 同時從benchmark數據,咱們能夠看到偏差率爲126702.544ops,比正常操做的85056.756要大不少,說明GC的影響,對HashMap的插入操做影響特別的大。github
因爲GC的存在,堆內緩存操做的ops會受到不小的影響,會形成本來小流量下10ms可以完成的內存計算,大流量下500ms還未完成。若是內存計算過於龐雜,則形成總體流程的ops吞吐量下降,也是極有可能的事兒。因此從這裏能夠看出,堆內緩存組件,在高併發的壓力下,若是計算量巨大,尤爲是寫操做巨大,使其不會成爲護城的利劍,反而成了性能的幫兇,何其可懼。redis
堆外緩存的優點算法
爲了緩解在高併發,高寫入操做下,堆內緩存組件形成的頻繁GC問題,堆外緩存應運而生。從前面的描述咱們知道,堆內緩存是受JVM管控的,因此咱們沒必要擔憂垃圾回收的問題。可是堆外緩存是不受JVM管控的,因此也不受GC的影響致使的應用暫停問題。可是因爲堆外緩存的使用,是以byte數組來進行的,因此須要本身進行序列化反序列化操做。目前已知的知名開源項目中,netty4的buffer pool採用了堆外緩存實現,具體的比對信息能夠參考此處,具體的比對信息截圖以下:數據庫
帶有Direct字眼的即爲offheap堆外Buffer,x軸爲分配的內存大小,Y軸爲耗時。從上面能夠看出,小塊內存分配,JVM要稍微優秀一點;可是大塊內存分配,明顯的堆外緩存要優秀一些。因爲堆外Buffer操做不受GC影響,實際上性能更好一些。可是須要的垃圾回收管控也須要本身去作,要麻煩不少。數組
堆外緩存實現原理緩存
說到堆外緩存實現原理,不可不提到sun.misc.Unsafe這個package包。此包提供了底層的Unsafe操做方法,讓咱們能夠直接在堆外內存作數據分配操做。因爲是底層包,因此用戶層面不多用到,只是一些jdk裏面的核心類庫會用到。其實例的初始化方式以下:安全
public static Unsafe getUnsafe() { Class cc = sun.reflect.Reflection.getCallerClass(2); if (cc.getClassLoader() != null) throw new SecurityException("Unsafe"); return theUnsafe; }
能夠看出是一個單例模式。讓咱們來嘗試使用一下(下面代碼是先分配了一個100bytes的空間,獲得分配好的地址,而後在此地址裏面放入1,最後將此地址裏面的數據取出,打印出來):
long address = unsafe.allocateMemory(100); unsafe.putLong(address,1); System.out.println(unsafe.getLong(address));
可是在運行的過程當中,咱們卻遇到了以下的錯誤:
java.lang.SecurityException: Unsafe at sun.misc.Unsafe.getUnsafe(Unsafe.java:90) at UnsafeTest.testUnsafe(UnsafeTest.java:18) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) ....... Process finished with exit code -1
能夠看出,因爲安全性的緣由,咱們是沒法直接使用Unsafe的實例來進行數據操做的,主要緣由是由於cc.getClassLoader()對theUnsafe實例作了過濾限制。可是咱們能夠直接用theUnsafe來實現,因爲是private修飾,咱們能夠用反射來將private修飾改爲public修飾,讓其暴露出來供咱們使用:
Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); Unsafe unsafe = (Unsafe) f.get(null); long address = unsafe.allocateMemory(100); unsafe.putLong(address,1); System.out.println(unsafe.getLong(address));
這樣就能夠了,可以正確的獲取運行結果。從這裏咱們能夠看出,堆外內存必須本身分配地址空間,那麼對應的,本身須要控制好地址邊界,若是控制很差,經典的OOM Exception將會出現。這也是比堆內內存使用麻煩的地方。
上面的代碼展現,其實已經說明了Unsafe方法的基本使用方式。若是想查看更多的Unsafe實現方式,我的推薦能夠看看Cassandra源碼中的中的Object mapper - Caffinitas裏面關於Unsafe的實現。此類的名稱爲Uns.java,因爲類精簡,我的認爲很值得一看,我貼出部分代碼來:
static { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); unsafe = (Unsafe) field.get(null); if (unsafe.addressSize() > 8) throw new RuntimeException("Address size " + unsafe.addressSize() + " not supported yet (max 8 bytes)"); if (__DEBUG_OFF_HEAP_MEMORY_ACCESS) LOGGER.warn("Degraded performance due to off-heap memory allocations and access guarded by debug code enabled via system property " + OHCacheBuilder.SYSTEM_PROPERTY_PREFIX + "debugOffHeapAccess=true"); IAllocator alloc; String allocType = __ALLOCATOR != null ? __ALLOCATOR : "jna"; switch (allocType) { case "unsafe": alloc = new UnsafeAllocator(); LOGGER.info("OHC using sun.misc.Unsafe memory allocation"); break; case "jna": default: alloc = new JNANativeAllocator(); LOGGER.info("OHC using JNA OS native malloc/free"); } allocator = alloc; } catch (Exception e) { throw new AssertionError(e); } } 。。。。。。 static long getLongFromByteArray(byte[] array, int offset) { if (offset < 0 || offset + 8 > array.length) throw new ArrayIndexOutOfBoundsException(); return unsafe.getLong(array, (long) Unsafe.ARRAY_BYTE_BASE_OFFSET + offset); } static int getIntFromByteArray(byte[] array, int offset) { if (offset < 0 || offset + 4 > array.length) throw new ArrayIndexOutOfBoundsException(); return unsafe.getInt(array, (long) Unsafe.ARRAY_BYTE_BASE_OFFSET + offset); } static short getShortFromByteArray(byte[] array, int offset) { if (offset < 0 || offset + 2 > array.length) throw new ArrayIndexOutOfBoundsException(); return unsafe.getShort(array, (long) Unsafe.ARRAY_BYTE_BASE_OFFSET + offset); }
堆外緩存實現進階
寫到這裏,原理什麼的大概都懂了,咱們準備進階一下,寫個基於Off-heap堆外緩存的Int數組,因爲On-heap Array的空間請求分配到了堆上,因此這裏天然而然的就把空間分配到了堆外。代碼以下:
public class OffheapIntArray { /** * 此list分配的地址 */ private long address; /** * 默認分配空間大小 */ private static final int defaultSize = 1024; /** * 帶參構造 * 因爲Integer類型在java中佔用4個字節,因此在分配地址的時候,一個integer,須要分配 4*8 = 32 bytes的空間 * @param size * @throws NoSuchFieldException * @throws IllegalAccessException */ public OffheapIntArray(Integer size) throws NoSuchFieldException, IllegalAccessException { if (size == null) { address = alloc(defaultSize * 4 * 8); } else { address = alloc(size * 4 * 8); } } public int get(int index) throws NoSuchFieldException, IllegalAccessException { return getUnsafe().getInt(address + index * 4 * 8); } public void set(int index, int value) throws NoSuchFieldException, IllegalAccessException { getUnsafe().putInt(address + index * 4 * 8, value); } private Unsafe getUnsafe() throws IllegalAccessException, NoSuchFieldException { Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); return (Unsafe) f.get(null); } private long alloc(int size) throws NoSuchFieldException, IllegalAccessException { long address = getUnsafe().allocateMemory(size); return address; } public void free() throws NoSuchFieldException, IllegalAccessException { if (address == 0) { return; } getUnsafe().freeMemory(address); } }
咱們來簡單的測試一下:
@Test public void testOffheap() throws NoSuchFieldException, IllegalAccessException { OffheapIntArray offheapArray = new OffheapIntArray(10); offheapArray.set(0,11111); offheapArray.set(1,1112); offheapArray.set(2,1113); offheapArray.set(3,1114); System.out.println(offheapArray.get(0)); System.out.println(offheapArray.get(1)); System.out.println(offheapArray.get(2)); System.out.println(offheapArray.get(3)); offheapArray.free(); }
輸出結果以下:
11111 1112 1113 1114
能夠看到獲得了正確的輸出結果。固然我這裏只是簡單的模擬使用。具體的使用方式,推薦以下兩篇文章,能夠對堆外內存的使用有更近一步的認識:
Java Magic. Part 4: sun.misc.Unsafe
堆外緩存組件實戰
知道了堆外緩存的簡單使用後,這裏咱們要更近一步,使用現有的堆外緩存組件到項目中。
目前在市面上,有諸多的緩存組件,好比mapdb,ohc,ehcache3等,可是因爲ehcache3收費,因此這裏不作討論,主要討論mapdb和ohc這兩個。咱們先經過benchmark來篩選一下兩者的性能差別,因爲這兩個緩存組件提供的都是基於key-value模型的數據存儲,因此benchmark的指標有9個,分別是get,set方法,hget,hset方法(value存儲的是hashmap),sadd,smember方法(value存儲的是set),zadd,zrange方法(value存儲的是treeset)。
benchmark結果以下:
Benchmark Mode Cnt Score Error Units OffheapCacheBenchmark.testMapdbGet thrpt 20 69699.610 ± 4578.888 ops/s OffheapCacheBenchmark.testMapdbHGet thrpt 20 63663.523 ± 3595.413 ops/s OffheapCacheBenchmark.testMapdbHGetAll thrpt 20 64235.582 ± 4009.039 ops/s OffheapCacheBenchmark.testMapdbHSet thrpt 20 25777.077 ± 480.461 ops/s OffheapCacheBenchmark.testMapdbSAdd thrpt 20 335.973 ± 39.353 ops/s OffheapCacheBenchmark.testMapdbSet thrpt 20 39417.070 ± 830.689 ops/s OffheapCacheBenchmark.testMapdbSmember thrpt 20 67432.314 ± 2799.983 ops/s OffheapCacheBenchmark.testMapdbZAdd thrpt 20 21220.595 ± 1128.103 ops/s OffheapCacheBenchmark.testMapdbZRange thrpt 20 45425.162 ± 4533.071 ops/s Benchmark Mode Cnt Score Error Units OhcheapOHCBenchmark.testOhcGet thrpt 20 1196976.452 ± 27291.669 ops/s OhcheapOHCBenchmark.testOhcHGet thrpt 20 348383.355 ± 23304.696 ops/s OhcheapOHCBenchmark.testOhcHGetAll thrpt 20 350798.417 ± 11870.685 ops/s OhcheapOHCBenchmark.testOhcHSet thrpt 20 349370.322 ± 8619.813 ops/s OhcheapOHCBenchmark.testOhcSAdd thrpt 20 11700.160 ± 611.794 ops/s OhcheapOHCBenchmark.testOhcSet thrpt 20 538314.544 ± 132111.037 ops/s OhcheapOHCBenchmark.testOhcSmember thrpt 20 458817.772 ± 15817.159 ops/s OhcheapOHCBenchmark.testOhcZAdd thrpt 20 323979.906 ± 9842.344 ops/s OhcheapOHCBenchmark.testOhcZRange thrpt 20 192776.479 ± 12988.484 ops/s
從上面的結果能夠看出,ohc屬於性能怪獸類型,性能十倍於mapdb。並且因爲ohc自己支持entry過時,可是mapdb不支持。因此這裏綜合一下,選擇ohc做爲咱們的堆外緩存組件。須要說明一下的是,在我進行benchmark測試過程當中,堆外緩存中會進行大量的數據讀寫操做,可是這些讀寫ops總體很是平穩,從error和score的對比就能夠看出。不會出現應用暫停的狀況。說明GC對堆外緩存的影響是很是小的。
總體類結構圖以下(考慮到擴展性,暫時將mapdb加入到告終構圖中):
從總體的類組織結構圖看來,使用了策略模式+模板模式組合的方式來進行。 屏蔽不一樣cache底層接口的不一致性,用的是策略模式;爲不一樣的堆外緩存組件提供一致的操做方法用的是模板模式。組合起來使用就使得開發和擴展顯得很是容易。
部分類的封裝方式以下:
public class OhcCacheStrategy implements CacheStrategy { /** * 日誌 */ private static Logger logger = LoggerFactory.getLogger(OhcCacheStrategy.class); /** * 緩存組件 */ public OHCache<byte[], byte[]> dataCache; /** * 過時時間組件 */ public OHCache<byte[], byte[]> expireCache; /** * 緩存table最大容量 */ private long level2cacheMax = 1024000L; /** * 鎖 */ private final Object lock = new Object(); /** * 鍵過時回調 */ public ExpirekeyAction expirekeyAction; /** * db引擎初始化 */ @PostConstruct public void initOhcEngine() { try { dataCache = OHCacheBuilder.<byte[], byte[]>newBuilder() .keySerializer(new OhcSerializer()) .valueSerializer(new OhcSerializer()) .segmentCount(2 * 4) .hashTableSize((int) level2cacheMax / 102400) .capacity(2 * 1024 * 1024 * 1024L) .defaultTTLmillis(OffheapCacheConst.EXPIRE_DEFAULT_SECONDS * 1000) .timeouts(true) .timeoutsSlots(64) .timeoutsPrecision(512) .eviction(Eviction.LRU) .build(); logger.error("ohc data cache init ok..."); expireCache = OHCacheBuilder.<byte[], byte[]>newBuilder() .keySerializer(new OhcSerializer()) .valueSerializer(new OhcSerializer()) .segmentCount(1) .hashTableSize((int) level2cacheMax / 102400) .capacity(2 * 1024 * 1024 * 1024L) .defaultTTLmillis(OffheapCacheConst.EXPIRE_DEFAULT_SECONDS * 1000) .timeouts(true) .timeoutsSlots(64) .timeoutsPrecision(512) .eviction(Eviction.NONE) .build(); logger.error("ohc expire cache init ok..."); } catch (Exception ex) { logger.error(OffheapCacheConst.PACKAGE_CONTAINER_OHC + OffheapCacheConst.ENGINE_INIT_FAIL, ex); AlarmUtil.alarm(OffheapCacheConst.PACKAGE_CONTAINER_OHC + OffheapCacheConst.ENGINE_INIT_FAIL, ex.getMessage()); throw ex; } } @Override public <T> boolean putEntry(String key, T entry, long expireAt) { synchronized (lock) { byte[] entryKey = SerializationUtils.serialize(key); byte[] entryVal = SerializationUtils.serialize((Serializable) entry); //緩存數據入庫 if (dataCache.put(entryKey, entryVal, expireAt)) { //過時時間入庫 putExpire(key, expireAt); //返回執行結果 return true; } return false; } } @Override public <T> T queryEntry(String key) { byte[] result = dataCache.get(SerializationUtils.serialize(key)); if (result == null) { return null; } return SerializationUtils.deserialize(result); } @Override public long queryExpireTime(String key) { byte[] entryKey = SerializationUtils.serialize(key); return expireCache.get(entryKey) == null ? 0 : SerializationUtils.deserialize(expireCache.get(entryKey)); } @Override public boolean removeEntry(String key) { byte[] entryKey = SerializationUtils.serialize(key); if (dataCache.remove(entryKey)) { removeExpire(key); return true; } return false; } @Override public boolean removeAll() { Iterable<byte[]> dataKey = () -> dataCache.keyIterator(); dataCache.removeAll(dataKey); Iterable<byte[]> expireKey = () -> expireCache.keyIterator(); expireCache.removeAll(expireKey); return true; } @Override public List<String> queryKeys() { List<String> list = new ArrayList<>(); Iterator<byte[]> iterator = expireCache.keyIterator(); while (iterator.hasNext()) { list.add(SerializationUtils.deserialize(iterator.next())); } return list; } /** * key過時時間同步入庫 * * @param key * @param expireAt */ private void putExpire(String key, long expireAt) { try { expireCache.put(SerializationUtils.serialize(key), SerializationUtils.serialize(expireAt)); } catch (Exception ex) { logger.error("key[" + key + "]過時時間入庫失敗..."); } } /** * 同步清理過時鍵 * * @param key */ private void removeExpire(String key) { try { if (expireCache.remove(SerializationUtils.serialize(key))) { if (expirekeyAction != null) { expirekeyAction.keyExpiredNotification(key); } } } catch (Exception ex) { logger.error("key[" + key + "]過時時間清除失敗..."); } } }
上面這個類是堆外緩存的核心策略類。全部其餘的數據模型讀寫操做均可以依據此類來擴展,好比相似redis的sortedset,value能夠存儲一個Treeset便可。須要說明一下,上面代碼中,dataCache主要用於存儲數據部分,expireCache主要用於存儲鍵過時時間。以便於能夠實現鍵主動過時和被動過時功能。用戶添加刪除鍵的時候,會同步刪除expireCache中的鍵,以便於兩者可以統一。因爲ohc自己並未實現keyExpireCallback,因此這裏我實現了這個功能,只要有鍵被移除(主動刪除仍是被動刪除,都會觸發通知),就會通知用戶,用戶能夠按照以下方式使用:
@PostConstruct public void Init() { ohcCacheTemplate.registerExpireKeyAction(key -> { logger.error("key " + key + " expired..."); }); }
鍵被動過時功能,模仿了redis的鍵被動驅逐方式,實現以下:
public class OffheapCacheWorker { /** * 帶參注入 * * @param cacheStrategy */ public OffheapCacheWorker(CacheStrategy cacheStrategy) { this.cacheStrategy = cacheStrategy; this.offheapCacheHelper = new OffheapCacheHelper(); } /** * 日誌 */ private static Logger logger = LoggerFactory.getLogger(OffheapCacheWorker.class); /** * 緩存幫助類 */ private OffheapCacheHelper offheapCacheHelper; /** * 緩存構建器 */ private CacheStrategy cacheStrategy; /** * 過時key檢測線程 */ private Thread expireCheckThread; /** * 線程狀態 */ private volatile boolean started; /** * 線程開啓 * * @throws IOException */ public synchronized void start() { if (started) { return; } expireCheckThread = new Thread("expire key check thread") { @Override public void run() { logger.error("expire key check thread start..."); while (!Thread.currentThread().isInterrupted()) { try { processLoop(); } catch (RuntimeException suppress) { logger.error("Thread `" + getName() + "` occured a error, suppressed.", suppress); throw suppress; } catch (Exception exception) { logger.error("Thread `" + getName() + "` occured a error, exception.", exception); } } logger.info("Thread `{}` was stopped normally.", getName()); } }; expireCheckThread.start(); started = true; } /** * 線程中止 * * @throws IOException */ public synchronized void stop() throws IOException { started = false; if (expireCheckThread != null) { expireCheckThread.interrupt(); } } /** * 過時鍵驅逐 * 模仿的redis鍵過時機制
*/ private void processLoop() throws InterruptedException { //每次採集樣本數 int sampleCheckNumber = 20; //過時key計數 int sampleExpiredCount = 0; //抽樣次數迭代 int sampleCheckIteration = 0; //緩存的key List<String> keys = cacheStrategy.queryKeys(); //抽樣開始時間 long start = System.currentTimeMillis(); //循環開始 do { //鍵數量 long expireContainerSize = keys.size(); //默認爲鍵數量 long loopCheckNumber = expireContainerSize; //每次檢查的鍵數量,若是超過樣本數,則以樣本數爲準 if (loopCheckNumber > sampleCheckNumber) { loopCheckNumber = sampleCheckNumber; } //開始檢測 while (loopCheckNumber-- > 0) { //取隨機下標 int rndNum = offheapCacheHelper.getRandomNumber(toIntExact(expireContainerSize) + 1); //取隨機鍵 String rndKey = keys.get(rndNum); //獲取過時時間 long expireTime = cacheStrategy.queryExpireTime(rndKey); //過時時間比對 if (expireTime <= System.currentTimeMillis()) { //鍵驅逐 boolean result = cacheStrategy.removeEntry(rndKey); if (result) { expireContainerSize--; sampleExpiredCount++; } } } //抽樣次數遞增 sampleCheckIteration++; //抽樣達到16次(16的倍數,&0xf都爲0)且本批次耗時超過0.5秒,將退出,避免阻塞正常業務操做 if ((sampleCheckIteration % 16) == 0 && (System.currentTimeMillis() - start) > 300) { logger.error("清理數據庫過時鍵操做耗時過長,退出,預備從新開始..."); return; } } while (sampleExpiredCount > sampleCheckNumber / 4); Thread.sleep(1500); } }
鍵被動驅逐,會隨機抽取20個key檢測,若是過時鍵小於5個,則直接進行下一次抽樣。不然將進行鍵驅逐操做。一旦抽樣次數達到限定次數且鍵驅逐耗時過長,爲了避免影響業務,將會退出本次循環,繼續下一次循環操做。此worker在後臺運行,實測6W個過時key一塊兒過時,cpu佔用控制在10%,60w個過時key基本上一塊兒過時,cpu佔用控制在60%左右。達到預期效果。在大量的讀寫操做過程當中,能夠看到堆內內存幾乎沒有變化。
寫到最後,上面就是此次我要介紹的堆外緩存的總體內容了,從Unsafe講到原理,從實現講到ohc,但願你們可以提出更好的東西來,多謝。