JetCache
是一個基於Java的緩存系統封裝,提供統一的API和註解來簡化緩存的使用。 JetCache提供了比SpringCache更增強大的註解,能夠原生的支持TTL、兩級緩存、分佈式自動刷新,還提供了Cache
接口用於手工緩存操做。 當前有四個實現:RedisCache
、RedisLettuceCache
、CaffeineCache
、LinkedHashMapCache
。html
特性:java
經過統一的API訪問Cache系統react
經過註解實現聲明式的方法緩存,支持TTL和兩級緩存git
經過註解建立並配置Cache
實例github
針對全部Cache
實例和方法緩存的自動統計redis
Key的生成策略和Value的序列化策略支持自定義配置spring
分佈式緩存自動刷新,分佈式鎖數據庫
異步Cache API (使用Redis的Lettuce客戶端時)express
緩存類型:編程
本地
LinkedHashMap
:使用LinkedHashMap作LUR方式淘汰
Caffeine
:基於Java8開發的提供了近乎最佳命中率的高性能的緩存庫
遠程(訪問Redis的客戶端)
Redis
:使用Jedis客戶端,Redis官方首選的Java客戶端
RedisSpringData
:使用SpringData訪問Redis(官網未做介紹)
RedisLettuce
:使用Lettuce客戶端,一個高性能基於Java的Redis驅動框架,支持線程安全的同步、異步操做,底層集成了Project Reactor,提供反應式編程,參考:Redis高級客戶端Lettuce詳解
在高併發、大流量等場景下,下降系統延遲,緩解數據庫壓力,提升系統總體的性能,讓用戶有更好的體驗。
讀多寫少、不追求強一致性、請求入參不易變化
選擇了遠程緩存請設置keyPrefix,保證存放至Redis的緩存key規範化,避免與其餘系統出現衝突,例如這樣設計:系統簡稱:所屬名字:
,這樣存儲到Redis的緩存key爲:系統簡稱:所屬名字:緩存key
選擇了本地緩存請設置limit,全局默認設置了100,本地緩存的數據存放於內存,減輕內存的損耗,若是使用了Caffeine,緩存的key過多可能致使內存溢出
請勿濫用緩存註解,對於非必要添加緩存的方法咱們儘可能不使用緩存
說明:如下使用方式是基於SpringBoot引入JetCache
緩存框架的,若是不是SpringBoot工程,請參考JetCache
官網使用
<dependencies> <!-- 使用 jedis 客戶端添加如下依賴 --> <dependency> <groupId>com.alicp.jetcache</groupId> <artifactId>jetcache-starter-redis</artifactId> <version>${version}</version> </dependency> <!-- 使用 lettuce 客戶端添加如下依賴 --> <dependency> <groupId>com.alicp.jetcache</groupId> <artifactId>jetcache-starter-redis-lettuce</artifactId> <version>${version}</version> </dependency> </dependencies>
jetcache: statIntervalMinutes: 60 areaInCacheName: false penetrationProtect: false enableMethodCache: true hiddenPackages: com.xxx.xxx,com.xxx.xxx local: default: type: caffeine # 支持的類型:linkedhashmap、caffeine limit: 100 keyConvertor: fastjson # 支持的類型:fastjson,可自定義轉換器函數 expireAfterWriteInMillis: 600000 expireAfterAccessInMillis: 300000 remote: default: type: redis.lettuce # 支持的類型:redis、redis.lettuce keyPrefix: '系統簡稱:所屬名字:' keyConvertor: fastjson valueEncoder: java # 支持的類型:kryo、java,可自定義編碼器 valueDecoder: java # 支持的類型:kryo、java,可自定義解碼器 expireAfterWriteInMillis: 3600000 #readFrom: slavePreferred # 優先從Slave節點中讀取 uri: redis-sentinel://host1:26379,host2:26379,host3:26379/?sentinelMasterId=mymaster # 哨兵模式 #uri: redis://127.0.0.1:6379/ # 單節點模式 #mode: masterslave # 設置爲主從模式 #uri: # 集羣模式 #- redis://127.0.0.1:7000 #- redis://127.0.0.1:7001 #- redis://127.0.0.1:7002 example: keyPrefix: '系統簡稱:所屬名字:' type: redis keyConvertor: fastjson valueEncoder: java valueDecoder: java expireAfterWriteInMillis: 3600000 poolConfig: minIdle: 10 maxIdle: 20 maxTotal: 50 #password: xxx # 鏈接密碼 #timeout: 2000 # 鏈接的超時時間,讀取數據的超時時間 #database: 0 # 鏈接的數據庫 #clientName: null # 客戶端名稱 #ssl: 是否使用SSL host: ${redis.host} port: ${redis.port} #sentinel: host1:26379,host2:26379,host3:26379 # 哨兵模式 #masterName: mymaster
jetcache的全局配置
屬性 | 默認值 | 說明 |
---|---|---|
jetcache.statIntervalMinutes | 0 | 用於統計緩存調用相關信息的統計間隔(分鐘),0表示不統計。 |
jetcache.areaInCacheName | true | 緩存實例名稱cacheName會做爲緩存key的前綴,2.4.3之前的版本老是把areaName加在cacheName中,所以areaName也出如今key前綴中。咱們通常設置爲false。 |
jetcache.penetrationProtect | false | 當緩存訪問未命中的狀況下,對併發進行的加載行爲進行保護。 當前版本實現的是單JVM內的保護,即同一個JVM中同一個key只有一個線程去加載,其它線程等待結果。這是全局配置,若是緩存實例沒有指定則使用全局配置。 |
jetcache.enableMethodCache | true | 是否使用jetcache緩存。 |
jetcache.hiddenPackages | 無 | 自動生成緩存實例名稱時,爲了避免讓名稱太長,hiddenPackages指定的包名前綴會被截掉,多個包名使用逗號分隔。咱們通常會指定每一個緩存實例的名稱。 |
本地緩存的全局配置
屬性 | 默認值 | 說明 |
---|---|---|
jetcache.local.${area}.type | 無 | 本地緩存類型,支持 linkedhashmap、caffeine。 |
jetcache.local.${area}.limit | 100 | 每一個緩存實例存儲的緩存數量的全局配置,僅本地緩存須要配置,若是緩存實例沒有指定則使用全局配置,請結合實例的業務場景進行配置該參數。 |
jetcache.local.${area}.keyConvertor | 無 | 緩存key轉換器的全局配置,支持的類型:fastjson 。僅當使用@CreateCache且緩存類型爲LOCAL時能夠指定爲none ,此時經過equals方法來識別key。方法緩存必須指定keyConvertor。支持自定義轉換器函數,可設置爲:bean:beanName ,而後會從spring容器中獲取該bean。 |
jetcache.local.${area}.expireAfterWriteInMillis | 無窮大 | 本地緩存超時時間的全局配置(毫秒)。 |
jetcache.local.${area}.expireAfterAccessInMillis | 0 | 多長時間沒訪問就讓緩存失效的全局配置(毫秒),僅支持本地緩存。0表示不使用這個功能。 |
遠程緩存的全局配置
屬性 | 默認值 | 說明 |
---|---|---|
jetcache.remote.${area}.type | 無 | 鏈接Redis的客戶端類型,支持 redis 、redis.lettuce 、redis.springdata 。 |
jetcache.remote.${area}.keyPrefix | 無 | 保存至遠程緩存key的前綴,請規範使用。 |
jetcache.remote.${area}.keyConvertor | 無 | 參考上述說明。 |
jetcache.remote.${area}.valueEncoder | java | 保存至遠程緩存value的編碼函數,支持:java 、kryo 。支持自定義編碼函數,可設置爲:bean:beanName ,而後會從spring容器中獲取該bean。 |
jetcache.remote.${area}.valueDecoder | java | 保存至遠程緩存value的解碼函數,支持:java 、kryo 。支持自定義解碼函數,可設置爲:bean:beanName ,而後會從spring容器中獲取該bean。 |
jetcache.remote.${area}.expireAfterWriteInMillis | 無窮大 | 遠程緩存超時時間的全局配置(毫秒)。 |
jetcache.remote.${area}.uri | 無 | redis節點信息。 |
上表中${area}對應@Cached和@CreateCache的area屬性,若是註解上沒有指定area,默認值是"default"。
關於緩存的超時時間:
若是須要使用jetcache
緩存,啓動類添加兩個註解:@EnableCreateCacheAnnotation
、@EnableMethodCache
開啓可經過@CreateCache註解建立Cache實例功能。
開啓可經過@Cached註解建立Cache實例功能,初始化spring aop,註解說明:
屬性 | 默認值 | 說明 |
---|---|---|
basePackages | 無 | jetcache須要攔截的包名,只有這些包名下的Cache實例纔會生效 |
order | Ordered.LOWEST_PRECEDENCE | 指定AOP切面執行過程的順序,默認最低優先級 |
mode | AdviceMode.PROXY | Spring AOP的模式,目前就提供默認值讓你修改 |
proxyTargetClass | false | 無 |
爲一個方法添加緩存,建立對應的緩存實例,註解能夠添加在接口或者類的方法上面,該類必須是spring bean,註解說明:
屬性 | 默認值 | 說明 |
---|---|---|
area | "default" | 若是在配置中配置了多個緩存area,在這裏指定使用哪一個area。 |
name | 未定義 | 指定緩存實例名稱,若是沒有指定,會根據類名+方法名自動生成。name會被用於遠程緩存的key前綴。另外在統計中,一個簡短有意義的名字會提升可讀性。 |
enabled | true | 是否激活緩存。 |
timeUnit | TimeUnit.SECONDS | 指定expire的單位。 |
expire | 未定義 | 超時時間。若是註解上沒有定義,會使用全局配置,若是此時全局配置也沒有定義,則爲無窮大。 |
localExpire | 未定義 | 僅當cacheType爲BOTH時適用,爲本地緩存指定一個不同的超時時間,一般應該小於expire。若是沒有設置localExpire且cacheType爲BOTH,那麼本地緩存的超時時間和遠程緩存保持一致。 |
cacheType | CacheType.REMOTE | 緩存的類型,支持:REMOTE 、LOCAL 、BOTH ,若是定義爲BOTH,會使用LOCAL和REMOTE組合成兩級緩存。 |
localLimit | 未定義 | 若是cacheType爲LOCAL或BOTH,這個參數指定本地緩存的最大元素數量,以控制內存佔用。若是註解上沒有定義,會使用全局配置,若是此時你沒有定義全局配置,則使用默認的全局配置100。請結合實際業務場景進行設置該值。 |
serialPolicy | 未定義 | 指定遠程緩存VALUE的序列化方式,支持SerialPolicy.JAVA 、SerialPolicy.KRYO 。若是註解上沒有定義,會使用全局配置,若是你沒有定義全局配置,則使用默認的全局配置SerialPolicy.JAVA。 |
keyConvertor | 未定義 | 指定KEY的轉換方式,用於將複雜的KEY類型轉換爲緩存實現能夠接受的類型,支持:KeyConvertor.FASTJSON 、KeyConvertor.NONE 。NONE表示不轉換,FASTJSON能夠將複雜對象KEY轉換成String。若是註解上沒有定義,會使用全局配置。 |
key | 未定義 | 使用SpEL指定緩存key,若是沒有指定會根據入參自動生成。 |
cacheNullValue | false | 當方法返回值爲null的時候是否要緩存。 |
condition | 未定義 | 使用SpEL指定條件,若是表達式返回true的時候纔去緩存中查詢。 |
postCondition | 未定義 | 使用SpEL指定條件,若是表達式返回true的時候才更新緩存,該評估在方法執行後進行,所以能夠訪問到#result。 |
用於移除緩存,配置說明:
配置 | 默認值 | 說明 |
---|---|---|
area | "default" | 若是在配置中配置了多個緩存area,在這裏指定使用哪一個area。 |
name | 無 | 指定緩存的惟一名稱,通常指向對應的@Cached定義的name。 |
key | 未定義 | 使用SpEL指定key,若是沒有指定會根據入參自動生成。 |
condition | 未定義 | 使用SpEL指定條件,若是表達式返回true才執行刪除,可訪問方法結果#result。刪除緩存實例中key的元素。 |
multi | false | 若是根據SpEL指定的key是一個集合,是否從緩存實例中刪除對應的每一個緩存。若是設置爲true,可是key不是集合,則不會刪除緩存。 |
用於更新緩存,配置說明:
配置 | 默認值 | 說明 |
---|---|---|
area | "default" | 若是在配置中配置了多個緩存area,在這裏指定使用哪一個area。 |
name | 無 | 指定緩存的惟一名稱,通常指向對應的@Cached定義的name。 |
key | 未定義 | 使用SpEL指定key,若是沒有指定會根據入參自動生成。 |
value | 無 | 使用SpEL指定value。 |
condition | 未定義 | 使用SpEL指定條件,若是表達式返回true才執行更新,可訪問方法結果#result。更新緩存實例中key的元素。 |
multi | false | 若是根據SpEL指定key和value都是集合而且元素的個數相同,則是否更新緩存實例中的對應的每一個元素。若是設置爲true,可是key不是集合或者value不是集合或者它們的元素的個數不相同,也不會更新緩存。 |
用於自定刷新緩存,配置說明:
配置 | 默認值 | 說明 |
---|---|---|
refresh | 無 | 刷新間隔 |
stopRefreshAfterLastAccess | 未定義 | 指定該key多長時間沒有訪問就中止刷新,若是不指定會一直刷新。 |
refreshLockTimeout | 60秒 | 類型爲BOTH/REMOTE的緩存刷新時,同時只會有一臺服務器在刷新,這臺服務器會在遠程緩存放置一個分佈式鎖,此配置指定該鎖的超時時間。 |
timeUnit | TimeUnit.SECONDS | 指定refresh時間單位。 |
當緩存訪問未命中的狀況下,對併發進行的加載行爲進行保護。 當前版本實現的是單JVM內的保護,即同一個JVM中同一個key只有一個線程去加載,其它線程等待結果,配置說明:
配置 | 默認值 | 說明 |
---|---|---|
value | true | 是否開啓保護模式。 |
timeout | 未定義 | 其餘線程的等待超時時間,若是超時則本身執行方法直接返回結果。 |
timeUnit | TimeUnit.SECONDS | 指定timeout時間單位。 |
在Spring Bean中使用該註解可建立一個Cache實例,配置說明:
配置 | 默認值 | 說明 |
---|---|---|
area | "default" | 若是在配置中配置了多個緩存area,在這裏指定使用哪一個area。 |
name | 未定義 | 指定緩存實例名稱,若是沒有指定,會根據類名+方法名自動生成。name會被用於遠程緩存的key前綴。另外在統計中,一個簡短有意義的名字會提升可讀性。 |
timeUnit | TimeUnit.SECONDS | 指定expire的單位。 |
expire | 未定義 | 超時時間。若是註解上沒有定義,會使用全局配置,若是此時全局配置也沒有定義,則爲無窮大。 |
localExpire | 未定義 | 僅當cacheType爲BOTH時適用,爲本地緩存指定一個不同的超時時間,一般應該小於expire。若是沒有設置localExpire且cacheType爲BOTH,那麼本地緩存的超時時間和遠程緩存保持一致。 |
cacheType | CacheType.REMOTE | 緩存的類型,支持:REMOTE 、LOCAL 、BOTH ,若是定義爲BOTH,會使用LOCAL和REMOTE組合成兩級緩存。 |
localLimit | 未定義 | 若是cacheType爲LOCAL或BOTH,這個參數指定本地緩存的最大元素數量,以控制內存佔用。若是註解上沒有定義,會使用全局配置,若是此時你沒有定義全局配置,則使用默認的全局配置100。請結合實際業務場景進行設置該值。 |
serialPolicy | 未定義 | 指定遠程緩存VALUE的序列化方式,支持SerialPolicy.JAVA 、SerialPolicy.KRYO 。若是註解上沒有定義,會使用全局配置,若是你沒有定義全局配置,則使用默認的全局配置SerialPolicy.JAVA。 |
keyConvertor | 未定義 | 指定KEY的轉換方式,用於將複雜的KEY類型轉換爲緩存實現能夠接受的類型,支持:KeyConvertor.FASTJSON 、KeyConvertor.NONE 。NONE表示不轉換,FASTJSON能夠將複雜對象KEY轉換成String。若是註解上沒有定義,會使用全局配置。 |
/** * 啓動類 */ @SpringBootApplication @EnableCreateCacheAnnotation @EnableMethodCache(basePackages = "com.xxx.xxx") public class Application { public static void main(String[] args){ SpringApplication.run(Application.class, args); } } /** * 接口 */ public interface JetCacheExampleService { User getValue(long userId); void updateValue(User user); void deleteValue(User user); } /** * 實現類 */ @Service public class JetCacheExampleServiceImpl implements JetCacheExampleService { @CreateCache(name = "JetCacheExampleServiceImpl.exampleCache" , localLimit = 50 ,cacheType = CacheType.LOCAL) @CachePenetrationProtect private Cache<Long, User> exampleCache; @Override @Cached(name = "JetCacheExampleService.getValue", expire = 3600 * 6, localLimit = 50, cacheType = CacheType.BOTH) @CacheRefresh(refresh = 3600, stopRefreshAfterLastAccess = 3600 * 2) @CachePenetrationProtect public User getValue(long userId){ String result = new User(); // ... 處理邏輯 return result; } @Override @CacheUpdate(name = "JetCacheExampleService.getValue", key="#user.userId", value="#user") public void updateValue(User user){ // 處理邏輯 } @Override @CacheInvalidate(name = "JetCacheExampleService.getValue", key="#user.userId") public void deleteValue(User user){ // 處理邏輯 } }
如上述所示
getValue方法會建立一個緩存實例,經過@Cached
註解能夠看到緩存實例名稱cacheName
爲'JetCacheExampleService.getValue',緩存的有效時長爲6小時,本地緩存的數量最多爲50,緩存類型爲BOTH
(優先從本地緩存獲取);經過@CacheRefresh
註解能夠看到會爲該緩存實例設置一個刷新策略,刷新間隔爲1小時,2個小時沒訪問後再也不刷新,須要刷新的緩存實例會爲其每個緩存數據建立一個RefreshTask
週期性任務;@CachePenetrationProtect
註解表示該緩存實例開啓保護模式,當緩存未命中,同一個JVM中同一個key只有一個線程去加載數據,其它線程等待結果。
updateValue方法能夠更新緩存,經過@CacheUpdate
註解能夠看到會更新緩存實例'JetCacheExampleService.getValue'中緩存key爲#user.userId的緩存value爲#user。
deleteValue方法能夠刪除緩存,經過@CacheInvalidate
註解能夠看到會刪除緩存實例'JetCacheExampleService.getValue'中緩存key爲#user.userId緩存數據。
exampleCache字段會做爲一個緩存實例對象,經過@CreateCache
註解能夠看到,會將該字段做爲cacheName
爲'JetCacheExampleService.getValue'緩存實例對象,本地緩存的數量最多爲50,緩存類型爲LOCAL
,@CachePenetrationProtect
註解表示該緩存實例開啓保護模式。
個人業務場景是使用上述的getValue方法建立緩存實例便可。
注意:
@Cached
註解不能和@CacheUpdate
或者@CacheInvalidate
同時使用@CacheInvalidate
能夠多個同時使用另外經過@CreateCache註解建立緩存實例也能夠這樣初始化:
@Service public class JetCacheExampleServiceImpl implements JetCacheExampleService { @CreateCache(name = "JetCacheExampleServiceImpl.exampleCache" , localLimit = 50 ,cacheType = CacheType.LOCAL) private Cache<Long, User> exampleCache; @PostConstruct public exampleCacheInit(){ RefreshPolicy policy = RefreshPolicy.newPolicy(60, TimeUnit.MINUTES) .stopRefreshAfterLastAccess(120, TimeUnit.MINUTES); exampleCache.config().setLoader(this::loadFromDatabase); exampleCache.config().setRefreshPolicy(policy); } }
更加詳細的使用方法請參考JetCache
官方地址。
參考本人Git倉庫中的JetCache
項目,已作詳細的註釋。
簡單歸納:利用Spring AOP功能,在調用須要緩存的方法前,經過解析註解獲取緩存配置,根據這些配置建立不一樣的實例對象,進行緩存等操做。
JetCache
分爲兩部分,一部分是Cache API以及實現,另外一部分是註解支持。
jetcache-anno-api:定義JetCache
註解和常量。
jetcache-core:核心API,Cache接口的實現,提供各類緩存實例的操做,不依賴於Spring。
jetcache-autoconfigure:完成初始化,解析application.yml配置文件中的相關配置,以提供不一樣緩存實例的CacheBuilder
構造器
jetcache-anno:基於Spring提供@Cached
和@CreateCache
註解支持,初始化Spring AOP以及JetCache
註解等配置。
jetcache-redis:使用Jedis提供Redis支持。
jetcache-redis-lettuce:使用Lettuce提供Redis支持,實現了JetCache
異步訪問緩存的的接口。
jetcache-redis-springdata:使用Spring Data提供Redis支持。
jetcache-starter-redis:提供pom文件,Spring Boot方式的Starter,基於Jedis。
jetcache-starter-redis-lettuce:提供pom文件,Spring Boot方式的Starter,基於Lettuce。
jetcache-starter-redis-springdata:提供pom文件,Spring Boot方式的Starter,基於Spring Data。
jetcache-test:提供相關測試。
在jetcache-anno-api模塊中定義了須要用的緩存註解與常量,在上述已經詳細的講述過,其中@CacheInvalidateContainer
註解定義value爲@CacheInvalidate
數組,而後經過jdk8新增的@Repeatable
註解,在@CacheInvalidate
註解上面添加@Repeatable(CacheInvalidateContainer.class)
,便可支持同一個地方可使用多個@CacheInvalidate
註解。
主要查看jetcache-core子模塊,提供各類Cache
緩存,以支持不一樣的緩存類型
Cache接口的子關係,結構以下圖:
主要對象描述:
com.alicp.jetcache.Cache
接口,定義了緩存實例的操做方法(部分有默認實現),以及獲取分佈式鎖(非嚴格,用於刷新遠程緩存)的實現,由於繼承了java.io.Closeable
接口,因此也提供了close方法的默認實現,空方法,交由不一樣緩存實例的實現去實現該方法用於釋放資源,在com.alicp.jetcache.anno.support.ConfigProvider.doShutdown()
方法中會調用每一個緩存實例對象的close方法進行資源釋放。主要代碼以下:
public interface Cache<K, V> extends Closeable { Logger logger = LoggerFactory.getLogger(Cache.class); //-----------------------------JSR 107 style API------------------------------------------------ default V get(K key) throws CacheInvokeException { CacheGetResult<V> result = GET(key); if (result.isSuccess()) { return result.getValue(); } else { return null; } } default Map<K, V> getAll(Set<? extends K> keys) throws CacheInvokeException { MultiGetResult<K, V> cacheGetResults = GET_ALL(keys); return cacheGetResults.unwrapValues(); } default void put(K key, V value) { PUT(key, value); } default void putAll(Map<? extends K, ? extends V> map) { PUT_ALL(map); } default boolean putIfAbsent(K key, V value) { // 多級緩存MultiLevelCache不支持此方法 CacheResult result = PUT_IF_ABSENT(key, value, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS); return result.getResultCode() == CacheResultCode.SUCCESS; } default boolean remove(K key) { return REMOVE(key).isSuccess(); } default void removeAll(Set<? extends K> keys) { REMOVE_ALL(keys); } <T> T unwrap(Class<T> clazz); @Override default void close() { } //--------------------------JetCache API--------------------------------------------- CacheConfig<K, V> config(); default AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit) { if (key == null) { return null; } // 隨機生成一個值 final String uuid = UUID.randomUUID().toString(); // 過時時間 final long expireTimestamp = System.currentTimeMillis() + timeUnit.toMillis(expire); final CacheConfig config = config(); AutoReleaseLock lock = () -> { // 建立一把會自動釋放資源的鎖,實現其 close() 方法 int unlockCount = 0; while (unlockCount++ < config.getTryLockUnlockCount()) { if(System.currentTimeMillis() < expireTimestamp) { // 這把鎖尚未過時,則刪除 // 刪除對應的 Key 值 // 出現的結果:成功,失敗,Key 不存在 CacheResult unlockResult = REMOVE(key); if (unlockResult.getResultCode() == CacheResultCode.FAIL || unlockResult.getResultCode() == CacheResultCode.PART_SUCCESS) { // 刪除對應的 Key 值過程當中出現了異常,則重試 logger.info("[tryLock] [{} of {}] [{}] unlock failed. Key={}, msg = {}", unlockCount, config.getTryLockUnlockCount(), uuid, key, unlockResult.getMessage()); // retry } else if (unlockResult.isSuccess()) { // 釋放成功 logger.debug("[tryLock] [{} of {}] [{}] successfully release the lock. Key={}", unlockCount, config.getTryLockUnlockCount(), uuid, key); return; } else { // 鎖已經被釋放了 logger.warn("[tryLock] [{} of {}] [{}] unexpected unlock result: Key={}, result={}", unlockCount, config.getTryLockUnlockCount(), uuid, key, unlockResult.getResultCode()); return; } } else { // 該鎖已失效 logger.info("[tryLock] [{} of {}] [{}] lock already expired: Key={}", unlockCount, config.getTryLockUnlockCount(), uuid, key); return; } } }; int lockCount = 0; Cache cache = this; while (lockCount++ < config.getTryLockLockCount()) { // 往 Redis(或者本地) 中存放 Key 值(_#RL#結尾的Key) // 返回的結果:成功、已存在、失敗 CacheResult lockResult = cache.PUT_IF_ABSENT(key, uuid, expire, timeUnit); if (lockResult.isSuccess()) { // 成功獲取到鎖 logger.debug("[tryLock] [{} of {}] [{}] successfully get a lock. Key={}", lockCount, config.getTryLockLockCount(), uuid, key); return lock; } else if (lockResult.getResultCode() == CacheResultCode.FAIL || lockResult.getResultCode() == CacheResultCode.PART_SUCCESS) { logger.info("[tryLock] [{} of {}] [{}] cache access failed during get lock, will inquiry {} times. Key={}, msg={}", lockCount, config.getTryLockLockCount(), uuid, config.getTryLockInquiryCount(), key, lockResult.getMessage()); // 嘗試獲取鎖的過程當中失敗了,也就是往 Redis 中存放 Key 值出現異常 // 這個時候可能 Key 值已經存儲了,可是因爲其餘緣由致使返回的結果表示執行失敗 int inquiryCount = 0; while (inquiryCount++ < config.getTryLockInquiryCount()) { CacheGetResult inquiryResult = cache.GET(key); if (inquiryResult.isSuccess()) { if (uuid.equals(inquiryResult.getValue())) { logger.debug("[tryLock] [{} of {}] [{}] successfully get a lock after inquiry. Key={}", inquiryCount, config.getTryLockInquiryCount(), uuid, key); return lock; } else { logger.debug("[tryLock] [{} of {}] [{}] not the owner of the lock, return null. Key={}", inquiryCount, config.getTryLockInquiryCount(), uuid, key); return null; } } else { logger.info("[tryLock] [{} of {}] [{}] inquiry failed. Key={}, msg={}", inquiryCount, config.getTryLockInquiryCount(), uuid, key, inquiryResult.getMessage()); // retry inquiry } } } else { // 已存在表示該鎖被其餘人佔有 // others holds the lock logger.debug("[tryLock] [{} of {}] [{}] others holds the lock, return null. Key={}", lockCount, config.getTryLockLockCount(), uuid, key); return null; } } logger.debug("[tryLock] [{}] return null after {} attempts. Key={}", uuid, config.getTryLockLockCount(), key); return null; } default boolean tryLockAndRun(K key, long expire, TimeUnit timeUnit, Runnable action){ // Release the lock use Java 7 try-with-resources. try (AutoReleaseLock lock = tryLock(key, expire, timeUnit)) { // 嘗試獲取鎖 if (lock != null) { // 獲取到鎖則執行下面的任務 action.run(); return true; } else { return false; } // 執行完鎖的操做後會進行資源釋放,調用 AutoCloseable 的 close() 方法 } } CacheGetResult<V> GET(K key); MultiGetResult<K, V> GET_ALL(Set<? extends K> keys); default V computeIfAbsent(K key, Function<K, V> loader) { return computeIfAbsent(key, loader, config().isCacheNullValue()); } V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull); V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull, long expireAfterWrite, TimeUnit timeUnit); default void put(K key, V value, long expireAfterWrite, TimeUnit timeUnit) { PUT(key, value, expireAfterWrite, timeUnit); } default CacheResult PUT(K key, V value) { if (key == null) { return CacheResult.FAIL_ILLEGAL_ARGUMENT; } return PUT(key, value, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS); } CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit); default void putAll(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) { PUT_ALL(map, expireAfterWrite, timeUnit); } default CacheResult PUT_ALL(Map<? extends K, ? extends V> map) { if (map == null) { return CacheResult.FAIL_ILLEGAL_ARGUMENT; } return PUT_ALL(map, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS); } CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit); CacheResult REMOVE(K key); CacheResult REMOVE_ALL(Set<? extends K> keys); CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit); }
com.alicp.jetcache.Cache
定義的方法大都是關於緩存的獲取、刪除和存放操做
其中大寫的方法返回JetCache
自定義的CacheResult(完整的返回值,能夠清晰的知道執行結果,例如get返回null的時候,沒法判定是對應的key不存在,仍是訪問緩存發生了異常)
小寫的方法默認實現就是調用大寫的方法
computeIfAbsent
方法最爲核心,交由子類去實現
tryLockAndRun
方法會非堵塞的嘗試獲取一把AutoReleaseLock分佈式鎖(非嚴格),獲取過程:
key_#RL#
,value爲UUID
,並設置這個鍵值對的過時時間爲60秒(默認)com.alicp.jetcache.AbstractCache
抽象類,實現了Cache接口,主要代碼以下:
public abstract class AbstractCache<K, V> implements Cache<K, V> { /** * 當緩存未命中時,併發狀況同一個Key是否只容許一個線程去加載,其餘線程等待結果(能夠設置timeout,超時則本身加載並直接返回) * 若是是的話則由獲取到Key對應的 LoaderLock.signal(採用了 CountDownLatch)的線程進行加載 * loaderMap臨時保存 Key 對應的 LoaderLock 對象 */ private volatile ConcurrentHashMap<Object, LoaderLock> loaderMap; ConcurrentHashMap<Object, LoaderLock> initOrGetLoaderMap() { if (loaderMap == null) { synchronized (this) { if (loaderMap == null) { loaderMap = new ConcurrentHashMap<>(); } } } return loaderMap; } @Override public final V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull) { return computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull, 0, null, this); } @Override public final V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull, long expireAfterWrite, TimeUnit timeUnit) { return computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull, expireAfterWrite, timeUnit, this); } private static <K, V> boolean needUpdate(V loadedValue, boolean cacheNullWhenLoaderReturnNull, Function<K, V> loader) { if (loadedValue == null && !cacheNullWhenLoaderReturnNull) { return false; } if (loader instanceof CacheLoader && ((CacheLoader<K, V>) loader).vetoCacheUpdate()) { return false; } return true; } static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull, long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) { // 獲取內部的 Cache 對象 AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache); // 封裝 loader 函數成一個 ProxyLoader 對象,主要在從新加載緩存後發出一個 CacheLoadEvent 到 CacheMonitor CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify); CacheGetResult<V> r; if (cache instanceof RefreshCache) { // 該緩存實例須要刷新 RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache); /* * 從緩存中獲取數據 * 若是是多級緩存(先從本地緩存獲取,獲取不到則從遠程緩存獲取) * 若是緩存數據是從遠程緩存獲取到的數據則會更新至本地緩存,而且若是本地緩存沒有設置 localExpire 則使用遠程緩存的到期時間做爲本身的到期時間 * 我通常不設置 localExpire ,由於可能致使本地緩存的有效時間比遠程緩存的有效時間更長 * 若是設置 localExpire 了記得設置 expireAfterAccessInMillis */ r = refreshCache.GET(key); // 添加/更新當前 RefreshCache 的刷新緩存任務,存放於 RefreshCache 的 taskMap 中 refreshCache.addOrUpdateRefreshTask(key, newLoader); } else { // 從緩存中獲取數據 r = cache.GET(key); } if (r.isSuccess()) { // 緩存命中 return r.getValue(); } else { // 緩存未命中 // 建立當緩存未命中去更新緩存的函數 Consumer<V> cacheUpdater = (loadedValue) -> { if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) { /* * 未在緩存註解中配置 key 的生成方式則默認取入參做爲緩存 key * 在進入當前方法時是否能夠考慮爲 key 建立一個副本???? * 由於緩存未命中而後經過 loader 從新加載方法時,若是方法內部對入參進行了修改,那麼生成的緩存 key 也會被修改 * 從而致使相同的 key 進入該方法時一直與緩存中的 key 不相同,一直出現緩存未命中 */ if (timeUnit != null) { cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult(); } else { cache.PUT(key, loadedValue).waitForResult(); } } }; V loadedValue; if (cache.config().isCachePenetrationProtect()) { // 添加了 @CachePenetrationProtect 註解 // 一個JVM只容許一個線程執行 loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater); } else { // 執行方法 loadedValue = newLoader.apply(key); // 將新的結果異步緩存 cacheUpdater.accept(loadedValue); } return loadedValue; } } static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache, K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) { ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap(); Object lockKey = buildLoaderLockKey(abstractCache, key); while (true) { // 爲何加一個 create[] 數組 疑問?? boolean create[] = new boolean[1]; LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> { create[0] = true; LoaderLock loaderLock = new LoaderLock(); loaderLock.signal = new CountDownLatch(1); loaderLock.loaderThread = Thread.currentThread(); return loaderLock; }); if (create[0] || ll.loaderThread == Thread.currentThread()) { try { // 加載該 Key 實例的方法 V loadedValue = newLoader.apply(key); ll.success = true; ll.value = loadedValue; // 將從新加載的數據更新至緩存 cacheUpdater.accept(loadedValue); return loadedValue; } finally { // 標記已完成 ll.signal.countDown(); if (create[0]) { loaderMap.remove(lockKey); } } } else { // 等待其餘線程加載,若是出現異常或者超時則本身加載返回數據,可是不更新緩存 try { Duration timeout = config.getPenetrationProtectTimeout(); if (timeout == null) { ll.signal.await(); } else { boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS); if(!ok) { logger.info("loader wait timeout:" + timeout); return newLoader.apply(key); } } } catch (InterruptedException e) { logger.warn("loader wait interrupted"); return newLoader.apply(key); } if (ll.success) { return (V) ll.value; } else { continue; } } } } private static Object buildLoaderLockKey(Cache c, Object key) { if (c instanceof AbstractEmbeddedCache) { return ((AbstractEmbeddedCache) c).buildKey(key); } else if (c instanceof AbstractExternalCache) { byte bytes[] = ((AbstractExternalCache) c).buildKey(key); return ByteBuffer.wrap(bytes); } else if (c instanceof MultiLevelCache) { c = ((MultiLevelCache) c).caches()[0]; return buildLoaderLockKey(c, key); } else if(c instanceof ProxyCache) { c = ((ProxyCache) c).getTargetCache(); return buildLoaderLockKey(c, key); } else { throw new CacheException("impossible"); } } /** * 從新加載數據鎖 */ static class LoaderLock { /** * 柵欄 */ CountDownLatch signal; /** * 持有的線程 */ Thread loaderThread; /** * 是否加載成功 */ boolean success; /** * 加載出來的數據 */, Object value; } }
com.alicp.jetcache.AbstractCache
實現了Cache
接口的大寫方法,內部調用本身定義的抽象方法(以DO_
開頭,交由不一樣的子類實現),操做緩存後發送相應的事件CacheEvent
,也就是調用本身定義的notify方法,遍歷每一個CacheMonitor
對該事件進行後置操做,用於統計信息。
computeIfAbsentImpl
方法實現了Cache
接口的核心方法,從緩存實例中根據緩存key獲取緩存value,邏輯以下:
獲取cache的targetCache,由於咱們經過@CreateCache
註解建立的緩存實例將生成LazyInitCache
對象,須要調用其getTargetCache方法纔會完成緩存實例的初始化
loader函數是對加載原有方法的封裝,這裏再進行一層封裝,封裝成ProxyLoader
類型,目的是在加載原有方法後將發送CacheLoadEvent
事件
從緩存實例中獲取對應的緩存value,若是緩存實例對象是RefreshCache
類型(在com.alicp.jetcache.anno.support.CacheContext.buildCache
方法中會將cache包裝成CacheHandlerRefreshCache
),則調用RefreshCache.addOrUpdateRefreshTask
方法,判斷是否應該爲它添加一個定時的刷新任務
若是緩存未命中,則執行loader函數,若是開啓了保護模式,則調用自定義的synchronizedLoad方法,大體邏輯:根據緩存key從本身的loaderMap(線程安全)遍歷中嘗試獲取(不存在則建立)LoaderLock
加載鎖,獲取到這把加載鎖才能夠執行loader函數,若是已被其餘線程佔有則進行等待(沒有設置超時時間則一直等待),經過CountDownLatch
計數器實現
com.alicp.jetcache.embedded.AbstractEmbeddedCache
抽象類繼承AbstractCache抽象類,定義了本地緩存的存放緩存數據的對象爲com.alicp.jetcache.embedded.InnerMap
接口和一個初始化該接口的createAreaCache抽象方法,基於InnerMap接口實現以DO_
開頭的方法,完成緩存實例各類操做的具體實現,主要代碼以下:
public abstract class AbstractEmbeddedCache<K, V> extends AbstractCache<K, V> { protected EmbeddedCacheConfig<K, V> config; /** * 本地緩存的 Map */ protected InnerMap innerMap; protected abstract InnerMap createAreaCache(); public AbstractEmbeddedCache(EmbeddedCacheConfig<K, V> config) { this.config = config; innerMap = createAreaCache(); } @Override public CacheConfig<K, V> config() { return config; } public Object buildKey(K key) { Object newKey = key; Function<K, Object> keyConvertor = config.getKeyConvertor(); if (keyConvertor != null) { newKey = keyConvertor.apply(key); } return newKey; } @Override protected CacheGetResult<V> do_GET(K key) { Object newKey = buildKey(key); CacheValueHolder<V> holder = (CacheValueHolder<V>) innerMap.getValue(newKey); return parseHolderResult(holder); } protected CacheGetResult<V> parseHolderResult(CacheValueHolder<V> holder) { long now = System.currentTimeMillis(); if (holder == null) { return CacheGetResult.NOT_EXISTS_WITHOUT_MSG; } else if (now >= holder.getExpireTime()) { return CacheGetResult.EXPIRED_WITHOUT_MSG; } else { synchronized (holder) { long accessTime = holder.getAccessTime(); if (config.isExpireAfterAccess()) { long expireAfterAccess = config.getExpireAfterAccessInMillis(); if (now >= accessTime + expireAfterAccess) { return CacheGetResult.EXPIRED_WITHOUT_MSG; } } // 設置該緩存數據的最後一次訪問時間 holder.setAccessTime(now); } return new CacheGetResult(CacheResultCode.SUCCESS, null, holder); } } @Override protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) { ArrayList<K> keyList = new ArrayList<K>(keys.size()); ArrayList<Object> newKeyList = new ArrayList<Object>(keys.size()); keys.stream().forEach((k) -> { Object newKey = buildKey(k); keyList.add(k); newKeyList.add(newKey); }); Map<Object, CacheValueHolder<V>> innerResultMap = innerMap.getAllValues(newKeyList); Map<K, CacheGetResult<V>> resultMap = new HashMap<>(); for (int i = 0; i < keyList.size(); i++) { K key = keyList.get(i); Object newKey = newKeyList.get(i); CacheValueHolder<V> holder = innerResultMap.get(newKey); resultMap.put(key, parseHolderResult(holder)); } MultiGetResult<K, V> result = new MultiGetResult<>(CacheResultCode.SUCCESS, null, resultMap); return result; } @Override protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) { CacheValueHolder<V> cacheObject = new CacheValueHolder(value ,timeUnit.toMillis(expireAfterWrite)); innerMap.putValue(buildKey(key), cacheObject); return CacheResult.SUCCESS_WITHOUT_MSG; } @Override protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) { HashMap newKeyMap = new HashMap(); for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) { CacheValueHolder<V> cacheObject = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite)); newKeyMap.put(buildKey(en.getKey()), cacheObject); } innerMap.putAllValues(newKeyMap); final HashMap resultMap = new HashMap(); map.keySet().forEach((k) -> resultMap.put(k, CacheResultCode.SUCCESS)); return CacheResult.SUCCESS_WITHOUT_MSG; } @Override protected CacheResult do_REMOVE(K key) { innerMap.removeValue(buildKey(key)); return CacheResult.SUCCESS_WITHOUT_MSG; } @Override protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) { Set newKeys = keys.stream().map((key) -> buildKey(key)).collect(Collectors.toSet()); innerMap.removeAllValues(newKeys); final HashMap resultMap = new HashMap(); keys.forEach((k) -> resultMap.put(k, CacheResultCode.SUCCESS)); return CacheResult.SUCCESS_WITHOUT_MSG; } @Override protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) { CacheValueHolder<V> cacheObject = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite)); if (innerMap.putIfAbsentValue(buildKey(key), cacheObject)) { return CacheResult.SUCCESS_WITHOUT_MSG; } else { return CacheResult.EXISTS_WITHOUT_MSG; } } }
com.alicp.jetcache.embedded.AbstractEmbeddedCache
抽象類實現了操做本地緩存的相關方法
定義了緩存實例對象本地緩存的配置信息EmbeddedCacheConfig
對象
定義了緩存實例對象本地緩存基於內存操做緩存數據的InnerMap
對象,它的初始化過程交由不一樣的內存緩存實例(LinkedHashMapCache和CaffeineCache)
com.alicp.jetcache.embedded.LinkedHashMapCache
基於LinkedHashMap完成緩存實例對象本地緩存基於內存操做緩存數據的InnerMap
對象的初始化工做,主要代碼以下:
public class LinkedHashMapCache<K, V> extends AbstractEmbeddedCache<K, V> { private static Logger logger = LoggerFactory.getLogger(LinkedHashMapCache.class); public LinkedHashMapCache(EmbeddedCacheConfig<K, V> config) { super(config); // 將緩存實例添加至 Cleaner addToCleaner(); } protected void addToCleaner() { Cleaner.add(this); } @Override protected InnerMap createAreaCache() { return new LRUMap(config.getLimit(), this); } public void cleanExpiredEntry() { ((LRUMap) innerMap).cleanExpiredEntry(); } /** * 用於本地緩存類型爲 linkedhashmap 緩存實例存儲緩存數據 */ final class LRUMap extends LinkedHashMap implements InnerMap { /** * 容許的最大緩存數量 */ private final int max; /** * 緩存實例鎖 */ private Object lock; public LRUMap(int max, Object lock) { super((int) (max * 1.4f), 0.75f, true); this.max = max; this.lock = lock; } /** * 當元素大於最大值時移除最老的元素 * * @param eldest 最老的元素 * @return 是否刪除 */ @Override protected boolean removeEldestEntry(Map.Entry eldest) { return size() > max; } /** * 清理過時的元素 */ void cleanExpiredEntry() { synchronized (lock) { // 佔有當前緩存實例這把鎖 for (Iterator it = entrySet().iterator(); it.hasNext();) { Map.Entry en = (Map.Entry) it.next(); Object value = en.getValue(); if (value != null && value instanceof CacheValueHolder) { CacheValueHolder h = (CacheValueHolder) value; /* * 緩存的數據已經失效了則刪除 * 爲何不對 expireAfterAccess 進行判斷,取最小值,疑問???? */ if (System.currentTimeMillis() >= h.getExpireTime()) { it.remove(); } } else { // assert false if (value == null) { logger.error("key " + en.getKey() + " is null"); } else { logger.error("value of key " + en.getKey() + " is not a CacheValueHolder. type=" + value.getClass()); } } } } } @Override public Object getValue(Object key) { synchronized (lock) { return get(key); } } @Override public Map getAllValues(Collection keys) { Map values = new HashMap(); synchronized (lock) { for (Object key : keys) { Object v = get(key); if (v != null) { values.put(key, v); } } } return values; } @Override public void putValue(Object key, Object value) { synchronized (lock) { put(key, value); } } @Override public void putAllValues(Map map) { synchronized (lock) { Set<Map.Entry> set = map.entrySet(); for (Map.Entry en : set) { put(en.getKey(), en.getValue()); } } } @Override public boolean removeValue(Object key) { synchronized (lock) { return remove(key) != null; } } @Override public void removeAllValues(Collection keys) { synchronized (lock) { for (Object k : keys) { remove(k); } } } @Override @SuppressWarnings("unchecked") public boolean putIfAbsentValue(Object key, Object value) { /* * 若是緩存 key 不存在,或者對應的 value 已經失效則放入,不然返回 false */ synchronized (lock) { CacheValueHolder h = (CacheValueHolder) get(key); if (h == null || parseHolderResult(h).getResultCode() == CacheResultCode.EXPIRED) { put(key, value); return true; } else { return false; } } } } }
com.alicp.jetcache.embedded.LinkedHashMapCache
自定義LRUMap
繼承LinkedHashMap並實現InnerMap接口
自定義max
字段,存儲元素個數的最大值,並設置初始容量爲(max * 1.4f)
自定義lock
字段,每一個緩存實例的鎖,經過synchronized關鍵詞保證線程安全,因此性能相對來講很差
覆蓋LinkedHashMap的removeEldestEntry
方法,當元素大於最大值時移除最老的元素
自定義cleanExpiredEntry
方法,遍歷Map,根據緩存value(被封裝成的com.alicp.jetcache.CacheValueHolder
對象,包含緩存數據、失效時間戳和第一次訪問的時間),清理過時的元素
該對象初始化時會被添加至com.alicp.jetcache.embedded.Cleaner
清理器中,Cleaner會週期性(每隔60秒)遍歷LinkedHashMapCache緩存實例,調用其cleanExpiredEntry方法
com.alicp.jetcache.embedded.Cleaner
用於清理緩存類型爲LinkedHashMapCache的緩存數據,請查看相應註釋,代碼以下:
/** * 執行任務:定時清理(每分鐘) LinkedHashMapCache 緩存實例中過時的緩存數據 */ class Cleaner { /** * 存放弱引用對象,以防內存溢出 * 若是被弱引用的對象只被當前弱引用對象關聯時,gc 時被弱引用的對象則會被回收(取決於被弱引用的對象是否還與其餘強引用對象關聯) * * 我的理解:當某個 LinkedHashMapCache 強引用對象沒有被其餘對象(除了這裏)引用時,咱們應該讓這個對象被回收, * 可是因爲這裏使用的也是強引用,這個對象被其餘強引用對象關聯了,不可能被回收,存在內存溢出的危險, * 因此這裏使用了弱引用對象,若是被弱引用的對象沒有被其餘對象(除了這裏)引用時,這個對象會被回收 * * 舉個例子:若是咱們往一個 Map<Object, Object> 中存放一個key-value鍵值對 * 假設對應的鍵已經再也不使用被回收了,那咱們沒法再獲取到對應的值,也沒法被回收,佔有必定的內存,存在風險 */ static LinkedList<WeakReference<LinkedHashMapCache>> linkedHashMapCaches = new LinkedList<>(); static { // 建立一個線程池,1個核心線程 ScheduledExecutorService executorService = JetCacheExecutor.defaultExecutor(); // 起一個循環任務一直清理 linkedHashMapCaches 過時的數據(每隔60秒) executorService.scheduleWithFixedDelay(() -> run(), 60, 60, TimeUnit.SECONDS); } static void add(LinkedHashMapCache cache) { synchronized (linkedHashMapCaches) { // 建立一個弱引用對象,並添加到清理對象中 linkedHashMapCaches.add(new WeakReference<>(cache)); } } static void run() { synchronized (linkedHashMapCaches) { Iterator<WeakReference<LinkedHashMapCache>> it = linkedHashMapCaches.iterator(); while (it.hasNext()) { WeakReference<LinkedHashMapCache> ref = it.next(); // 獲取被弱引用的對象(強引用) LinkedHashMapCache c = ref.get(); if (c == null) { // 表示被弱引用的對象被標記成了垃圾,則移除 it.remove(); } else { c.cleanExpiredEntry(); } } } } }
com.alicp.jetcache.embedded.CaffeineCache
基於Caffeine完成緩存實例對象本地緩存基於內存操做緩存數據的InnerMap
對象的初始化工做,主要代碼以下:
public class CaffeineCache<K, V> extends AbstractEmbeddedCache<K, V> { /** * 緩存實例對象 */ private com.github.benmanes.caffeine.cache.Cache cache; public CaffeineCache(EmbeddedCacheConfig<K, V> config) { super(config); } /** * 初始化本地緩存的容器 * * @return Map對象 */ @Override @SuppressWarnings("unchecked") protected InnerMap createAreaCache() { Caffeine<Object, Object> builder = Caffeine.newBuilder(); // 設置緩存實例的最大緩存數量 builder.maximumSize(config.getLimit()); final boolean isExpireAfterAccess = config.isExpireAfterAccess(); final long expireAfterAccess = config.getExpireAfterAccessInMillis(); // 設置緩存實例的緩存數據的失效策略 builder.expireAfter(new Expiry<Object, CacheValueHolder>() { /** * 獲取緩存的有效時間 * * @param value 緩存數據 * @return 有效時間 */ private long getRestTimeInNanos(CacheValueHolder value) { long now = System.currentTimeMillis(); long ttl = value.getExpireTime() - now; /* * 若是本地緩存設置了多長時間沒訪問緩存則失效 */ if(isExpireAfterAccess){ // 設置緩存的失效時間 // 多長時間沒訪問緩存則失效 and 緩存的有效時長取 min ttl = Math.min(ttl, expireAfterAccess); } return TimeUnit.MILLISECONDS.toNanos(ttl); } @Override public long expireAfterCreate(Object key, CacheValueHolder value, long currentTime) { return getRestTimeInNanos(value); } @Override public long expireAfterUpdate(Object key, CacheValueHolder value, long currentTime, long currentDuration) { return currentDuration; } @Override public long expireAfterRead(Object key, CacheValueHolder value, long currentTime, long currentDuration) { return getRestTimeInNanos(value); } }); // 構建 Cache 緩存實例 cache = builder.build(); return new InnerMap() { @Override public Object getValue(Object key) { return cache.getIfPresent(key); } @Override public Map getAllValues(Collection keys) { return cache.getAllPresent(keys); } @Override public void putValue(Object key, Object value) { cache.put(key, value); } @Override public void putAllValues(Map map) { cache.putAll(map); } @Override public boolean removeValue(Object key) { return cache.asMap().remove(key) != null; } @Override public void removeAllValues(Collection keys) { cache.invalidateAll(keys); } @Override public boolean putIfAbsentValue(Object key, Object value) { return cache.asMap().putIfAbsent(key, value) == null; } }; } }
com.alicp.jetcache.embedded.CaffeineCache
經過Caffeine構建一個com.github.benmanes.caffeine.cache.Cache
緩存對象,而後實現InnerMap接口,調用這個緩存對象的相關方法
構建時設置每一個元素的過時時間,也就是根據每一個元素(com.alicp.jetcache.CacheValueHolder
)的失效時間戳來設置,底層如何實現的能夠參考Caffeine官方地址
調用com.github.benmanes.caffeine.cache.Cache
的put方法我有遇到過'unable to create native thread'內存溢出的問題,因此請結合實際業務場景合理的設置緩存相關配置
com.alicp.jetcache.embedded.AbstractExternalCache
抽象類繼承AbstractCache抽象類,定義了緩存實例對象遠程緩存的配置信息ExternalCacheConfig
對象,提供了將緩存key轉換成字節數組的方法,代碼比較簡單。
com.alicp.jetcache.redis.RedisCache
使用Jedis鏈接Redis,對遠程的緩存數據進行操做,代碼沒有很複雜,可查看個人註釋
定義了com.alicp.jetcache.redis.RedisCacheConfig
配置對象,包含Redis鏈接池的相關信息
實現了以DO_
開頭的方法,也就是經過Jedis操做緩存數據
com.alicp.jetcache.redis.lettuce.RedisLettuceCache
使用Lettuce鏈接Redis,對遠程的緩存數據進行操做,代碼沒有很複雜,可查看個人註釋
定義了com.alicp.jetcache.redis.lettuce.RedisLettuceCacheConfig
配置對象,包含Redis客戶端、與Redis創建的安全鏈接等信息,由於底層是基於Netty實現的,因此無需配置線程池
使用com.alicp.jetcache.redis.lettuce.LettuceConnectionManager
自定義管理器將與Redis鏈接的相關信息封裝成LettuceObjects
對象,並管理RedisClient與LettuceObjects對應關係
相比Jedis更加安全高效
對Lettuce不瞭解的能夠參考我寫的測試類com.alicp.jetcache.test.external.LettuceTest
當你設置了緩存類型爲BOTH兩級緩存,那麼建立的實例對象會被封裝成com.alicp.jetcache.MultiLevelCache
對象
定義了caches
字段類型爲Cache[],用於保存AbstractEmbeddedCache本地緩存實例和AbstractExternalCache遠程緩存實例,本地緩存存放於遠程緩存前面
實現了do_GET
方法,遍歷caches數組,也就是先從本地緩存獲取,若是獲取緩存不成功則從遠程緩存獲取,成功獲取到緩存後會調用checkResultAndFillUpperCache方法
從checkResultAndFillUpperCache
方法的邏輯能夠看到,將獲取到的緩存數據更新至更底層的緩存中,也就是說若是緩存數據是從遠程獲取到的,那麼進入這個方法後會將獲取到的緩存數據更新到本地緩存中去,這樣下次請求能夠直接從本地緩存獲取,避免與Redis之間的網絡消耗
實現了do_PUT
方法,遍歷caches數組,經過CompletableFuture
進行異步編程,將全部的操做綁定在一條鏈上執行。
實現的了PUT(K key, V value)
方法,會先判斷是否單獨配置了本地緩存時間localExipre,配置了則單獨爲本地緩存設置過時時間,沒有配置則到期時間和遠程緩存的同樣
覆蓋tryLock
方法,調用caches[caches.length-1].tryLock方法,也就是隻會調用最頂層遠程緩存的這個方法
主要代碼以下:
public class MultiLevelCache<K, V> extends AbstractCache<K, V> { private Cache[] caches; private MultiLevelCacheConfig<K, V> config; @SuppressWarnings("unchecked") @Deprecated public MultiLevelCache(Cache... caches) throws CacheConfigException { this.caches = caches; checkCaches(); CacheConfig lastConfig = caches[caches.length - 1].config(); config = new MultiLevelCacheConfig<>(); config.setCaches(Arrays.asList(caches)); config.setExpireAfterWriteInMillis(lastConfig.getExpireAfterWriteInMillis()); config.setCacheNullValue(lastConfig.isCacheNullValue()); } @SuppressWarnings("unchecked") public MultiLevelCache(MultiLevelCacheConfig<K, V> cacheConfig) throws CacheConfigException { this.config = cacheConfig; this.caches = cacheConfig.getCaches().toArray(new Cache[]{}); checkCaches(); } private void checkCaches() { if (caches == null || caches.length == 0) { throw new IllegalArgumentException(); } for (Cache c : caches) { if (c.config().getLoader() != null) { throw new CacheConfigException("Loader on sub cache is not allowed, set the loader into MultiLevelCache."); } } } public Cache[] caches() { return caches; } @Override public MultiLevelCacheConfig<K, V> config() { return config; } @Override public CacheResult PUT(K key, V value) { if (config.isUseExpireOfSubCache()) { // 本地緩存使用本身的失效時間 // 設置了TimeUnit爲null,本地緩存則使用本身的到期時間 return PUT(key, value, 0, null); } else { return PUT(key, value, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS); } } @Override public CacheResult PUT_ALL(Map<? extends K, ? extends V> map) { if (config.isUseExpireOfSubCache()) { return PUT_ALL(map, 0, null); } else { return PUT_ALL(map, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS); } } @Override protected CacheGetResult<V> do_GET(K key) { // 遍歷多級緩存(遠程緩存排在後面) for (int i = 0; i < caches.length; i++) { Cache cache = caches[i]; CacheGetResult result = cache.GET(key); if (result.isSuccess()) { CacheValueHolder<V> holder = unwrapHolder(result.getHolder()); /* * 這個遍歷是從低層的緩存開始獲取,獲取成功則將該值設置到更低層的緩存中 * 情景: * 本地沒有獲取到緩存,遠程獲取到了緩存,這裏會將遠程的緩存數據設置到本地中, * 這樣下次請求則直接從本次獲取,減小了遠程獲取的時間 */ checkResultAndFillUpperCache(key, i, holder); return new CacheGetResult(CacheResultCode.SUCCESS, null, holder); } } return CacheGetResult.NOT_EXISTS_WITHOUT_MSG; } private CacheValueHolder<V> unwrapHolder(CacheValueHolder<V> h) { // if @Cached or @CacheCache change type from REMOTE to BOTH (or from BOTH to REMOTE), // during the dev/publish process, the value type which different application server put into cache server will be different // (CacheValueHolder<V> and CacheValueHolder<CacheValueHolder<V>>, respectively). // So we need correct the problem at here and in CacheGetResult. Objects.requireNonNull(h); if (h.getValue() instanceof CacheValueHolder) { return (CacheValueHolder<V>) h.getValue(); } else { return h; } } private void checkResultAndFillUpperCache(K key, int i, CacheValueHolder<V> h) { Objects.requireNonNull(h); long currentExpire = h.getExpireTime(); long now = System.currentTimeMillis(); if (now <= currentExpire) { if(config.isUseExpireOfSubCache()){ // 若是使用本地本身的緩存過時時間 // 使用本地緩存本身的過時時間 PUT_caches(i, key, h.getValue(), 0, null); } else { // 使用遠程緩存的過時時間 long restTtl = currentExpire - now; if (restTtl > 0) { // 遠程緩存數據還未失效,則從新設置本地的緩存 PUT_caches(i, key, h.getValue(), restTtl, TimeUnit.MILLISECONDS); } } } } @Override protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) { HashMap<K, CacheGetResult<V>> resultMap = new HashMap<>(); Set<K> restKeys = new HashSet<>(keys); for (int i = 0; i < caches.length; i++) { if (restKeys.size() == 0) { break; } Cache<K, CacheValueHolder<V>> c = caches[i]; MultiGetResult<K, CacheValueHolder<V>> allResult = c.GET_ALL(restKeys); if (allResult.isSuccess() && allResult.getValues() != null) { for (Map.Entry<K, CacheGetResult<CacheValueHolder<V>>> en : allResult.getValues().entrySet()) { K key = en.getKey(); CacheGetResult result = en.getValue(); if (result.isSuccess()) { CacheValueHolder<V> holder = unwrapHolder(result.getHolder()); checkResultAndFillUpperCache(key, i, holder); resultMap.put(key, new CacheGetResult(CacheResultCode.SUCCESS, null, holder)); restKeys.remove(key); } } } } for (K k : restKeys) { resultMap.put(k, CacheGetResult.NOT_EXISTS_WITHOUT_MSG); } return new MultiGetResult<>(CacheResultCode.SUCCESS, null, resultMap); } @Override protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) { return PUT_caches(caches.length, key, value, expireAfterWrite, timeUnit); } @Override protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) { CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null); for (Cache c : caches) { CacheResult r; if(timeUnit == null) { r = c.PUT_ALL(map); } else { r = c.PUT_ALL(map, expireAfterWrite, timeUnit); } future = combine(future, r); } return new CacheResult(future); } private CacheResult PUT_caches(int lastIndex, K key, V value, long expire, TimeUnit timeUnit) { CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null); for (int i = 0; i < lastIndex; i++) { Cache cache = caches[i]; CacheResult r; if (timeUnit == null) { // 表示本地緩存使用本身過時時間 r = cache.PUT(key, value); } else { r = cache.PUT(key, value, expire, timeUnit); } // 將多個 PUT 操做放在一條鏈上 future = combine(future, r); } return new CacheResult(future); } private CompletableFuture<ResultData> combine(CompletableFuture<ResultData> future, CacheResult result) { return future.thenCombine(result.future(), (d1, d2) -> { if (d1 == null) { return d2; } if (d1.getResultCode() != d2.getResultCode()) { return new ResultData(CacheResultCode.PART_SUCCESS, null, null); } return d1; }); } @Override protected CacheResult do_REMOVE(K key) { CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null); for (Cache cache : caches) { CacheResult r = cache.REMOVE(key); future = combine(future, r); } return new CacheResult(future); } @Override protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) { CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null); for (Cache cache : caches) { CacheResult r = cache.REMOVE_ALL(keys); future = combine(future, r); } return new CacheResult(future); } @Override public <T> T unwrap(Class<T> clazz) { Objects.requireNonNull(clazz); for (Cache cache : caches) { try { T obj = (T) cache.unwrap(clazz); if (obj != null) { return obj; } } catch (IllegalArgumentException e) { // ignore } } throw new IllegalArgumentException(clazz.getName()); } @Override public AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit) { if (key == null) { return null; } return caches[caches.length - 1].tryLock(key, expire, timeUnit); } @Override public boolean putIfAbsent(K key, V value) { throw new UnsupportedOperationException("putIfAbsent is not supported by MultiLevelCache"); } @Override protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) { throw new UnsupportedOperationException("PUT_IF_ABSENT is not supported by MultiLevelCache"); } @Override public void close() { for (Cache c : caches) { c.close(); } } }
com.alicp.jetcache.RefreshCache
爲緩存實例添加刷新任務,前面在AbstractCache抽象類中講到了,在com.alicp.jetcache.anno.support.CacheContext.buildCache
方法中會將cache包裝成CacheHandlerRefreshCache
,因此說每一個緩存實例都會調用一下addOrUpdateRefreshTask
方法,代碼以下:
public class RefreshCache<K, V> extends LoadingCache<K, V> { protected CacheConfig<K, V> config; /** * 用於保存刷新任務 */ private ConcurrentHashMap<Object, RefreshTask> taskMap = new ConcurrentHashMap<>(); protected void addOrUpdateRefreshTask(K key, CacheLoader<K, V> loader) { // 獲取緩存刷新策略 RefreshPolicy refreshPolicy = config.getRefreshPolicy(); if (refreshPolicy == null) { // 沒有則不進行刷新 return; } // 獲取刷新時間間隔 long refreshMillis = refreshPolicy.getRefreshMillis(); if (refreshMillis > 0) { // 獲取線程任務的ID Object taskId = getTaskId(key); // 獲取對應的RefreshTask,不存在則建立一個 RefreshTask refreshTask = taskMap.computeIfAbsent(taskId, tid -> { logger.debug("add refresh task. interval={}, key={}", refreshMillis, key); RefreshTask task = new RefreshTask(taskId, key, loader); task.lastAccessTime = System.currentTimeMillis(); /* * 獲取 ScheduledExecutorService 週期/延遲線程池,10個核心線程,建立的線程都是守護線程 * scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit) * 運行的任務task、多久延遲後開始執行、後續執行的週期間隔多長,時間單位 * 經過其建立一個循環任務,用於刷新緩存數據 */ ScheduledFuture<?> future = JetCacheExecutor.heavyIOExecutor().scheduleWithFixedDelay(task, refreshMillis, refreshMillis, TimeUnit.MILLISECONDS); task.future = future; return task; }); // 設置最後一次訪問時間 refreshTask.lastAccessTime = System.currentTimeMillis(); } } }
若是緩存實例配置了刷新策略而且刷新間隔大於0,則會從taskMap
(線程安全)中嘗試獲取對應的刷新任務RefreshTask
,若是不存在則建立一個任務放入線程池週期性的執行
com.alicp.jetcache.RefreshCache.RefreshTask
代碼以下:
public class RefreshCache<K, V> extends LoadingCache<K, V> { protected Cache concreteCache() { Cache c = getTargetCache(); while (true) { if (c instanceof ProxyCache) { c = ((ProxyCache) c).getTargetCache(); } else if (c instanceof MultiLevelCache) { Cache[] caches = ((MultiLevelCache) c).caches(); // 若是是兩級緩存則返回遠程緩存 c = caches[caches.length - 1]; } else { return c; } } } class RefreshTask implements Runnable { /** * 惟一標誌符,也就是Key轉換後的值 */ private Object taskId; /** * 緩存的Key */ private K key; /** * 執行方法的CacheLoader對象 */ private CacheLoader<K, V> loader; /** * 最後一次訪問時間 */ private long lastAccessTime; /** * 該 Task 的執行策略 */ private ScheduledFuture future; RefreshTask(Object taskId, K key, CacheLoader<K, V> loader) { this.taskId = taskId; this.key = key; this.loader = loader; } private void cancel() { logger.debug("cancel refresh: {}", key); // 嘗試中斷當前任務 future.cancel(false); // 從任務列表中刪除 taskMap.remove(taskId); } /** * 從新加載數據 * * @throws Throwable 異常 */ private void load() throws Throwable { CacheLoader<K, V> l = loader == null ? config.getLoader() : loader; if (l != null) { // 封裝 CacheLoader 成 ProxyLoader,加載後會發起 Load 事件 l = CacheUtil.createProxyLoader(cache, l, eventConsumer); // 加載 V v = l.load(key); if (needUpdate(v, l)) { // 將從新加載的數據放入緩存 cache.PUT(key, v); } } } /** * 遠程加載數據 * * @param concreteCache 緩存對象 * @param currentTime 當前時間 * @throws Throwable 異常 */ private void externalLoad(final Cache concreteCache, final long currentTime) throws Throwable { // 獲取 Key 轉換後的值 byte[] newKey = ((AbstractExternalCache) concreteCache).buildKey(key); // 建立分佈式鎖對應的Key byte[] lockKey = combine(newKey, "_#RL#".getBytes()); // 分佈式鎖的存在時間 long loadTimeOut = RefreshCache.this.config.getRefreshPolicy().getRefreshLockTimeoutMillis(); // 刷新間隔 long refreshMillis = config.getRefreshPolicy().getRefreshMillis(); // Key對應的時間戳Key(用於存放上次刷新時間) byte[] timestampKey = combine(newKey, "_#TS#".getBytes()); // AbstractExternalCache buildKey method will not convert byte[] // 獲取Key上一次刷新時間 CacheGetResult refreshTimeResult = concreteCache.GET(timestampKey); boolean shouldLoad = false; // 是否須要從新加載 if (refreshTimeResult.isSuccess()) { // 當前時間與上一次刷新的時間間隔是否大於或等於刷新間隔 shouldLoad = currentTime >= Long.parseLong(refreshTimeResult.getValue().toString()) + refreshMillis; } else if (refreshTimeResult.getResultCode() == CacheResultCode.NOT_EXISTS) { // 無緩存 shouldLoad = true; } if (!shouldLoad) { if (multiLevelCache) { // 將頂層的緩存數據更新至低層的緩存中,例如將遠程的緩存數據放入本地緩存 // 由於若是是多級緩存,建立刷新任務後,咱們只需更新遠程的緩存,而後從遠程緩存獲取緩存數據更新低層的緩存,保證緩存一致 refreshUpperCaches(key); } return; } // 從新加載 Runnable r = () -> { try { load(); // AbstractExternalCache buildKey method will not convert byte[] // 保存一個key-value至redis,其中的信息爲該value的生成時間,刷新緩存 concreteCache.put(timestampKey, String.valueOf(System.currentTimeMillis())); } catch (Throwable e) { throw new CacheException("refresh error", e); } }; // AbstractExternalCache buildKey method will not convert byte[] // 分佈式緩存沒有一個全局分配的功能,這裏嘗試獲取一把非嚴格的分佈式鎖,獲取鎖的超時時間默認60秒,也就是獲取到這把鎖最多能夠擁有60秒 // 只有獲取Key對應的這把分佈式鎖,才執行從新加載的操做 boolean lockSuccess = concreteCache.tryLockAndRun(lockKey, loadTimeOut, TimeUnit.MILLISECONDS, r); if (!lockSuccess && multiLevelCache) { // 沒有獲取到鎖而且是多級緩存 // 這個時候應該有其餘實例在刷新緩存,因此這裏設置過一會直接獲取遠程的緩存數據更新到本地 // 建立一個延遲任務(1/5刷新間隔後),將最頂層的緩存數據更新至每一層 JetCacheExecutor.heavyIOExecutor().schedule(() -> refreshUpperCaches(key), (long) (0.2 * refreshMillis), TimeUnit.MILLISECONDS); } } private void refreshUpperCaches(K key) { MultiLevelCache<K, V> targetCache = (MultiLevelCache<K, V>) getTargetCache(); Cache[] caches = targetCache.caches(); int len = caches.length; // 獲取多級緩存中頂層的緩存數據 CacheGetResult cacheGetResult = caches[len - 1].GET(key); if (!cacheGetResult.isSuccess()) { return; } // 將緩存數據從新放入低層緩存 for (int i = 0; i < len - 1; i++) { caches[i].PUT(key, cacheGetResult.getValue()); } } /** * 刷新任務的具體執行 */ @Override public void run() { try { if (config.getRefreshPolicy() == null || (loader == null && !hasLoader())) { // 取消執行 cancel(); return; } long now = System.currentTimeMillis(); long stopRefreshAfterLastAccessMillis = config.getRefreshPolicy().getStopRefreshAfterLastAccessMillis(); if (stopRefreshAfterLastAccessMillis > 0) { // 最後一次訪問到如今時間的間隔超過了設置的 stopRefreshAfterLastAccessMillis,則取消當前任務執行 if (lastAccessTime + stopRefreshAfterLastAccessMillis < now) { logger.debug("cancel refresh: {}", key); cancel(); return; } } logger.debug("refresh key: {}", key); // 獲取緩存實例對象,若是是多層則返回頂層,也就是遠程緩存 Cache concreteCache = concreteCache(); if (concreteCache instanceof AbstractExternalCache) { // 遠程緩存刷新 externalLoad(concreteCache, now); } else { // 本地緩存刷新 load(); } } catch (Throwable e) { logger.error("refresh error: key=" + key, e); } } } }
刷新邏輯:
判斷是否須要中止刷新了,須要的話調用其future
的cancel方法取消執行,並從taskMap
中刪除
獲取緩存實例對象,若是是多層則返回頂層,也就是遠程緩存實例對象
若是是本地緩存,則調用load
方法,也就是執行loader函數加載原有方法,將獲取到的數據更新至緩存實例中(若是是多級緩存,則每級緩存都會更新)
若是是遠程緩存對象,則調用externalLoad
方法,刷新後會往Redis中存放一個鍵值對,key爲key_#TS#
,value爲上一次刷新時間
先從Redis中獲取上一次刷新時間的鍵值對,根據上一次刷新的時間判斷是否大於刷新間隔,大於(或者沒有上一次刷新時間)表示須要從新加載數據,不然不須要從新加載數據
若是不須要從新加載數據,可是又是多級緩存,則獲取遠程緩存數據更新至本地緩存,保證兩級緩存的一致性
若是須要從新加載數據,則調用tryLockAndRun
方法,嘗試獲取分佈式鎖,執行刷新任務(調用load
方法,並往Redis中從新設置上一次的刷新時間),若是沒有獲取到分佈式鎖,則建立一個延遲任務(1/5刷新間隔後)將最頂層的緩存數據更新至每一層
主要查看jetcache-autoconfigure子模塊,解析application.yml中jetcache相關配置,初始化不一樣緩存類型的CacheBuilder
構造器,用於生產緩存實例,也初始化如下對象:
com.alicp.jetcache.anno.support.ConfigProvider
:緩存管理器,注入了全局配置GlobalCacheConfig、緩存實例管理器SimpleCacheManager、緩存上下文CacheContext等大量信息
com.alicp.jetcache.autoconfigure.AutoConfigureBeans
:存儲CacheBuilder
構造器以及Redis的相關信息
com.alicp.jetcache.anno.support.GlobalCacheConfig
:全局配置類,保存了一些全局信息
經過@Conditional
註解將須要使用到的緩存類型對應的構造器初始化類注入到Spring容器並執行初始化過程,也就是建立CacheBuilder構造器
初始化構造器類的類型結構以下圖所示:
主要對象描述:
AbstractCacheAutoInit:抽象類,實現Spring的InitializingBean接口,注入至Spring容器時完成初始化
EmbeddedCacheAutoInit:抽象類,繼承AbstractCacheAutoInit,解析本地緩存獨有的配置
LinkedHashMapAutoConfiguration:初始化LinkedHashMapCacheBuilder構造器
CaffeineAutoConfiguration:初始化CaffeineCacheBuilder構造器
ExternalCacheAutoInit:抽象類,繼承AbstractCacheAutoInit,解析遠程緩存獨有的配置
RedisAutoInit:初始化RedisCacheBuilder構造器
RedisLettuceAutoInit:初始化RedisLettuceCacheBuilder構造器
com.alicp.jetcache.autoconfigure.AbstractCacheAutoInit
抽象類主要實現了Spring的InitializingBean接口,在注入Spring容器時,Spring會調用其afterPropertiesSet方法,完成本地緩存類型和遠程緩存類型CacheBuilder
構造器的初始化,主要代碼以下:
public abstract class AbstractCacheAutoInit implements InitializingBean { @Autowired protected ConfigurableEnvironment environment; @Autowired protected AutoConfigureBeans autoConfigureBeans; @Autowired protected ConfigProvider configProvider; protected String[] typeNames; private boolean inited = false; public AbstractCacheAutoInit(String... cacheTypes) { Objects.requireNonNull(cacheTypes,"cacheTypes can't be null"); Assert.isTrue(cacheTypes.length > 0, "cacheTypes length is 0"); this.typeNames = cacheTypes; } /** * 初始化方法 */ @Override public void afterPropertiesSet() { if (!inited) { synchronized (this) { if (!inited) { // 這裏咱們有兩個指定前綴 'jetcache.local' 'jetcache.remote' process("jetcache.local.", autoConfigureBeans.getLocalCacheBuilders(), true); process("jetcache.remote.", autoConfigureBeans.getRemoteCacheBuilders(), false); inited = true; } } } } private void process(String prefix, Map cacheBuilders, boolean local) { // 建立一個配置對象(本地或者遠程) ConfigTree resolver = new ConfigTree(environment, prefix); // 獲取本地或者遠程的配置項 Map<String, Object> m = resolver.getProperties(); // 獲取本地或者遠程的 area ,這裏我通常只有默認的 default Set<String> cacheAreaNames = resolver.directChildrenKeys(); for (String cacheArea : cacheAreaNames) { // 獲取本地或者遠程存儲類型,例如 caffeine,redis.lettuce final Object configType = m.get(cacheArea + ".type"); // 緩存類型是否和當前 CacheAutoInit 的某一個 typeName 匹配(不一樣的 CacheAutoInit 會設置一個或者多個 typename) boolean match = Arrays.stream(typeNames).anyMatch((tn) -> tn.equals(configType)); /* * 由於有不少 CacheAutoInit 繼承者,都會執行這個方法,不一樣的繼承者解析不一樣的配置 * 例如 CaffeineAutoConfiguration 只解析 jetcache.local.default.type=caffeine 便可 * RedisLettuceAutoInit 只解析 jetcache.remote.default.type=redis.lettuce 便可 */ if (!match) { continue; } // 獲取本地或者遠程的 area 的子配置項 ConfigTree ct = resolver.subTree(cacheArea + "."); logger.info("init cache area {} , type= {}", cacheArea, typeNames[0]); // 根據配置信息構建本地或者遠程緩存的 CacheBuilder 構造器 CacheBuilder c = initCache(ct, local ? "local." + cacheArea : "remote." + cacheArea); // 將 CacheBuilder 構造器存放至 AutoConfigureBeans cacheBuilders.put(cacheArea, c); } } /** * 設置公共的配置到 CacheBuilder 構造器中 * * @param builder 構造器 * @param ct 配置信息 */ protected void parseGeneralConfig(CacheBuilder builder, ConfigTree ct) { AbstractCacheBuilder acb = (AbstractCacheBuilder) builder; // 設置 Key 的轉換函數 acb.keyConvertor(configProvider.parseKeyConvertor(ct.getProperty("keyConvertor"))); // 設置超時時間 String expireAfterWriteInMillis = ct.getProperty("expireAfterWriteInMillis"); if (expireAfterWriteInMillis == null) { // compatible with 2.1 兼容老版本 expireAfterWriteInMillis = ct.getProperty("defaultExpireInMillis"); } if (expireAfterWriteInMillis != null) { acb.setExpireAfterWriteInMillis(Long.parseLong(expireAfterWriteInMillis)); } // 多長時間沒有訪問就讓緩存失效,0表示不使用該功能(注意:只支持本地緩存) String expireAfterAccessInMillis = ct.getProperty("expireAfterAccessInMillis"); if (expireAfterAccessInMillis != null) { acb.setExpireAfterAccessInMillis(Long.parseLong(expireAfterAccessInMillis)); } } /** * 初始化 CacheBuilder 構造器交由子類去實現 * * @param ct 配置信息 * @param cacheAreaWithPrefix 配置前綴 * @return CacheBuilder 構造器 */ protected abstract CacheBuilder initCache(ConfigTree ct, String cacheAreaWithPrefix); }
afterPropertiesSet()
方法中能夠看到會調用process
方法分別初始化本地緩存和遠程緩存的構造器process
方法:
initCache
交由子類實現AutoConfigureBeans
對象中去parseGeneralConfig
方法解析本地緩存和遠程緩存都有的配置至CacheBuilder構造器中com.alicp.jetcache.autoconfigure.EmbeddedCacheAutoInit
抽象類繼承了AbstractCacheAutoInit
,主要是覆蓋父類的parseGeneralConfig
,解析本地緩存單有的配置limit
,代碼以下:
public abstract class EmbeddedCacheAutoInit extends AbstractCacheAutoInit { public EmbeddedCacheAutoInit(String... cacheTypes) { super(cacheTypes); } @Override protected void parseGeneralConfig(CacheBuilder builder, ConfigTree ct) { super.parseGeneralConfig(builder, ct); EmbeddedCacheBuilder ecb = (EmbeddedCacheBuilder) builder; // 設置本地緩存每一個緩存實例的緩存數量個數限制(默認100) ecb.limit(Integer.parseInt(ct.getProperty("limit", String.valueOf(CacheConsts.DEFAULT_LOCAL_LIMIT)))); } }
com.alicp.jetcache.autoconfigure.LinkedHashMapAutoConfiguration
繼承了EmbeddedCacheAutoInit
,實現了initCache
方法,先經過LinkedHashMapCacheBuilder建立一個默認實現類,而後解析相關配置至構造器中完成初始化,代碼以下:
@Component @Conditional(LinkedHashMapAutoConfiguration.LinkedHashMapCondition.class) public class LinkedHashMapAutoConfiguration extends EmbeddedCacheAutoInit { public LinkedHashMapAutoConfiguration() { super("linkedhashmap"); } @Override protected CacheBuilder initCache(ConfigTree ct, String cacheAreaWithPrefix) { // 建立一個 LinkedHashMapCacheBuilder 構造器 LinkedHashMapCacheBuilder builder = LinkedHashMapCacheBuilder.createLinkedHashMapCacheBuilder(); // 解析相關配置至 LinkedHashMapCacheBuilder 的 CacheConfig 中 parseGeneralConfig(builder, ct); return builder; } public static class LinkedHashMapCondition extends JetCacheCondition { // 配置了緩存類型爲 linkedhashmap 當前類纔會被注入 Spring 容器 public LinkedHashMapCondition() { super("linkedhashmap"); } } }
這裏咱們注意到@Conditional
註解,這個註解的做用是:知足SpringBootCondition
條件這個Bean纔會被Spring容器管理
他的條件是LinkedHashMapCondition
,繼承了JetCacheCondition
,也就是說配置文件中配置了緩存類型爲linkedhashmap
時這個類纔會被Spring容器管理,纔會完成LinkedHashMapCacheBuilder構造器的初始化
JetCacheCondition
邏輯並不複雜,可自行查看
com.alicp.jetcache.autoconfigure.CaffeineAutoConfiguration
繼承了EmbeddedCacheAutoInit
,實現了initCache
方法,先經過CaffeineCacheBuilder建立一個默認實現類,而後解析相關配置至構造器中完成初始化,代碼以下:
@Component @Conditional(CaffeineAutoConfiguration.CaffeineCondition.class) public class CaffeineAutoConfiguration extends EmbeddedCacheAutoInit { public CaffeineAutoConfiguration() { super("caffeine"); } @Override protected CacheBuilder initCache(ConfigTree ct, String cacheAreaWithPrefix) { // 建立一個 CaffeineCacheBuilder 構造器 CaffeineCacheBuilder builder = CaffeineCacheBuilder.createCaffeineCacheBuilder(); // 解析相關配置至 CaffeineCacheBuilder 的 CacheConfig 中 parseGeneralConfig(builder, ct); return builder; } public static class CaffeineCondition extends JetCacheCondition { // 配置了緩存類型爲 caffeine 當前類纔會被注入 Spring 容器 public CaffeineCondition() { super("caffeine"); } } }
一樣使用了@Conditional
註解,這個註解的做用是:知足SpringBootCondition
條件這個Bean纔會被Spring容器管理
他的條件是CaffeineCondition
,繼承了JetCacheCondition
,也就是說配置文件中配置了緩存類型爲caffeine
時這個類纔會被Spring容器管理,纔會完成LinkedHashMapCacheBuilder構造器的初始化
com.alicp.jetcache.autoconfigure.ExternalCacheAutoInit
抽象類繼承了AbstractCacheAutoInit
,主要是覆蓋父類的parseGeneralConfig
,解析遠程緩存單有的配置keyPrefix
、valueEncoder
和valueDecoder
,代碼以下:
public abstract class ExternalCacheAutoInit extends AbstractCacheAutoInit { public ExternalCacheAutoInit(String... cacheTypes) { super(cacheTypes); } /** * 設置遠程緩存 CacheBuilder 構造器的相關配置 * * @param builder 構造器 * @param ct 配置信息 */ @Override protected void parseGeneralConfig(CacheBuilder builder, ConfigTree ct) { super.parseGeneralConfig(builder, ct); ExternalCacheBuilder ecb = (ExternalCacheBuilder) builder; // 設置遠程緩存 key 的前綴 ecb.setKeyPrefix(ct.getProperty("keyPrefix")); /* * 根據配置建立緩存數據的編碼函數和解碼函數 */ ecb.setValueEncoder(configProvider.parseValueEncoder(ct.getProperty("valueEncoder", CacheConsts.DEFAULT_SERIAL_POLICY))); ecb.setValueDecoder(configProvider.parseValueDecoder(ct.getProperty("valueDecoder", CacheConsts.DEFAULT_SERIAL_POLICY))); } }
com.alicp.jetcache.autoconfigure.RedisAutoInit
繼承了ExternalCacheAutoInit
,實現initCache
方法,完成了經過Jedis鏈接Redis的初始化操做,主要代碼以下:
@Configuration @Conditional(RedisAutoConfiguration.RedisCondition.class) public class RedisAutoConfiguration { public static final String AUTO_INIT_BEAN_NAME = "redisAutoInit"; @Bean(name = AUTO_INIT_BEAN_NAME) public RedisAutoInit redisAutoInit() { return new RedisAutoInit(); } public static class RedisCondition extends JetCacheCondition { // 配置了緩存類型爲 redis 當前類纔會被注入 Spring 容器 public RedisCondition() { super("redis"); } } public static class RedisAutoInit extends ExternalCacheAutoInit { public RedisAutoInit() { // 設置緩存類型 super("redis"); } @Autowired private AutoConfigureBeans autoConfigureBeans; @Override protected CacheBuilder initCache(ConfigTree ct, String cacheAreaWithPrefix) { Pool jedisPool = parsePool(ct); Pool[] slavesPool = null; int[] slavesPoolWeights = null; // 是否只從 Redis 的從節點讀取數據 boolean readFromSlave = Boolean.parseBoolean(ct.getProperty("readFromSlave", "False")); // 獲取從節點的配置信息 ConfigTree slaves = ct.subTree("slaves."); Set<String> slaveNames = slaves.directChildrenKeys(); // 依次建立每一個從節點的鏈接池 if (slaveNames.size() > 0) { slavesPool = new Pool[slaveNames.size()]; slavesPoolWeights = new int[slaveNames.size()]; int i = 0; for (String slaveName: slaveNames) { ConfigTree slaveConfig = slaves.subTree(slaveName + "."); slavesPool[i] = parsePool(slaveConfig); slavesPoolWeights[i] = Integer.parseInt(slaveConfig.getProperty("weight","100")); i++; } } // 建立一個 RedisCacheBuilder 構造器 ExternalCacheBuilder externalCacheBuilder = RedisCacheBuilder.createRedisCacheBuilder() .jedisPool(jedisPool) .readFromSlave(readFromSlave) .jedisSlavePools(slavesPool) .slaveReadWeights(slavesPoolWeights); // 解析相關配置至 RedisCacheBuilder 的 CacheConfig 中 parseGeneralConfig(externalCacheBuilder, ct); // eg: "jedisPool.remote.default" autoConfigureBeans.getCustomContainer().put("jedisPool." + cacheAreaWithPrefix, jedisPool); return externalCacheBuilder; } /** * 建立 Redis 鏈接池 * * @param ct 配置信息 * @return 鏈接池 */ private Pool<Jedis> parsePool(ConfigTree ct) { // 建立鏈接池配置對象 GenericObjectPoolConfig poolConfig = parsePoolConfig(ct); String host = ct.getProperty("host", (String) null); int port = Integer.parseInt(ct.getProperty("port", "0")); int timeout = Integer.parseInt(ct.getProperty("timeout", String.valueOf(Protocol.DEFAULT_TIMEOUT))); String password = ct.getProperty("password", (String) null); int database = Integer.parseInt(ct.getProperty("database", String.valueOf(Protocol.DEFAULT_DATABASE))); String clientName = ct.getProperty("clientName", (String) null); boolean ssl = Boolean.parseBoolean(ct.getProperty("ssl", "false")); String masterName = ct.getProperty("masterName", (String) null); String sentinels = ct.getProperty("sentinels", (String) null);//ip1:port,ip2:port Pool<Jedis> jedisPool; if (sentinels == null) { Objects.requireNonNull(host, "host/port or sentinels/masterName is required"); if (port == 0) { throw new IllegalStateException("host/port or sentinels/masterName is required"); } // 建立一個 Jedis 鏈接池 jedisPool = new JedisPool(poolConfig, host, port, timeout, password, database, clientName, ssl); } else { Objects.requireNonNull(masterName, "host/port or sentinels/masterName is required"); String[] strings = sentinels.split(","); HashSet<String> sentinelsSet = new HashSet<>(); for (String s : strings) { if (s != null && !s.trim().equals("")) { sentinelsSet.add(s.trim()); } } // 建立一個 Jedis Sentine 鏈接池 jedisPool = new JedisSentinelPool(masterName, sentinelsSet, poolConfig, timeout, password, database, clientName); } return jedisPool; } } }
com.alicp.jetcache.autoconfigure.RedisAutoInit
是com.alicp.jetcache.autoconfigure.RedisAutoConfiguration
內部的靜態類,在RedisAutoConfiguration內經過redisAutoInit()
方法定義RedisAutoInit做爲Spring Bean
一樣RedisAutoConfiguration使用了@Conditional
註解,知足SpringBootCondition
條件這個Bean纔會被Spring容器管理,內部的RedisAutoInit也不會被管理,也就是說配置文件中配置了緩存類型爲redis
時RedisLettuceAutoInit纔會被Spring容器管理,纔會完成RedisLettuceCacheBuilder構造器的初始化
實現了initCache
方法
AutoConfigureBeans
中com.alicp.jetcache.autoconfigure.RedisLettuceAutoInit
繼承了ExternalCacheAutoInit
,實現initCache
方法,完成了經過Lettuce鏈接Redis的初始化操做,主要代碼以下:
@Configuration @Conditional(RedisLettuceAutoConfiguration.RedisLettuceCondition.class) public class RedisLettuceAutoConfiguration { public static final String AUTO_INIT_BEAN_NAME = "redisLettuceAutoInit"; /** * 注入 spring 容器的條件 */ public static class RedisLettuceCondition extends JetCacheCondition { // 配置了緩存類型爲 redis.lettuce 當前類纔會被注入 Spring 容器 public RedisLettuceCondition() { super("redis.lettuce"); } } @Bean(name = {AUTO_INIT_BEAN_NAME}) public RedisLettuceAutoInit redisLettuceAutoInit() { return new RedisLettuceAutoInit(); } public static class RedisLettuceAutoInit extends ExternalCacheAutoInit { public RedisLettuceAutoInit() { // 設置緩存類型 super("redis.lettuce"); } /** * 初始化 RedisLettuceCacheBuilder 構造器 * * @param ct 配置信息 * @param cacheAreaWithPrefix 配置前綴 * @return 構造器 */ @Override protected CacheBuilder initCache(ConfigTree ct, String cacheAreaWithPrefix) { Map<String, Object> map = ct.subTree("uri"/*there is no dot*/).getProperties(); // 數據節點偏好設置 String readFromStr = ct.getProperty("readFrom"); // 集羣模式 String mode = ct.getProperty("mode"); // 異步獲取結果的超時時間,默認1s long asyncResultTimeoutInMillis = Long.parseLong( ct.getProperty("asyncResultTimeoutInMillis", Long.toString(CacheConsts.ASYNC_RESULT_TIMEOUT.toMillis()))); ReadFrom readFrom = null; if (readFromStr != null) { /* * MASTER:只從Master節點中讀取。 * MASTER_PREFERRED:優先從Master節點中讀取。 * SLAVE_PREFERRED:優先從Slave節點中讀取。 * SLAVE:只從Slave節點中讀取。 * NEAREST:使用最近一次鏈接的Redis實例讀取。 */ readFrom = ReadFrom.valueOf(readFromStr.trim()); } AbstractRedisClient client; StatefulConnection connection = null; if (map == null || map.size() == 0) { throw new CacheConfigException("lettuce uri is required"); } else { // 建立對應的 RedisURI List<RedisURI> uriList = map.values().stream().map((k) -> RedisURI.create(URI.create(k.toString()))) .collect(Collectors.toList()); if (uriList.size() == 1) { // 只有一個 URI,集羣模式只給一個域名怎麼辦 TODO 疑問?? RedisURI uri = uriList.get(0); if (readFrom == null) { // 建立一個 Redis 客戶端 client = RedisClient.create(uri); // 設置失去鏈接時的行爲,拒絕命令,默認爲 DEFAULT ((RedisClient) client).setOptions(ClientOptions.builder(). disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS).build()); } else { // 建立一個 Redis 客戶端 client = RedisClient.create(); ((RedisClient) client).setOptions(ClientOptions.builder(). disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS).build()); // 建立一個安全鏈接並設置數據節點偏好 StatefulRedisMasterSlaveConnection c = MasterSlave.connect( (RedisClient) client, new JetCacheCodec(), uri); c.setReadFrom(readFrom); connection = c; } } else { // 多個 URI,集羣模式 if (mode != null && mode.equalsIgnoreCase("MasterSlave")) { client = RedisClient.create(); ((RedisClient) client).setOptions(ClientOptions.builder(). disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS).build()); StatefulRedisMasterSlaveConnection c = MasterSlave.connect( (RedisClient) client, new JetCacheCodec(), uriList); if (readFrom != null) { c.setReadFrom(readFrom); } connection = c; } else { // 建立一個 Redis 客戶端 client = RedisClusterClient.create(uriList); ((RedisClusterClient) client).setOptions(ClusterClientOptions.builder(). disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS).build()); if (readFrom != null) { StatefulRedisClusterConnection c = ((RedisClusterClient) client).connect(new JetCacheCodec()); c.setReadFrom(readFrom); connection = c; } } } } // 建立一個 RedisLettuceCacheBuilder 構造器 ExternalCacheBuilder externalCacheBuilder = RedisLettuceCacheBuilder.createRedisLettuceCacheBuilder() .connection(connection) .redisClient(client) .asyncResultTimeoutInMillis(asyncResultTimeoutInMillis); // 解析相關配置至 RedisLettuceCacheBuilder 的 CacheConfig 中 parseGeneralConfig(externalCacheBuilder, ct); // eg: "remote.default.client" autoConfigureBeans.getCustomContainer().put(cacheAreaWithPrefix + ".client", client); // 開始將 Redis 客戶端和安全鏈接保存至 LettuceConnectionManager 管理器中 LettuceConnectionManager m = LettuceConnectionManager.defaultManager(); // 初始化 Lettuce 鏈接 Redis m.init(client, connection); // 初始化 Redis 鏈接的相關信息保存至 LettuceObjects 中,並將相關信息保存至 AutoConfigureBeans.customContainer autoConfigureBeans.getCustomContainer().put(cacheAreaWithPrefix + ".connection", m.connection(client)); autoConfigureBeans.getCustomContainer().put(cacheAreaWithPrefix + ".commands", m.commands(client)); autoConfigureBeans.getCustomContainer().put(cacheAreaWithPrefix + ".asyncCommands", m.asyncCommands(client)); autoConfigureBeans.getCustomContainer().put(cacheAreaWithPrefix + ".reactiveCommands", m.reactiveCommands(client)); return externalCacheBuilder; } } }
com.alicp.jetcache.autoconfigure.RedisLettuceAutoInit
是com.alicp.jetcache.autoconfigure.RedisLettuceAutoConfiguration
內部的靜態類,在RedisLettuceAutoConfiguration內經過redisLettuceAutoInit()
方法定義RedisLettuceAutoInit做爲Spring Bean
一樣RedisLettuceAutoConfiguration使用了@Conditional
註解,知足SpringBootCondition
條件這個Bean纔會被Spring容器管理,內部的RedisLettuceAutoInit也不會被管理,也就是說配置文件中配置了緩存類型爲redis.lettuce
時RedisLettuceAutoInit纔會被Spring容器管理,纔會完成RedisLettuceCacheBuilder構造器的初始化
實現了initCache
方法
LettuceConnectionManager
管理器,將經過Lettuce建立Redis客戶端和與Redis的鏈接保存AutoConfigureBeans
中上面的初始化構造器的類須要被Spring容器管理,就需被掃描到,咱們通常會設置掃描路徑,可是別人引入JetCache確定是做爲其餘包不可以被掃描到的,這些Bean也就不會被Spring管理,這裏咱們查看jetcache-autoconfigure
模塊下src/main/resources/META-INF/spring.factories
文件,內容以下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.alicp.jetcache.autoconfigure.JetCacheAutoConfiguration
這應該是一種SPI
機制,這樣這個項目之外的JetCache包裏面的com.alicp.jetcache.autoconfigure.JetCacheAutoConfiguration
就會被Spring容器掃描到,咱們來看看他的代碼:
/** * 該 Bean 將會被 Spring 容器注入,依次注入下面幾個 Bean * SpringConfigProvider -> AutoConfigureBeans -> BeanDependencyManager(爲 GlobalCacheConfig 添加 CacheAutoInit 依賴) -> GlobalCacheConfig * 由此會完成初始化配置操做,緩存實例構造器 CacheBuilder 也會被注入容器 * * Created on 2016/11/17. * * @author <a href="mailto:areyouok@gmail.com">huangli</a> */ @Configuration @ConditionalOnClass(GlobalCacheConfig.class) @ConditionalOnMissingBean(GlobalCacheConfig.class) @EnableConfigurationProperties(JetCacheProperties.class) @Import({RedisAutoConfiguration.class, CaffeineAutoConfiguration.class, MockRemoteCacheAutoConfiguration.class, LinkedHashMapAutoConfiguration.class, RedisLettuceAutoConfiguration.class, RedisSpringDataAutoConfiguration.class}) public class JetCacheAutoConfiguration { public static final String GLOBAL_CACHE_CONFIG_NAME = "globalCacheConfig"; private SpringConfigProvider _springConfigProvider = new SpringConfigProvider(); private AutoConfigureBeans _autoConfigureBeans = new AutoConfigureBeans(); private GlobalCacheConfig _globalCacheConfig; @Bean @ConditionalOnMissingBean public SpringConfigProvider springConfigProvider() { return _springConfigProvider; } @Bean public AutoConfigureBeans autoConfigureBeans() { return _autoConfigureBeans; } @Bean public static BeanDependencyManager beanDependencyManager(){ return new BeanDependencyManager(); } @Bean(name = GLOBAL_CACHE_CONFIG_NAME) public GlobalCacheConfig globalCacheConfig(SpringConfigProvider configProvider, AutoConfigureBeans autoConfigureBeans, JetCacheProperties props) { if (_globalCacheConfig != null) { return _globalCacheConfig; } _globalCacheConfig = new GlobalCacheConfig(); _globalCacheConfig.setHiddenPackages(props.getHiddenPackages()); _globalCacheConfig.setStatIntervalMinutes(props.getStatIntervalMinutes()); _globalCacheConfig.setAreaInCacheName(props.isAreaInCacheName()); _globalCacheConfig.setPenetrationProtect(props.isPenetrationProtect()); _globalCacheConfig.setEnableMethodCache(props.isEnableMethodCache()); _globalCacheConfig.setLocalCacheBuilders(autoConfigureBeans.getLocalCacheBuilders()); _globalCacheConfig.setRemoteCacheBuilders(autoConfigureBeans.getRemoteCacheBuilders()); return _globalCacheConfig; } }
能夠看到經過@Import
註解,初始化構造器的那些類會被加入到Spring容器,加上@Condotional
註解,只有咱們配置過的緩存類型的構造器纔會被加入,而後保存至AutoConfigureBeans對象中
注意到這裏咱們注入的是SpringConfigProvider
對象,加上@ConditionalOnMissingBean
註解,沒法再次註冊該對象至Spring容器,相比ConfigProvider
對象,它的區別是設置了EncoderParser爲DefaultSpringEncoderParser,設置了KeyConvertorParser爲DefaultSpringKeyConvertorParser,目的是支持兩個解析器可以解析自定義bean
在BeanDependencyManager
中能夠看到它是一個BeanFactoryPostProcessor
,用於BeanFactory容器初始後執行操做,目的是往JetCacheAutoConfiguration的BeanDefinition的依賴中添加幾個AbstractCacheAutoInit類型的beanName,保證幾個CacheBuilder構造器已經初始化
globalCacheConfig
方法中設置全局的相關配置並添加已經初始化的CacheBuilder構造器,而後返回GlobalCacheConfig讓Spring容器管理,這樣一來就完成了JetCache的解析配置並初始化的功能
構造器的做用就是根據配置構建一個對應類型的緩存實例
CacheBuilder的子類結構以下:
根據類名就能夠知道其做用
CacheBuilder接口只定義了一個buildCache()
方法,用於構建緩存實例,交由不一樣的實現類
AbstractCacheBuilder抽象類實現了buildCache()
方法,主要代碼以下:
public abstract class AbstractCacheBuilder<T extends AbstractCacheBuilder<T>> implements CacheBuilder, Cloneable { /** * 該緩存實例的配置 */ protected CacheConfig config; /** * 建立緩存實例函數 */ private Function<CacheConfig, Cache> buildFunc; public abstract CacheConfig getConfig(); protected T self() { return (T) this; } public T buildFunc(Function<CacheConfig, Cache> buildFunc) { this.buildFunc = buildFunc; return self(); } protected void beforeBuild() { } @Deprecated public final <K, V> Cache<K, V> build() { return buildCache(); } @Override public final <K, V> Cache<K, V> buildCache() { if (buildFunc == null) { throw new CacheConfigException("no buildFunc"); } beforeBuild(); // 克隆一份配置信息,由於這裏獲取到的是全局配置信息,以防後續被修改 CacheConfig c = getConfig().clone(); // 經過構建函數建立一個緩存實例 Cache<K, V> cache = buildFunc.apply(c); /* * 目前發現 c.getLoader() 都是 null,後續都會把 cache 封裝成 CacheHandlerRefreshCache * TODO 疑問???? */ if (c.getLoader() != null) { if (c.getRefreshPolicy() == null) { cache = new LoadingCache<>(cache); } else { cache = new RefreshCache<>(cache); } } return cache; } @Override public Object clone() { AbstractCacheBuilder copy = null; try { copy = (AbstractCacheBuilder) super.clone(); copy.config = getConfig().clone(); return copy; } catch (CloneNotSupportedException e) { throw new CacheException(e); } } }
實現了java.lang.Cloneable
的clone方法,支持克隆該對象,由於每一個緩存實例的配置不必定相同,這個構造器中保存的是全局的一些配置,因此須要克隆一個構造器出來爲每一個緩存實例設置其本身的配置而不影響這個最初始的構造器
定義CacheConfig對象存放緩存配置,構建緩存實例須要根據這些配置
定義的buildFunc
函數用於構建緩存實例,咱們在初始化構造器中能夠看到,不一樣的構造器設置的該函數都是new一個緩存實例並傳入配置信息,例如:
// 設置構建 CaffeineCache 緩存實例的函數 buildFunc((c) -> new CaffeineCache((EmbeddedCacheConfig) c)); // 進入CaffeineCache的構造器你就能夠看到會根據配置完成緩存實例的初始化
不一樣類型的構造器區別在於CacheConfig類型不一樣,由於遠程和本地的配置是有所區別的,還有就是設置的buildFunc
函數不一樣,由於須要構建不一樣的緩存實例,和上面的例子差很少,都是new一個緩存實例並傳入配置信息,這裏就不一一講述了
主要查看jetcache-anno子模塊,提供AOP功能
JetCache能夠經過@EnableMethodCache和@EnableCreateCacheAnnotation註解完成AOP的初始化工做,咱們在Spring Boot工程中的啓動類上面添加這兩個註解便可啓用JetCache緩存。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import({CommonConfiguration.class, ConfigSelector.class}) public @interface EnableMethodCache { boolean proxyTargetClass() default false; AdviceMode mode() default AdviceMode.PROXY; int order() default Ordered.LOWEST_PRECEDENCE; String[] basePackages(); }
註解的相關配置在上面的'如何使用'中已經講過了,這裏咱們關注@Import
註解中的CommonConfiguration
和ConfigSelector
兩個類,將會被Spring容器管理
com.alicp.jetcache.anno.config.CommonConfiguration
上面有@Configuration註解,因此會被做爲一個Spring Bean,裏面定義了一個Bean爲ConfigMap
,因此這個Bean也會被Spring容器管理,com.alicp.jetcache.anno.support.ConfigMap
中保存方法與緩存註解配置信息的映射關係
com.alicp.jetcache.anno.config.ConfigSelector
繼承了AdviceModeImportSelector,經過@Import
註解他的selectImports
方法會被調用,根據不一樣的AdviceMode導入不一樣的配置類,能夠看到會返回一個JetCacheProxyConfiguration類名稱,那麼它也會被注入
com.alicp.jetcache.anno.config.JetCacheProxyConfiguration
是配置AOP的配置類,代碼以下:
@Configuration public class JetCacheProxyConfiguration implements ImportAware, ApplicationContextAware { protected AnnotationAttributes enableMethodCache; private ApplicationContext applicationContext; @Override public void setImportMetadata(AnnotationMetadata importMetadata) { // 獲取 @EnableMethodCache 註解信息 this.enableMethodCache = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableMethodCache.class.getName(), false)); if (this.enableMethodCache == null) { throw new IllegalArgumentException( "@EnableMethodCache is not present on importing class " + importMetadata.getClassName()); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Bean(name = CacheAdvisor.CACHE_ADVISOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public CacheAdvisor jetcacheAdvisor(JetCacheInterceptor jetCacheInterceptor) { CacheAdvisor advisor = new CacheAdvisor(); // bean的名稱:jetcache2.internalCacheAdvisor advisor.setAdviceBeanName(CacheAdvisor.CACHE_ADVISOR_BEAN_NAME); // 設置緩存攔截器爲 JetCacheInterceptor advisor.setAdvice(jetCacheInterceptor); // 設置須要掃描的包 advisor.setBasePackages(this.enableMethodCache.getStringArray("basePackages")); // 設置優先級,默認 Integer 的最大值,最低優先級 advisor.setOrder(this.enableMethodCache.<Integer>getNumber("order")); return advisor; } /** * 注入一個 JetCacheInterceptor 攔截器,設置爲框架內部的角色 * * @return JetCacheInterceptor */ @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public JetCacheInterceptor jetCacheInterceptor() { return new JetCacheInterceptor(); } }
由於JetCacheProxyConfiguration是經過@Import
註解注入的而且實現了ImportAware
接口,當被注入Bean的時候會先調用其setImportMetadata
方法(這裏好像必須添加@Configuration註解,否則沒法被Spring識別出來)獲取到@EnableMethodCache
註解的元信息
其中定義了兩個Bean:
com.alicp.jetcache.anno.aop.JetCacheInterceptor
:實現了aop中的MethodInterceptor方法攔截器,可用於aop攔截方法後執行相關處理
com.alicp.jetcache.anno.aop.CacheAdvisor
:
繼承了org.springframework.aop.support.AbstractBeanFactoryPointcutAdvisor
,將會做爲一個AOP切面
設置了通知advice爲JetCacheInterceptor,也就是說被攔截的方法都會進入JetCacheInterceptor,JetCacheInterceptor就做爲JetCache的入口了
根據註解設置了須要掃描的包路徑以及優先級,默認是最低優先級
CacheAdvisor實現了org.springframework.aopPointcutAdvisor
接口的getPointcut()
方法,設置這個切面的切入點爲com.alicp.jetcache.anno.aop.CachePointcut
從CachePointcut做爲切入點
實現了org.springframework.aop.ClassFilter
接口,用於判斷哪些類須要被攔截
實現了org.springframework.aop.MethodMatcher
接口,用於判斷哪些類中的哪些方法會被攔截
在判斷方法是否須要進入JetCache的JetCacheInterceptor過程當中,會解析方法上面的JetCache相關緩存註解,將配置信息封裝com.alicp.jetcache.anno.methodCacheInvokeConfig
對象中,並把它保存至com.alicp.jetcache.anno.support.ConfigMap
對象中
總結:@EnableMethodCache註解主要就是生成一個AOP切面用於攔截帶有緩存註解的方法
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import({CommonConfiguration.class, CreateCacheAnnotationBeanPostProcessor.class}) public @interface EnableCreateCacheAnnotation { }
相比@EnableMethodCache註解,沒有相關屬性,一樣會導入CommonConfiguration類
不一樣的是將導入com.alicp.jetcache.anno.field.CreateCacheAnnotationBeanPostProcessor
類,它繼承了org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor
做爲一個BeanPostProcessor,用於在Spring初始化bean的時候作一些操做
從代碼中能夠看到他的做用是:若是這個bean內部存在添加了帶有@CreateCache
註解的字段(沒有添加static),會將這個字段做爲須要注入的對象,解析成 com.alicp.jetcache.anno.field.LazyInitCache
緩存實例
LazyInitCache的主要代碼以下:
class LazyInitCache implements ProxyCache { /** * 是否初始化,用於懶加載 */ private boolean inited; /** * 緩存實例 */ private Cache cache; /** * 所處上下文 */ private ConfigurableListableBeanFactory beanFactory; /** * CreateCache 註解元信息 */ private CreateCache ann; /** * 字段 */ private Field field; /** * 刷新策略 */ private RefreshPolicy refreshPolicy; /** * 保護策略 */ private PenetrationProtectConfig protectConfig; public LazyInitCache(ConfigurableListableBeanFactory beanFactory, CreateCache ann, Field field) { this.beanFactory = beanFactory; this.ann = ann; this.field = field; CacheRefresh cr = field.getAnnotation(CacheRefresh.class); if (cr != null) { refreshPolicy = CacheConfigUtil.parseRefreshPolicy(cr); } CachePenetrationProtect penetrateProtect = field.getAnnotation(CachePenetrationProtect.class); if (penetrateProtect != null) { protectConfig = CacheConfigUtil.parsePenetrationProtectConfig(penetrateProtect); } } private void checkInit() { if (!inited) { synchronized (this) { if (!inited) { init(); inited = true; } } } } /** * 獲取緩存實例,不存在則新建 * * @return 緩存實例 */ @Override public Cache getTargetCache() { checkInit(); return cache; } private void init() { if (inited) { throw new IllegalStateException(); } // 從 spring 的容器中獲取全局緩存配置 GlobalCacheConfig 對象 GlobalCacheConfig globalCacheConfig = beanFactory.getBean(GlobalCacheConfig.class); ConfigProvider configProvider = beanFactory.getBean(ConfigProvider.class); // 將註解信息封裝到 CachedAnnoConfig 對象中 CachedAnnoConfig cac = new CachedAnnoConfig(); cac.setArea(ann.area()); cac.setName(ann.name()); cac.setTimeUnit(ann.timeUnit()); cac.setExpire(ann.expire()); cac.setLocalExpire(ann.localExpire()); cac.setCacheType(ann.cacheType()); cac.setLocalLimit(ann.localLimit()); cac.setSerialPolicy(ann.serialPolicy()); cac.setKeyConvertor(ann.keyConvertor()); cac.setRefreshPolicy(refreshPolicy); cac.setPenetrationProtectConfig(protectConfig); String cacheName = cac.getName(); if (CacheConsts.isUndefined(cacheName)) { String[] hiddenPackages = globalCacheConfig.getHiddenPackages(); CacheNameGenerator g = configProvider.createCacheNameGenerator(hiddenPackages); cacheName = g.generateCacheName(field); } // 從緩存實例管理器中獲取或者建立對應的緩存實例 cache = configProvider.getCacheContext().__createOrGetCache(cac, ann.area(), cacheName); } }
能夠看到經過@CreateCache
建立的緩存實例也能夠添加@CacheRefresh
和@CachePenetrationProtect
註解
在AbstractCache抽象類的computeIfAbsentImpl方法中咱們有講到,若是緩存實例是ProxyCache類型,則會先調用其getTargetCache()
方法獲取緩存實例對象,因此LazyInitCache在第一次訪問的時候才進行初始化,並根據緩存註解配置信息建立(存在則直接獲取)一個緩存實例
總結:@EnableCreateCacheAnnotation註解主要是支持@CreateCache可以建立緩存實例
經過@EnableMethodCache
和@EnableCreateCacheAnnotation
兩個註解,加上前面的解析配置過程
,已經完成的JetCache的解析與初始化過程,那麼接下來咱們來看看JetCache如何處理被攔截的方法。
從com.alicp.jetcache.anno.aop.CachePointcut
切入點判斷方法是否須要攔截的邏輯:
方法所在的類對象是否匹配,除去以"java"、"org.springframework"開頭和包含"$$EnhancerBySpringCGLIB$$"、"$$FastClassBySpringCGLIB$$"的類,該類是否在咱們經過@EnableMethodCache
註解配置的basePackages中
從ConfigMap
獲取方法對應的CacheInvokeConfig
對象,也就是獲取緩存配置信息
com.alicp.jetcache.anno.support.ConfigProvide
是一個配置提供者對象,包含了JetCache的全局配置、緩存實例管理器、緩存value轉換器、緩存key轉換器、上下文和監控指標相關信息,主要代碼以下:
public class ConfigProvider extends AbstractLifecycle { /** * 緩存的全局配置 */ @Resource protected GlobalCacheConfig globalCacheConfig; /** * 緩存實例管理器 */ protected SimpleCacheManager cacheManager; /** * 根據不一樣類型生成緩存數據轉換函數的轉換器 */ protected EncoderParser encoderParser; /** * 根據不一樣類型生成緩存 Key 轉換函數的轉換器 */ protected KeyConvertorParser keyConvertorParser; /** * 緩存監控指標管理器 */ protected CacheMonitorManager cacheMonitorManager; /** * 打印緩存各項指標的函數 */ private Consumer<StatInfo> metricsCallback = new StatInfoLogger(false); /** * 緩存更新事件(REMOVE OR PUT)消息接收者,無實現類 * 咱們能夠本身實現 CacheMessagePublisher 用於統計一些緩存的命中信息 */ private CacheMessagePublisher cacheMessagePublisher; /** * 默認的緩存監控指標管理器 */ private CacheMonitorManager defaultCacheMonitorManager = new DefaultCacheMonitorManager(); /** * 緩存上下文 */ private CacheContext cacheContext; public ConfigProvider() { cacheManager = SimpleCacheManager.defaultManager; encoderParser = new DefaultEncoderParser(); keyConvertorParser = new DefaultKeyConvertorParser(); cacheMonitorManager = defaultCacheMonitorManager; } @Override public void doInit() { // 啓動緩存指標監控器,週期性打印各項指標 initDefaultCacheMonitorInstaller(); // 初始化緩存上下文 cacheContext = newContext(); } protected void initDefaultCacheMonitorInstaller() { if (cacheMonitorManager == defaultCacheMonitorManager) { DefaultCacheMonitorManager installer = (DefaultCacheMonitorManager) cacheMonitorManager; installer.setGlobalCacheConfig(globalCacheConfig); installer.setMetricsCallback(metricsCallback); if (cacheMessagePublisher != null) { installer.setCacheMessagePublisher(cacheMessagePublisher); } // 啓動緩存指標監控器 installer.init(); } } @Override public void doShutdown() { shutdownDefaultCacheMonitorInstaller(); cacheManager.rebuild(); } protected void shutdownDefaultCacheMonitorInstaller() { if (cacheMonitorManager == defaultCacheMonitorManager) { ((DefaultCacheMonitorManager) cacheMonitorManager).shutdown(); } } /** * 根據編碼類型經過緩存value轉換器生成編碼函數 * * @param valueEncoder 編碼類型 * @return 編碼函數 */ public Function<Object, byte[]> parseValueEncoder(String valueEncoder) { return encoderParser.parseEncoder(valueEncoder); } /** * 根據解碼類型經過緩存value轉換器生成解碼函數 * * @param valueDecoder 解碼類型 * @return 解碼函數 */ public Function<byte[], Object> parseValueDecoder(String valueDecoder) { return encoderParser.parseDecoder(valueDecoder); } /** * 根據轉換類型經過緩存key轉換器生成轉換函數 * * @param convertor 轉換類型 * @return 轉換函數 */ public Function<Object, Object> parseKeyConvertor(String convertor) { return keyConvertorParser.parseKeyConvertor(convertor); } public CacheNameGenerator createCacheNameGenerator(String[] hiddenPackages) { return new DefaultCacheNameGenerator(hiddenPackages); } protected CacheContext newContext() { return new CacheContext(this, globalCacheConfig); } }
繼承了com.alicp.jetcache.anno.support.AbstractLifecycle
,查看其代碼能夠看到有兩個方法,分別爲init()
初始化方法和shutdown()
銷燬方法,由於分別添加了@PostConstruct
註解和@PreDestroy
註解,因此在Spring初始化時會調用init(),在Spring容器銷燬時會調用shutdown()方法,內部分別調用doInit()和doShutdown(),這兩個方法交由子類實現
在doInit()方法中先啓動緩存指標監控器,用於週期性打印各項緩存指標,而後初始化CacheContext緩存上下文,SpringConfigProvider返回的是SpringConfigContext
在doShutdown()方法中關閉緩存指標監控器,清除緩存實例
com.alicp.jetcache.anno.support.CacheContext
緩存上下文主要爲每個被攔截的請求建立緩存上下文,構建對應的緩存實例,主要代碼以下:
public class CacheContext { private static Logger logger = LoggerFactory.getLogger(CacheContext.class); private static ThreadLocal<CacheThreadLocal> cacheThreadLocal = new ThreadLocal<CacheThreadLocal>() { @Override protected CacheThreadLocal initialValue() { return new CacheThreadLocal(); } }; /** * JetCache 緩存的管理器(包含不少信息) */ private ConfigProvider configProvider; /** * 緩存的全局配置 */ private GlobalCacheConfig globalCacheConfig; /** * 緩存實例管理器 */ protected SimpleCacheManager cacheManager; public CacheContext(ConfigProvider configProvider, GlobalCacheConfig globalCacheConfig) { this.globalCacheConfig = globalCacheConfig; this.configProvider = configProvider; cacheManager = configProvider.getCacheManager(); } public CacheInvokeContext createCacheInvokeContext(ConfigMap configMap) { // 建立一個本次調用的上下文 CacheInvokeContext c = newCacheInvokeContext(); // 添加一個函數,後續用於獲取緩存實例 // 根據註解配置信息獲取緩存實例對象,不存在則建立並設置到緩存註解配置類中 c.setCacheFunction((invokeContext, cacheAnnoConfig) -> { Cache cache = cacheAnnoConfig.getCache(); if (cache == null) { if (cacheAnnoConfig instanceof CachedAnnoConfig) { // 緩存註解 // 根據配置建立一個緩存實例對象,經過 CacheBuilder cache = createCacheByCachedConfig((CachedAnnoConfig) cacheAnnoConfig, invokeContext); } else if ((cacheAnnoConfig instanceof CacheInvalidateAnnoConfig) || (cacheAnnoConfig instanceof CacheUpdateAnnoConfig)) { // 更新/使失效緩存註解 CacheInvokeConfig cacheDefineConfig = configMap.getByCacheName(cacheAnnoConfig.getArea(), cacheAnnoConfig.getName()); if (cacheDefineConfig == null) { String message = "can't find @Cached definition with area=" + cacheAnnoConfig.getArea() + " name=" + cacheAnnoConfig.getName() + ", specified in " + cacheAnnoConfig.getDefineMethod(); CacheConfigException e = new CacheConfigException(message); logger.error("Cache operation aborted because can't find @Cached definition", e); return null; } cache = createCacheByCachedConfig(cacheDefineConfig.getCachedAnnoConfig(), invokeContext); } cacheAnnoConfig.setCache(cache); } return cache; }); return c; } private Cache createCacheByCachedConfig(CachedAnnoConfig ac, CacheInvokeContext invokeContext) { // 緩存區域 String area = ac.getArea(); // 緩存實例名稱 String cacheName = ac.getName(); if (CacheConsts.isUndefined(cacheName)) { // 沒有定義緩存實例名稱 // 生成緩存實例名稱:類名+方法名+(參數類型) cacheName = configProvider.createCacheNameGenerator(invokeContext.getHiddenPackages()) .generateCacheName(invokeContext.getMethod(), invokeContext.getTargetObject()); } // 建立緩存實例對象 Cache cache = __createOrGetCache(ac, area, cacheName); return cache; } @Deprecated public <K, V> Cache<K, V> getCache(String cacheName) { return getCache(CacheConsts.DEFAULT_AREA, cacheName); } @Deprecated public <K, V> Cache<K, V> getCache(String area, String cacheName) { Cache cache = cacheManager.getCacheWithoutCreate(area, cacheName); return cache; } public Cache __createOrGetCache(CachedAnnoConfig cachedAnnoConfig, String area, String cacheName) { // 緩存名稱拼接 String fullCacheName = area + "_" + cacheName; // 從緩存實例管理器中根據緩存區域和緩存實例名稱獲取緩存實例 Cache cache = cacheManager.getCacheWithoutCreate(area, cacheName); if (cache == null) { synchronized (this) { // 加鎖 // 再次確認 cache = cacheManager.getCacheWithoutCreate(area, cacheName); if (cache == null) { /* * 緩存區域的名稱是否做爲緩存 key 名稱前綴,默認爲 true ,我通常設置爲 false */ if (globalCacheConfig.isAreaInCacheName()) { // for compatible reason, if we use default configuration, the prefix should same to that version <=2.4.3 cache = buildCache(cachedAnnoConfig, area, fullCacheName); } else { // 構建一個緩存實例 cache = buildCache(cachedAnnoConfig, area, cacheName); } cacheManager.putCache(area, cacheName, cache); } } } return cache; } protected Cache buildCache(CachedAnnoConfig cachedAnnoConfig, String area, String cacheName) { Cache cache; if (cachedAnnoConfig.getCacheType() == CacheType.LOCAL) { // 本地緩存 cache = buildLocal(cachedAnnoConfig, area); } else if (cachedAnnoConfig.getCacheType() == CacheType.REMOTE) { // 遠程緩存 cache = buildRemote(cachedAnnoConfig, area, cacheName); } else { // 兩級緩存 // 構建本地緩存實例 Cache local = buildLocal(cachedAnnoConfig, area); // 構建遠程緩存實例 Cache remote = buildRemote(cachedAnnoConfig, area, cacheName); // 兩級緩存時是否單獨設置了本地緩存失效時間 localExpire boolean useExpireOfSubCache = cachedAnnoConfig.getLocalExpire() > 0; // 建立一個兩級緩存CacheBuilder cache = MultiLevelCacheBuilder.createMultiLevelCacheBuilder() .expireAfterWrite(remote.config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS) .addCache(local, remote) .useExpireOfSubCache(useExpireOfSubCache) .cacheNullValue(cachedAnnoConfig.isCacheNullValue()) .buildCache(); } // 設置緩存刷新策略 cache.config().setRefreshPolicy(cachedAnnoConfig.getRefreshPolicy()); // 將 cache 封裝成 CacheHandlerRefreshCache,也就是 RefreshCache 類型 // 後續添加刷新任務時會判斷是否爲 RefreshCache 類型,而後決定是否執行 addOrUpdateRefreshTask 方法,添加刷新任務,沒有刷新策略不會添加 cache = new CacheHandler.CacheHandlerRefreshCache(cache); // 設置緩存未命中時,JVM是否只容許一個線程執行方法,其餘線程等待,全局配置默認爲false cache.config().setCachePenetrationProtect(globalCacheConfig.isPenetrationProtect()); PenetrationProtectConfig protectConfig = cachedAnnoConfig.getPenetrationProtectConfig(); if (protectConfig != null) { // 方法的@CachePenetrationProtect註解 cache.config().setCachePenetrationProtect(protectConfig.isPenetrationProtect()); cache.config().setPenetrationProtectTimeout(protectConfig.getPenetrationProtectTimeout()); } if (configProvider.getCacheMonitorManager() != null) { // 添加監控統計配置 configProvider.getCacheMonitorManager().addMonitors(area, cacheName, cache); } return cache; } protected Cache buildRemote(CachedAnnoConfig cachedAnnoConfig, String area, String cacheName) { // 獲取緩存區域對應的 CacheBuilder 構造器 ExternalCacheBuilder cacheBuilder = (ExternalCacheBuilder) globalCacheConfig.getRemoteCacheBuilders().get(area); if (cacheBuilder == null) { throw new CacheConfigException("no remote cache builder: " + area); } // 克隆一個 CacheBuilder 構造器,由於不一樣緩存實例有不一樣的配置 cacheBuilder = (ExternalCacheBuilder) cacheBuilder.clone(); if (cachedAnnoConfig.getExpire() > 0 ) { // 設置失效時間 cacheBuilder.expireAfterWrite(cachedAnnoConfig.getExpire(), cachedAnnoConfig.getTimeUnit()); } // 設置緩存 key 的前綴 if (cacheBuilder.getConfig().getKeyPrefix() != null) { // 配置文件中配置了 prefix,則設置爲 prefix+cacheName cacheBuilder.setKeyPrefix(cacheBuilder.getConfig().getKeyPrefix() + cacheName); } else { // 設置爲 cacheName cacheBuilder.setKeyPrefix(cacheName); } if (!CacheConsts.isUndefined(cachedAnnoConfig.getKeyConvertor())) { // 若是註解中設置了Key的轉換方式則替換,不然仍是使用全局的 // 設置 key 的轉換器,只支持 FASTJSON cacheBuilder.setKeyConvertor(configProvider.parseKeyConvertor(cachedAnnoConfig.getKeyConvertor())); } if (!CacheConsts.isUndefined(cachedAnnoConfig.getSerialPolicy())) { // 緩存數據保存至遠程須要進行編碼和解碼,因此這裏設置其編碼和解碼方式,KRYO 和 JAVA 可選擇 cacheBuilder.setValueEncoder(configProvider.parseValueEncoder(cachedAnnoConfig.getSerialPolicy())); cacheBuilder.setValueDecoder(configProvider.parseValueDecoder(cachedAnnoConfig.getSerialPolicy())); } // 設置是否緩存 null 值 cacheBuilder.setCacheNullValue(cachedAnnoConfig.isCacheNullValue()); return cacheBuilder.buildCache(); } protected Cache buildLocal(CachedAnnoConfig cachedAnnoConfig, String area) { // 獲取緩存區域對應的 CacheBuilder 構造器 EmbeddedCacheBuilder cacheBuilder = (EmbeddedCacheBuilder) globalCacheConfig.getLocalCacheBuilders().get(area); if (cacheBuilder == null) { throw new CacheConfigException("no local cache builder: " + area); } // 克隆一個 CacheBuilder 構造器,由於不一樣緩存實例有不一樣的配置 cacheBuilder = (EmbeddedCacheBuilder) cacheBuilder.clone(); if (cachedAnnoConfig.getLocalLimit() != CacheConsts.UNDEFINED_INT) { // 本地緩存數量限制 cacheBuilder.setLimit(cachedAnnoConfig.getLocalLimit()); } if (cachedAnnoConfig.getCacheType() == CacheType.BOTH && cachedAnnoConfig.getLocalExpire() > 0) { // 設置本地緩存失效時間,前提是多級緩存,通常和遠程緩存保持一致不設置 cacheBuilder.expireAfterWrite(cachedAnnoConfig.getLocalExpire(), cachedAnnoConfig.getTimeUnit()); } else if (cachedAnnoConfig.getExpire() > 0) { // 設置失效時間 cacheBuilder.expireAfterWrite(cachedAnnoConfig.getExpire(), cachedAnnoConfig.getTimeUnit()); } if (!CacheConsts.isUndefined(cachedAnnoConfig.getKeyConvertor())) { cacheBuilder.setKeyConvertor(configProvider.parseKeyConvertor(cachedAnnoConfig.getKeyConvertor())); } // 設置是否緩存 null 值 cacheBuilder.setCacheNullValue(cachedAnnoConfig.isCacheNullValue()); // 構建一個緩存實例 return cacheBuilder.buildCache(); } protected CacheInvokeContext newCacheInvokeContext() { return new CacheInvokeContext(); } }
createCacheInvokeContext
方法返回一個本次調用的上下文CacheInvokeContext,爲這個上下文設置緩存函數,用於獲取或者構建緩存實例,這個函數在CacheHandler中會被調用,咱們來看看這個函數的處理邏輯:有兩個入參,分別爲本次調用的上下文和緩存註解的配置信息
首先從緩存註解的配置信息中獲取緩存實例,若是不爲null則直接返回,不然調用createCacheByCachedConfig
方法,根據配置經過CacheBuilder構造器建立一個緩存實例對象
createCacheByCachedConfig
方法:
若是沒有定義緩存實例名稱(@Cached註解中的name配置),則生成類名+方法名+(參數類型)
做爲緩存實例名稱
而後調用__createOrGetCache
方法
__createOrGetCache
方法:
經過緩存實例管理器SimpleCacheManager根據緩存區域area和緩存實例名稱cacheName獲取緩存實例對象,若是不爲null則直接返回,判斷緩存實例對象是否爲null爲進行兩次確認,第二次會給當前CacheContext加鎖進行判斷,避免線程不安全
緩存實例對象仍是爲null的話,先判斷緩存區域area是否添加至緩存實例名稱中,是的話"area_cacheName"爲緩存實例名稱,而後調用buildCache
方法建立一個緩存實例對象
buildCache
方法:根據緩存實例類型構建不一樣的緩存實例對象,處理邏輯以下:
LOCAL
則調用buildLocal
方法:1.1. 從GlobalCacheConfig全局配置的localCacheBuilders(保存本地緩存CacheBuilder構造器的集合)中的獲取本地緩存該緩存區域的構造器,在以前講到的'JetCacheAutoConfiguration自動配置'中有說到過,會將初始化好的構造器從AutoConfigureBeans中添加至GlobalCacheConfig中 1.2. 克隆一個 CacheBuilder 構造器,由於不一樣緩存實例有不一樣的配置 1.3. 將緩存註解的配置信息設置到構造器中,有如下配置: - 若是配置了localLimit,則設置本地緩存最大數量limit的值 - 若是CacheType爲BOTH而且配置了localExpire(大於0),則設置有效時間expireAfterWrite的值爲localExpire,不然若是配置的expire大於0,則設置其值爲expire - 若是配置了keyConvertor,則根據該值生成一個轉換函數,沒有配置的話在初始化構造器的時候根據全局配置可能已經生成了一個轉換函數(我通常在全局配置中設置) - 設置是否緩存null值 1.4. 經過調用構造器的buildCache()方法構建一個緩存實例對象,該方法在以前講到的'CacheBuilder構造器'中有分析過
REMOTE
則調用buildRemote
方法:1.1. 從GlobalCacheConfig全局配置的remoteCacheBuilders(保存遠程緩存CacheBuilder構造器的集合)中的獲取遠程緩存該緩存區域的構造器 1.2. 克隆一個 CacheBuilder 構造器,由於不一樣緩存實例有不一樣的配置 1.3. 將緩存註解的配置信息設置到構造器中,有如下配置: - 若是配置了expire,則設置遠程緩存有效時間expireAfterWrite的值 - 若是全局設置遠程緩存的緩存key的前綴keyPrefix,則設置緩存key的前綴爲"keyPrefix+cacheName",不然我爲"cacheName" - 若是配置了keyConvertor,則根據該值生成一個轉換函數,沒有配置的話在初始化構造器的時候根據全局配置可能已經生成了一個轉換函數(我通常在全局配置中設置) - 若是設置了serialPolicy,則根據該值生成編碼和解碼函數,沒有配置的話在初始化構造器的時候根據全局配置可能已經生成了編碼函數和解碼函數(我通常在全局配置中設置) - 設置是否緩存null值 1.4. 經過調用構造器的buildCache()方法構建一個緩存實例對象
BOTH
則調用buildLocal
方法構建本地緩存實例,調用buildRemote
方法構建遠程緩存實例:1.1. 建立一個MultiLevelCacheBuilder構造器 1.2. 設置有效時間爲遠程緩存的有效時間、添加local和remote緩存實例、設置是否單獨配置了本地緩存的失效時間(是否有配置localExpire)、設置是否緩存null值 1.3. 經過調用構造器的buildCache()方法構建一個緩存實例對象
設置刷新策略RefreshPolicy,沒有的話爲null
將緩存實例對象封裝成CacheHandlerRefreshCache對象,用於後續的添加刷新任務,在以前的'AbstractCache抽象類'有講到
設置是否開啓緩存未命中時加載方法的保護模式,全局默認爲false
將緩存實例添加至監控管理器中
被攔截後的處理在com.alicp.jetcache.anno.aop.JetCacheInterceptor
中,代碼以下:
public class JetCacheInterceptor implements MethodInterceptor, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(JetCacheInterceptor.class); /** * 緩存實例註解信息 */ @Autowired private ConfigMap cacheConfigMap; /** * Spring 上下文 */ private ApplicationContext applicationContext; /** * 緩存的全局配置 */ private GlobalCacheConfig globalCacheConfig; /** * JetCache 緩存的管理器(包含不少信息) */ ConfigProvider configProvider; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public Object invoke(final MethodInvocation invocation) throws Throwable { if (configProvider == null) { /** * 這裏會獲取到 SpringConfigProvider 可查看 {@link com.alicp.jetcache.autoconfigure.JetCacheAutoConfiguration} */ configProvider = applicationContext.getBean(ConfigProvider.class); } if (configProvider != null && globalCacheConfig == null) { globalCacheConfig = configProvider.getGlobalCacheConfig(); } if (globalCacheConfig == null || !globalCacheConfig.isEnableMethodCache()) { return invocation.proceed(); } // 獲取被攔截的方法 Method method = invocation.getMethod(); // 獲取被攔截的對象 Object obj = invocation.getThis(); CacheInvokeConfig cac = null; if (obj != null) { // 獲取改方法的Key(方法所在類名+方法名+(參數類型)+方法返回類型+_被攔截的類名) String key = CachePointcut.getKey(method, obj.getClass()); // 獲取該方法的緩存註解信息,在 Pointcut 中已經對註解進行解析並放入 ConfigMap 中 cac = cacheConfigMap.getByMethodInfo(key); } if(logger.isTraceEnabled()){ logger.trace("JetCacheInterceptor invoke. foundJetCacheConfig={}, method={}.{}(), targetClass={}", cac != null, method.getDeclaringClass().getName(), method.getName(), invocation.getThis() == null ? null : invocation.getThis().getClass().getName()); } // 無緩存相關注解配置信息代表無須緩存,直接執行該方法 if (cac == null || cac == CacheInvokeConfig.getNoCacheInvokeConfigInstance()) { return invocation.proceed(); } // 爲本次調用建立一個上下文對象,包含對應的緩存實例 CacheInvokeContext context = configProvider.getCacheContext().createCacheInvokeContext(cacheConfigMap); context.setTargetObject(invocation.getThis()); context.setInvoker(invocation::proceed); context.setMethod(method); context.setArgs(invocation.getArguments()); context.setCacheInvokeConfig(cac); context.setHiddenPackages(globalCacheConfig.getHiddenPackages()); // 繼續往下執行 return CacheHandler.invoke(context); } public void setCacheConfigMap(ConfigMap cacheConfigMap) { this.cacheConfigMap = cacheConfigMap; } }
從ConfigMap
中獲取被攔截的方法對象的緩存配置信息,若是沒有則直接執行該方法,不然繼續往下執行
根據CacheContext
對象(SpringCacheContext,由於在以前講到的'JetCacheAutoConfiguration自動配置'中有說到注入的是SpringConfigProvider對象,在其初始化方法中調用newContext()方法生成SpringCacheContext)調用其createCacheInvokeContext
方法爲本次調用建立一個上下文CacheInvokeContext
,並設置獲取緩存實例函數,具體實現邏輯查看上面講到的CacheContext
設置本次調用上下文的targetObject爲被攔截對象,invoker爲被攔截對象的調用器,method爲被攔截方法,args爲方法入參,cacheInvokeConfig爲緩存配置信息,hiddenPackages爲緩存實例名稱須要截斷的包名
經過CacheHandler的invoke方法繼續往下執行
com.alicp.jetcache.anno.method.CacheHandler
用於JetCache處理被攔截的方法,部分代碼以下:
public class CacheHandler implements InvocationHandler { public static Object invoke(CacheInvokeContext context) throws Throwable { if (context.getCacheInvokeConfig().isEnableCacheContext()) { try { CacheContextSupport._enable(); return doInvoke(context); } finally { CacheContextSupport._disable(); } } else { return doInvoke(context); } } private static Object doInvoke(CacheInvokeContext context) throws Throwable { // 獲取緩存實例配置 CacheInvokeConfig cic = context.getCacheInvokeConfig(); // 獲取註解配置信息 CachedAnnoConfig cachedConfig = cic.getCachedAnnoConfig(); if (cachedConfig != null && (cachedConfig.isEnabled() || CacheContextSupport._isEnabled())) { // 通過緩存中獲取結果 return invokeWithCached(context); } else if (cic.getInvalidateAnnoConfigs() != null || cic.getUpdateAnnoConfig() != null) { // 根據結果刪除或者更新緩存 return invokeWithInvalidateOrUpdate(context); } else { // 執行該方法 return invokeOrigin(context); } } private static Object invokeWithCached(CacheInvokeContext context) throws Throwable { // 獲取本地調用的上下文 CacheInvokeConfig cic = context.getCacheInvokeConfig(); // 獲取註解配置信息 CachedAnnoConfig cac = cic.getCachedAnnoConfig(); // 獲取緩存實例對象(不存在則會建立並設置到 cac 中) // 可在 JetCacheInterceptor 建立本次調用的上下文時,調用 createCacheInvokeContext(cacheConfigMap) 方法中查看詳情 Cache cache = context.getCacheFunction().apply(context, cac); if (cache == null) { logger.error("no cache with name: " + context.getMethod()); // 無緩存實例對象,執行原有方法 return invokeOrigin(context); } // 生成緩存 Key 對象(註解中沒有配置的話就是入參,沒有入參則爲 "_$JETCACHE_NULL_KEY$_" ) Object key = ExpressionUtil.evalKey(context, cic.getCachedAnnoConfig()); if (key == null) { // 生成緩存 Key 失敗則執行原方法,並記錄 CacheLoadEvent 事件 return loadAndCount(context, cache, key); } /* * 根據配置的 condition 來決定是否走緩存 * 緩存註解中沒有配置 condition 表示全部請求都走緩存 * 配置了 condition 表示知足條件的才走緩存 */ if (!ExpressionUtil.evalCondition(context, cic.getCachedAnnoConfig())) { // 不知足 condition 則直接執行原方法,並記錄 CacheLoadEvent 事件 return loadAndCount(context, cache, key); } try { // 建立一個執行原有方法的函數 CacheLoader loader = new CacheLoader() { @Override public Object load(Object k) throws Throwable { Object result = invokeOrigin(context); context.setResult(result); return result; } @Override public boolean vetoCacheUpdate() { // 本次執行原方法後是否須要更新緩存 return !ExpressionUtil.evalPostCondition(context, cic.getCachedAnnoConfig()); } }; // 獲取結果 Object result = cache.computeIfAbsent(key, loader); return result; } catch (CacheInvokeException e) { throw e.getCause(); } } private static Object loadAndCount(CacheInvokeContext context, Cache cache, Object key) throws Throwable { long t = System.currentTimeMillis(); Object v = null; boolean success = false; try { // 調用原有方法 v = invokeOrigin(context); success = true; } finally { t = System.currentTimeMillis() - t; // 發送 CacheLoadEvent 事件 CacheLoadEvent event = new CacheLoadEvent(cache, t, key, v, success); while (cache instanceof ProxyCache) { cache = ((ProxyCache) cache).getTargetCache(); } if (cache instanceof AbstractCache) { ((AbstractCache) cache).notify(event); } } return v; } private static Object invokeOrigin(CacheInvokeContext context) throws Throwable { // 執行被攔截的方法 return context.getInvoker().invoke(); } }
直接查看invokeWithCached
方法:
獲取緩存註解信息
根據本地調用的上下文CacheInvokeContext獲取緩存實例對象(調用其cacheFunction函數),在CacheContext中有講到
若是緩存實例不存在則直接調用invokeOrigin方法,執行被攔截的對象的調用器
根據本次調用的上下文CacheInvokeContext生成緩存key,根據配置的緩存key的SpEL表達式生成,若是沒有配置則返回入參對象,若是沒有對象則返回"_ $JETCACHE_NULL_KEY$_"
根據配置condition表達式判斷是否須要走緩存
建立一個CacheLoader
對象,用於執行被攔截的對象的調用器,也就是加載原有方法
調用緩存實例的computeIfAbsent(key, loader)
方法獲取結果,這個方法的處理過程可查看'緩存API'這一小節
至此結束!!!😄😄😄