自動加載緩存框架

自動加載緩存框架


代碼,請訪問github 獲取更詳情,更新的內容 QQ交流羣:429274886,版本更新會在羣裏通知,能瞭解最新動態java

0.5版本已是穩定版本了,你們能夠放心使用了。git

如今使用的緩存技術不少,好比RedisMemcacheEhCache等,甚至還有使用ConcurrentHashMapHashTable 來實現緩存。但在緩存的使用上,每一個人都有本身的實現方式,大部分是直接與業務代碼綁定,隨着業務的變化,要更換緩存方案時,很是麻煩。接下來咱們就使用AOP + Annotation 來解決這個問題,同時使用自動加載機制 來實現數據「常駐內存」。github

Spring AOP這幾年很是熱門,使用也愈來愈多,但我的建議AOP只用於處理一些輔助的功能(好比:接下來咱們要說的緩存),而不能把業務邏輯使用AOP中實現,尤爲是在須要「事務」的環境中。redis

以下圖所示: Alt 緩存框架spring

AOP攔截到請求後:數據庫

  1. 根據請求參數生成Key,後面咱們會對生成Key的規則,進一步說明;
  2. 若是是AutoLoad的,則請求相關參數,封裝到AutoLoadTO中,並放到AutoLoadHandler中。
  3. 根據Key去緩存服務器中取數據,若是取到數據,則返回數據,若是沒有取到數據,則執行DAO中的方法,獲取數據,同時將數據放到緩存中。若是是AutoLoad的,則把最後加載時間,更新到AutoLoadTO中,最後返回數據;如是AutoLoad的請求,每次請求時,都會更新AutoLoadTO中的 最後請求時間。
  4. 爲了減小併發,增長等待機制:若是多個用戶同時取一個數據,那麼先讓第一個用戶去DAO取數據,其它用戶則等待其返回後,去緩存中獲取,嘗試必定次數後,若是還沒獲取到,再去DAO中取數據。

AutoLoadHandler(自動加載處理器)主要作的事情:當緩存即將過時時,去執行DAO的方法,獲取數據,並將數據放到緩存中。爲了防止自動加載隊列過大,設置了容量限制;同時會將超過必定時間沒有用戶請求的也會從自動加載隊列中移除,把服務器資源釋放出來,給真正須要的請求。express

使用自加載的目的:緩存

  1. 避免在請求高峯時,由於緩存失效,而形成數據庫壓力沒法承受;
  2. 把一些耗時業務得以實現。
  3. 把一些使用很是頻繁的數據,使用自動加載,由於這樣的數據緩存失效時,最容易形成服務器的壓力過大。

分佈式自動加載服務器

若是將應用部署在多臺服務器上,理論上能夠認爲自動加載隊列是由這幾臺服務器共同完成自動加載任務。好比應用部署在A,B兩臺服務器上,A服務器自動加載了數據D,(由於兩臺服務器的自動加載隊列是獨立的,因此加載的順序也是同樣的),接着有用戶從B服務器請求數據D,這時會把數據D的最後加載時間更新給B服務器,這樣B服務器就不會重複加載數據D。併發

##使用方法 ###1. 實現com.jarvis.cache.CacheGeterSeter 下面舉個使用Redis作緩存服務器的例子:

package com.jarvis.example.cache;
import ... ...
/**
 * 緩存切面,用於攔截數據並調用Redis進行緩存操做
 */
@Aspect
public class CachePointCut implements CacheGeterSeter<Serializable> {

    private static final Logger logger=Logger.getLogger(CachePointCut.class);

    private AutoLoadHandler<Serializable> autoLoadHandler;

    private static List<RedisTemplate<String, Serializable>> redisTemplateList;

    public CachePointCut() {
        autoLoadHandler=new AutoLoadHandler<Serializable>(10, this, 20000);
    }

    @Pointcut(value="execution(public !void com.jarvis.example.dao..*.*(..)) && @annotation(cache)", argNames="cache")
    public void daoCachePointcut(Cache cache) {
        logger.info("----------------------init daoCachePointcut()--------------------");
    }

    @Around(value="daoCachePointcut(cache)", argNames="pjp, cache")
    public Object controllerPointCut(ProceedingJoinPoint pjp, Cache cache) throws Exception {
        return CacheUtil.proceed(pjp, cache, autoLoadHandler, this);
    }

    public static RedisTemplate<String, Serializable> getRedisTemplate(String key) {
        if(null == redisTemplateList || redisTemplateList.isEmpty()) {
            return null;
        }
        int hash=Math.abs(key.hashCode());
        Integer clientKey=hash % redisTemplateList.size();
        RedisTemplate<String, Serializable> redisTemplate=redisTemplateList.get(clientKey);
        return redisTemplate;
    }

    @Override
    public void setCache(final String cacheKey, final CacheWrapper<Serializable> result, final int expire) {
        try {
            final RedisTemplate<String, Serializable> redisTemplate=getRedisTemplate(cacheKey);
            redisTemplate.execute(new RedisCallback<Object>() {

                @Override
                public Object doInRedis(RedisConnection connection) throws DataAccessException {
                    byte[] key=redisTemplate.getStringSerializer().serialize(cacheKey);
                    JdkSerializationRedisSerializer serializer=(JdkSerializationRedisSerializer)redisTemplate.getValueSerializer();
                    byte[] val=serializer.serialize(result);
                    connection.set(key, val);
                    connection.expire(key, expire);
                    return null;
                }
            });
        } catch(Exception ex) {
            logger.error(ex.getMessage(), ex);
        }
    }

    @Override
    public CacheWrapper<Serializable> get(final String cacheKey) {
        CacheWrapper<Serializable> res=null;
        try {
            final RedisTemplate<String, Serializable> redisTemplate=getRedisTemplate(cacheKey);
            res=redisTemplate.execute(new RedisCallback<CacheWrapper<Serializable>>() {

                @Override
                public CacheWrapper<Serializable> doInRedis(RedisConnection connection) throws DataAccessException {
                    byte[] key=redisTemplate.getStringSerializer().serialize(cacheKey);
                    byte[] value=connection.get(key);
                    if(null != value && value.length > 0) {
                        JdkSerializationRedisSerializer serializer=
                            (JdkSerializationRedisSerializer)redisTemplate.getValueSerializer();
                        @SuppressWarnings("unchecked")
                        CacheWrapper<Serializable> res=(CacheWrapper<Serializable>)serializer.deserialize(value);
                        return res;
                    }
                    return null;
                }
            });
        } catch(Exception ex) {
            logger.error(ex.getMessage(), ex);
        }
        return res;
    }

    /**
     * 刪除緩存
     * @param cs Class
     * @param method
     * @param arguments
     * @param subKeySpEL
     * @param deleteByPrefixKey 是否批量刪除
     */
    public static void delete(@SuppressWarnings("rawtypes") Class cs, String method, Object[] arguments, String subKeySpEL,
        boolean deleteByPrefixKey) {
        try {
            if(deleteByPrefixKey) {
                final String cacheKey=CacheUtil.getDefaultCacheKeyPrefix(cs.getName(), method, arguments, subKeySpEL) + "*";
                for(final RedisTemplate<String, Serializable> redisTemplate : redisTemplateList){
                    redisTemplate.execute(new RedisCallback<Object>() {
                        @Override
                        public Object doInRedis(RedisConnection connection) throws DataAccessException {
                            byte[] key=redisTemplate.getStringSerializer().serialize(cacheKey);
                            Set<byte[]> keys=connection.keys(key);
                            if(null != keys && keys.size() > 0) {
                                byte[][] keys2=new byte[keys.size()][];
                                keys.toArray(keys2);
                                connection.del(keys2);
                            }
                            return null;
                        }
                    });
                }

            } else {
                final String cacheKey=CacheUtil.getDefaultCacheKey(cs.getName(), method, arguments, subKeySpEL);
                final RedisTemplate<String, Serializable> redisTemplate=getRedisTemplate(cacheKey);
                redisTemplate.execute(new RedisCallback<Object>() {

                    @Override
                    public Object doInRedis(RedisConnection connection) throws DataAccessException {
                        byte[] key=redisTemplate.getStringSerializer().serialize(cacheKey);

                        connection.del(key);

                        return null;
                    }
                });
            }
        } catch(Exception ex) {
            logger.error(ex.getMessage(), ex);
        }
    }

    public AutoLoadHandler<Serializable> getAutoLoadHandler() {
        return autoLoadHandler;
    }

    public void destroy() {
        autoLoadHandler.shutdown();
        autoLoadHandler=null;
    }

    public List<RedisTemplate<String, Serializable>> getRedisTemplateList() {
        return redisTemplateList;
    }

    public void setRedisTemplateList(List<RedisTemplate<String, Serializable>> redisTemplateList) {
        CachePointCut.redisTemplateList=redisTemplateList;
    }

}

從上面的代碼能夠看出,對緩存的操做,仍是由業務系統本身來實現的,咱們只是對AOP攔截到的ProceedingJoinPoint,進行作一些處理。

java代碼實現後,接下來要在spring中進行相關的配置:

<aop:aspectj-autoproxy proxy-target-class="true"/>
<bean id="cachePointCut" class="com.jarvis.example.cache.CachePointCut" destroy-method="destroy">
    <property name="redisTemplateList">
        <list>
            <ref bean="redisTemplate1"/>
            <ref bean="redisTemplate2"/>
        </list>
    </property>
</bean>

從0.4版本開始增長了Redis的PointCut 的實現,直接在Spring 中用aop:config就可使用:

<bean id="autoLoadConfig" class="com.jarvis.cache.to.AutoLoadConfig">
    <property name="threadCnt" value="10" />
    <property name="maxElement" value="20000" />
    <property name="printSlowLog" value="true" />
    <property name="slowLoadTime" value="1000" />
</bean>
<bean id="cachePointCut" class="com.jarvis.cache.redis.CachePointCut" destroy-method="destroy">
  <constructor-arg ref="autoLoadConfig" />
  <property name="redisTemplateList">
    <list>
      <ref bean="redisTemplate100" />
      <ref bean="redisTemplate2" />
    </list>
  </property>
</bean>

<aop:config>
  <aop:aspect id="aa" ref="cachePointCut">
    <aop:pointcut id="daoCachePointcut" expression="execution(public !void com.jarvis.cache_example.dao..*.*(..)) &amp;&amp; @annotation(cache)" />
    <aop:around pointcut-ref="daoCachePointcut" method="controllerPointCut" />
  </aop:aspect>
</aop:config>

經過Spring配置,能更好地支持,不一樣的數據使用不一樣的緩存服務器的狀況。

實例代碼

Memcache例子:

<bean id="memcachedClient" class="net.spy.memcached.spring.MemcachedClientFactoryBean">
    <property name="servers" value="192.138.11.165:11211,192.138.11.166:11211" />
    <property name="protocol" value="BINARY" />
    <property name="transcoder">
        <bean class="net.spy.memcached.transcoders.SerializingTranscoder">
            <property name="compressionThreshold" value="1024" />
        </bean>
    </property>
    <property name="opTimeout" value="2000" />
    <property name="timeoutExceptionThreshold" value="1998" />
    <property name="hashAlg">
        <value type="net.spy.memcached.DefaultHashAlgorithm">KETAMA_HASH</value>
    </property>
    <property name="locatorType" value="CONSISTENT" />
    <property name="failureMode" value="Redistribute" />
    <property name="useNagleAlgorithm" value="false" />
</bean>

<bean id="cachePointCut" class="com.jarvis.cache.memcache.CachePointCut" destroy-method="destroy">
  <constructor-arg value="10" /><!-- 線程數量 -->
  <constructor-arg value="20000" /><!-- 自動加載隊列容量 -->
  <property name="memcachedClient", ref="memcachedClient" />
</bean>

###2. 將須要使用緩存的方法前增長@Cache註解

package com.jarvis.example.dao;
import ... ...
public class UserDAO {
    @Cache(expire=600, autoload=true, requestTimeout=72000)
    public List<UserTO> getUserList(... ...) {
        ... ...
    }
}

##緩存Key的生成

  1. 使用Spring EL 表達式自定義緩存Key:CacheUtil.getDefinedCacheKey(String keySpEL, Object[] arguments)

    例如: @Cache(expire=600, key="'goods'+#args[0]")

  2. 默認生成緩存Key的方法:CacheUtil.getDefaultCacheKey(String className, String method, Object[] arguments, String subKeySpEL)

  • className 類名稱

  • method 方法名稱

  • arguments 參數

  • subKeySpEL SpringEL表達式

    生成的Key格式爲:{類名稱}.{方法名稱}{.SpringEL表達式運算結果}:{參數值的Hash字符串}。

    當@Cache中不設置key值時,使用默認方式生成緩存Key

建議使用默認生成緩存Key的方法,能減小一些維護工做。

###subKeySpEL 使用說明

根據業務的須要,將緩存Key進行分組。舉個例子,商品的評論列表:

package com.jarvis.example.dao;
import ... ...
public class GoodsCommentDAO{
    @Cache(expire=600, subKeySpEL="#args[0]", autoload=true, requestTimeout=18000)
    public List<CommentTO> getCommentListByGoodsId(Long goodsId, int pageNo, int pageSize) {
        ... ...
    }
}

若是商品Id爲:100,那麼生成緩存Key格式爲:com.jarvis.example.dao.GoodsCommentDAO.getCommentListByGoodsId.100:xxxx 在Redis中,能精確刪除商品Id爲100的評論列表,執行命令便可: del com.jarvis.example.dao.GoodsCommentDAO.getCommentListByGoodsId.100:*

SpringEL表達式使用起來確實很是方便,若是須要,@Cache中的expire,requestTimeout以及autoload參數均可以用SpringEL表達式來動態設置,但使用起來就變得複雜,因此咱們沒有這樣作。

###數據實時性

上面商品評論的例子中,若是用戶發表了評論,要當即顯示該如何來處理?

比較簡單的方法就是,在發表評論成功後,當即把緩存中的數據也清除,這樣就能夠了。

package com.jarvis.example.dao;
import ... ...
public class GoodsCommentDAO{
    @Cache(expire=600, subKeySpEL="#args[0]", autoload=true, requestTimeout=18000)
    public List<CommentTO> getCommentListByGoodsId(Long goodsId, int pageNo, int pageSize) {
        ... ...
    }
    public void addComment(Long goodsId, String comment) {
        ... ...// 省略添加評論代碼
        deleteCache(goodsId);
    }
    private void deleteCache(Long goodsId) {
        Object arguments[]=new Object[]{goodsId};
        CachePointCut.delete(this.getClass(), "getCommentListByGoodsId", arguments, "#args[0]", true);
    }
}

###@Cache

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Cache {

    /**
     * 緩存的過時時間,單位:秒
     */
    int expire();

    /**
     * 自定義緩存Key,若是不設置使用系統默認生成緩存Key的方法
     * @return
     */
    String key() default "";
    
    /**
     * 是否啓用自動加載緩存
     * @return
     */
    boolean autoload() default false;

    /**
     * 當autoload爲true時,緩存數據在 requestTimeout 秒以內沒有使用了,就不進行自動加載數據,若是requestTimeout爲0時,會一直自動加載
     * @return
     */
    long requestTimeout() default 36000L;
    
    /**
     * 使用SpEL,將緩存key,根據業務須要進行二次分組
     * @return
     */
    String subKeySpEL() default "";
    /**
     * 緩存的條件,能夠爲空,使用 SpEL 編寫,返回 true 或者 false,只有爲 true 才進行緩存,例如:"#args[0]==1",當第一個參數值爲1時,才進緩存。
     * @return
     */
    String condition() default "";
}

##注意事項

###1. 當@Cache中 autoload 設置爲 ture 時,對應方法的參數必須都是Serializable的。 AutoLoadHandler中須要緩存經過深度複製後的參數。

###2. 參數中只設置必要的屬性值,在DAO中用不到的屬性值儘可能不要設置,這樣能避免生成不一樣的緩存Key,下降緩存的使用率。 例如:

public CollectionTO<AccountTO> getAccountByCriteria(AccountCriteriaTO criteria)  {
        List<AccountTO> list=null;
        PaginationTO paging=criteria.getPaging();
        if(null != paging && paging.getPageNo() > 0 && paging.getPageSize() > 0) {// 若是須要分頁查詢,先查詢總數
            criteria.setPaging(null);// 減小緩存KEY的變化,在查詢記錄總數據時,不用設置分頁相關的屬性值
            Integer recordCnt=accountDAO.getAccountCntByCriteria(criteria);
            if(recordCnt > 0) {
                criteria.setPaging(paging);
                paging.setRecordCnt(recordCnt);
                list=accountDAO.getAccountByCriteria(criteria);
            }
            return new CollectionTO<AccountTO>(list, recordCnt, criteria.getPaging().getPageSize());
        } else {
            list=accountDAO.getAccountByCriteria(criteria);
            return new CollectionTO<AccountTO>(list, null != list ? list.size() : 0, 0);
        }
    }

###3. 注意AOP失效的狀況; 例如:

TempDAO {

        public Object a() {
            return b().get(0);
        }

        @Cache(expire=600)
        public List<Object> b(){
            return ... ...;
        }
    }

經過 new TempDAO().a() 調用b方法時,AOP失效,也沒法進行緩存相關操做。

###4. 自動加載緩存時,不能在緩存方法內疊加查詢參數值; 例如:

@Cache(expire=600, autoload=true)
    public List<AccountTO> getDistinctAccountByPlayerGet(AccountCriteriaTO criteria) {
        List<AccountTO> list;
        int count=criteria.getPaging().getThreshold() ;
        // 查預設查詢數量的10倍
        criteria.getPaging().setThreshold(count * 10);
        … …
    }

由於自動加載時,AutoLoadHandler 緩存了查詢參數,執行自動加載時,每次執行時 threshold 都會乘以10,這樣threshold的值就會愈來愈大。

###5. 當方法返回值類型改變了怎麼辦?

在代碼重構時,可能會出現改方法返回值類型的狀況,而參數不變的狀況,那上線部署時,可能會從緩存中取到舊數據類型的數據,能夠經過如下方法處理:

  • 上線後,快速清理緩存中的數據;
  • 在CacheGeterSeter的實現類中統一加個version;
  • 在@Cache中加version(未實現)。

###6. 對於一些比較耗時的方法儘可能使用自動加載。

###7. 對於查詢條件變化比較劇烈的,不要使用自動加載機制。 好比,根據用戶輸入的關鍵字進行搜索數據的方法,不建議使用自動加載。

##在事務環境中,如何減小「髒讀」

  1. 不要從緩存中取數據,而後應用到修改數據的SQL語句中

  2. 在事務完成後,再刪除相關的緩存

在事務開始時,用一個ThreadLocal記錄一個HashSet,在更新數據方法執行完時,把要刪除緩存的相關參數封裝成在一個Bean中,放到這個HashSet中,在事務完成時,遍歷這個HashSet,而後刪除相關緩存。

大部分狀況,只要作到第1點就能夠了,由於保證數據庫中的數據準確纔是最重要的。由於這種「髒讀」的狀況只能減小出現的機率,不能完成解決。通常只有在很是高併發的狀況纔有可能發生。就像12306,在查詢時告訴你還有車票,但最後支付時不必定會有。

##使用規範

  1. 將調接口或數據庫中取數據,封裝在DAO層,不能什麼地方都有調接口的方法。
  2. 自動加載緩存時,不能在緩存方法內**疊加(或減)**查詢條件值,但容許設置值。
  3. DAO層內部,沒使用@Cache的方法,不能調用加了@Cache的方法,避免AOP失效。
  4. 因緩存Key是方法參數轉爲字符串得到的,爲了不生成的Key不一樣,儘可能只設置必要的參數及屬性,也便於反向定位
  5. 對於比較大的系統,要進行模塊化設計,這樣能夠將自動加載,均分到各個模塊中。

##爲何要使用自動加載機制?

首先咱們想一下系統的瓶頸在哪裏?

  1. 在高併發的狀況下數據庫性能極差,即便查詢語句的性能很高;若是沒有自動加載機制的話,在當緩存過時時,訪問洪峯到來時,很容易就使數據壓力大增。

  2. 往緩存寫數據與從緩存讀數據相比,效率也差不少,由於寫緩存時須要分配內存等操做。使用自動加載,能夠減小同時往緩存寫數據的狀況,同時也能提高緩存服務器的吞吐量。

  3. 還有一些比較耗時的業務。

##如何減小DAO層併發

  1. 使用緩存;
  2. 使用自動加載機制;「寫」數據每每比讀數據性能要差,使用自動加載也能減小寫併發。
  3. 從DAO層加載數據時,增長等待機制(拿來主義):若是有多個請求同時請求同一個數據,會先讓其中一個請求去取數據,其它的請求則等待它的數據。

##可擴展性及維護性

  1. 經過AOP實現緩存與業務邏輯的解耦;若是要實時顯示數據,仍是會有點耦合。
  2. 很是方便更換緩存服務器或緩存實現(好比:從Memcache換成Redis);
  3. 很是方便增減緩存服務器(如:增長Redis的節點數);
  4. 很是方便增長或去除緩存,方便測試期間排查問題;
相關文章
相關標籤/搜索