使用DelayQueue 和 FutureTask 實現java中的緩存

使用DelayQueue、ConcurrentHashMap、FutureTask實現的緩存工具類。java

DelayQueue 簡介

DelayQueue是一個支持延時獲取元素的無界阻塞隊列。DelayQueue內部隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。緩存

DelayQueue很是有用,能夠將DelayQueue運用在如下應用場景。多線程

  1. 緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢
    DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
  2. 定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從
    DelayQueue中獲取到任務就開始執行,好比TimerQueue就是使用DelayQueue實現的。

ConcurrentHashMap和FutureTask,詳見如下:

  1. ConcurrentHashMap 原理解析
  2. FutureTask 源碼分析

緩存工具類實現

  1. 支持緩存多長時間,單位毫秒。
  2. 支持多線程併發。
    好比:有一個比較耗時的操做,此時緩衝中沒有此緩存值,一個線程開始計算這個耗時操做,而再次進來線程就不須要再次進行計算,只須要等上一個線程計算完成後(使用FutureTask)返回該值便可。
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

/**
  * @author WChao
  * @date 2018/06/21
  */
public class CacheBean<V> {
    // 緩存計算的結果
    private final static ConcurrentMap<String, Future<Object>> cache = new ConcurrentHashMap<>();

    // 延遲隊列來判斷那些緩存過時
    private final static DelayQueue<DelayedItem<String>> delayQueue = new DelayQueue<>();

    // 緩存時間
    private final int ms;

    static {
        // 定時清理過時緩存
        Thread t = new Thread() {
            @Override
            public void run() {
                dameonCheckOverdueKey();
            }
        };
        t.setDaemon(true);
        t.start();
    }

    private final Computable<V> c;

    /**
     * @param c Computable
     */
    public CacheBean(Computable<V> c) {
        this(c, 60 * 1000);
    }

    /**
     * @param c Computable
     * @param ms 緩存多少毫秒
     */
    public CacheBean(Computable<V> c, int ms) {
        this.c = c;
        this.ms = ms;
    }

    public V compute(final String key) throws InterruptedException {

        while (true) {
            //根據key從緩存中獲取值
            Future<V> f = (Future<V>) cache.get(key);
            if (f == null) {
                Callable<V> eval = new Callable<V>() {
                    public V call() {
                        return (V) c.compute(key);
                    }
                };
                FutureTask<V> ft = new FutureTask<>(eval);
                //若是緩存中存在此能夠,則返回已存在的value
                f = (Future<V>) cache.putIfAbsent(key, (Future<Object>) ft);
                if (f == null) {
                    //向delayQueue中添加key,並設置該key的存活時間
                    delayQueue.put(new DelayedItem<>(key, ms));
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(key, f);
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 檢查過時的key,從cache中刪除
     */
    private static void dameonCheckOverdueKey() {
        DelayedItem<String> delayedItem;
        while (true) {
            try {
                delayedItem = delayQueue.take();
                if (delayedItem != null) {
                    cache.remove(delayedItem.getT());
                    System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}


class DelayedItem<T> implements Delayed {

    private T t;
    private long liveTime;
    private long removeTime;

    public DelayedItem(T t, long liveTime) {
        this.setT(t);
        this.liveTime = liveTime;
        this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        if (o == null)
            return 1;
        if (o == this)
            return 0;
        if (o instanceof DelayedItem) {
            DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o;
            if (liveTime > tmpDelayedItem.liveTime) {
                return 1;
            } else if (liveTime == tmpDelayedItem.liveTime) {
                return 0;
            } else {
                return -1;
            }
        }
        long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
        return diff > 0 ? 1 : diff == 0 ? 0 : -1;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(removeTime - System.currentTimeMillis(), unit);
    }

    public T getT() {
        return t;
    }

    public void setT(T t) {
        this.t = t;
    }

    @Override
    public int hashCode() {
        return t.hashCode();
    }

    @Override
    public boolean equals(Object object) {
        if (object instanceof DelayedItem) {
            return object.hashCode() == hashCode() ? true : false;
        }
        return false;
    }

}

Computable接口

/**
  * @author WChao
  * @date 2018/06/21
  */
public interface Computable<V> {
    V compute(String k);
}

測試類

/**
  * @author WChao
  * @date 2018/06/21
  */
public class FutureTaskDemo {

    public static void main(String[] args) throws InterruptedException {
        // 子線程
        Thread t = new Thread(() -> {
            CacheBean<String> cb = new CacheBean<>(k -> {
                try {
                    System.out.println("模擬計算數據,計算時長2秒。key=" + k);
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "你好:" + k;
            }, 5000);

            try {
                while (true) {
                    System.out.println("thead2:" + cb.compute("b"));
                    TimeUnit.SECONDS.sleep(1);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t.start();

        // 主線程
        while (true) {
            CacheBean<String> cb = new CacheBean<>(k -> {
                try {
                    System.out.println("模擬計算數據,計算時長2秒。key=" + k);
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "你好:" + k;
            }, 5000);

            System.out.println("thead1:" + cb.compute("b"));
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

執行結果:併發

兩個線程同時訪問同一個key的緩存。從執行結果發現,每次緩存失效後,同一個key只執行一次計算,而不是多個線程併發執行同一個計算而後緩存。ide

相關文章
相關標籤/搜索