如何實現一個緩存服務

  場景:咱們對於須要大量計算的場景,但願將結果緩存起來,而後咱們一塊兒來實現一個緩存服務。即對於一個相同的輸入,它的輸出是不變的(也能夠短期不變)java

實現說明:這裏實現採用GuavaCache+裝飾器模式。spring

首先設計一個緩存服務接口。json

public interface CacheableService<I, O> {

    /**
     * 計算服務
     * @param i
     * @return
     * @throws Exception 
     */
    O doService(I i) throws Exception;
}

這裏定義了一個緩存服務接口,這裏的key和Hashmap的key同樣,須要覆寫equals和hashcode方法。緩存

public class CacheableServiceWrapper<I , O> implements
        CacheableService<I, O>,
        GlobalResource {

    /**
     * 日誌
     */
    private final static Logger LOGGER = LoggerFactory
            .getLogger(CacheableServiceWrapper.class);

    /**
     * 緩存大小
     */
    private int MAX_CACHE_SIZE = 20;

    /**
     * 出現異常的時候重試,默認不重試
     */
    private boolean retryOnExp = false;

    /**
     * 重試次數,默認爲0,即不重試
     */
    private int retryTimes = 0;

    /**
     * 默認30分鐘
     */
    private long expireTimeWhenAccess = 30 * 60;

    /**
     * 緩存
     */
    private LoadingCache<I, Future<O>> cache = null;

    private CacheableService<I, O> cacheableService = null;

    /**
     * Calculate o.
     *
     * @param i the
     * @return the o
     * @throws Exception the exception
     */
    public O doService(final I i) throws Exception {

        Assert.notNull(cacheableService, "請設置好實例");

        int currentTimes = 0;
        while (currentTimes <= retryTimes) {
            try {
                Future<O> oFuture = cache.get(i);
                return oFuture.get();

            } catch (Exception e) {
                if (!retryOnExp) {
                    throw e;
                }
                currentTimes++;
                LoggerUtils.info(LOGGER, "第", currentTimes, "重試,key=", i);
            }
        }
        throw new Exception("任務執行失敗");
    }


    /**
     * 提交計算任務
     *
     * @param i
     * @return
     */
    private Future<O> createTask(final I i) {
        Assert.notNull(cacheableService, "請設置好實例");

        LoggerUtils.info(LOGGER, "提交任務,key=", i);
        LoggerUtils.info(LOGGER, "當前cache=", JSON.toJSONString(cache));

        Future<O> resultFuture = THREAD_POOL.submit(new Callable<O>() {

            public O call() throws Exception {
                return cacheableService.doService(i);
            }
        });
        return resultFuture;

    }

    /**
     * 構造函數
     */
    public CacheableServiceWrapper(CacheableService<I, O> cacheableService,
                                            int maxCacheSize, long expireTime) {
        this.cacheableService = cacheableService;
        this.MAX_CACHE_SIZE = maxCacheSize;
        this.expireTimeWhenAccess = expireTime;
        cache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE)
                .expireAfterAccess(expireTimeWhenAccess, TimeUnit.SECONDS)
                .build(new CacheLoader<I, Future<O>>() {
                    public Future<O> load(I key) throws ExecutionException {
                        LoggerUtils.warn(LOGGER, "get Element from cacheLoader");
                        return createTask(key);
                    }

                    ;
                });
    }

    /**
     * 構造函數
     */
    public CacheableServiceWrapper(CacheableService<I, O> cacheableService) {
        this.cacheableService = cacheableService;
        cache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE)
                .expireAfterAccess(expireTimeWhenAccess, TimeUnit.SECONDS)
                .build(new CacheLoader<I, Future<O>>() {
                    public Future<O> load(I key) throws ExecutionException {
                        LoggerUtils.warn(LOGGER, "get Element from cacheLoader");
                        return createTask(key);
                    }

                    ;
                });
    }

    /**
     * Setter method for property <tt>retryTimes</tt>.
     *
     * @param retryTimes value to be assigned to property retryTimes
     */
    public void setRetryTimes(int retryTimes) {
        this.retryTimes = retryTimes;
    }

    /**
     * Setter method for property <tt>retryOnExp</tt>.
     *
     * @param retryOnExp value to be assigned to property retryOnExp
     */
    public void setRetryOnExp(boolean retryOnExp) {
        this.retryOnExp = retryOnExp;
    }

}
緩存服務裝飾器

這個裝飾器就是最主要的內容了,實現了對緩存服務的輸入和輸出的緩存。這裏先說明下中間幾個重要的屬性:多線程

MAX_CACHE_SIZE :緩存空間的大小
retryOnExp :當緩存服務發生異常的時候,是否發起重試
retryTimes :當緩存服務異常須要重試的時候,從新嘗試的最大上限。
expireTimeWhenAccess : 緩存失效時間,當key多久沒有訪問的時候,淘汰數據

而後是doService採用了Guava的緩存機制,當獲取緩存爲空的時候,會自動去build緩存,這個操做是原子化的,因此不用本身去採用ConcurrentHashmap的putIfAbsent方法去作啦~~~
這裏面實現了最主要的邏輯,就是獲取緩存,而後去get數據,而後若是異常,根據配置去重試。

好啦如今我們去測試啦
public class CacheableCalculateServiceTest {

    private CacheableService<String, String> calculateService;

    @Before
    public void before() {
        CacheableServiceWrapper<String, String> wrapper = new CacheableServiceWrapper<String, String>(
            new CacheableService<String, String>() {

                public String doService(String i) throws Exception {
                    Thread.sleep(999);
                    return i + i;
                }
            });
        wrapper.setRetryOnExp(true);
        wrapper.setRetryTimes(2);
        calculateService = wrapper;
    }

    @Test
    public void test() throws Exception {
        MutiThreadRun.init(5).addTaskAndRun(300, new Callable<String>() {

            public String call() throws Exception {
                return calculateService.doService("1");
            }
        });
    }

這裏咱們爲了模擬大量計算的場景,咱們將線程暫停了999ms,而後使用5個線程,執行任務999次,結果以下:app

2016-08-24 02:00:18:848 com.zhangwei.learning.calculate.CacheableServiceWrapper get Element from cacheLoader
2016-08-24 02:00:20:119 com.zhangwei.learning.calculate.CacheableServiceWrapper 提交任務,key=1
2016-08-24 02:00:20:122 com.zhangwei.learning.calculate.CacheableServiceWrapper 當前cache={}
2016-08-24 02:00:21:106 com.zhangwei.learning.jedis.JedisPoolMonitorTask poolSize=500 borrowed=0 idle=0
2016-08-24 02:00:21:914 com.zhangwei.learning.run.MutiThreadRun 任務執行完畢,執行時間3080ms,共有300個任務,執行異常0次

能夠看到,因爲key同樣,只執行了一次計算,而後剩下299都是從緩存中獲取的。異步

如今咱們修改成5個線程,執行300000次。ide

2016-08-24 02:03:15:013 com.zhangwei.learning.calculate.CacheableServiceWrapper get Element from cacheLoader
2016-08-24 02:03:16:298 com.zhangwei.learning.calculate.CacheableServiceWrapper 提交任務,key=1
2016-08-24 02:03:16:300 com.zhangwei.learning.calculate.CacheableServiceWrapper 當前cache={}
2016-08-24 02:03:17:289 com.zhangwei.learning.jedis.JedisPoolMonitorTask poolSize=500 borrowed=0 idle=0
2016-08-24 02:03:18:312 com.zhangwei.learning.run.MutiThreadRun 任務執行完畢,執行時間3317ms,共有300000個任務,執行異常0次

發現,執行時間沒啥區別啦~~~~緩存的效果然是棒棒的~~svn

PS:個人我的svn地址:http://code.taobao.org/p/learningIT/wiki/index/   有興趣的能夠看下啦~函數

後面咱們再看基於註解去實現緩存~~~

好啦繼續更新,咱們使用註解,來實現緩存,首先咱們的前提仍是跟上面的同樣,是對方法作緩存,也就是將方法的輸入到輸出的映射作緩存。

首先來個註解:

@Target({ ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Cache {

    /**
     * 是否打印
     * @return
     */
    public boolean enabled() default true;

    /**
     * Cache type cache type.
     *
     * @return the cache type
     */
    public CacheType cacheType() default CacheType.LOCAL;

}

該註解是註解在方法上的

package com.zhangwei.learning.utils.cache;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.zhangwei.learning.model.ToString;
import com.zhangwei.learning.utils.log.LoggerUtils;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
 * 用於緩存的key,若是接口須要緩存,那麼複雜對象參數都須要實現這個接口
 * Created by Administrator on 2016/8/22.
 */
public class CacheKey extends ToString {

    /**
     * The A class.
     */
    private String classPath;

    /**
     * The Method.
     */
    private Method method;

    /**
     * The Input params.
     */
    private List<Object> inputParams;

    /**
     * Instantiates a new Cache key.
     *
     * @param clazz  the clazz
     * @param method the method
     * @param inputs the inputs
     */
    public CacheKey(Class clazz, Method method, Object[] inputs) {
        this.classPath = clazz.getName();
        this.method = method;
        List<Object> list = Lists.newArrayList();
        if(inputs==null || inputs.length==0){
            inputParams = list;
        }
        for(Object o : inputs){
            list.add(o);
        }
        inputParams = list;
    }

    /**
     * Equals boolean.
     *
     * @param obj the obj
     * @return the boolean
     */
    @Override
    public boolean equals(Object obj) {
        if (obj == null || !(obj instanceof CacheKey)) {
            return false;
        }
        CacheKey key = (CacheKey) obj;
        if (classPath.equals(key.getClassPath()) && method.equals(key.getMethod())) {
            if (key.getInputParams().size() != getInputParams().size()) {
                return false;
            }
            for (int i = 0; i < inputParams.size(); i++) {
                Object param = getInputParams().get(i);
                //若是有自定義的convertor,那麼使用自定義的convertor
                ObjEqualsConvertor convertor = CacheInterceptor.getConvertors().get(param.getClass().getName());
                if(convertor !=null){
                    if(!convertor.extraEquals(param,key.getInputParams().get(i))){
                        return false;
                    }
                    continue;
                }
                if (!getInputParams().get(i).equals(key.getInputParams().get(i))) {
                    return false;
                }
            }
            return true;
        }
        return false;
    }

    /**
     * Hash code int.
     *
     * @return the int
     */
    @Override
    public int hashCode() {
        return classPath.hashCode()+method.hashCode()+inputParams.hashCode();
    }

    /**
     * Gets class path.
     *
     * @return the class path
     */
    public String getClassPath() {
        return classPath;
    }

    /**
     * Sets class path.
     *
     * @param classPath the class path
     */
    public void setClassPath(String classPath) {
        this.classPath = classPath;
    }

    /**
     * Gets method.
     *
     * @return the method
     */
    public Method getMethod() {
        return method;
    }

    /**
     * Sets method.
     *
     * @param method the method
     */
    public void setMethod(Method method) {
        this.method = method;
    }

    /**
     * Gets input params.
     *
     * @return the input params
     */
    public List<Object> getInputParams() {
        return inputParams;
    }

    /**
     * Sets input params.
     *
     * @param inputParams the input params
     */
    public void setInputParams(List<Object> inputParams) {
        this.inputParams = inputParams;
    }
}

 

咱們要作緩存,確定要有個key,這裏就是咱們定義的key,最主要的是咱們使用了一個專門的類,主要包含調用的類、方法、以及入參。這裏有下面幾個須要注意的點:

一、須要修改equals方法,這點跟hashmap自定義key同樣。

二、比較類的時候直接用class全名。若是用class的equals方法,有可能class地址不一致致使判斷有問題。這裏method的equals方法已是覆寫了,因此沒問題。

三、hashcode使用三個參數合起來的hashcode,這樣儘可能讓key散列到不一樣的捅,若是用classpath的,那麼若是這個類調用量很大,其餘的類調用不多,那麼桶分佈就很不均勻了。

四、入參都須要注意下equals方法,可是對於有些類咱們沒有辦法修改它的equals方法,這個時候咱們有個轉換map,能夠自定義對某個類的equal比較器,而後能夠在不對類的修改的狀況下,達到比較的效果。

 

上面實現了註解和緩存的key,下面來攔截器啦

/**
 * Alipay.com Inc.
 * Copyright (c) 2004-2016 All Rights Reserved.
 */
package com.zhangwei.learning.utils.cache;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Maps;
import com.zhangwei.learning.resource.GlobalResource;
import com.zhangwei.learning.utils.log.LoggerUtils;
import org.aopalliance.intercept.Invocation;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

import java.util.Map;
import java.util.concurrent.*;

/**
 * 能夠對接口作緩存的攔截器
 *
 * @author Administrator
 * @version $Id: CacheInterceptor.java, v 0.1 2016年8月22日 上午2:50:32 Administrator Exp $
 */
public class CacheInterceptor implements MethodInterceptor, InitializingBean, GlobalResource {

    /**
     * The constant logger.
     */
    private static final Logger logger = LoggerFactory
            .getLogger(CacheInterceptor.class);

    /**
     * 本地緩存大小.
     */
    private long maxCacheSize = 300;

    /**
     * The constant expireTimeWhenAccess.
     */
    private long expireTimeWhenAccess = 20;

    /**
     * The Local Cache.
     */
    private com.google.common.cache.Cache<CacheKey, FutureTask<Object>> cache = null;

    /**
     * The equal Convertors.
     */
    private static Map<String, ObjEquality> convertors = Maps.newHashMap();

    /**
     * @see org.aopalliance.intercept.MethodInterceptor#invoke(org.aopalliance.intercept.MethodInvocation)
     */
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        Cache cacheAnnotation = invocation.getMethod().getAnnotation(Cache.class);
        if (cacheAnnotation == null || !cacheAnnotation.enabled()) {
            return invocation.proceed();
        }
        //須要cache
        CacheKey cacheKey = new CacheKey(invocation.getMethod().getDeclaringClass(), invocation.getMethod(), invocation.getArguments());
        CacheType cacheType = cacheAnnotation.cacheType();
        if (cacheType == CacheType.LOCAL) {
            Object result = getLocalCacheResult(cacheKey, invocation);
            return result;
        }
        throw new RuntimeException("not supported cacheType");
    }

    /**
     * Get local cache result object.
     *
     * @param key the key
     * @return the object
     */

    private Object getLocalCacheResult(CacheKey key, final Invocation i) throws ExecutionException, InterruptedException {
        FutureTask<Object> f = new FutureTask<Object>(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                try {
                    return i.proceed();
                } catch (Throwable throwable) {
                    throw new ExecutionException(throwable);
                }
            }
        });
        FutureTask<Object> result = cache.asMap().putIfAbsent(key, f);
        if (result == null) {
            f.run();
            result = f;
            LoggerUtils.debug(logger,"提交任務,key=",key);
        }else {
            LoggerUtils.debug(logger, "從緩存獲取,key=", key);
        }
        return result.get();
    }

    /**
     * Sets expire time when access.
     *
     * @param expireTimeWhenAccess the expire time when access
     */
    public void setExpireTimeWhenAccess(long expireTimeWhenAccess) {
        this.expireTimeWhenAccess = expireTimeWhenAccess;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        cache = CacheBuilder
                .newBuilder()
                .maximumSize(maxCacheSize)
                .expireAfterAccess(
                        expireTimeWhenAccess,
                        TimeUnit.SECONDS).removalListener(new RemovalListener<CacheKey, Future<Object>>() {
                    @Override
                    public void onRemoval(RemovalNotification<CacheKey, Future<Object>> notification) {
                        LoggerUtils.info(logger, "移除key=", notification.getKey(), ",value=", notification.getValue(), ",cause=", notification.getCause());
                    }
                })
                .build();
    }

    /**
     * Sets convertors.
     *
     * @param convertors the convertors
     */
    public void setConvertors(Map<String, ObjEquality> convertors) {
        this.convertors = convertors;
    }

    /**
     * Gets convertors.
     *
     * @return the convertors
     */
    public static Map<String, ObjEquality> getConvertors() {
        return convertors;
    }

    /**
     * Sets max cache size.
     *
     * @param maxCacheSize the max cache size
     */
    public void setMaxCacheSize(long maxCacheSize) {
        this.maxCacheSize = maxCacheSize;
    }
}
緩存攔截器

這裏咱們實現了緩存的攔截器,緩存採用Guava cache,這裏咱們在使用上主要是使用了guava的緩存自動淘汰、原子化的功能。咱們能夠看到,緩存的是CacheKey--->FutureTask<Object>的映射,這裏咱們採用了FutureTask的異步執行的功能。而且將Guava 做爲ConcurrentHashMap來使用。

 

好了咱們來配置下。

<bean id="cacheInteceptor" class="com.zhangwei.learning.utils.cache.CacheInterceptor">
        <property name="maxCacheSize" value="300"/>
        <property name="expireTimeWhenAccess" value="300"/>
        <property name="convertors">
            <map>
                <entry key="java.lang.String" value-ref="stringConvertor" />
            </map>
        </property>
    </bean>
    <bean class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator">
        <property name="order" value="90"></property>        
        <property name="interceptorNames">
            <list>
                <value>digestInteceptor</value>
                <value>cacheInteceptor</value>
            </list>
        </property>
        <property name="beanNames">
            <value>*</value>
        </property>
    </bean>

上面的那個map就是配置的自定義equals比較器

上測試類

@Component
@Digest
public class TestBean {

    @Cache(cacheType = CacheType.LOCAL, enabled = true)
    public String test(String one, String two) throws Exception {
        Thread.sleep(999);
        //        throw new Exception("lalal");
        return one + two;
    }
}
public class CacheTest {

    private final static Logger LOGGER = LoggerFactory.getLogger(CacheTest.class);

    @org.junit.Test
    public void Test() {
        final TestBean client = GlobalResourceUtils.getBean(TestBean.class);
        LoggerUtils.info(LOGGER, "獲取到的client=", JSON.toJSONString(client));
        MutiThreadRun.init(5).addTaskAndRun(10, new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                return client.test("aaa","bbb");
            }
        });
    }
}
public class StringConvertor extends ObjEquality {
    @Override
    public boolean extraEquals(Object a, Object b) {
        return false;
    }
}

這裏咱們就是講一個方法多線程執行了10次,該方法中間會將線程暫停1s,因此能夠看每次方法的執行時間就知道是否走緩存了。咱們這裏自定義了一個equal比較器,老是返回false,因此這裏咱們理論上每次都不會走緩存的,由於比較的時候key不一樣。

2016-08-28 23:38:09:527 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1043ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:38:09:530 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1035ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:38:09:530 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1034ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:38:09:531 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1036ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:38:09:534 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1033ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:38:10:527 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:38:10:530 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:38:10:531 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:38:10:531 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:38:10:534 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:38:10:534 com.zhangwei.learning.run.MutiThreadRun 任務執行完畢,執行時間2051ms,共有10個任務,執行異常0次

能夠看到 每次執行時間都超過了1s,由於沒走緩存,每次線程都暫停了1s。

而後咱們把那個String比較器刪掉。理論上此次調用的就是String的equals方法,就能走上緩存了。

2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,986ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1020ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,987ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1026ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:52:27:419 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,0ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,2ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1037ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,0ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1ms,No Exception,aaa^bbb,aaabbb)
2016-08-28 23:52:27:421 com.zhangwei.learning.run.MutiThreadRun 任務執行完畢,執行時間1043ms,共有10個任務,執行異常0次

能夠看到,除了5個結果執行時間超過1s,其餘的都很快,爲啥呢?由於方法是多線程執行的,5個線程,最開始執行,5個線程中一個線程會執行方法,而且把結果放到緩存裏面。而後5個線程一塊兒等待方法執行完成而後把結果返回,而後後面的全部的都只須要從緩存獲取就行了,這似不似很贊~~~~

相關文章
相關標籤/搜索