在Java應用系統中,咱們常常須要配置文件來定製系統行爲,這些配置文件可能包括:類路徑下的文件和文件夾、非類路徑下的絕對路徑和相對路徑的文件和文件夾,在分佈式環境中,還須要經過HTTP從統一集中的Web服務器中得到配置信息,如何對這些配置信息進行自動加載並實時檢測變化呢?java
Java分佈式中文分詞組件 - word分詞已經實現了這個功能,咱們看看是如何實現的:git
package org.apdplat.word.util; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.MalformedURLException; import java.net.URL; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisPubSub; /** * 資源變化自動檢測 * @author 楊尚川 */ public class AutoDetector { private static final Logger LOGGER = LoggerFactory.getLogger(AutoDetector.class); //已經被監控的文件 private static final Set<String> fileWatchers = new HashSet<>(); private static final Set<String> httpWatchers = new HashSet<>(); private static final Map<DirectoryWatcher, String> resources = new HashMap<>(); private static final Map<DirectoryWatcher, ResourceLoader> resourceLoaders = new HashMap<>(); private static final Map<DirectoryWatcher.WatcherCallback, DirectoryWatcher> watcherCallbacks = new HashMap<>(); /** * 加載資源並自動檢測資源變化 * 當資源發生變化的時候從新自動加載 * @param resourceLoader 資源加載邏輯 * @param resourcePaths 多個資源路徑,用逗號分隔 */ public static void loadAndWatch(ResourceLoader resourceLoader, String resourcePaths) { LOGGER.info("開始加載資源"); LOGGER.info(resourcePaths); long start = System.currentTimeMillis(); List<String> result = new ArrayList<>(); for(String resource : resourcePaths.split("[,,]")){ try{ resource = resource.trim(); if(resource.startsWith("classpath:")){ //處理類路徑資源 result.addAll(loadClasspathResource(resource.replace("classpath:", ""), resourceLoader, resourcePaths)); }else if(resource.startsWith("http:")){ //處理HTTP資源 result.addAll(loadHttpResource(resource, resourceLoader)); }else{ //處理非類路徑資源 result.addAll(loadNoneClasspathResource(resource, resourceLoader, resourcePaths)); } }catch(Exception e){ LOGGER.error("加載資源失敗:"+resource, e); } } LOGGER.info("加載資源 "+result.size()+" 行"); //調用自定義加載邏輯 resourceLoader.clear(); resourceLoader.load(result); long cost = System.currentTimeMillis() - start; LOGGER.info("完成加載資源,耗時"+cost+" 毫秒"); } /** * 加載類路徑資源 * @param resource 資源名稱 * @param resourceLoader 資源自定義加載邏輯 * @param resourcePaths 資源的全部路徑,用於資源監控 * @return 資源內容 * @throws IOException */ private static List<String> loadClasspathResource(String resource, ResourceLoader resourceLoader, String resourcePaths) throws IOException{ List<String> result = new ArrayList<>(); LOGGER.info("類路徑資源:"+resource); Enumeration<URL> ps = AutoDetector.class.getClassLoader().getResources(resource); while(ps.hasMoreElements()) { URL url=ps.nextElement(); LOGGER.info("類路徑資源URL:"+url); if(url.getFile().contains(".jar!")){ //加載jar資源 result.addAll(load("classpath:"+resource)); continue; } File file=new File(url.getFile()); boolean dir = file.isDirectory(); if(dir){ //處理目錄 result.addAll(loadAndWatchDir(file.toPath(), resourceLoader, resourcePaths)); }else{ //處理文件 result.addAll(load(file.getAbsolutePath())); //監控文件 watchFile(file, resourceLoader, resourcePaths); } } return result; } /** * 加載HTTP資源 * @param resource 資源URL * @param resourceLoader 資源自定義加載邏輯 * @return 資源內容 */ private static List<String> loadHttpResource(String resource, ResourceLoader resourceLoader) throws MalformedURLException, IOException { List<String> result = new ArrayList<>(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(new URL(resource).openConnection().getInputStream(), "utf-8"))) { String line = null; while((line = reader.readLine()) != null){ line = line.trim(); if("".equals(line) || line.startsWith("#")){ continue; } result.add(line); } } watchHttp(resource, resourceLoader); return result; } private static void watchHttp(String resource, final ResourceLoader resourceLoader){ String[] attrs = resource.split("/"); final String channel = attrs[attrs.length-1]; if(httpWatchers.contains(channel)){ return; } httpWatchers.add(channel); Thread thread = new Thread(new Runnable() { @Override public void run() { String host = WordConfTools.get("redis.host", "localhost"); int port = WordConfTools.getInt("redis.port", 6379); String channel_add = channel+".add"; String channel_remove = channel+".remove"; LOGGER.info("redis服務器配置信息 host:" + host + ",port:" + port + ",channels:[" + channel_add + "," + channel_remove+"]"); while(true){ try{ JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), host, port); final Jedis jedis = jedisPool.getResource(); LOGGER.info("redis守護線程啓動"); jedis.subscribe(new HttpResourceChangeRedisListener(resourceLoader), new String[]{channel_add, channel_remove}); jedisPool.returnResource(jedis); LOGGER.info("redis守護線程結束"); break; }catch(Exception e){ LOGGER.info("redis未啓動,暫停一分鐘後從新鏈接"); try { Thread.sleep(60000); } catch (InterruptedException ex) { LOGGER.error(ex.getMessage(), ex); } } } } }); thread.setDaemon(true); thread.setName("redis守護線程,用於動態監控資源:"+channel); thread.start(); } private static final class HttpResourceChangeRedisListener extends JedisPubSub { private ResourceLoader resourceLoader; public HttpResourceChangeRedisListener(ResourceLoader resourceLoader){ this.resourceLoader = resourceLoader; } @Override public void onMessage(String channel, String message) { LOGGER.debug("onMessage channel:" + channel + " and message:" + message); if(channel.endsWith(".add")){ this.resourceLoader.add(message); }else if(channel.endsWith(".remove")){ this.resourceLoader.remove(message); } } @Override public void onPMessage(String pattern, String channel, String message) { LOGGER.debug("pattern:" + pattern + " and channel:" + channel + " and message:" + message); onMessage(channel, message); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { LOGGER.debug("psubscribe pattern:" + pattern + " and subscribedChannels:" + subscribedChannels); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { LOGGER.debug("punsubscribe pattern:" + pattern + " and subscribedChannels:" + subscribedChannels); } @Override public void onSubscribe(String channel, int subscribedChannels) { LOGGER.debug("subscribe channel:" + channel + " and subscribedChannels:" + subscribedChannels); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { LOGGER.debug("unsubscribe channel:" + channel + " and subscribedChannels:" + subscribedChannels); } } /** * 加載非類路徑資源 * @param resource 資源路徑 * @param resourceLoader 資源自定義加載邏輯 * @param resourcePaths 資源的全部路徑,用於資源監控 * @return 資源內容 * @throws IOException */ private static List<String> loadNoneClasspathResource(String resource, ResourceLoader resourceLoader, String resourcePaths) throws IOException { List<String> result = new ArrayList<>(); Path path = Paths.get(resource); boolean exist = Files.exists(path); if(!exist){ LOGGER.error("資源不存在:"+resource); return result; } boolean isDir = Files.isDirectory(path); if(isDir){ //處理目錄 result.addAll(loadAndWatchDir(path, resourceLoader, resourcePaths)); }else{ //處理文件 result.addAll(load(resource)); //監控文件 watchFile(path.toFile(), resourceLoader, resourcePaths); } return result; } /** * 遞歸加載目錄下面的全部資源 * 並監控目錄變化 * @param path 目錄路徑 * @param resourceLoader 資源自定義加載邏輯 * @param resourcePaths 資源的全部路徑,用於資源監控 * @return 目錄全部資源內容 */ private static List<String> loadAndWatchDir(Path path, ResourceLoader resourceLoader, String resourcePaths) { final List<String> result = new ArrayList<>(); try { Files.walkFileTree(path, new SimpleFileVisitor<Path>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { result.addAll(load(file.toAbsolutePath().toString())); return FileVisitResult.CONTINUE; } }); } catch (IOException ex) { LOGGER.error("加載資源失敗:"+path, ex); } if(fileWatchers.contains(path.toString())){ //以前已經註冊過監控服務,這次忽略 return result; } fileWatchers.add(path.toString()); DirectoryWatcher.WatcherCallback watcherCallback = new DirectoryWatcher.WatcherCallback(){ private long lastExecute = System.currentTimeMillis(); @Override public void execute(WatchEvent.Kind<?> kind, String path) { //一秒內發生的多個相同事件認定爲一次,防止短期內屢次加載資源 if(System.currentTimeMillis() - lastExecute > 1000){ lastExecute = System.currentTimeMillis(); LOGGER.info("事件:"+kind.name()+" ,路徑:"+path); synchronized(AutoDetector.class){ DirectoryWatcher dw = watcherCallbacks.get(this); String paths = resources.get(dw); ResourceLoader loader = resourceLoaders.get(dw); LOGGER.info("從新加載數據"); loadAndWatch(loader, paths); } } } }; DirectoryWatcher directoryWatcher = DirectoryWatcher.getDirectoryWatcher(watcherCallback, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE); directoryWatcher.watchDirectoryTree(path); watcherCallbacks.put(watcherCallback, directoryWatcher); resources.put(directoryWatcher, resourcePaths); resourceLoaders.put(directoryWatcher, resourceLoader); return result; } /** * 加載文件資源 * @param path 文件路徑 * @return 文件內容 */ private static List<String> load(String path) { List<String> result = new ArrayList<>(); try{ InputStream in = null; LOGGER.info("加載資源:"+path); if(path.startsWith("classpath:")){ in = AutoDetector.class.getClassLoader().getResourceAsStream(path.replace("classpath:", "")); }else{ in = new FileInputStream(path); } try(BufferedReader reader = new BufferedReader(new InputStreamReader(in,"utf-8"))){ String line; while((line = reader.readLine()) != null){ line = line.trim(); if("".equals(line) || line.startsWith("#")){ continue; } result.add(line); } } }catch(Exception e){ LOGGER.error("加載資源失敗:"+path, e); } return result; } /** * 監控文件變化 * @param file 文件 */ private static void watchFile(final File file, ResourceLoader resourceLoader, String resourcePaths) { if(fileWatchers.contains(file.toString())){ //以前已經註冊過監控服務,這次忽略 return; } fileWatchers.add(file.toString()); LOGGER.info("監控文件:"+file.toString()); DirectoryWatcher.WatcherCallback watcherCallback = new DirectoryWatcher.WatcherCallback(){ private long lastExecute = System.currentTimeMillis(); @Override public void execute(WatchEvent.Kind<?> kind, String path) { if(System.currentTimeMillis() - lastExecute > 1000){ lastExecute = System.currentTimeMillis(); if(!path.equals(file.toString())){ return; } LOGGER.info("事件:"+kind.name()+" ,路徑:"+path); synchronized(AutoDetector.class){ DirectoryWatcher dw = watcherCallbacks.get(this); String paths = resources.get(dw); ResourceLoader loader = resourceLoaders.get(dw); LOGGER.info("從新加載數據"); loadAndWatch(loader, paths); } } } }; DirectoryWatcher fileWatcher = DirectoryWatcher.getDirectoryWatcher(watcherCallback, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE); fileWatcher.watchDirectory(file.getParent()); watcherCallbacks.put(watcherCallback, fileWatcher); resources.put(fileWatcher, resourcePaths); resourceLoaders.put(fileWatcher, resourceLoader); } public static void main(String[] args){ AutoDetector.loadAndWatch(new ResourceLoader(){ @Override public void clear() { System.out.println("清空資源"); } @Override public void load(List<String> lines) { for(String line : lines){ System.out.println(line); } } @Override public void add(String line) { System.out.println("add:"+line); } @Override public void remove(String line) { System.out.println("remove:"+line); } }, "d:/DIC, d:/DIC2, d:/dic.txt, classpath:dic2.txt,classpath:dic"); } }
代碼地址github