場景:咱們對於須要大量計算的場景,但願將結果緩存起來,而後咱們一塊兒來實現一個緩存服務。即對於一個相同的輸入,它的輸出是不變的(也能夠短期不變)。java
實現說明:這裏實現採用GuavaCache+裝飾器模式。spring
首先設計一個緩存服務接口。json
public interface CacheableService<I, O> { /** * 計算服務 * @param i * @return * @throws Exception */ O doService(I i) throws Exception; }
這裏定義了一個緩存服務接口,這裏的key和Hashmap的key同樣,須要覆寫equals和hashcode方法。緩存
public class CacheableServiceWrapper<I , O> implements CacheableService<I, O>, GlobalResource { /** * 日誌 */ private final static Logger LOGGER = LoggerFactory .getLogger(CacheableServiceWrapper.class); /** * 緩存大小 */ private int MAX_CACHE_SIZE = 20; /** * 出現異常的時候重試,默認不重試 */ private boolean retryOnExp = false; /** * 重試次數,默認爲0,即不重試 */ private int retryTimes = 0; /** * 默認30分鐘 */ private long expireTimeWhenAccess = 30 * 60; /** * 緩存 */ private LoadingCache<I, Future<O>> cache = null; private CacheableService<I, O> cacheableService = null; /** * Calculate o. * * @param i the * @return the o * @throws Exception the exception */ public O doService(final I i) throws Exception { Assert.notNull(cacheableService, "請設置好實例"); int currentTimes = 0; while (currentTimes <= retryTimes) { try { Future<O> oFuture = cache.get(i); return oFuture.get(); } catch (Exception e) { if (!retryOnExp) { throw e; } currentTimes++; LoggerUtils.info(LOGGER, "第", currentTimes, "重試,key=", i); } } throw new Exception("任務執行失敗"); } /** * 提交計算任務 * * @param i * @return */ private Future<O> createTask(final I i) { Assert.notNull(cacheableService, "請設置好實例"); LoggerUtils.info(LOGGER, "提交任務,key=", i); LoggerUtils.info(LOGGER, "當前cache=", JSON.toJSONString(cache)); Future<O> resultFuture = THREAD_POOL.submit(new Callable<O>() { public O call() throws Exception { return cacheableService.doService(i); } }); return resultFuture; } /** * 構造函數 */ public CacheableServiceWrapper(CacheableService<I, O> cacheableService, int maxCacheSize, long expireTime) { this.cacheableService = cacheableService; this.MAX_CACHE_SIZE = maxCacheSize; this.expireTimeWhenAccess = expireTime; cache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE) .expireAfterAccess(expireTimeWhenAccess, TimeUnit.SECONDS) .build(new CacheLoader<I, Future<O>>() { public Future<O> load(I key) throws ExecutionException { LoggerUtils.warn(LOGGER, "get Element from cacheLoader"); return createTask(key); } ; }); } /** * 構造函數 */ public CacheableServiceWrapper(CacheableService<I, O> cacheableService) { this.cacheableService = cacheableService; cache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE) .expireAfterAccess(expireTimeWhenAccess, TimeUnit.SECONDS) .build(new CacheLoader<I, Future<O>>() { public Future<O> load(I key) throws ExecutionException { LoggerUtils.warn(LOGGER, "get Element from cacheLoader"); return createTask(key); } ; }); } /** * Setter method for property <tt>retryTimes</tt>. * * @param retryTimes value to be assigned to property retryTimes */ public void setRetryTimes(int retryTimes) { this.retryTimes = retryTimes; } /** * Setter method for property <tt>retryOnExp</tt>. * * @param retryOnExp value to be assigned to property retryOnExp */ public void setRetryOnExp(boolean retryOnExp) { this.retryOnExp = retryOnExp; } }
這個裝飾器就是最主要的內容了,實現了對緩存服務的輸入和輸出的緩存。這裏先說明下中間幾個重要的屬性:多線程
MAX_CACHE_SIZE :緩存空間的大小
retryOnExp :當緩存服務發生異常的時候,是否發起重試
retryTimes :當緩存服務異常須要重試的時候,從新嘗試的最大上限。
expireTimeWhenAccess : 緩存失效時間,當key多久沒有訪問的時候,淘汰數據
而後是doService採用了Guava的緩存機制,當獲取緩存爲空的時候,會自動去build緩存,這個操做是原子化的,因此不用本身去採用ConcurrentHashmap的putIfAbsent方法去作啦~~~
這裏面實現了最主要的邏輯,就是獲取緩存,而後去get數據,而後若是異常,根據配置去重試。
好啦如今我們去測試啦
public class CacheableCalculateServiceTest { private CacheableService<String, String> calculateService; @Before public void before() { CacheableServiceWrapper<String, String> wrapper = new CacheableServiceWrapper<String, String>( new CacheableService<String, String>() { public String doService(String i) throws Exception { Thread.sleep(999); return i + i; } }); wrapper.setRetryOnExp(true); wrapper.setRetryTimes(2); calculateService = wrapper; } @Test public void test() throws Exception { MutiThreadRun.init(5).addTaskAndRun(300, new Callable<String>() { public String call() throws Exception { return calculateService.doService("1"); } }); }
這裏咱們爲了模擬大量計算的場景,咱們將線程暫停了999ms,而後使用5個線程,執行任務999次,結果以下:app
2016-08-24 02:00:18:848 com.zhangwei.learning.calculate.CacheableServiceWrapper get Element from cacheLoader 2016-08-24 02:00:20:119 com.zhangwei.learning.calculate.CacheableServiceWrapper 提交任務,key=1 2016-08-24 02:00:20:122 com.zhangwei.learning.calculate.CacheableServiceWrapper 當前cache={} 2016-08-24 02:00:21:106 com.zhangwei.learning.jedis.JedisPoolMonitorTask poolSize=500 borrowed=0 idle=0 2016-08-24 02:00:21:914 com.zhangwei.learning.run.MutiThreadRun 任務執行完畢,執行時間3080ms,共有300個任務,執行異常0次
能夠看到,因爲key同樣,只執行了一次計算,而後剩下299都是從緩存中獲取的。異步
如今咱們修改成5個線程,執行300000次。ide
2016-08-24 02:03:15:013 com.zhangwei.learning.calculate.CacheableServiceWrapper get Element from cacheLoader 2016-08-24 02:03:16:298 com.zhangwei.learning.calculate.CacheableServiceWrapper 提交任務,key=1 2016-08-24 02:03:16:300 com.zhangwei.learning.calculate.CacheableServiceWrapper 當前cache={} 2016-08-24 02:03:17:289 com.zhangwei.learning.jedis.JedisPoolMonitorTask poolSize=500 borrowed=0 idle=0 2016-08-24 02:03:18:312 com.zhangwei.learning.run.MutiThreadRun 任務執行完畢,執行時間3317ms,共有300000個任務,執行異常0次
發現,執行時間沒啥區別啦~~~~緩存的效果然是棒棒的~~svn
PS:個人我的svn地址:http://code.taobao.org/p/learningIT/wiki/index/ 有興趣的能夠看下啦~函數
後面咱們再看基於註解去實現緩存~~~
好啦繼續更新,咱們使用註解,來實現緩存,首先咱們的前提仍是跟上面的同樣,是對方法作緩存,也就是將方法的輸入到輸出的映射作緩存。
首先來個註解:
@Target({ ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Cache { /** * 是否打印 * @return */ public boolean enabled() default true; /** * Cache type cache type. * * @return the cache type */ public CacheType cacheType() default CacheType.LOCAL; }
該註解是註解在方法上的
package com.zhangwei.learning.utils.cache; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import com.zhangwei.learning.model.ToString; import com.zhangwei.learning.utils.log.LoggerUtils; import org.slf4j.LoggerFactory; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; /** * 用於緩存的key,若是接口須要緩存,那麼複雜對象參數都須要實現這個接口 * Created by Administrator on 2016/8/22. */ public class CacheKey extends ToString { /** * The A class. */ private String classPath; /** * The Method. */ private Method method; /** * The Input params. */ private List<Object> inputParams; /** * Instantiates a new Cache key. * * @param clazz the clazz * @param method the method * @param inputs the inputs */ public CacheKey(Class clazz, Method method, Object[] inputs) { this.classPath = clazz.getName(); this.method = method; List<Object> list = Lists.newArrayList(); if(inputs==null || inputs.length==0){ inputParams = list; } for(Object o : inputs){ list.add(o); } inputParams = list; } /** * Equals boolean. * * @param obj the obj * @return the boolean */ @Override public boolean equals(Object obj) { if (obj == null || !(obj instanceof CacheKey)) { return false; } CacheKey key = (CacheKey) obj; if (classPath.equals(key.getClassPath()) && method.equals(key.getMethod())) { if (key.getInputParams().size() != getInputParams().size()) { return false; } for (int i = 0; i < inputParams.size(); i++) { Object param = getInputParams().get(i); //若是有自定義的convertor,那麼使用自定義的convertor ObjEqualsConvertor convertor = CacheInterceptor.getConvertors().get(param.getClass().getName()); if(convertor !=null){ if(!convertor.extraEquals(param,key.getInputParams().get(i))){ return false; } continue; } if (!getInputParams().get(i).equals(key.getInputParams().get(i))) { return false; } } return true; } return false; } /** * Hash code int. * * @return the int */ @Override public int hashCode() { return classPath.hashCode()+method.hashCode()+inputParams.hashCode(); } /** * Gets class path. * * @return the class path */ public String getClassPath() { return classPath; } /** * Sets class path. * * @param classPath the class path */ public void setClassPath(String classPath) { this.classPath = classPath; } /** * Gets method. * * @return the method */ public Method getMethod() { return method; } /** * Sets method. * * @param method the method */ public void setMethod(Method method) { this.method = method; } /** * Gets input params. * * @return the input params */ public List<Object> getInputParams() { return inputParams; } /** * Sets input params. * * @param inputParams the input params */ public void setInputParams(List<Object> inputParams) { this.inputParams = inputParams; } }
咱們要作緩存,確定要有個key,這裏就是咱們定義的key,最主要的是咱們使用了一個專門的類,主要包含調用的類、方法、以及入參。這裏有下面幾個須要注意的點:
一、須要修改equals方法,這點跟hashmap自定義key同樣。
二、比較類的時候直接用class全名。若是用class的equals方法,有可能class地址不一致致使判斷有問題。這裏method的equals方法已是覆寫了,因此沒問題。
三、hashcode使用三個參數合起來的hashcode,這樣儘可能讓key散列到不一樣的捅,若是用classpath的,那麼若是這個類調用量很大,其餘的類調用不多,那麼桶分佈就很不均勻了。
四、入參都須要注意下equals方法,可是對於有些類咱們沒有辦法修改它的equals方法,這個時候咱們有個轉換map,能夠自定義對某個類的equal比較器,而後能夠在不對類的修改的狀況下,達到比較的效果。
上面實現了註解和緩存的key,下面來攔截器啦
/** * Alipay.com Inc. * Copyright (c) 2004-2016 All Rights Reserved. */ package com.zhangwei.learning.utils.cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.google.common.collect.Maps; import com.zhangwei.learning.resource.GlobalResource; import com.zhangwei.learning.utils.log.LoggerUtils; import org.aopalliance.intercept.Invocation; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import java.util.Map; import java.util.concurrent.*; /** * 能夠對接口作緩存的攔截器 * * @author Administrator * @version $Id: CacheInterceptor.java, v 0.1 2016年8月22日 上午2:50:32 Administrator Exp $ */ public class CacheInterceptor implements MethodInterceptor, InitializingBean, GlobalResource { /** * The constant logger. */ private static final Logger logger = LoggerFactory .getLogger(CacheInterceptor.class); /** * 本地緩存大小. */ private long maxCacheSize = 300; /** * The constant expireTimeWhenAccess. */ private long expireTimeWhenAccess = 20; /** * The Local Cache. */ private com.google.common.cache.Cache<CacheKey, FutureTask<Object>> cache = null; /** * The equal Convertors. */ private static Map<String, ObjEquality> convertors = Maps.newHashMap(); /** * @see org.aopalliance.intercept.MethodInterceptor#invoke(org.aopalliance.intercept.MethodInvocation) */ @Override public Object invoke(MethodInvocation invocation) throws Throwable { Cache cacheAnnotation = invocation.getMethod().getAnnotation(Cache.class); if (cacheAnnotation == null || !cacheAnnotation.enabled()) { return invocation.proceed(); } //須要cache CacheKey cacheKey = new CacheKey(invocation.getMethod().getDeclaringClass(), invocation.getMethod(), invocation.getArguments()); CacheType cacheType = cacheAnnotation.cacheType(); if (cacheType == CacheType.LOCAL) { Object result = getLocalCacheResult(cacheKey, invocation); return result; } throw new RuntimeException("not supported cacheType"); } /** * Get local cache result object. * * @param key the key * @return the object */ private Object getLocalCacheResult(CacheKey key, final Invocation i) throws ExecutionException, InterruptedException { FutureTask<Object> f = new FutureTask<Object>(new Callable<Object>() { @Override public Object call() throws Exception { try { return i.proceed(); } catch (Throwable throwable) { throw new ExecutionException(throwable); } } }); FutureTask<Object> result = cache.asMap().putIfAbsent(key, f); if (result == null) { f.run(); result = f; LoggerUtils.debug(logger,"提交任務,key=",key); }else { LoggerUtils.debug(logger, "從緩存獲取,key=", key); } return result.get(); } /** * Sets expire time when access. * * @param expireTimeWhenAccess the expire time when access */ public void setExpireTimeWhenAccess(long expireTimeWhenAccess) { this.expireTimeWhenAccess = expireTimeWhenAccess; } @Override public void afterPropertiesSet() throws Exception { cache = CacheBuilder .newBuilder() .maximumSize(maxCacheSize) .expireAfterAccess( expireTimeWhenAccess, TimeUnit.SECONDS).removalListener(new RemovalListener<CacheKey, Future<Object>>() { @Override public void onRemoval(RemovalNotification<CacheKey, Future<Object>> notification) { LoggerUtils.info(logger, "移除key=", notification.getKey(), ",value=", notification.getValue(), ",cause=", notification.getCause()); } }) .build(); } /** * Sets convertors. * * @param convertors the convertors */ public void setConvertors(Map<String, ObjEquality> convertors) { this.convertors = convertors; } /** * Gets convertors. * * @return the convertors */ public static Map<String, ObjEquality> getConvertors() { return convertors; } /** * Sets max cache size. * * @param maxCacheSize the max cache size */ public void setMaxCacheSize(long maxCacheSize) { this.maxCacheSize = maxCacheSize; } }
這裏咱們實現了緩存的攔截器,緩存採用Guava cache,這裏咱們在使用上主要是使用了guava的緩存自動淘汰、原子化的功能。咱們能夠看到,緩存的是CacheKey--->FutureTask<Object>的映射,這裏咱們採用了FutureTask的異步執行的功能。而且將Guava 做爲ConcurrentHashMap來使用。
好了咱們來配置下。
<bean id="cacheInteceptor" class="com.zhangwei.learning.utils.cache.CacheInterceptor"> <property name="maxCacheSize" value="300"/> <property name="expireTimeWhenAccess" value="300"/> <property name="convertors"> <map> <entry key="java.lang.String" value-ref="stringConvertor" /> </map> </property> </bean> <bean class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator"> <property name="order" value="90"></property> <property name="interceptorNames"> <list> <value>digestInteceptor</value> <value>cacheInteceptor</value> </list> </property> <property name="beanNames"> <value>*</value> </property> </bean>
上面的那個map就是配置的自定義equals比較器
上測試類
@Component @Digest public class TestBean { @Cache(cacheType = CacheType.LOCAL, enabled = true) public String test(String one, String two) throws Exception { Thread.sleep(999); // throw new Exception("lalal"); return one + two; } } public class CacheTest { private final static Logger LOGGER = LoggerFactory.getLogger(CacheTest.class); @org.junit.Test public void Test() { final TestBean client = GlobalResourceUtils.getBean(TestBean.class); LoggerUtils.info(LOGGER, "獲取到的client=", JSON.toJSONString(client)); MutiThreadRun.init(5).addTaskAndRun(10, new Callable<Object>() { @Override public Object call() throws Exception { return client.test("aaa","bbb"); } }); } } public class StringConvertor extends ObjEquality { @Override public boolean extraEquals(Object a, Object b) { return false; } }
這裏咱們就是講一個方法多線程執行了10次,該方法中間會將線程暫停1s,因此能夠看每次方法的執行時間就知道是否走緩存了。咱們這裏自定義了一個equal比較器,老是返回false,因此這裏咱們理論上每次都不會走緩存的,由於比較的時候key不一樣。
2016-08-28 23:38:09:527 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1043ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:09:530 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1035ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:09:530 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1034ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:09:531 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1036ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:09:534 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1033ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:527 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:530 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:531 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:531 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:534 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:534 com.zhangwei.learning.run.MutiThreadRun 任務執行完畢,執行時間2051ms,共有10個任務,執行異常0次
能夠看到 每次執行時間都超過了1s,由於沒走緩存,每次線程都暫停了1s。
而後咱們把那個String比較器刪掉。理論上此次調用的就是String的equals方法,就能走上緩存了。
2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,986ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1020ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,987ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1026ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:419 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,0ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,2ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1037ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,0ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:421 com.zhangwei.learning.run.MutiThreadRun 任務執行完畢,執行時間1043ms,共有10個任務,執行異常0次
能夠看到,除了5個結果執行時間超過1s,其餘的都很快,爲啥呢?由於方法是多線程執行的,5個線程,最開始執行,5個線程中一個線程會執行方法,而且把結果放到緩存裏面。而後5個線程一塊兒等待方法執行完成而後把結果返回,而後後面的全部的都只須要從緩存獲取就行了,這似不似很贊~~~~