public interface DataSource<S, T> { /** * Load data data source as the target type. * * @return the target data. * @throws Exception */ T loadConfig() throws Exception; /** * Read original data from the data source. * * @return the original data. * @throws Exception */ S readSource() throws Exception; /** * Get {@link SentinelProperty} of the data source. * * @return the property. */ SentinelProperty<T> getProperty(); /** * Write the {@code values} to the data source. * * @param values * @throws Exception */ void writeDataSource(T values) throws Exception; /** * Close the data source. * * @throws Exception */ void close() throws Exception; }
public abstract class AbstractDataSource<S, T> implements DataSource<S, T> { protected final ConfigParser<S, T> parser; protected final SentinelProperty<T> property; public AbstractDataSource(ConfigParser<S, T> parser) { if (parser == null) { throw new IllegalArgumentException("parser can't be null"); } this.parser = parser; this.property = new DynamicSentinelProperty<T>(); } @Override public T loadConfig() throws Exception { S readValue = readSource(); T value = parser.parse(readValue); return value; } public T loadConfig(S conf) throws Exception { T value = parser.parse(conf); return value; } @Override public SentinelProperty<T> getProperty() { return property; } @Override public void writeDataSource(T values) throws Exception { throw new UnsupportedOperationException(); } }
/** * A {@link DataSource} automatically fetches the backend data. * * @param <S> source data type * @param <T> target data type * @author Carpenter Lee */ public abstract class AutoRefreshDataSource<S, T> extends AbstractDataSource<S, T> { private ScheduledExecutorService service; protected long recommendRefreshMs = 3000; public AutoRefreshDataSource(ConfigParser<S, T> configParser) { super(configParser); startTimerService(); } public AutoRefreshDataSource(ConfigParser<S, T> configParser, final long recommendRefreshMs) { super(configParser); if (recommendRefreshMs <= 0) { throw new IllegalArgumentException("recommendRefreshMs must > 0, but " + recommendRefreshMs + " get"); } this.recommendRefreshMs = recommendRefreshMs; startTimerService(); } private void startTimerService() { service = Executors.newScheduledThreadPool(1, new NamedThreadFactory("sentinel-datasource-auto-refresh-task", true)); service.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { T newValue = loadConfig(); getProperty().updateValue(newValue); } catch (Throwable e) { RecordLog.info("loadConfig exception", e); } } }, recommendRefreshMs, recommendRefreshMs, TimeUnit.MILLISECONDS); } @Override public void close() throws Exception { if (service != null) { service.shutdownNow(); service = null; } } }
/** * <p> * A {@link DataSource} based on file. This class will automatically fetches the backend file every 3 seconds. * </p> * <p> * Limitations: default read buffer size is 1MB, if file size is greater than buffer size, exceeding bytes will * be ignored. Default charset is UTF8. * </p> * * @author Carpenter Lee */ public class FileRefreshableDataSource<T> extends AutoRefreshDataSource<String, T> { private static final int MAX_SIZE = 1024 * 1024 * 4; private static final long DEFAULT_REFRESH_MS = 3000; private static final int DEFAULT_BUF_SIZE = 1024 * 1024; private static final Charset DEFAULT_CHAR_SET = Charset.forName("utf-8"); private byte[] buf; private Charset charset; private File file; /** * Create a file based {@link DataSource} whose read buffer size is 1MB, charset is UTF8, * and read interval is 3 seconds. * * @param file the file to read. * @param configParser the config parser. */ public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser) throws FileNotFoundException { this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); } public FileRefreshableDataSource(String fileName, ConfigParser<String, T> configParser) throws FileNotFoundException { this(new File(fileName), configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); //System.out.println(file.getAbsoluteFile()); } public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, int bufSize) throws FileNotFoundException { this(file, configParser, DEFAULT_REFRESH_MS, bufSize, DEFAULT_CHAR_SET); } public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, Charset charset) throws FileNotFoundException { this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, charset); } public FileRefreshableDataSource(File file, ConfigParser<String, T> configParser, long recommendRefreshMs, int bufSize, Charset charset) throws FileNotFoundException { super(configParser, recommendRefreshMs); if (bufSize <= 0 || bufSize > MAX_SIZE) { throw new IllegalArgumentException("bufSize must between (0, " + MAX_SIZE + "], but " + bufSize + " get"); } if (file == null) { throw new IllegalArgumentException("file can't be null"); } if (charset == null) { throw new IllegalArgumentException("charset can't be null"); } this.buf = new byte[bufSize]; this.file = file; this.charset = charset; firstLoad(); } private void firstLoad() { try { T newValue = loadConfig(); getProperty().updateValue(newValue); } catch (Throwable e) { RecordLog.info("loadConfig exception", e); } } @Override public String readSource() throws Exception { FileInputStream inputStream = null; try { inputStream = new FileInputStream(file); FileChannel channel = inputStream.getChannel(); if (channel.size() > buf.length) { throw new RuntimeException(file.getAbsolutePath() + " file size=" + channel.size() + ", is bigger than bufSize=" + buf.length + ". Can't read"); } int len = inputStream.read(buf); return new String(buf, 0, len, charset); } finally { if (inputStream != null) { try { inputStream.close(); } catch (Exception ignore) { } } } } @Override public void close() throws Exception { super.close(); buf = null; } @Override public void writeDataSource(T values) throws Exception { throw new UnsupportedOperationException(); } }