import com.ithzk.common.PropertiesUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.apache.log4j.Logger; import org.apache.zookeeper.data.Stat; import java.util.List; /** * zookeeper客戶端工具類 * @author hzk * @date 2018/3/15 */ public class ZkClientUtils { private static final Logger logger = Logger.getLogger(ZkClientUtils.class); private CuratorFramework client; private String ZK_ADDRESS; public ZkClientUtils() { ZK_ADDRESS = PropertiesUtil.getValueByBundle("config","zk.address"); this.client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(10, 5000)); this.client.start(); } public CuratorFramework getClient(){ return this.client; } /** * 建立節點 * @param zkPath 路徑 * @param data 數據 * @return */ public boolean create(String zkPath,String data){ try { // 若是父節點不存在,則在建立節點的同時建立父節點 client.create().creatingParentsIfNeeded().forPath(zkPath, data.getBytes("utf-8")); return true; } catch (Exception e) { e.printStackTrace(); logger.error("----- create zk node error:"+e.getMessage()); } return false; } /** * 獲取節點值 * @param zkPath 路徑 * @return */ public String getZnodeData(String zkPath){ try { byte[] datas = client.getData().forPath(zkPath); return new String(datas,"utf-8"); } catch (Exception e) { e.printStackTrace(); logger.error("----- get zk node data error:"+e.getMessage()); } return null; } /** * 設置節點值 * @param zkPath 路徑 * @param data 數據 * @return */ public boolean set(String zkPath,String data){ try { if(isExists(zkPath)){ client.setData().forPath(zkPath, data.getBytes("utf-8")); }else { create(zkPath, data); } return true; } catch (Exception e) { e.printStackTrace(); logger.error("----- set zk node data error:"+e.getMessage()); } return false; } /** * 檢驗節點是否存在 * @param zkPath 路徑 * @return * @throws Exception */ public boolean isExists(String zkPath) throws Exception{ Stat stat = client.checkExists().forPath(zkPath); return null != stat; } /** * 獲取子節點列表 * @param zkPath 路徑 * @return */ public List<String> list(String zkPath){ try { return client.getChildren().forPath(zkPath); } catch (Exception e) { e.printStackTrace(); logger.error("----- get zk children node list error:"+e.getMessage()); } return null; } public void close(){ client.close(); } }
import java.util.concurrent.*; public class WorkerPool { /** * 線程池執行任務 * @param command */ public void exec(Runnable command){ ThreadFactory threadFactory = Executors.defaultThreadFactory(); ArrayBlockingQueue queue = new ArrayBlockingQueue<Runnable>(2); ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, queue, threadFactory); executorPool.execute(command); } }
/** * @author hzk * @date 2018/3/15 */ public interface IUpdateLocalCacheData { void change(Node node); }
/** * 緩存重試統計類 * @author hzk * @date 2018/3/16 */ public class RetryRecord { private String lastRecordTime; private Integer retryCount; public RetryRecord(String lastRecordTime, Integer retryCount) { this.lastRecordTime = lastRecordTime; this.retryCount = retryCount; } public String getLastRecordTime() { return lastRecordTime; } public void setLastRecordTime(String lastRecordTime) { this.lastRecordTime = lastRecordTime; } public Integer getRetryCount() { return retryCount; } public void setRetryCount(Integer retryCount) { this.retryCount = retryCount; } }
package com.ithzk.common.listener; import com.ithzk.common.zk.CacheDataSynServerHandle; import org.apache.log4j.Logger; import org.springframework.beans.factory.InitializingBean; import org.springframework.web.context.ServletContextAware; import javax.servlet.ServletContext; /** * @author hzk * @date 2018/3/15 */ public class InitDataListener implements InitializingBean,ServletContextAware { private static final Logger logger = Logger.getLogger(InitDataListener.class); public static ServletContext servletContext; @Override public void setServletContext(ServletContext arg0) { InitDataListener.servletContext = arg0; } @Override public void afterPropertiesSet() throws Exception { //初始化數據 加載數據到本地緩存 logger.info("----------------- init cache data.. -----------------------"); new CacheDataSynServerHandle().loadLocal(); //註冊zk監聽節點變化 同步數據 new CacheDataSynServerHandle().start(); logger.info("----------------- init cache data complete -----------------------"); } }
import com.ithzk.common.BeanFactoryUtil; import com.ithzk.common.Constants; import com.ithzk.common.JsonUtil; import com.ithzk.common.PropertiesUtil; import com.ithzk.common.dic.LoadData; import com.ithzk.common.zk.ifc.IUpdateLocalCacheData; import com.ithzk.dto.out.BaseResponse; import com.ithzk.dto.out.DictionaryResponse; import com.ithzk.dto.out.PartnerResponse; import com.ithzk.ifc.DictionaryIfc; import com.ithzk.ifc.PartnerIfc; import com.ithzk.common.Detect; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 服務端初始化緩存數據 同步緩存數據 * @author hzk * @date 2018/3/15 */ @Component public class CacheDataSynServerHandle { private static final Logger logger = Logger.getLogger(CacheDataSynServerHandle.class); private String ZK_ADDRESS; /** * 初始化CacheDataSynServerHandle */ public void start(){ new WorkerPool().exec(new Runnable() { @Override public void run() { new CacheDataSynServerHandle().init(); } }); } /** * 註冊監聽 */ public void init() { ZK_ADDRESS = PropertiesUtil.getValueByBundle("config","zk.address"); logger.info("------register watch path:"+ZK_ADDRESS); //添加監聽 ZkWatcherUtils zkWatcherUtils = new ZkWatcherUtils(); try { UpdateLocalCacheData change = new UpdateLocalCacheData(); zkWatcherUtils.addWatch(Constants.AbstractZkNodePath.ZK_DICTIONRAY_PATH,change); } catch (Exception e) { e.printStackTrace(); logger.error("------register watch error :"+e.getMessage()); } loadZKData(); } /** * 檢查zk中是否有數據 * 沒有則初始化zk字典數據 */ private void loadZKData(){ ZkClientUtils zkClientUtils = new ZkClientUtils(); try { if(!zkClientUtils.isExists(Constants.AbstractZkNodePath.ZK_DICTIONRAY_PATH)){ List<DictionaryResponse> dicData = getDicData(); String data = JsonUtil.list2json(dicData); zkClientUtils.create(Constants.AbstractZkNodePath.ZK_DICTIONRAY_PATH, data); logger.info("------loadZKData success!"); } } catch (Exception e) { e.printStackTrace(); logger.error("------loadZKData fail msg:"+e.getMessage()); }finally{ zkClientUtils.close(); } } /** * 加載本地緩存 */ public void loadLocal(){ try { logger.info("------loadLocal dictionary data--"); List<DictionaryResponse> dataDic = getDicData(); List<PartnerResponse> partnerDic = getPartnerData(); new UpdateLocalCacheData().convert(dataDic,partnerDic); } catch (Exception e) { e.printStackTrace(); logger.error("-------------- loadLocal error :"+e.getMessage()); } } /** * 從新加載本地緩存 */ public void reloadLocal() { loadLocal(); } /** * 得到字典數據 * @return */ private static List<DictionaryResponse> getDicData() { DictionaryIfc dictionaryIfc = BeanFactoryUtil.getInstance("dictionaryIfc"); Map<String, Object> conditions = new HashMap<String, Object>(); BaseResponse<List<DictionaryResponse>> listBaseResponseDic = dictionaryIfc.selectByExample(conditions); List<DictionaryResponse> dicList = listBaseResponseDic.getDatat(); return dicList; } /** * 得到合做方數據 * @return */ private static List<PartnerResponse> getPartnerData() { PartnerIfc partnerIfc = BeanFactoryUtil.getInstance("partnerIfc"); Map<String, Object> conditions = new HashMap<String, Object>(); BaseResponse<List<PartnerResponse>> listBaseResponsePartner = partnerIfc.selectByExample(conditions); List<PartnerResponse> partnerList = listBaseResponsePartner.getDatat(); return partnerList; } static class UpdateLocalCacheData implements IUpdateLocalCacheData { @Override public void change(Node node) { try { logger.info("-------------- zookeeper node change -------------"); List<DictionaryResponse> dataDic = getDicData(); List<PartnerResponse> partnerDic = getPartnerData(); convert(dataDic,partnerDic); } catch (Exception e) { e.printStackTrace(); logger.error("-------------- zookeeper node change error :"+e.getMessage()); } } public void convert(List<DictionaryResponse> dataDic,List<PartnerResponse> partnerDic){ if(!Detect.notEmpty(dataDic)){ return; } try { LoadData.buildLocalCache(dataDic,partnerDic); logger.info("-------------- server start update local data.... ---------------------"); } catch (Exception e) { e.printStackTrace(); logger.error("-------------- loadLocal error :"+e.getMessage()); } } } }
import com.ithzk.common.PropertiesUtil; import com.ithzk.common.zk.ifc.IUpdateLocalCacheData; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.RetryNTimes; import org.apache.log4j.Logger; /** * zookeeper監聽工具類 * @author hzk * @date 2018/3/15 */ public class ZkWatcherUtils { private static final Logger logger = Logger.getLogger(ZkWatcherUtils.class); private String ZK_ADDRESS; /** * 添加監聽事件 * @param zkPath 路徑 * @param updateLocalCacheData 回調 * @throws Exception */ public void addWatch(String zkPath,final IUpdateLocalCacheData updateLocalCacheData) throws Exception { // 鏈接zk ZK_ADDRESS = PropertiesUtil.getValueByBundle("config","zk.address"); CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); logger.info("----- zk client start success ------"); //註冊監聽 final NodeCache watcher = new NodeCache(client, zkPath); watcher.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { if(null != watcher.getCurrentData()){ if(null != updateLocalCacheData){ Node node = new Node(watcher.getCurrentData().getPath(),new String(watcher.getCurrentData().getData(),"utf-8")); updateLocalCacheData.change(node); } logger.info("----- zk node changed, data is: "+ new String(watcher.getCurrentData().getData(),"utf-8")); }else{ logger.error("----- Node changed, data is null -----"); } } }); watcher.start(true); logger.error("----- Register zk watcher success! -----"); } }
package com.blog.cas.common.dic; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.blog.cas.common.constants.Constants; import com.blog.cas.common.utils.BeanFactoryUtils; import com.blog.cas.common.utils.Detect; import com.blog.cas.common.utils.json.JsonUtils; import com.blog.cas.common.zk.CacheDataSynServerHandle; import com.blog.cas.common.zk.RetryRecord; import com.blog.cas.common.zk.ZkClientUtils; import com.blog.cas.repository.entity.BlogDictionary; import com.blog.cas.service.IDictionaryService; import org.apache.log4j.Logger; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 本地緩存操做類 * @author hzk * @date 2018/11/14 */ public class LoadData { private static final Logger logger = Logger.getLogger(LoadData.class); /** * itemKey value */ private static Map<String, BlogDictionary> dicInfoMap = new HashMap<>(); /** * Id value */ private static Map<Integer, BlogDictionary> dicInfoIdMap = new HashMap<>(); private static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private static Lock r = rwl.readLock(); private static Lock w = rwl.writeLock(); /*------------------------------------------ mysql select-------------------------------------------*/ /** * 經過itemKey從數據庫中查詢數據 * @param key * @return {@link BlogDictionary} */ private static BlogDictionary getDicByItemKeyWithIfc(String key){ IDictionaryService blogDictionaryService = BeanFactoryUtils.getInstance("dictionaryService"); return blogDictionaryService.selectByItemKey(key); } /** * 經過id從數據庫中查詢數據 * @param id * @return {@link BlogDictionary} */ private static BlogDictionary getDicByIdWithIfc(Integer id){ IDictionaryService blogDictionaryService= BeanFactoryUtils.getInstance("dictionaryService"); return blogDictionaryService.selectByPrimaryKey(id); } /*------------------------------------------by itemKey-------------------------------------------*/ /** * 經過itemKey查詢字典 * @param key * @return {@link BlogDictionary} */ public static BlogDictionary getDicByItemKey(String key) { r.lock(); try { BlogDictionary dictionary = dicInfoMap.get(key); if (dictionary != null) { return dictionary; } retryBuildLocalCache(dicInfoMap); BlogDictionary dicByItemKeyWithIfc = getDicByItemKeyWithIfc(key); if(null != dicByItemKeyWithIfc){ return dicByItemKeyWithIfc; } logger.warn("======Not Found Dic key:" + key); return null; }finally { r.unlock(); } } /** * 經過itemKey查詢字典value * @param key 字典key * @param check 是否驗證啓用 * @return */ public static String getDicValueByItemKey(String key,boolean check) { r.lock(); try { BlogDictionary dictionary = dicInfoMap.get(key); if (dictionary != null) { if(check){ if(Detect.equal(LoadData.getDicIdByItemKey(Constants.AbstractPublicStatus.ENABLE),dictionary.getFrState())){ return dictionary.getItemValue(); }else{ return null; } }else{ return dictionary.getItemValue(); } } retryBuildLocalCache(dicInfoMap); BlogDictionary dicByItemKeyWithIfc = getDicByItemKeyWithIfc(key); if(null != dicByItemKeyWithIfc){ if(check){ if(Detect.equal(LoadData.getDicIdByItemKey(Constants.AbstractPublicStatus.ENABLE),dicByItemKeyWithIfc.getFrState())){ return dicByItemKeyWithIfc.getItemValue(); }else{ return null; } }else{ return dicByItemKeyWithIfc.getItemValue(); } } logger.warn("======Not Found Dic key:" + key); return null; }finally { r.unlock(); } } /** * 經過itemKey查詢字典value * @param key 字典key * @param defaultValue 不存在時設置默認值 * @return */ public static String getDicValueByItemKey(String key,String defaultValue){ r.lock(); try { BlogDictionary dictionary = dicInfoMap.get(key); if (dictionary != null) { return dictionary.getItemValue(); } retryBuildLocalCache(dicInfoMap); BlogDictionary dicByItemKeyWithIfc = getDicByItemKeyWithIfc(key); if(null != dicByItemKeyWithIfc){ return dicByItemKeyWithIfc.getItemValue(); } logger.warn("======Not Found Dic key:" + key); return defaultValue; } finally { r.unlock(); } } /** * 經過itemKey查詢字典ID * @param key 字典key * @return */ public static int getDicIdByItemKey(String key) { r.lock(); try { BlogDictionary dictionary = dicInfoMap.get(key); if (dictionary != null) { return Detect.asPrimitiveInt(dictionary.getId()); } retryBuildLocalCache(dicInfoMap); BlogDictionary dicByItemKeyWithIfc = getDicByItemKeyWithIfc(key); if(null != dicByItemKeyWithIfc){ return dicByItemKeyWithIfc.getId(); } logger.warn("======Not Found Dic key:" + key); return 0; }finally { r.unlock(); } } /** * 經過itemKey查詢字典中文名稱 * @param key 字典key * @return */ public static String getDicNameByItemKey(String key) { r.lock(); try { BlogDictionary dictionary = dicInfoMap.get(key); if (dictionary != null) { return Detect.asString(dictionary.getItemNamecn()); } retryBuildLocalCache(dicInfoMap); BlogDictionary dicByItemKeyWithIfc = getDicByItemKeyWithIfc(key); if(null != dicByItemKeyWithIfc){ return dicByItemKeyWithIfc.getItemNamecn(); } logger.warn("======Not Found Dic key:" + key); return null; }finally { r.unlock(); } } /*------------------------------------------by id-------------------------------------------*/ /** * 經過ID查詢字典 * @param id 字典ID * @return {@link BlogDictionary} */ public static BlogDictionary getDicById(Integer id) { r.lock(); try { BlogDictionary dictionary = dicInfoIdMap.get(id); if (dictionary != null) { return dictionary; } else{ retryBuildLocalCache(dicInfoIdMap); BlogDictionary dicByItemKeyWithIfc = getDicByIdWithIfc(id); if(null != dicByItemKeyWithIfc){ return dicByItemKeyWithIfc; } logger.warn("======Not Found Dic id:" + id); return null; } }finally { r.unlock(); } } /** * 經過id查詢字典value * @param id 字典ID * @return */ public static String getDicValueById(Integer id) { r.lock(); try { BlogDictionary dictionary = dicInfoIdMap.get(id); if (dictionary != null) { return dictionary.getItemValue(); } else{ retryBuildLocalCache(dicInfoIdMap); BlogDictionary dicByItemKeyWithIfc = getDicByIdWithIfc(id); if(null != dicByItemKeyWithIfc){ return dicByItemKeyWithIfc.getItemValue(); } logger.warn("======Not Found Dic id:" + id); return null; } }finally { r.unlock(); } } /** * 經過ID查詢字典itemKey * @param id 字典ID * @return */ public static String getDicItemKeyById(Integer id) { r.lock(); try { BlogDictionary dictionary = dicInfoIdMap.get(id); if (dictionary != null) { return Detect.asString(dictionary.getItemKey()); } else { retryBuildLocalCache(dicInfoIdMap); BlogDictionary dicByItemKeyWithIfc = getDicByIdWithIfc(id); if(null != dicByItemKeyWithIfc){ return dicByItemKeyWithIfc.getItemKey(); } logger.warn("======Not Found Dic id:" + id); return null; } }finally { r.unlock(); } } /** * 經過ID查詢字典parentId * @param id 字典ID * @return */ public static Integer getDicParentIdById(Integer id) { r.lock(); try { BlogDictionary dictionary = dicInfoIdMap.get(id); if (dictionary != null) { return dictionary.getParentId(); } else { retryBuildLocalCache(dicInfoIdMap); BlogDictionary dicByItemKeyWithIfc = getDicByIdWithIfc(id); if(null != dicByItemKeyWithIfc){ return dicByItemKeyWithIfc.getParentId(); } logger.warn("======Not Found Dic id:" + id); return null; } }finally { r.unlock(); } } /** * 經過ID查詢字典中文名稱 * @param id 字典ID * @return */ public static String getDicNameById(Integer id) { r.lock(); try { BlogDictionary dictionary = dicInfoIdMap.get(id); if (dictionary != null) { return Detect.asString(dictionary.getItemNamecn()); } else { retryBuildLocalCache(dicInfoIdMap); BlogDictionary dicByItemKeyWithIfc = getDicByIdWithIfc(id); if(null != dicByItemKeyWithIfc){ return dicByItemKeyWithIfc.getItemNamecn(); } logger.warn("======Not Found Dic id:" + id); return null; } }finally { r.unlock(); } } /*------------------------------------------init data-------------------------------------------*/ /** * 初始化內存數據 * 字典/合做方 */ public static void buildLocalCache(List<BlogDictionary> dataDic){ w.lock(); try { //初始化數據字典 logger.warn("-------------------init dictionary start...---------------------"); for (BlogDictionary blogDictionary : dataDic) { dicInfoMap.put(Detect.trim(blogDictionary.getItemKey()), blogDictionary); dicInfoIdMap.put(blogDictionary.getId(), blogDictionary); } logger.warn("-------------------init dictionary complete---------------------"); } catch (Exception e){ e.printStackTrace(); logger.warn("-------------------init LoadData error---------------------"+e.getMessage()); } finally{ w.unlock(); } } /** * 重試機制 * 若屢次發現本地緩存爲空 則從新加載本地數據 */ private static void retryBuildLocalCache(Map<?,BlogDictionary> dataDic){ // 內存中數據爲null時 記錄獲取次數 到達必定次數嘗試重試加載緩存 if(!Detect.notEmpty(dataDic)){ logger.warn("------ local cache dic is null -------"); ZkClientUtils zkClientUtils = new ZkClientUtils(); Date currentDate = new Date(System.currentTimeMillis()); try { if(!zkClientUtils.isExists(Constants.AbstractZkNodePath.ZK_RETRY_RECORD_PATH)){ logger.info("------ init retry record start... -------"); RetryRecord retryRecord = new RetryRecord(Detect.asString(currentDate.getTime()),0); String data = JsonUtils.object2json(retryRecord); zkClientUtils.create(Constants.AbstractZkNodePath.ZK_RETRY_RECORD_PATH, data); logger.info("------ init retry record success -------"); } String znodeData = zkClientUtils.getZnodeData(Constants.AbstractZkNodePath.ZK_RETRY_RECORD_PATH); RetryRecord retryRecord = JSONObject.toJavaObject(JSON.parseObject(znodeData), RetryRecord.class); if(null != retryRecord){ Integer retryCount = retryRecord.getRetryCount(); if(null != retryCount){ if(retryCount % Constants.AbstractZkNodePath.ZK_RETRY_COUNT_REGEX == 1){ // 嘗試從新加載本地緩存 new CacheDataSynServerHandle().reloadLocal(); } retryRecord.setLastRecordTime(Detect.asString(currentDate.getTime())); retryRecord.setRetryCount(retryCount+1); String data = JsonUtils.object2json(retryRecord); zkClientUtils.set(Constants.AbstractZkNodePath.ZK_RETRY_RECORD_PATH, data); } } } catch (Exception e) { e.printStackTrace(); logger.error("------ retry build local cache fail msg:"+e.getMessage()); }finally{ zkClientUtils.close(); } } } /** * 刷新zk字典節點緩存 * @return */ public static int renovateZkDicCache(){ ZkClientUtils zkClientUtils = new ZkClientUtils(); int success = 1; w.lock(); try { List<BlogDictionary> blogDictionary = CacheDataSynServerHandle.getDicData(); String data = JsonUtils.list2json(blogDictionary); if(!zkClientUtils.isExists(Constants.AbstractZkNodePath.ZK_DICTIONRAY_PATH)){ zkClientUtils.create(Constants.AbstractZkNodePath.ZK_DICTIONRAY_PATH, data); logger.info("------renovateZKData success!"); }else{ zkClientUtils.set(Constants.AbstractZkNodePath.ZK_DICTIONRAY_PATH, data); logger.info("------renovateZKData success!"); } } catch (Exception e) { e.printStackTrace(); success = 0; logger.error("------renovateZKData fail msg:"+e.getMessage()); }finally{ zkClientUtils.close(); w.unlock(); } return success; } }
<!-- spring系統啓動之後,將laze-init設爲false,啓動即會加載。會先加載該類 -->
<bean id="beanFactoryUtil" class="com.ithzk.common.BeanFactoryUtil" lazy-init="false"/>
<bean id="initDataListener" class="com.ithzk.common.listener.InitDataListener" lazy-init="false"/>java