使用DelayQueue、ConcurrentHashMap、FutureTask實現的緩存工具類。java
DelayQueue是一個支持延時獲取元素的無界阻塞隊列。DelayQueue內部隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。緩存
DelayQueue很是有用,能夠將DelayQueue運用在如下應用場景。多線程
import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; /** * @author WChao * @date 2018/06/21 */ public class CacheBean<V> { // 緩存計算的結果 private final static ConcurrentMap<String, Future<Object>> cache = new ConcurrentHashMap<>(); // 延遲隊列來判斷那些緩存過時 private final static DelayQueue<DelayedItem<String>> delayQueue = new DelayQueue<>(); // 緩存時間 private final int ms; static { // 定時清理過時緩存 Thread t = new Thread() { @Override public void run() { dameonCheckOverdueKey(); } }; t.setDaemon(true); t.start(); } private final Computable<V> c; /** * @param c Computable */ public CacheBean(Computable<V> c) { this(c, 60 * 1000); } /** * @param c Computable * @param ms 緩存多少毫秒 */ public CacheBean(Computable<V> c, int ms) { this.c = c; this.ms = ms; } public V compute(final String key) throws InterruptedException { while (true) { //根據key從緩存中獲取值 Future<V> f = (Future<V>) cache.get(key); if (f == null) { Callable<V> eval = new Callable<V>() { public V call() { return (V) c.compute(key); } }; FutureTask<V> ft = new FutureTask<>(eval); //若是緩存中存在此能夠,則返回已存在的value f = (Future<V>) cache.putIfAbsent(key, (Future<Object>) ft); if (f == null) { //向delayQueue中添加key,並設置該key的存活時間 delayQueue.put(new DelayedItem<>(key, ms)); f = ft; ft.run(); } } try { return f.get(); } catch (CancellationException e) { cache.remove(key, f); } catch (ExecutionException e) { e.printStackTrace(); } } } /** * 檢查過時的key,從cache中刪除 */ private static void dameonCheckOverdueKey() { DelayedItem<String> delayedItem; while (true) { try { delayedItem = delayQueue.take(); if (delayedItem != null) { cache.remove(delayedItem.getT()); System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } class DelayedItem<T> implements Delayed { private T t; private long liveTime; private long removeTime; public DelayedItem(T t, long liveTime) { this.setT(t); this.liveTime = liveTime; this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { if (o == null) return 1; if (o == this) return 0; if (o instanceof DelayedItem) { DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o; if (liveTime > tmpDelayedItem.liveTime) { return 1; } else if (liveTime == tmpDelayedItem.liveTime) { return 0; } else { return -1; } } long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return diff > 0 ? 1 : diff == 0 ? 0 : -1; } @Override public long getDelay(TimeUnit unit) { return unit.convert(removeTime - System.currentTimeMillis(), unit); } public T getT() { return t; } public void setT(T t) { this.t = t; } @Override public int hashCode() { return t.hashCode(); } @Override public boolean equals(Object object) { if (object instanceof DelayedItem) { return object.hashCode() == hashCode() ? true : false; } return false; } }
/** * @author WChao * @date 2018/06/21 */ public interface Computable<V> { V compute(String k); }
/** * @author WChao * @date 2018/06/21 */ public class FutureTaskDemo { public static void main(String[] args) throws InterruptedException { // 子線程 Thread t = new Thread(() -> { CacheBean<String> cb = new CacheBean<>(k -> { try { System.out.println("模擬計算數據,計算時長2秒。key=" + k); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "你好:" + k; }, 5000); try { while (true) { System.out.println("thead2:" + cb.compute("b")); TimeUnit.SECONDS.sleep(1); } } catch (InterruptedException e) { e.printStackTrace(); } }); t.start(); // 主線程 while (true) { CacheBean<String> cb = new CacheBean<>(k -> { try { System.out.println("模擬計算數據,計算時長2秒。key=" + k); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "你好:" + k; }, 5000); System.out.println("thead1:" + cb.compute("b")); TimeUnit.SECONDS.sleep(1); } } }
執行結果:併發
兩個線程同時訪問同一個key的緩存。從執行結果發現,每次緩存失效後,同一個key只執行一次計算,而不是多個線程併發執行同一個計算而後緩存。ide