media cache 對比.

對工做中用到的media player 播放領域中遇到的三個cache 實現優缺點總結:html

1,android 原生的NuCacheSource2前端

2,嵌入式播放中經常使用的ringBuffer cacheandroid

3,youtobe 開源播放器exoplayer 中用到的cache 策略git

 

I.android 原生中http + mp4 的播放模式,爲緩解網絡抖動平衡下載的時機,開發了NuCacheSource2 github

NuCacheSource2 集成與DataSource 抽象類,提供緩存讀寫統計功能緩存

 

android 的多媒體框架在Source端的模型如圖:網絡

1,DataSource 被抽象爲數據源(例如HTTP,Cache,LocalFIle,Prefetching) 抽象的是一種訪問數據的類型.能夠用IBINDER 初始化,也能夠無參初始化,很是靈活.    數據結構

40class DataSource : public RefBase {
41public:
42    enum Flags {
43        kWantsPrefetching      = 1,
44        kStreamedFromLocalHost = 2,
45        kIsCachingDataSource   = 4,
46        kIsHTTPBasedSource     = 8,
47    };
48
49    static sp<DataSource> CreateFromURI(
50            const sp<IMediaHTTPService> &httpService,
51            const char *uri,
52            const KeyedVector<String8, String8> *headers = NULL,
53            String8 *contentType = NULL,
54            HTTPBase *httpSource = NULL);
55
56    static sp<DataSource> CreateMediaHTTP(const sp<IMediaHTTPService> &httpService);
57    static sp<DataSource> CreateFromIDataSource(const sp<IDataSource> &source);
58
59    DataSource() {}
60
61    virtual status_t initCheck() const = 0;
62
63    // Returns the number of bytes read, or -1 on failure. It's not an error if
64    // this returns zero; it just means the given offset is equal to, or
65    // beyond, the end of the source.
66    virtual ssize_t readAt(off64_t offset, void *data, size_t size) = 0;
67
74    // May return ERROR_UNSUPPORTED.
75    virtual status_t getSize(off64_t *size);
76
77    virtual uint32_t flags() {
78        return 0;
79    }
80
81    virtual status_t reconnectAtOffset(off64_t offset) {
82        return ERROR_UNSUPPORTED;
83    }
84

104    virtual String8 getUri() {
105        return String8();
106    }
107
108    virtual String8 getMIMEType() const;

 

2,NuPlayer::Source 是MediaSource 的抽象表明一種多媒體源以及對多媒體源的基本的操做行爲(prepare,start,pause,resume,seek,disconnect)多線程

struct NuPlayer::Source : public AHandler {
34    enum Flags {
35        FLAG_CAN_PAUSE          = 1,
36        FLAG_CAN_SEEK_BACKWARD  = 2,  // the "10 sec back button"
37        FLAG_CAN_SEEK_FORWARD   = 4,  // the "10 sec forward button"
38        FLAG_CAN_SEEK           = 8,  // the "seek bar"
39        FLAG_DYNAMIC_DURATION   = 16,
40        FLAG_SECURE             = 32,
41        FLAG_PROTECTED          = 64,
42    };
43
44    enum {
45        kWhatPrepared,
46        kWhatFlagsChanged,
47        kWhatVideoSizeChanged,
48        kWhatBufferingUpdate,
49        kWhatBufferingStart,
50        kWhatBufferingEnd,
51        kWhatPauseOnBufferingStart,
52        kWhatResumeOnBufferingEnd,
53        kWhatCacheStats,
54        kWhatSubtitleData,
55        kWhatTimedTextData,
56        kWhatTimedMetaData,
57        kWhatQueueDecoderShutdown,
58        kWhatDrmNoLicense,
59        kWhatInstantiateSecureDecoders,
60    };
61
62    // The provides message is used to notify the player about various
63    // events.
64    Source(const sp<AMessage> &notify)
65        : mNotify(notify) {
66    }
67
68    virtual void prepareAsync() = 0;
69
70    virtual void start() = 0;
71    virtual void stop() {}
72    virtual void pause() {}
73    virtual void resume() {}
74
75    // Explicitly disconnect the underling data source
76    virtual void disconnect() {}
77
78    // Returns OK iff more data was available,
79    // an error or ERROR_END_OF_STREAM if not.
80    virtual status_t feedMoreTSData() = 0;
81
82    virtual sp<AMessage> getFormat(bool audio);
83    virtual sp<MetaData> getFormatMeta(bool /* audio */) { return NULL; }
84    virtual sp<MetaData> getFileFormatMeta() const { return NULL; }
85
86    virtual status_t dequeueAccessUnit(
87            bool audio, sp<ABuffer> *accessUnit) = 0;
88
89    virtual status_t getDuration(int64_t * /* durationUs */) {
90        return INVALID_OPERATION;
91    }
92
93    virtual size_t getTrackCount() const {
94        return 0;
95    }
96
97    virtual sp<AMessage> getTrackInfo(size_t /* trackIndex */) const {
98        return NULL;
99    }
100
101    virtual ssize_t getSelectedTrack(media_track_type /* type */) const {
102        return INVALID_OPERATION;
103    }
104
105    virtual status_t selectTrack(size_t /* trackIndex */, bool /* select */, int64_t /* timeUs*/) {
106        return INVALID_OPERATION;
107    }
108
109    virtual status_t seekTo(int64_t /* seekTimeUs */) {
110        return INVALID_OPERATION;
111    }
112
113    virtual status_t setBuffers(bool /* audio */, Vector<MediaBuffer *> &/* buffers */) {
114        return INVALID_OPERATION;
115    }
116
117    virtual bool isRealTime() const {
118        return false;
119    }
120
121    virtual bool isStreaming() const {
122        return true;
123    }

 

3,這個類重點提供兩個方法fetch,read 經過PageCache.app

30struct NuCachedSource2 : public DataSource {
31    static sp<NuCachedSource2> Create(
32            const sp<DataSource> &source,
33            const char *cacheConfig = NULL,
34            bool disconnectAtHighwatermark = false);
35
36    virtual status_t initCheck() const;
38    virtual ssize_t readAt(off64_t offset, void *data, size_t size);
40    virtual void disconnect();
42    virtual status_t getSize(off64_t *size);
43    virtual uint32_t flags();
49    virtual String8 getMIMEType() const;
50
51    ////////////////////////////////////////////////////////////////////////////
52
53    size_t cachedSize();
54    size_t approxDataRemaining(status_t *finalStatus) const;
56    void resumeFetchingIfNecessary();
58    // The following methods are supported only if the
59    // data source is HTTP-based; otherwise, ERROR_UNSUPPORTED
60    // is returned.
61    status_t getEstimatedBandwidthKbps(int32_t *kbps);
62    status_t setCacheStatCollectFreq(int32_t freqMs);
64    static void RemoveCacheSpecificHeaders(
65            KeyedVector<String8, String8> *headers,
66            String8 *cacheConfig,
67            bool *disconnectAtHighwatermark);
68
69protected:
70    virtual ~NuCachedSource2();
71
72private:
73    friend struct AHandlerReflector<NuCachedSource2>;
75    NuCachedSource2(
76            const sp<DataSource> &source,
77            const char *cacheConfig,
78            bool disconnectAtHighwatermark);
80    enum {
81        kPageSize                       = 65536,
82        kDefaultHighWaterThreshold      = 20 * 1024 * 1024,
83        kDefaultLowWaterThreshold       = 4 * 1024 * 1024,
84
85        // Read data after a 15 sec timeout whether we're actively
86        // fetching or not.
87        kDefaultKeepAliveIntervalUs     = 15000000,
88    };
90    enum {
91        kWhatFetchMore  = 'fetc',
92        kWhatRead       = 'read',
93    };
95    enum {
96        kMaxNumRetries = 10,
97    };
99    sp<DataSource> mSource;
100    sp<AHandlerReflector<NuCachedSource2> > mReflector;
101    sp<ALooper> mLooper;
102
103    Mutex mSerializer;
104    mutable Mutex mLock;
105    Condition mCondition;
106
107    PageCache *mCache;
108    off64_t mCacheOffset;
109    status_t mFinalStatus;
110    off64_t mLastAccessPos;
111    sp<AMessage> mAsyncResult;
112    bool mFetching;
113    bool mDisconnecting;
114    int64_t mLastFetchTimeUs;
115
116    int32_t mNumRetriesLeft;
117
118    size_t mHighwaterThresholdBytes;
119    size_t mLowwaterThresholdBytes;
120
121    // If the keep-alive interval is 0, keep-alives are disabled.
122    int64_t mKeepAliveIntervalUs;
123
124    bool mDisconnectAtHighwatermark;
125
126    void onMessageReceived(const sp<AMessage> &msg);
127    void onFetch();
128    void onRead(const sp<AMessage> &msg);
129
130    void fetchInternal();
131    ssize_t readInternal(off64_t offset, void *data, size_t size);
132    status_t seekInternal_l(off64_t offset);
133
134    size_t approxDataRemaining_l(status_t *finalStatus) const;
135
136    void restartPrefetcherIfNecessary_l(
137            bool ignoreLowWaterThreshold = false, bool force = false);
138
139    void updateCacheParamsFromSystemProperty();
140    void updateCacheParamsFromString(const char *s);
141
142    DISALLOW_EVIL_CONSTRUCTORS(NuCachedSource2);
143};

詳解一下NuCachedSource2,PageCache;

 PageCache 是一個數據結構,維護了兩個List <page*>Activity ,Free ;Acivity 中的是need read 的數據,Free 中是已經讀取解析須要釋放的數據.數據結構簡單有效,易於理解.肯定是沒有migration 容易產生頁碎片.

NuCacheSource2 是一個數據緩衝策略類,如低水位fetching數據,高水位中止bufferring 數據,retryFecting ,AkeepLiveCache ;以及調用HTTPBase或CURL 蒐集一些碼率帶寬cacheDuration 等統計信息.

NOTICE: PageCache 是給NuCacheSource2的成員變量DataSource (HTTPDataSource--------->OKHTTP) 填充數據用的

II , 嵌入式中經常使用的cache 數據結構是RingBuffer,如在android  中能夠直接替代PageCache;具體就是規定readPointer 永遠追趕writePointer,抽象出一個無限大的cache. 好處就是在處理高幀率高清晰度的片源時節省空間,缺點就是多線程訪問比較麻煩,每次讀以前都須要 readPointer合writePointer比較

//TODO

 

 

III ,EXOPLAYER + OKIO

    緩存分爲兩層:

                          1,okio 提供的豐富的source ,sink 訪問pipe,async;共享buffer (相似與ArrayList 的結構;循環鏈表+segments);而Buffer 擁有的全部segment 都來自於segmentPool 管理的單鏈表. segments 提供 ownership 因此能夠經過調用split方法改變ownership 以及share 減小copy;segments 提供compact 方法減小segments 碎片.

https://github.com/square/okio

Buffer is a mutable sequence of bytes. Like ArrayList, you don't need to size your buffer in advance. You read and write buffers as a queue: write data to the end and read it from the front. There's no obligation to manage positions, limits, or capacities.

Buffer is implemented as a linked list of segments. When you move data from one buffer to another, it reassigns ownershipof the segments rather than copying the data across. This approach is particularly helpful for multithreaded programs: a thread that talks to the network can exchange data with a worker thread without any copying or ceremony.

 

final class Segment {
  /** The size of all segments in bytes. */
  static final int SIZE = 8192;
   
  /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */
  static final int SHARE_MINIMUM = 1024;
   
  final byte[] data;
   
  /** The next byte of application data byte to read in this segment. */
  int pos;
   
  /** The first byte of available data ready to be written to. */
  int limit;
   
  /** True if other segments or byte strings use the same byte array. */
  boolean shared;
   
  /** True if this segment owns the byte array and can append to it, extending {@code limit}. */
  boolean owner;
   
  /** Next segment in a linked or circularly-linked list. */
  Segment next;
   
  /** Previous segment in a circularly-linked list. */
  Segment prev;

 

在segment中有幾個有意思的方法。

compact方法

/**
   * Call this when the tail and its predecessor may both be less than half
   * full. This will copy data so that segments can be recycled.
   */
  public void compact() {
    if (prev == this) throw new IllegalStateException();
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
    writeTo(prev, byteCount);
    pop();
    SegmentPool.recycle(this);
  }
  • 當Segment的前一個和自身的數據量都不足一半時,會對segement進行壓縮,把自身的數據寫入到前一個Segment中,而後將自身進行回收。

split

將一個Segment的數據拆成兩個,注意,這裏有trick。若是有兩個Segment相同的字節超過了SHARE_MINIMUM (1024),那麼這兩個Segment會共享一份數據,這樣就省去了開闢內存及複製內存的開銷,達到了提升性能的目的。

public Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

    // We have two competing performance goals:
    //  - Avoid copying data. We accomplish this by sharing segments.
    //  - Avoid short shared segments. These are bad for performance because they are readonly and
    //    may lead to long chains of short segments.
    // To balance these goals we only share segments when the copy will be large.
    if (byteCount >= SHARE_MINIMUM) {
      prefix = new Segment(this);
    } else {
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
  }
  •  
  • SegmentPool

這是一個回收池,目前的設計是能存放64K的字節,即8個Segment。在實際使用中,建議對其進行調整。

final class SegmentPool {
  /** The maximum number of bytes to pool. */
  // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
  static final long MAX_SIZE = 64 * 1024; // 64 KiB.

  /** Singly-linked list of segments. */
  static Segment next;

  /** Total bytes in this pool. */
  static long byteCount;
    ...
}
  •  

講到這裏,整個Buffer的實現原理也就呼之欲出了。

Buffer的寫操做,實際上就是不斷增長Segment的一個過程,讀操做,就是不斷消耗Segment中的數據,若是數據讀取完,則使用SegmentPool進行回收。 
當複製內存數據時,使用Segment的共享機制,多個Segment共享一份data[]。

Buffer更多的邏輯主要是跨Segment讀取數據,須要把前一個Segment的尾端和後一個Segment的前端拼接在一塊兒,所以看起來代碼量相對多,但其實開銷很是低。

TimeOut機制

在Okio中定義了一個類叫TimeOut,主要用於判斷時間是否超過閾值,超過以後就拋出中斷異常。

public void throwIfReached() throws IOException {
    if (Thread.interrupted()) {
      throw new InterruptedIOException("thread interrupted");
    }

    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
      throw new InterruptedIOException("deadline reached");
    }
  }
  • 自己是以有序鏈表的方式,按照超時的時間進行排序。在其head是一個佔位的AsyncTime,主要用於啓動WatchDog線程。這種異步超時主要能夠用在當時間到時,就能夠當即得到通知,不須要等待某阻塞方法返回時,才知道超時了。使用異步超時,timeout方法在發生超時會進行回調,須要重載timedOut()方法以處理超時事件

 

2,exo 內部是一個vector 初始化大小爲16M, 有高水位限制內存的增加

高水位30s && %80*TotalCacheSize

低水位5s  or %20*TotalCacheSize

相關文章
相關標籤/搜索