給 Java 和 Android 構建一個簡單的響應式Local Cache

夕陽.JPG

一. 爲什麼要建立這個庫

首先,Local Cache 不是相似於 Redis、Couchbase、Memcached 這樣的分佈式 Cache。Local Cache 適用於在單機環境下,對訪問頻率高、更新次數少的數據進行存放。所以,Local Cache 不適合存放大量的數據。java

Local Cache 特別適合於 App,也適合在 Java 的某些場景下使用。react

咱們的 App 使用 Retrofit 做爲網絡框架,而且大量使用 RxJava,所以我考慮建立一個 RxCache 來緩存一些必要的數據。git

RxCache 地址:github.com/fengzhizi71…github

二. 如何構建 RxCache

2.1 RxCache 的基本方法

對於 Local Cache,最重要是須要有如下的這些方法:算法

<T> Record<T> get(String key, Type type);

<T> void save(String key, T value);

<T> void save(String key, T value, long expireTime);

boolean containsKey(String key);

Set<String> getAllKeys();

void remove(String key);

void clear();
複製代碼

其中,有一個 save() 方法包含了失效時間的參數expireTime,這對於 Local Cache 是比較重要的一個方法,超過這個時間,這個數據將會失效。spring

既然是 RxCache,對於獲取數據確定須要相似這樣的方法:json

<T> Observable<Record<T>> load2Observable(final String key, final Type type) ;

<T> Flowable<Record<T>> load2Flowable(final String key, final Type type);

<T> Single<Record<T>> load2Single(final String key, final Type type);

<T> Maybe<Record<T>> load2Maybe(final String key, final Type type);
複製代碼

也須要一些 Transformer 的方法,將 RxJava 的被觀察者進行轉換。在 RxCache 中,包含了一些默認的 Transformer 策略,特別是使用 Retrofit 和 RxJava 時,能夠考慮結合這些策略來緩存數據。緩存

以 CacheFirstStrategy 爲例:網絡

/** * 緩存優先的策略,緩存取不到時取接口的數據。 * Created by tony on 2018/9/30. */
public class CacheFirstStrategy implements ObservableStrategy, FlowableStrategy, MaybeStrategy {

    @Override
    public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

        Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);

        Flowable<Record<T>> remote = source
                .map(new Function<T, Record<T>>() {
                    @Override
                    public Record<T> apply(@NonNull T t) throws Exception {

                        rxCache.save(key, t);

                        return new Record<>(Source.CLOUD, key, t);
                    }
                });

        return cache.switchIfEmpty(remote);
    }

    @Override
    public <T> Maybe<Record<T>> execute(RxCache rxCache, String key, Maybe<T> source, Type type) {

        Maybe<Record<T>> cache = rxCache.<T>load2Maybe(key, type);

        Maybe<Record<T>> remote = source
                .map(new Function<T, Record<T>>() {
                    @Override
                    public Record<T> apply(@NonNull T t) throws Exception {

                        rxCache.save(key, t);

                        return new Record<>(Source.CLOUD, key, t);
                    }
                });

        return cache.switchIfEmpty(remote);
    }

    @Override
    public <T> Observable<Record<T>> execute(RxCache rxCache, String key, Observable<T> source, Type type) {

        Observable<Record<T>> cache = rxCache.<T>load2Observable(key, type);

        Observable<Record<T>> remote = source
                .map(new Function<T, Record<T>>() {
                    @Override
                    public Record<T> apply(@NonNull T t) throws Exception {

                        rxCache.save(key, t);

                        return new Record<>(Source.CLOUD, key, t);
                    }
                });

        return cache.switchIfEmpty(remote);
    }
}
複製代碼

2.2 Memory

RxCache 包含了兩級緩存: Memory 和 Persistence 。架構

RxCache.png

Memory:

package com.safframework.rxcache.memory;

import com.safframework.rxcache.domain.Record;

import java.util.Set;

/** * Created by tony on 2018/9/29. */
public interface Memory {

    <T> Record<T> getIfPresent(String key);

    <T> void put(String key, T value);

    <T> void put(String key, T value, long expireTime);

    Set<String> keySet();

    boolean containsKey(String key);

    void evict(String key);

    void evictAll();
}
複製代碼

它的默認實現 DefaultMemoryImpl 使用 ConcurrentHashMap 來緩存數據。

在 extra 模塊還有 Guava Cache、Caffeine 的實現。它們都是成熟的 Local Cache,若是不想使用 DefaultMemoryImpl ,徹底可使用 extra 模塊成熟的替代方案。

2.3 Persistence

Persistence 的接口跟 Memory 很相似:

package com.safframework.rxcache.persistence;

import com.safframework.rxcache.domain.Record;

import java.lang.reflect.Type;
import java.util.List;

/** * Created by tony on 2018/9/28. */
public interface Persistence {

    <T> Record<T> retrieve(String key, Type type);

    <T> void save(String key, T value);

    <T> void save(String key, T value, long expireTime);

    List<String> allKeys();

    boolean containsKey(String key);

    void evict(String key);

    void evictAll();
}
複製代碼

因爲,考慮到持久層可能包括 Disk、DB。因而單獨抽象了一個 Disk 接口繼承 Persistence。

在 Disk 的實現類 DiskImpl 中,它的構造方法注入了 Converter 接口:

public class DiskImpl implements Disk {

    private File cacheDirectory;
    private Converter converter;

    public DiskImpl(File cacheDirectory,Converter converter) {

        this.cacheDirectory = cacheDirectory;
        this.converter = converter;
    }

    ......
}
複製代碼

Converter 接口用於對象儲存到文件的序列化和反序列化,目前支持 Gson 和 FastJSON。

Converter 的抽象實現類 AbstractConverter 的構造方法注入了 Encryptor 接口:

public abstract class AbstractConverter implements Converter {

    private Encryptor encryptor;

    public AbstractConverter() {
    }

    public AbstractConverter(Encryptor encryptor) {

        this.encryptor = encryptor;
    }

    ......
}
複製代碼

Encryptor 接口用於將存儲到 Disk 上的數據進行加密和解密,目前 RxCache 支持 AES128 和 DES 兩種加密方式。不使用 Encryptor 接口,則存儲到 Disk 上的數據是明文,也就是一串json字符串。

三. 支持 Java

在 example 模塊下,包括了一些常見 Java 使用的例子。

例如,最簡單的使用:

import com.safframework.rxcache.RxCache;
import com.safframework.rxcache.domain.Record;
import domain.User;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;

/** * Created by tony on 2018/9/29. */
public class Test {

    public static void main(String[] args) {

        RxCache.config(new RxCache.Builder());

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
}
複製代碼

帶 ExpireTime 的緩存測試:

import com.safframework.rxcache.RxCache;
import com.safframework.rxcache.domain.Record;
import domain.User;

/** * Created by tony on 2018/10/5. */
public class TestWithExpireTime {

    public static void main(String[] args) {

        RxCache.config(new RxCache.Builder());

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u,2000);

        try {
            Thread.sleep(2500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Record<User> record = rxCache.get("test", User.class);

        if (record==null) {
            System.out.println("record is null");
        }
    }
}
複製代碼

跟 Spring 整合而且 Memory 的實現使用 GuavaCacheImpl:

import com.safframework.rxcache.RxCache;
import com.safframework.rxcache.extra.memory.GuavaCacheImpl;
import com.safframework.rxcache.memory.Memory;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;

/** * Created by tony on 2018/10/5. */
@Configurable
public class ConfigWithGuava {

    @Bean
    public Memory guavaCache(){
        return new GuavaCacheImpl(100);
    }

    @Bean
    public RxCache.Builder rxCacheBuilder(){
        return new RxCache.Builder().memory(guavaCache());
    }

    @Bean
    public RxCache rxCache() {

        RxCache.config(rxCacheBuilder());

        return RxCache.getRxCache();
    }
}
複製代碼

測試一下剛纔的整合:

import com.safframework.rxcache.RxCache;
import com.safframework.rxcache.domain.Record;
import domain.User;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

/** * Created by tony on 2018/10/5. */
public class TestWithGuava {

    public static void main(String[] args) {

        ApplicationContext ctx = new AnnotationConfigApplicationContext(ConfigWithGuava.class);

        RxCache rxCache = ctx.getBean(RxCache.class);

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {
            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
}
複製代碼

四. 支持 Android

爲了更好地支持 Android,我還單首創建了一個項目 RxCache4a: github.com/fengzhizi71…

它包含了一個基於 LruCache 的 Memory 實現,以及一個基於 MMKV(騰訊開源的key -value存儲框架) 的 Persistence 實現。

咱們目前 App 採用了以下的 MVVM 架構來傳輸數據:

MVVM.png

將來,但願可以經過 RxCache 來整合 Repository 這一層。

五. 總結

目前,RxCache 完成了大致的框架,初步可用,接下來打算增長一些 Annotation,方便其使用。

RxCache 系列的相關文章:

  1. ReentrantReadWriteLock讀寫鎖及其在 RxCache 中的使用
  2. 堆外內存及其在 RxCache 中的使用
  3. Retrofit 風格的 RxCache及其多種緩存替換算法
  4. RxCache 整合 Android 的持久層框架 greenDAO、Room

Java與Android技術棧:每週更新推送原創技術文章,歡迎掃描下方的公衆號二維碼並關注,期待與您的共同成長和進步。

相關文章
相關標籤/搜索