視頻緩存AndroidVideoCache攻略

很久沒有更新博客了,一個是由於太忙,業務是在太多,另一個就是也比較懶,因此拖着就有接近兩個月的時間沒有寫博客了,實在是罪過。今天分享一個開源庫 AndroidVideoCache 。這個庫主要是作視頻緩存管理功能,支持邊下邊播,離線播放,緩存管理等。用過MediaPlayer的小夥伴都知道,能夠支持在線播放和播放本地資源,可是不支持緩存,這樣很消耗用戶流量,這個時候AndroidVideoCache就派上用場了。java

1.基本原理

AndroidVideoCache 經過代理的策略將咱們的網絡請求代理到本地服務,本地服務再決定是從本地緩存拿仍是發起網絡請求,若是須要發起網絡請求就先向本地寫入數據,再從本地提供數據給視頻播放器。這樣就作到了數據的複用。git

借用一張AndroidVideoCache - 視頻邊播放邊緩存的代理策略裏面的圖片看的比較清楚: github

原理圖

在視頻播放器,好比VideoView發起一個urlA,經過HttpProxyCacheServer轉成一個本地host和端口的urlB,這樣視頻播放器發起請求就是向HttpProxyCacheServer請求,返回視頻播放器的Socket,Server再創建一個HttpProxyCacheServerClients來發起網絡請求處理緩存等工做,而後把數據經過前面的Socket返回給視頻播放器。算法

瞭解了基本原理,再看下代碼結構。數據庫

2.代碼結構

整個代碼結構仍是比較清晰,涉及到的類比較多,這裏只畫出了一些主要的相關類,看下個人手繪圖😢: 緩存

WechatIMG2.jpeg

HttpProxyCacheServer是庫對外的接口,經過這個和視頻播放器聯繫,判斷本地是否有緩存,有的話直接返回本地文件;沒有就創建一個和url對應的HttpProxyCacheServerClients處理本次請求,請求工做是交給Source接口,緩存工做是經過Cache接口。文件緩存是用LRU算法實現,能夠根據文件大小或者文件個數管理緩存。bash

CacheListener是緩存本地成功後回調接口,能夠用於更新視頻進度條等UI需求。服務器

上面總體介紹了下原理和代碼結構,接下來是時候看下使用方法了,暴露出來的接口比較少,因此使用起來也簡單。網絡

3. 使用

首先是導包,截止到寫這邊博客,最新的版本是2.7.1:session

dependencies {
   compile 'com.danikula:videocache:2.7.1'
}
複製代碼

而後在全局初始化一個本地代理服務器,這裏選擇在 Application 的實現類中

public class App extends Application {

   private HttpProxyCacheServer proxy;

   public static HttpProxyCacheServer getProxy(Context context) {
       App app = (App) context.getApplicationContext();
       return app.proxy == null ? (app.proxy = app.newProxy()) : app.proxy;
   }

   private HttpProxyCacheServer newProxy() {
       return new HttpProxyCacheServer(this);
   }
}
複製代碼

有了代理服務器,咱們就可使用了,把本身的網絡視頻 url 用提供的方法替換成另外一個 URL

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);

   HttpProxyCacheServer proxy = getProxy();
   String proxyUrl = proxy.getProxyUrl(VIDEO_URL);
   videoView.setVideoPath(proxyUrl);
}
複製代碼

提供了更多的能夠自定義的地方,好比緩存的文件最大大小,以及文件個數,緩存採起的是 LruCache 的方法,對於老文件在達到上限後會自動清理。

private HttpProxyCacheServer newProxy() {
    return new HttpProxyCacheServer.Builder(this)
            .maxCacheSize(1024 * 1024 * 1024)       // 1 Gb for cache
            .build();
}

private HttpProxyCacheServer newProxy() {
    return new HttpProxyCacheServer.Builder(this)
            .maxCacheFilesCount(20)
            .build();
}
複製代碼

除了這個,還有一個就是生成的文件名,默認是使用的 MD5 方式生成 key,考慮到一些業務邏輯,咱們也能夠繼承一個 FileNameGenerator 來實現本身的策略

public class MyFileNameGenerator implements FileNameGenerator {

    // Urls contain mutable parts (parameter 'sessionToken') and stable video's id (parameter 'videoId'). // e. g. http://example.com?videoId=abcqaz&sessionToken=xyz987 public String generate(String url) { Uri uri = Uri.parse(url); String videoId = uri.getQueryParameter("videoId"); return videoId + ".mp4"; } } ... HttpProxyCacheServer proxy = HttpProxyCacheServer.Builder(context) .fileNameGenerator(new MyFileNameGenerator()) .build() 複製代碼

很明顯,構造Server是經過建造者的模式,看下Builder的代碼就知道支持哪些配置和默認配置是什麼了。

private static final long DEFAULT_MAX_SIZE = 512 * 1024 * 1024;

        private File cacheRoot;
        private FileNameGenerator fileNameGenerator;
        private DiskUsage diskUsage;
        private SourceInfoStorage sourceInfoStorage;
        private HeaderInjector headerInjector;

        public Builder(Context context) {
            this.sourceInfoStorage = SourceInfoStorageFactory.newSourceInfoStorage(context);
            this.cacheRoot = StorageUtils.getIndividualCacheDirectory(context);
            this.diskUsage = new TotalSizeLruDiskUsage(DEFAULT_MAX_SIZE);
            this.fileNameGenerator = new Md5FileNameGenerator();
            this.headerInjector = new EmptyHeadersInjector();
        }
複製代碼
  • cacheRoot就是緩存默認的文件夾,若是有sd卡而且申請了權限,會放到下面的目錄
<i>("/Android/data/[app_package_name]/cache")</i> 
複製代碼

不然放到手機的內部存儲

cacheDirPath = "/data/data/" + context.getPackageName() + "/cache/";
複製代碼
  • FileNameGenerator用於生成文件名,默認是 Md5FileNameGenerator,生成MD5串做爲文件名。

  • DiskUsage是用於管理本地緩存,默認是經過文件大小進行管理,大小默認是512M

private static final long DEFAULT_MAX_SIZE = 512 * 1024 * 1024;
this.diskUsage = new TotalSizeLruDiskUsage(DEFAULT_MAX_SIZE);
複製代碼
  • SourceInfoStorage是用於存儲SourInfo,默認是數據庫存儲
this.sourceInfoStorage = SourceInfoStorageFactory.newSourceInfoStorage(context);

public static SourceInfoStorage newSourceInfoStorage(Context context) {
        return new DatabaseSourceInfoStorage(context);
}
複製代碼

SourInfo是什麼?主要用於存儲http請求源的一些信息,好比url,數據長度length,請求資源的類型mime:

public final String url;
    public final long length;
    public final String mime;
複製代碼
  • HeaderInjector主要用於添加一些自定義的頭部字段,默認是空
this.headerInjector = new EmptyHeadersInjector();
複製代碼

最後把這些字段構形成Config,構造HttpProxyCacheServer須要,後面會再傳給HttpProxyCacheServerClients用於發起請求(url,length,mime)等,和本地緩存(DiskUsage,SourceInfoStorage,cacheRoot)等。

/**
         * Builds new instance of {@link HttpProxyCacheServer}.
         *
         * @return proxy cache. Only single instance should be used across whole app.
         */
        public HttpProxyCacheServer build() {
            Config config = buildConfig();
            return new HttpProxyCacheServer(config);
        }

        private Config buildConfig() {
            return new Config(cacheRoot, fileNameGenerator, diskUsage, sourceInfoStorage, headerInjector);
        }
複製代碼

4. 源碼分析

從上面分析知道入口是HttpProxyCacheServer,因此咱們先看下它:

HttpProxyCacheServer.java

    private static final Logger LOG = LoggerFactory.getLogger("HttpProxyCacheServer");
    private static final String PROXY_HOST = "127.0.0.1";

    private final Object clientsLock = new Object();
    private final ExecutorService socketProcessor = Executors.newFixedThreadPool(8);
    private final Map<String, HttpProxyCacheServerClients> clientsMap = new ConcurrentHashMap<>();
    private final ServerSocket serverSocket;
    private final int port;
    private final Thread waitConnectionThread;
    private final Config config;
    private final Pinger pinger;

    public HttpProxyCacheServer(Context context) {
        this(new Builder(context).buildConfig());
    }

    private HttpProxyCacheServer(Config config) {
        this.config = checkNotNull(config);
        try {
            InetAddress inetAddress = InetAddress.getByName(PROXY_HOST);
            this.serverSocket = new ServerSocket(0, 8, inetAddress);
            this.port = serverSocket.getLocalPort();
            IgnoreHostProxySelector.install(PROXY_HOST, port);
            CountDownLatch startSignal = new CountDownLatch(1);
            this.waitConnectionThread = new Thread(new WaitRequestsRunnable(startSignal));
            this.waitConnectionThread.start();
            startSignal.await(); // freeze thread, wait for server starts
            this.pinger = new Pinger(PROXY_HOST, port);
            LOG.info("Proxy cache server started. Is it alive? " + isAlive());
        } catch (IOException | InterruptedException e) {
            socketProcessor.shutdown();
            throw new IllegalStateException("Error starting local proxy server", e);
        }
    }
複製代碼

首先是構造一個本地127.0.0.1ServerSocker,隨機分配了一個端口,而後啓動一個線程去執行WaitRequestsRunnable,在這裏面執行 waitForRequest,經過 accept() 方法監聽這個服務器 socket 的入站鏈接,accept() 方法會一直阻塞,直到有一個客戶端嘗試創建鏈接。

private void waitForRequest() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                Socket socket = serverSocket.accept();
                LOG.debug("Accept new socket " + socket);
                socketProcessor.submit(new SocketProcessorRunnable(socket));
            }
        } catch (IOException e) {
            onError(new ProxyCacheException("Error during waiting connection", e));
        }
    }
複製代碼

再回到前面的構造函數中,有個信號量用來保證Server啓動後再走往下的流程,Server啓動後會構造一個pinger,用來看服務是否可用。

CountDownLatch startSignal = new CountDownLatch(1);
this.waitConnectionThread = new Thread(new WaitRequestsRunnable(startSignal));
this.waitConnectionThread.start();
startSignal.await(); // freeze thread, wait for server starts
this.pinger = new Pinger(PROXY_HOST, port);
複製代碼

經過上面幾步,HttpProxyCacheServer就已經啓動起來了,在等待客戶端的鏈接,那客戶端怎麼鏈接到服務?再看下第三章節使用裏面提到的另一個方法getProxyUrl,看下官方解釋,若是本地有緩存那麼會返回本地地址的 Uri,file:// uri,不然返回一個代理的url。

/**
     * Returns url that wrap original url and should be used for client (MediaPlayer, ExoPlayer, etc).
     * <p>
     * If parameter {@code allowCachedFileUri} is {@code true} and file for this url is fully cached
     * (it means method {@link #isCached(String)} returns {@code true}) then file:// uri to cached file will be returned.
     *
     * @param url                a url to file that should be cached.
     * @param allowCachedFileUri {@code true} if allow to return file:// uri if url is fully cached
     * @return a wrapped by proxy url if file is not fully cached or url pointed to cache file otherwise (if {@code allowCachedFileUri} is {@code true}).
     */
複製代碼

再看下代碼就很簡單了, 若是本地已經緩存了,就直接拿本地地址的 Uri,而且 touch 一下文件,把時間更新後最新,由於後面 LruCache 是根據文件被訪問的時間進行排序的。

public String getProxyUrl(String url, boolean allowCachedFileUri) {
        if (allowCachedFileUri && isCached(url)) {
            File cacheFile = getCacheFile(url);
            touchFileSafely(cacheFile);
            return Uri.fromFile(cacheFile).toString();
        }
        return isAlive() ? appendToProxyUrl(url) : url;
    }
複製代碼

若是文件沒有被緩存那麼就會先走一下 isAlive() 方法,這裏會ping一下Server,確保是通的。若是不通就直接返回原url,通的話就返回代理url:

private static final String PROXY_HOST = "127.0.0.1";
    private String appendToProxyUrl(String url) {
    return String.format(Locale.US, "http://%s:%d/%s", PROXY_HOST, port, ProxyCacheUtils.encode(url));
    }
複製代碼

因此在視頻播放器拿着這個代理url發起請求會和Server進行鏈接,而後前面提到的waitForRequest會返回一個客戶端的Socket,用於和客戶端通訊。而後會用線程池處理這個請求,能夠看到最多支持8個併發鏈接。

private final ExecutorService socketProcessor = Executors.newFixedThreadPool(8);
socketProcessor.submit(new SocketProcessorRunnable(socket));
複製代碼

SocketProcessorRunnable請求會經過processSocket進行處理,前面 ping 的過程其實也被會這個 socket 監聽而且走進來這一段。資源請求會走到else邏輯裏面。

private void processSocket(Socket socket) {
        try {
            GetRequest request = GetRequest.read(socket.getInputStream());
            LOG.debug("Request to cache proxy:" + request);
            String url = ProxyCacheUtils.decode(request.uri);
            if (pinger.isPingRequest(url)) {
                pinger.responseToPing(socket);
            } else {
                HttpProxyCacheServerClients clients = getClients(url);
                clients.processRequest(request, socket);
            }
        } catch (SocketException e) {
            // There is no way to determine that client closed connection http://stackoverflow.com/a/10241044/999458
            // So just to prevent log flooding don't log stacktrace LOG.debug("Closing socket… Socket is closed by client."); } catch (ProxyCacheException | IOException e) { onError(new ProxyCacheException("Error processing request", e)); } finally { releaseSocket(socket); LOG.debug("Opened connections: " + getClientsCount()); } } 複製代碼

首先在內存緩存,其實就是ConcurrentHashMap,看看有沒有url對應的HttpProxyCacheServerClients,沒有的話構造一個。HttpProxyCacheServerClients就是用來處理一個請求url對應的工做。

public void processRequest(GetRequest request, Socket socket) throws ProxyCacheException, IOException {
        startProcessRequest();
        try {
            clientsCount.incrementAndGet();
            proxyCache.processRequest(request, socket);
        } finally {
            finishProcessRequest();
        }
    }
複製代碼

經過startProcessRequest()構造HttpProxyCache:

private synchronized void startProcessRequest() throws ProxyCacheException {
        proxyCache = proxyCache == null ? newHttpProxyCache() : proxyCache;
    }

    private HttpProxyCache newHttpProxyCache() throws ProxyCacheException {
        HttpUrlSource source = new HttpUrlSource(url, config.sourceInfoStorage, config.headerInjector);
        FileCache cache = new FileCache(config.generateCacheFile(url), config.diskUsage);
        HttpProxyCache httpProxyCache = new HttpProxyCache(source, cache);
        httpProxyCache.registerCacheListener(uiCacheListener);
        return httpProxyCache;
    }
複製代碼

在前面第二章節代碼結構中能夠看到無論網絡請求HttpUrlSource仍是緩存FileCache都是經過HttpProxyCache管理。而後註冊一個回調CacheListener,在HttpProxyCache緩存可用的時候會回調通知HttpProxyCacheServerClients,Clients就能夠通知監聽者:

httpProxyCache.registerCacheListener(uiCacheListener);
this.uiCacheListener = new UiListenerHandler(url, listeners);

    private static final class UiListenerHandler extends Handler implements CacheListener {

        private final String url;
        private final List<CacheListener> listeners;

        public UiListenerHandler(String url, List<CacheListener> listeners) {
            super(Looper.getMainLooper());
            this.url = url;
            this.listeners = listeners;
        }

        @Override
        public void onCacheAvailable(File file, String url, int percentsAvailable) {
            Message message = obtainMessage();
            message.arg1 = percentsAvailable;
            message.obj = file;
            sendMessage(message);
        }

        @Override
        public void handleMessage(Message msg) {
            for (CacheListener cacheListener : listeners) {
                cacheListener.onCacheAvailable((File) msg.obj, url, msg.arg1);
            }
        }
    }
複製代碼

再回到HttpProxyCacheServerClients構造函數中,接下來會調用proxyCache.processRequest(request, socket):

public void processRequest(GetRequest request, Socket socket) throws IOException, ProxyCacheException {
        OutputStream out = new BufferedOutputStream(socket.getOutputStream());
        String responseHeaders = newResponseHeaders(request);
        out.write(responseHeaders.getBytes("UTF-8"));

        long offset = request.rangeOffset;
        if (isUseCache(request)) {
            responseWithCache(out, offset);
        } else {
            responseWithoutCache(out, offset);
        }
    }
複製代碼

首先經過Socket回消息給視頻播放器頭部信息,接下來判斷是否須要走緩存,不走緩存就直接經過HttpUrlSource發起HttpURLConnection,讀取數據經過Socket返回給播放器。若是須要走緩存,會走下面代碼,先調用read讀取8k的數據,讀取成功經過Socket先返回給播放器,再重複讀直到完成。

HttpProxyCache.java

    static final int DEFAULT_BUFFER_SIZE = 8 * 1024;

    private void responseWithCache(OutputStream out, long offset) throws ProxyCacheException, IOException {
        byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
        int readBytes;
        while ((readBytes = read(buffer, offset, buffer.length)) != -1) {
            out.write(buffer, 0, readBytes);
            offset += readBytes;
        }
        out.flush();
    }
複製代碼

read方法是調用的父類ProxyCache的read方法:

public int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
        ProxyCacheUtils.assertBuffer(buffer, offset, length);

        while (!cache.isCompleted() && cache.available() < (offset + length) && !stopped) {
            readSourceAsync();
            waitForSourceData();
            checkReadSourceErrorsCount();
        }
        int read = cache.read(buffer, offset, length);
        if (cache.isCompleted() && percentsAvailable != 100) {
            percentsAvailable = 100;
            onCachePercentsAvailableChanged(100);
        }
        return read;
    }
複製代碼

經過循環不斷讀取數據,直到下面其中一個條件知足:

  • 文件讀取完成
  • 或者讀取的數據已經達到length的要求,默認是8k
  • Clients已經調用shutdown

讀取數據會啓動一個新的線程去讀取:

private synchronized void readSourceAsync() throws ProxyCacheException {
        boolean readingInProgress = sourceReaderThread != null && sourceReaderThread.getState() != Thread.State.TERMINATED;
        if (!stopped && !cache.isCompleted() && !readingInProgress) {
            sourceReaderThread = new Thread(new SourceReaderRunnable(), "Source reader for " + source);
            sourceReaderThread.start();
        }
    }
複製代碼

SourceReaderRunnable中主要就是調用readSource,這裏主要是經過HttpUrlSource.read讀取網絡數據,而後經過FileCache寫入到本地緩存,在緩存結束後一樣也會發送一個通知通知本身已經緩存完了,回調由外界控制。

private void readSource() {
        long sourceAvailable = -1;
        long offset = 0;
        try {
            offset = cache.available();
            source.open(offset);
            sourceAvailable = source.length();
            byte[] buffer = new byte[ProxyCacheUtils.DEFAULT_BUFFER_SIZE];
            int readBytes;
            while ((readBytes = source.read(buffer)) != -1) {
                synchronized (stopLock) {
                    if (isStopped()) {
                        return;
                    }
                    cache.append(buffer, readBytes);
                }
                offset += readBytes;
                notifyNewCacheDataAvailable(offset, sourceAvailable);
            }
            tryComplete();
            onSourceRead();
        } catch (Throwable e) {
            readSourceErrorsCount.incrementAndGet();
            onError(e);
        } finally {
            closeSource();
            notifyNewCacheDataAvailable(offset, sourceAvailable);
        }
    }
複製代碼

同時調用ProxyCache.read的線程如今在作什麼?在看下read方法裏面的代碼:

public int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
        ProxyCacheUtils.assertBuffer(buffer, offset, length);

        while (!cache.isCompleted() && cache.available() < (offset + length) && !stopped) {
            readSourceAsync();
            waitForSourceData();
            checkReadSourceErrorsCount();
        }
        int read = cache.read(buffer, offset, length);
        if (cache.isCompleted() && percentsAvailable != 100) {
            percentsAvailable = 100;
            onCachePercentsAvailableChanged(100);
        }
        return read;
    }
複製代碼

readSourceAsync啓動另一個線程(爲了方便這裏簡稱爲ThreadB)後,本線程(爲了方便這裏簡稱爲ThreadA)會接下來執行 waitForSourceData, 先得到wc這個鎖,而後調用ThreadA會掛起1s的時間或者ThreadB已經寫完緩存,經過notifyAll通知。

private void waitForSourceData() throws ProxyCacheException {
        synchronized (wc) {
            try {
                wc.wait(1000);
            } catch (InterruptedException e) {
                throw new ProxyCacheException("Waiting source data is interrupted!", e);
            }
        }
    }

    private void notifyNewCacheDataAvailable(long cacheAvailable, long sourceAvailable) {
        onCacheAvailable(cacheAvailable, sourceAvailable);

        synchronized (wc) {
            wc.notifyAll();
        }
    }
複製代碼

接下來ThreadA會繼續執行checkReadSourceErrorsCount方法,若是ThreadB在readSource出現異常,會增長一次錯誤次數,而後會拋出異常。

ProxyCache.java

private static final int MAX_READ_SOURCE_ATTEMPTS = 1;

    private void checkReadSourceErrorsCount() throws ProxyCacheException {
        int errorsCount = readSourceErrorsCount.get();
        if (errorsCount >= MAX_READ_SOURCE_ATTEMPTS) {
            readSourceErrorsCount.set(0);
            throw new ProxyCacheException("Error reading source " + errorsCount + " times");
        }
    }
複製代碼

線程ThreadA會在while循環中繼續判斷條件,若是知足會跳出,而後從FileCache中讀取length字節的數據返回到HttpProxyCacheresponseWithCache方法中,經過Socket寫回給播放器。

到此整個讀取數據,緩存數據的流程就結束了。

5. 總結

寫的比較長,先介紹了下AndroidVideoCache的基本原理,而後手繪了張代碼框架圖,方便全局瞭解,而後看了下使用方法,最後分析了主要流程的源碼。簡單提及來就是經過代理策略,攔截網絡請求,從本地拿出數據給到播放器。後面若是有時間能夠再簡單說下本地緩存的一些代碼。

若是本文對你有幫助,歡迎關注哈。

感謝@右傾傾,但願你能少點痛苦,平平安安,快快樂樂。

下車了,提早祝你們新年快樂!

相關文章
相關標籤/搜索