對工做中用到的media player 播放領域中遇到的三個cache 實現優缺點總結:html
1,android 原生的NuCacheSource2前端
2,嵌入式播放中經常使用的ringBuffer cacheandroid
3,youtobe 開源播放器exoplayer 中用到的cache 策略git
I.android 原生中http + mp4 的播放模式,爲緩解網絡抖動平衡下載的時機,開發了NuCacheSource2 github
NuCacheSource2 集成與DataSource 抽象類,提供緩存讀寫統計功能.緩存
android 的多媒體框架在Source端的模型如圖:網絡
1,DataSource 被抽象爲數據源(例如HTTP,Cache,LocalFIle,Prefetching) 抽象的是一種訪問數據的類型.能夠用IBINDER 初始化,也能夠無參初始化,很是靈活. 數據結構
40class DataSource : public RefBase { 41public: 42 enum Flags { 43 kWantsPrefetching = 1, 44 kStreamedFromLocalHost = 2, 45 kIsCachingDataSource = 4, 46 kIsHTTPBasedSource = 8, 47 }; 48 49 static sp<DataSource> CreateFromURI( 50 const sp<IMediaHTTPService> &httpService, 51 const char *uri, 52 const KeyedVector<String8, String8> *headers = NULL, 53 String8 *contentType = NULL, 54 HTTPBase *httpSource = NULL); 55 56 static sp<DataSource> CreateMediaHTTP(const sp<IMediaHTTPService> &httpService); 57 static sp<DataSource> CreateFromIDataSource(const sp<IDataSource> &source); 58 59 DataSource() {} 60 61 virtual status_t initCheck() const = 0; 62 63 // Returns the number of bytes read, or -1 on failure. It's not an error if 64 // this returns zero; it just means the given offset is equal to, or 65 // beyond, the end of the source. 66 virtual ssize_t readAt(off64_t offset, void *data, size_t size) = 0; 67 74 // May return ERROR_UNSUPPORTED. 75 virtual status_t getSize(off64_t *size); 76 77 virtual uint32_t flags() { 78 return 0; 79 } 80 81 virtual status_t reconnectAtOffset(off64_t offset) { 82 return ERROR_UNSUPPORTED; 83 } 84 104 virtual String8 getUri() { 105 return String8(); 106 } 107 108 virtual String8 getMIMEType() const;
2,NuPlayer::Source 是MediaSource 的抽象表明一種多媒體源以及對多媒體源的基本的操做行爲(prepare,start,pause,resume,seek,disconnect)多線程
struct NuPlayer::Source : public AHandler { 34 enum Flags { 35 FLAG_CAN_PAUSE = 1, 36 FLAG_CAN_SEEK_BACKWARD = 2, // the "10 sec back button" 37 FLAG_CAN_SEEK_FORWARD = 4, // the "10 sec forward button" 38 FLAG_CAN_SEEK = 8, // the "seek bar" 39 FLAG_DYNAMIC_DURATION = 16, 40 FLAG_SECURE = 32, 41 FLAG_PROTECTED = 64, 42 }; 43 44 enum { 45 kWhatPrepared, 46 kWhatFlagsChanged, 47 kWhatVideoSizeChanged, 48 kWhatBufferingUpdate, 49 kWhatBufferingStart, 50 kWhatBufferingEnd, 51 kWhatPauseOnBufferingStart, 52 kWhatResumeOnBufferingEnd, 53 kWhatCacheStats, 54 kWhatSubtitleData, 55 kWhatTimedTextData, 56 kWhatTimedMetaData, 57 kWhatQueueDecoderShutdown, 58 kWhatDrmNoLicense, 59 kWhatInstantiateSecureDecoders, 60 }; 61 62 // The provides message is used to notify the player about various 63 // events. 64 Source(const sp<AMessage> ¬ify) 65 : mNotify(notify) { 66 } 67 68 virtual void prepareAsync() = 0; 69 70 virtual void start() = 0; 71 virtual void stop() {} 72 virtual void pause() {} 73 virtual void resume() {} 74 75 // Explicitly disconnect the underling data source 76 virtual void disconnect() {} 77 78 // Returns OK iff more data was available, 79 // an error or ERROR_END_OF_STREAM if not. 80 virtual status_t feedMoreTSData() = 0; 81 82 virtual sp<AMessage> getFormat(bool audio); 83 virtual sp<MetaData> getFormatMeta(bool /* audio */) { return NULL; } 84 virtual sp<MetaData> getFileFormatMeta() const { return NULL; } 85 86 virtual status_t dequeueAccessUnit( 87 bool audio, sp<ABuffer> *accessUnit) = 0; 88 89 virtual status_t getDuration(int64_t * /* durationUs */) { 90 return INVALID_OPERATION; 91 } 92 93 virtual size_t getTrackCount() const { 94 return 0; 95 } 96 97 virtual sp<AMessage> getTrackInfo(size_t /* trackIndex */) const { 98 return NULL; 99 } 100 101 virtual ssize_t getSelectedTrack(media_track_type /* type */) const { 102 return INVALID_OPERATION; 103 } 104 105 virtual status_t selectTrack(size_t /* trackIndex */, bool /* select */, int64_t /* timeUs*/) { 106 return INVALID_OPERATION; 107 } 108 109 virtual status_t seekTo(int64_t /* seekTimeUs */) { 110 return INVALID_OPERATION; 111 } 112 113 virtual status_t setBuffers(bool /* audio */, Vector<MediaBuffer *> &/* buffers */) { 114 return INVALID_OPERATION; 115 } 116 117 virtual bool isRealTime() const { 118 return false; 119 } 120 121 virtual bool isStreaming() const { 122 return true; 123 }
3,這個類重點提供兩個方法fetch,read 經過PageCache.app
30struct NuCachedSource2 : public DataSource { 31 static sp<NuCachedSource2> Create( 32 const sp<DataSource> &source, 33 const char *cacheConfig = NULL, 34 bool disconnectAtHighwatermark = false); 35 36 virtual status_t initCheck() const; 38 virtual ssize_t readAt(off64_t offset, void *data, size_t size); 40 virtual void disconnect(); 42 virtual status_t getSize(off64_t *size); 43 virtual uint32_t flags(); 49 virtual String8 getMIMEType() const; 50 51 //////////////////////////////////////////////////////////////////////////// 52 53 size_t cachedSize(); 54 size_t approxDataRemaining(status_t *finalStatus) const; 56 void resumeFetchingIfNecessary(); 58 // The following methods are supported only if the 59 // data source is HTTP-based; otherwise, ERROR_UNSUPPORTED 60 // is returned. 61 status_t getEstimatedBandwidthKbps(int32_t *kbps); 62 status_t setCacheStatCollectFreq(int32_t freqMs); 64 static void RemoveCacheSpecificHeaders( 65 KeyedVector<String8, String8> *headers, 66 String8 *cacheConfig, 67 bool *disconnectAtHighwatermark); 68 69protected: 70 virtual ~NuCachedSource2(); 71 72private: 73 friend struct AHandlerReflector<NuCachedSource2>; 75 NuCachedSource2( 76 const sp<DataSource> &source, 77 const char *cacheConfig, 78 bool disconnectAtHighwatermark); 80 enum { 81 kPageSize = 65536, 82 kDefaultHighWaterThreshold = 20 * 1024 * 1024, 83 kDefaultLowWaterThreshold = 4 * 1024 * 1024, 84 85 // Read data after a 15 sec timeout whether we're actively 86 // fetching or not. 87 kDefaultKeepAliveIntervalUs = 15000000, 88 }; 90 enum { 91 kWhatFetchMore = 'fetc', 92 kWhatRead = 'read', 93 }; 95 enum { 96 kMaxNumRetries = 10, 97 }; 99 sp<DataSource> mSource; 100 sp<AHandlerReflector<NuCachedSource2> > mReflector; 101 sp<ALooper> mLooper; 102 103 Mutex mSerializer; 104 mutable Mutex mLock; 105 Condition mCondition; 106 107 PageCache *mCache; 108 off64_t mCacheOffset; 109 status_t mFinalStatus; 110 off64_t mLastAccessPos; 111 sp<AMessage> mAsyncResult; 112 bool mFetching; 113 bool mDisconnecting; 114 int64_t mLastFetchTimeUs; 115 116 int32_t mNumRetriesLeft; 117 118 size_t mHighwaterThresholdBytes; 119 size_t mLowwaterThresholdBytes; 120 121 // If the keep-alive interval is 0, keep-alives are disabled. 122 int64_t mKeepAliveIntervalUs; 123 124 bool mDisconnectAtHighwatermark; 125 126 void onMessageReceived(const sp<AMessage> &msg); 127 void onFetch(); 128 void onRead(const sp<AMessage> &msg); 129 130 void fetchInternal(); 131 ssize_t readInternal(off64_t offset, void *data, size_t size); 132 status_t seekInternal_l(off64_t offset); 133 134 size_t approxDataRemaining_l(status_t *finalStatus) const; 135 136 void restartPrefetcherIfNecessary_l( 137 bool ignoreLowWaterThreshold = false, bool force = false); 138 139 void updateCacheParamsFromSystemProperty(); 140 void updateCacheParamsFromString(const char *s); 141 142 DISALLOW_EVIL_CONSTRUCTORS(NuCachedSource2); 143};
詳解一下NuCachedSource2,PageCache;
PageCache 是一個數據結構,維護了兩個List <page*>Activity ,Free ;Acivity 中的是need read 的數據,Free 中是已經讀取解析須要釋放的數據.數據結構簡單有效,易於理解.肯定是沒有migration 容易產生頁碎片.
NuCacheSource2 是一個數據緩衝策略類,如低水位fetching數據,高水位中止bufferring 數據,retryFecting ,AkeepLiveCache ;以及調用HTTPBase或CURL 蒐集一些碼率帶寬cacheDuration 等統計信息.
NOTICE: PageCache 是給NuCacheSource2的成員變量DataSource (HTTPDataSource--------->OKHTTP) 填充數據用的
II , 嵌入式中經常使用的cache 數據結構是RingBuffer,如在android 中能夠直接替代PageCache;具體就是規定readPointer 永遠追趕writePointer,抽象出一個無限大的cache. 好處就是在處理高幀率高清晰度的片源時節省空間,缺點就是多線程訪問比較麻煩,每次讀以前都須要 readPointer合writePointer比較
//TODO
III ,EXOPLAYER + OKIO
緩存分爲兩層:
1,okio 提供的豐富的source ,sink 訪問pipe,async;共享buffer (相似與ArrayList 的結構;循環鏈表+segments);而Buffer 擁有的全部segment 都來自於segmentPool 管理的單鏈表. segments 提供 ownership 因此能夠經過調用split方法改變ownership 以及share 減小copy;segments 提供compact 方法減小segments 碎片.
https://github.com/square/okio
Buffer is a mutable sequence of bytes. Like ArrayList
, you don't need to size your buffer in advance. You read and write buffers as a queue: write data to the end and read it from the front. There's no obligation to manage positions, limits, or capacities.
Buffer
is implemented as a linked list of segments. When you move data from one buffer to another, it reassigns ownershipof the segments rather than copying the data across. This approach is particularly helpful for multithreaded programs: a thread that talks to the network can exchange data with a worker thread without any copying or ceremony.
final class Segment { | |
/** The size of all segments in bytes. */ | |
static final int SIZE = 8192; | |
/** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */ | |
static final int SHARE_MINIMUM = 1024; | |
final byte[] data; | |
/** The next byte of application data byte to read in this segment. */ | |
int pos; | |
/** The first byte of available data ready to be written to. */ | |
int limit; | |
/** True if other segments or byte strings use the same byte array. */ | |
boolean shared; | |
/** True if this segment owns the byte array and can append to it, extending {@code limit}. */ | |
boolean owner; | |
/** Next segment in a linked or circularly-linked list. */ | |
Segment next; | |
/** Previous segment in a circularly-linked list. */ | |
Segment prev; |
在segment中有幾個有意思的方法。
/** * Call this when the tail and its predecessor may both be less than half * full. This will copy data so that segments can be recycled. */ public void compact() { if (prev == this) throw new IllegalStateException(); if (!prev.owner) return; // Cannot compact: prev isn't writable. int byteCount = limit - pos; int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos); if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space. writeTo(prev, byteCount); pop(); SegmentPool.recycle(this); }
將一個Segment的數據拆成兩個,注意,這裏有trick。若是有兩個Segment相同的字節超過了SHARE_MINIMUM (1024),那麼這兩個Segment會共享一份數據,這樣就省去了開闢內存及複製內存的開銷,達到了提升性能的目的。
public Segment split(int byteCount) { if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException(); Segment prefix; // We have two competing performance goals: // - Avoid copying data. We accomplish this by sharing segments. // - Avoid short shared segments. These are bad for performance because they are readonly and // may lead to long chains of short segments. // To balance these goals we only share segments when the copy will be large. if (byteCount >= SHARE_MINIMUM) { prefix = new Segment(this); } else { prefix = SegmentPool.take(); System.arraycopy(data, pos, prefix.data, 0, byteCount); } prefix.limit = prefix.pos + byteCount; pos += byteCount; prev.push(prefix); return prefix; }
這是一個回收池,目前的設計是能存放64K的字節,即8個Segment。在實際使用中,建議對其進行調整。
final class SegmentPool { /** The maximum number of bytes to pool. */ // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments? static final long MAX_SIZE = 64 * 1024; // 64 KiB. /** Singly-linked list of segments. */ static Segment next; /** Total bytes in this pool. */ static long byteCount; ... }
講到這裏,整個Buffer的實現原理也就呼之欲出了。
Buffer的寫操做,實際上就是不斷增長Segment的一個過程,讀操做,就是不斷消耗Segment中的數據,若是數據讀取完,則使用SegmentPool進行回收。
當複製內存數據時,使用Segment的共享機制,多個Segment共享一份data[]。
Buffer更多的邏輯主要是跨Segment讀取數據,須要把前一個Segment的尾端和後一個Segment的前端拼接在一塊兒,所以看起來代碼量相對多,但其實開銷很是低。
在Okio中定義了一個類叫TimeOut,主要用於判斷時間是否超過閾值,超過以後就拋出中斷異常。
public void throwIfReached() throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException("thread interrupted"); } if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) { throw new InterruptedIOException("deadline reached"); } }
2,exo 內部是一個vector 初始化大小爲16M, 有高水位限制內存的增加
高水位30s && %80*TotalCacheSize
低水位5s or %20*TotalCacheSize