高併發數據緩存池(基於EHcache)

在高併發的場景裏面常常會使用到localcache內容,可是一直沒有一個很好的內存管理工具。在開發的時候發現了ehcache,這麼一個開源的工具。惟一的缺點就是沒法對於多塊數據單元進行一個有效的管理,而且在數據過時的時候沒法提供有效的更新機制,因此這裏寫了一個數據緩存池來知足這個需求。 java

下面是設計組織結構: spring

這裏主要是在數據實體內部封裝了數據更新器,這樣在數據過時的時候能夠調用更新器的方法。 緩存

1. Ehcache數據緩衝的具體代碼:(主要是get方法內部進行數據更新,使用對象鎖的方式來進行數據過時的併發控制,缺點是可能在很是高的併發裏面會出現數據阻塞的現象,可是由於這裏大部分都是內存的運算操做,因此相對來講阻塞的效果還好) 併發

package com.tmall.lafite.core.manager.localcache;

import java.util.List;

import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;
import net.sf.ehcache.concurrent.LockType;
import net.sf.ehcache.concurrent.ReadWriteLockSync;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.store.MemoryStoreEvictionPolicy;

import com.tmall.lafite.core.LafiteResult;
import com.tmall.lafite.core.ResultCode;

/**
 * cache數據實體
 * @author wangxiao
 *
 */
public class LafiteCache {
	
	private CacheManager cacheManager = null;
	private Cache cacheImpl = null;
	
	ReadWriteLockSync rwLock = new ReadWriteLockSync();
	
	private int capability = 30;
	private long expireTime = 30;
	
	public static final int DEFAULT_CAPABILITY = 30;
	public static final int DEFAULT_EXPIRETIME = 30;
	
	private String cacheName = "Tair Local Cache";
	
	public LafiteCache(String id, int capability, long expireTimeMS) {
		this.cacheName = id;
		this.capability = capability;
		this.expireTime = expireTimeMS;
	}
	
	public void setExpireTime(long expireTimeMS) {
		this.expireTime = expireTimeMS;
	}
	
	public void setCapacity(int cap) {
		cacheImpl.getCacheConfiguration().setMaxEntriesLocalHeap(cap);
	}
	
	public long getExpireTime() {
		return expireTime;
	}

	@SuppressWarnings("deprecation")
	public void initialize() {
		CacheConfiguration cacheConfiguration = new CacheConfiguration();
		cacheConfiguration.setDiskPersistent(false);
		
		cacheConfiguration.name(cacheName)
						  .maxEntriesLocalHeap(capability)
				   		  .diskPersistent(false)
				   		  .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU);
				   		 // .timeToLiveSeconds(expireTime);
		//cacheConfiguration.transactionalMode("LOCAL");
		//Configuration config = new Configuration().name(cacheName).cache(cacheConfiguration);
		
		cacheManager = CacheManager.create();
		
		cacheImpl =  new Cache(cacheConfiguration);
		cacheManager.addCache(cacheImpl);
		//cache = MemoryStore.create(cacheImpl, new UnboundedPool());
		
	}
	
	public int size() {
		return (int) cacheImpl.getSize();
	}
	
	public void destroy() {
		cacheImpl.dispose();
		//cacheManager.removeCache(cacheName);
		cacheManager.shutdown();
	}
	
	public void clear() {
		rwLock.lock(LockType.WRITE);
		try {
			cacheImpl.removeAll();
		} finally {
			rwLock.unlock(LockType.WRITE);
		}
	}
	
	public void del(Object key) {
		cacheImpl.remove(key);
		return;
	}
	
	public void put(Object key, Object value) {	
		cacheImpl.put(new Element(key, value));
		return ;
	}
	
	public LafiteResult get(Object key) {
		LafiteResult lafiteResult = new LafiteResult(); 
		
		Element element = cacheImpl.get(key);
		if (element == null) {
			lafiteResult.setError(ResultCode.Error.Cache.NO_DATA);
			return lafiteResult;
		}
		long now = System.currentTimeMillis();
		
		long pastTime = now - element.getLastUpdateTime();
		if (pastTime >= expireTime) {
			// double check
			synchronized (element) {
				pastTime = now - element.getLastUpdateTime();
				if (pastTime >= expireTime) {
					// expired, update entry
					element.updateUpdateStatistics();
					lafiteResult.setError(ResultCode.Error.Cache.DATA_OVERDUE);
				}
			}
		}
		// element object value never null;
		lafiteResult.setDefaultModel(element.getObjectValue());
		return lafiteResult;
	}
	
	@SuppressWarnings("unchecked")
	public List<Object> getKeys() {
		List<Object> keys = cacheImpl.getKeys();
		return keys;
	}
}

2. 數據邏輯單元定義(這裏封裝了數據容器單元,把原來的數據池方法傳入,在數據更新的時候使用namespace來獲取內存實體,進而來獲取數據。) ide

(注:這裏目前尚未想好是否把數據的初始化放在容器當中,這裏暫時不放入。只是在容器裏面進行初始化方法的調用,真正的數據設置方式由邏輯單元獲取數據池進行自身的put) 函數

package com.tmall.lafite.core.manager.localcache.util;

import org.springframework.beans.factory.annotation.Autowired;

import com.tmall.lafite.core.LafiteResult;
import com.tmall.lafite.core.manager.localcache.LafiteContainer;

/**
 * 緩存邏輯單元
 * @author wangxiao
 *
 */
public abstract class LogicCenter {
	@Autowired
	private LafiteContainer lafiteContainer;
	
	/**
	 * 初始化方法
	 * @return
	 */
	public abstract Object initialize();
	
	/**
	 * 回調函數
	 * @param lafiteCache
	 * @param key
	 * @return
	 */
	public abstract Object callBack(String namespace, Object key, LafiteResult lafiteResult);

	public LafiteContainer getLafiteContainer() {
		return lafiteContainer;
	}
}
3. 數據池(數據池,使用init來循環調用內存實體裏的邏輯單元進行數據的初始化)
package com.tmall.lafite.core.manager.localcache;


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.tmall.lafite.core.LafiteResult;
import com.tmall.lafite.core.ResultCode;
import com.tmall.lafite.core.manager.localcache.entity.CacheEntity;
import com.tmall.lafite.core.manager.localcache.util.LogicCenter;

/**
 * cache容器
 * @author wangxiao
 *
 */
public class LafiteContainer {
	
	protected final Logger logger = LoggerFactory.getLogger(LafiteContainer.class);

	private Map<String, CacheEntity> cacheMap = new ConcurrentHashMap<String, CacheEntity>();
	
	/**
	 * 註冊緩存對象
	 * @param namespace
	 * @param key
	 * @param lafiteCache 緩存對象
	 * @param logicCenter 邏輯對象 (包含:初始化方法和callback方法)
	 * @return
	 */
//	public String register(String namespace, String key, LafiteCache lafiteCache, LogicCenter logicCenter) {
//		if(namespace != null && StringUtils.isEmpty(key) && lafiteCache != null) {
//			if(cacheMap.containsKey(namespace)) {
//				return ResultCode.Error.Cache.NAMESPACE_REPETITION;
//			}
//			
//			CacheEntity cacheEntity = new CacheEntity(lafiteCache, logicCenter);
//			try {
//				cacheEntity.initialize();
//			} catch (Exception e) {
//				logger.error("LafiteContainer.register ", e);
//			}
//			cacheMap.put(namespace, cacheEntity);
//			return null;
//		} 
//		return ResultCode.Error.Cache.COMMON_PARAM_LOST;
//	}
	
	/**
	 * 獲取cache內的數據
	 * @param namespace
	 * @param key
	 * @return
	 */
	public LafiteResult get(String namespace, Object key) {
		LafiteResult lafiteResult = new LafiteResult();
		CacheEntity cacheEntity = cacheMap.get(namespace);//獲取緩存實體
		if(cacheEntity == null) {
			lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY);
		} else {
			LafiteCache lafiteCache = cacheEntity.getLafiteCache();
			LafiteResult result = lafiteCache.get(key);
			if(ResultCode.Error.Cache.NO_DATA.equals(result.getError()) 
					|| ResultCode.Error.Cache.DATA_OVERDUE.equals(result.getError())) {
				cacheEntity.getLogicCenter().callBack(namespace, key, result);//數據過時觸發callback事件
			}
			lafiteResult = result;
		}
		
		return lafiteResult;
	}
	
	/**
	 * 獲取指定命名空間的所有數據
	 * 	這裏採用的是逐條遍歷的方式
	 * @param namespace
	 * @return
	 */
	public LafiteResult getAll(String namespace) {
		LafiteResult lafiteResult = new LafiteResult();
		List<Object> objects = new ArrayList<Object>();
		
		CacheEntity cacheEntity = cacheMap.get(namespace);
		if(cacheEntity == null) {
			lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY);
		} else {
			LafiteCache lafiteCache = cacheEntity.getLafiteCache();
			List<Object> keys = lafiteCache.getKeys();
			if(keys.isEmpty()) {
				lafiteResult.setError(ResultCode.Error.Cache.NO_DATA);
			} else {
				for(Object key : keys) {
					LafiteResult lr = get(namespace, key);
					objects.add(lr.getDefaultModel());
				}
			}
		}
		lafiteResult.setDefaultModel(objects);
		return lafiteResult;
	}
	
	/**
	 * 設置數據對象內容
	 * @param namespace
	 * @param key
	 * @param value
	 * @return
	 */
	public LafiteResult put(String namespace, Object key, Object value) {
		LafiteResult lafiteResult = new LafiteResult();
		CacheEntity cacheEntity = cacheMap.get(namespace);
		if(cacheEntity == null) {
			lafiteResult.setError(ResultCode.Error.Cache.NO_CACHEENTITY);
		} else {
			LafiteCache lafiteCache = cacheEntity.getLafiteCache();
			lafiteCache.put(key, value);
		}
		return lafiteResult;
	}

	public void setCacheMap(Map<String, CacheEntity> cacheMap) {
		this.cacheMap = cacheMap;
	}

	public Map<String, CacheEntity> getCacheMap() {
		return cacheMap;
	}
	
	public void initialize() {
		
		new Thread(new Runnable() {
			@Override
			public void run() {
				for(String key : cacheMap.keySet()) {
					try {
						CacheEntity cacheEntity = cacheMap.get(key);
						if(cacheEntity != null) {
							LogicCenter logicCenter = cacheEntity.getLogicCenter();
							if(logicCenter != null) {
								logicCenter.initialize();
							}
						}
					}catch (Exception e) {
						e.printStackTrace();
					}
				}
				
			}
		}).start();
		
	}
}
4. spring初始化方式
<!-- 權限角色緩存
 <bean id="permitRoleCache" class="com.tmall.lafite.core.manager.localcache.LafiteCache" init-method="initialize">
 <constructor-arg value="_permit_role_cache_"/>
 <constructor-arg value="100"/>
 <constructor-arg value="2000"/>
 </bean>
 <bean id="permitRoleLogicCenter" class="com.tmall.lafite.core.manager.permit.cache.PermitRoleLogicCenter"/>
 <bean id="permitRoleCacheEntity" class="com.tmall.lafite.core.manager.localcache.entity.CacheEntity">
 <property name="logicCenter" ref="permitRoleLogicCenter" />
 <property name="lafiteCache" ref="permitRoleCache" />
 </bean>
 -->
  
 <!-- 緩存容器 -->
 <bean id="lafiteContainer" class="com.tmall.lafite.core.manager.localcache.LafiteContainer" init-method="initialize">
 <!-- 
 <property name="cacheMap">
    	<map>
    		<entry key="PermitCommon" value-ref="permitCommonCacheEntity" />
    		<entry key="PermitAlgorithm" value-ref="permitAlgorithmCacheEntity"/>
    		<entry key="PermitRole" value-ref="permitRoleCacheEntity"/>
    	</map>
    </property> 
    -->
 </bean>

5. 使用示例 高併發

@Autowired
	private LafiteContainer lafiteContainer;
	
	private String namespace = LafiteNameSpace.PermitCommon;

	@SuppressWarnings("unchecked")
	@Transactional
	public List<PermitCommonDO> getPermitDOCache() {
		LafiteResult lafiteResult = lafiteContainer.getAll(namespace);
		if(lafiteResult.getDefaultModel() != null) {
			return (List<PermitCommonDO>) lafiteResult.getDefaultModel();
		}
		return null;
	}
相關文章
相關標籤/搜索