Shuffle 過程
上一章裏討論了 job 的物理執行圖,也討論了流入 RDD 中的 records 是怎麼被 compute() 後流到後續 RDD 的,同時也分析了 task 是怎麼產生 result,以及 result 怎麼被收集後計算出最終結果的。然而,咱們尚未討論
數據是怎麼經過 ShuffleDependency 流向下一個 stage 的?
對比 Hadoop MapReduce 和 Spark 的 Shuffle 過程
若是熟悉 Hadoop MapReduce 中的 shuffle 過程,可能會按照 MapReduce 的思路去想象 Spark 的 shuffle 過程。然而,它們之間有一些區別和聯繫。
從 high-level 的角度來看,二者並無大的差異。 都是將 mapper(Spark 裏是 ShuffleMapTask)的輸出進行 partition,不一樣的 partition 送到不一樣的 reducer(Spark 裏 reducer 多是下一個 stage 裏的 ShuffleMapTask,也多是 ResultTask)。Reducer 之內存做緩衝區,邊 shuffle 邊 aggregate 數據,等到數據 aggregate 好之後進行 reduce() (Spark 裏多是後續的一系列操做)。
從 low-level 的角度來看,二者差異不小。 Hadoop MapReduce 是 sort-based,進入 combine() 和 reduce() 的 records 必須先 sort。這樣的好處在於 combine/reduce() 能夠處理大規模的數據,由於其輸入數據能夠經過
外排獲得(mapper 對每段數據先作排序,reducer 的 shuffle 對排好序的每段數據作歸併)。目前的 Spark 默認選擇的是 hash-based,一般使用 HashMap 來對 shuffle 來的數據進行 aggregate,不會對數據進行提早排序。若是用戶須要通過排序的數據,那麼須要本身調用相似 sortByKey() 的操做;若是你是Spark 1.1的用戶,能夠將spark.shuffle.manager設置爲sort,則會對數據進行排序。在Spark 1.2中,sort將做爲默認的Shuffle實現。
從實現角度來看,二者也有很多差異。 Hadoop MapReduce 將處理流程劃分出明顯的幾個階段:map(), spill, merge, shuffle, sort, reduce() 等。每一個階段各司其職,能夠按照過程式的編程思想來逐一實現每一個階段的功能。在 Spark 中,沒有這樣功能明確的階段,只有不一樣的 stage 和一系列的 transformation(),因此 spill, merge, aggregate 等操做須要蘊含在 transformation() 中。 若是咱們將 map 端劃分數據、持久化數據的過程稱爲 shuffle write,而將 reducer 讀入數據、aggregate 數據的過程稱爲 shuffle read。那麼在 Spark 中,
問題就變爲怎麼在 job 的邏輯或者物理執行圖中加入 shuffle write 和 shuffle read 的處理邏輯?以及兩個處理邏輯應該怎麼高效實現?
Shuffle write
因爲不要求數據有序,shuffle write 的任務很簡單:將數據 partition 好,並持久化。之因此要持久化,一方面是要減小內存存儲空間壓力,另外一方面也是爲了 fault-tolerance。 shuffle write 的任務很簡單,那麼實現也很簡單:將 shuffle write 的處理邏輯加入到 ShuffleMapStage(ShuffleMapTask 所在的 stage) 的最後,該 stage 的 final RDD 每輸出一個 record 就將其 partition 並持久化。圖示以下:

上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運行,CPU core 數爲 2,能夠同時運行兩個 task。每一個 task 的執行結果(該 stage 的 finalRDD 中某個 partition 包含的 records)被逐一寫到本地磁盤上。每一個 task 包含 R 個緩衝區,R = reducer 個數(也就是下一個 stage 中 task 的個數),緩衝區被稱爲 bucket,其大小爲
spark.shuffle.file.buffer.kb
,默認是 32KB(Spark 1.1 版本之前是 100KB)。
其實 bucket 是一個廣義的概念,表明 ShuffleMapTask 輸出結果通過 partition 後要存放的地方,這裏爲了細化數據存放位置和數據名稱,僅僅用 bucket 表示緩衝區。
ShuffleMapTask 的執行過程很簡單:先利用 pipeline 計算獲得 finalRDD 中對應 partition 的 records。每獲得一個 record 就將其送到對應的 bucket 裏,具體是哪一個 bucket 由
partitioner.partition(record.getKey()))
決定。每一個 bucket 裏面的數據會不斷被寫到本地磁盤上,造成一個 ShuffleBlockFile,或者簡稱
FileSegment。以後的 reducer 會去 fetch 屬於本身的 FileSegment,進入 shuffle read 階段。 這樣的實現很簡單,但有幾個問題:
- 產生的 FileSegment 過多。每一個 ShuffleMapTask 產生 R(reducer 個數)個 FileSegment,M 個 ShuffleMapTask 就會產生 M * R 個文件。通常 Spark job 的 M 和 R 都很大,所以磁盤上會存在大量的數據文件。
- 緩衝區佔用內存空間大。每一個 ShuffleMapTask 須要開 R 個 bucket,M 個 ShuffleMapTask 就會產生 MR 個 bucket。雖然一個 ShuffleMapTask 結束後,對應的緩衝區能夠被回收,但一個 worker node 上同時存在的 bucket 個數能夠達到 cores R 個(通常 worker 同時能夠運行 cores 個 ShuffleMapTask),佔用的內存空間也就達到了
cores * R * 32 KB
。對於 8 核 1000 個 reducer 來講,佔用內存就是 256MB。
目前來看,第二個問題尚未好的方法解決,由於寫磁盤終究是要開緩衝區的,緩衝區過小會影響 IO 速度。但第一個問題有一些方法去解決,下面介紹已經在 Spark 裏面實現的 FileConsolidation 方法。先上圖:

能夠明顯看出,在一個 core 上連續執行的 ShuffleMapTasks 能夠共用一個輸出文件 ShuffleFile。先執行完的 ShuffleMapTask 造成 ShuffleBlock i,後執行的 ShuffleMapTask 能夠將輸出數據直接追加到 ShuffleBlock i 後面,造成 ShuffleBlock i',每一個 ShuffleBlock 被稱爲
FileSegment。下一個 stage 的 reducer 只須要 fetch 整個 ShuffleFile 就好了。這樣,每一個 worker 持有的文件數降爲 cores * R。FileConsolidation 功能能夠經過
spark.shuffle.consolidateFiles=true
來開啓。
Shuffle read
先看一張包含 ShuffleDependency 的物理執行圖,來自 reduceByKey:

很天然地,要計算 ShuffleRDD 中的數據,必須先把 MapPartitionsRDD 中的數據 fetch 過來。那麼問題就來了:
- 在何時 fetch,parent stage 中的一個 ShuffleMapTask 執行完仍是等所有 ShuffleMapTasks 執行完?
- 邊 fetch 邊處理仍是一次性 fetch 完再處理?
- fetch 來的數據存放到哪裏?
- 怎麼得到要 fetch 的數據的存放位置?
解決問題:
- 在何時 fetch?當 parent stage 的全部 ShuffleMapTasks 結束後再 fetch。理論上講,一個 ShuffleMapTask 結束後就能夠 fetch,可是爲了迎合 stage 的概念(即一個 stage 若是其 parent stages 沒有執行完,本身是不能被提交執行的),仍是選擇所有 ShuffleMapTasks 執行完再去 fetch。由於 fetch 來的 FileSegments 要先在內存作緩衝,因此一次 fetch 的 FileSegments 總大小不能太大。Spark 規定這個緩衝界限不能超過
spark.reducer.maxMbInFlight
,這裏用 softBuffer 表示,默認大小爲 48MB。一個 softBuffer 裏面通常包含多個 FileSegment,但若是某個 FileSegment 特別大的話,這一個就能夠填滿甚至超過 softBuffer 的界限。
- 邊 fetch 邊處理仍是一次性 fetch 完再處理?邊 fetch 邊處理。本質上,MapReduce shuffle 階段就是邊 fetch 邊使用 combine() 進行處理,只是 combine() 處理的是部分數據。MapReduce 爲了讓進入 reduce() 的 records 有序,必須等到所有數據都 shuffle-sort 後再開始 reduce()。由於 Spark 不要求 shuffle 後的數據全局有序,所以不必等到所有數據 shuffle 完成後再處理。那麼如何實現邊 shuffle 邊處理,並且流入的 records 是無序的?答案是使用能夠 aggregate 的數據結構,好比 HashMap。每 shuffle 獲得(從緩衝的 FileSegment 中 deserialize 出來)一個 \<key, value\=""> record,直接將其放進 HashMap 裏面。若是該 HashMap 已經存在相應的 Key,那麼直接進行 aggregate 也就是
func(hashMap.get(Key), Value)
,好比上面 WordCount 例子中的 func 就是 hashMap.get(Key) + Value
,並將 func 的結果從新 put(key) 到 HashMap 中去。這個 func 功能上至關於 reduce(),但實際處理數據的方式與 MapReduce reduce() 有差異,差異至關於下面兩段程序的差異。 // MapReduce
reduce(K key, Iterable<V> values) {
result = process(key, values)
return result
}
// Spark
reduce(K key, Iterable<V> values) {
result = null
for (V value : values)
result = func(result, value)
return result
}
MapReduce 能夠在 process 函數裏面能夠定義任何數據結構,也能夠將部分或所有的 values 都 cache 後再進行處理,很是靈活。而 Spark 中的 func 的輸入參數是固定的,一個是上一個 record 的處理結果,另外一個是當前讀入的 record,它們通過 func 處理後的結果被下一個 record 處理時使用。所以一些算法好比求平均數,在 process 裏面很好實現,直接sum(values)/values.length
,而在 Spark 中 func 能夠實現sum(values)
,但很差實現/values.length
。更多的 func 將會在下面的章節細緻分析。
- fetch 來的數據存放到哪裏?剛 fetch 來的 FileSegment 存放在 softBuffer 緩衝區,通過處理後的數據放在內存 + 磁盤上。這裏咱們主要討論處理後的數據,能夠靈活設置這些數據是「只用內存」仍是「內存+磁盤」。若是
spark.shuffle.spill = false
就只用內存。內存使用的是AppendOnlyMap
,相似 Java 的HashMap
,內存+磁盤使用的是ExternalAppendOnlyMap
,若是內存空間不足時,ExternalAppendOnlyMap
能夠將 \<k, v\=""> records 進行 sort 後 spill 到磁盤上,等到須要它們的時候再進行歸併,後面會詳解。使用「內存+磁盤」的一個主要問題就是如何在二者之間取得平衡?在 Hadoop MapReduce 中,默認將 reducer 的 70% 的內存空間用於存放 shuffle 來的數據,等到這個空間利用率達到 66% 的時候就開始 merge-combine()-spill。在 Spark 中,也適用一樣的策略,一旦 ExternalAppendOnlyMap 達到一個閾值就開始 spill,具體細節下面會討論。
- 怎麼得到要 fetch 的數據的存放位置?在上一章討論物理執行圖中的 stage 劃分的時候,咱們強調 「一個 ShuffleMapStage 造成後,會將該 stage 最後一個 final RDD 註冊到
MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size)
,這一步很重要,由於 shuffle 過程須要 MapOutputTrackerMaster 來指示 ShuffleMapTask 輸出數據的位置」。所以,reducer 在 shuffle 的時候是要去 driver 裏面的 MapOutputTrackerMaster 詢問 ShuffleMapTask 輸出的數據位置的。每一個 ShuffleMapTask 完成時會將 FileSegment 的存儲位置信息彙報給 MapOutputTrackerMaster。
至此,咱們已經討論了 shuffle write 和 shuffle read 設計的核心思想、算法及某些實現。接下來,咱們深刻一些細節來討論。
典型 transformation() 的 shuffle read
1. reduceByKey(func)
上面初步介紹了 reduceByKey() 是如何實現邊 fetch 邊 reduce() 的。須要注意的是雖然 Example(WordCount) 中給出了各個 RDD 的內容,但一個 partition 裏面的 records 並非同時存在的。好比在 ShuffledRDD 中,每 fetch 來一個 record 就當即進入了 func 進行處理。MapPartitionsRDD 中的數據是 func 在所有 records 上的處理結果。從 record 粒度上來看,reduce() 能夠表示以下:

能夠看到,fetch 來的 records 被逐個 aggreagte 到 HashMap 中,等到全部 records 都進入 HashMap,就獲得最後的處理結果。惟一要求是 func 必須是 commulative 的(參見上面的 Spark 的 reduce() 的代碼)。 ShuffledRDD 到 MapPartitionsRDD 使用的是 mapPartitionsWithContext 操做。 爲了減小數據傳輸量,MapReduce 能夠在 map 端先進行 combine(),其實在 Spark 也能夠實現,只須要將上圖 ShuffledRDD => MapPartitionsRDD 的 mapPartitionsWithContext 在 ShuffleMapStage 中也進行一次便可,好比 reduceByKey 例子中 ParallelCollectionRDD => MapPartitionsRDD 完成的就是 map 端的 combine()。
對比 MapReduce 的 map()-reduce() 和 Spark 中的 reduceByKey():
- map 端的區別:map() 沒有區別。對於 combine(),MapReduce 先 sort 再 combine(),Spark 直接在 HashMap 上進行 combine()。
- reduce 端區別:MapReduce 的 shuffle 階段先 fetch 數據,數據量到達必定規模後 combine(),再將剩餘數據 merge-sort 後 reduce(),reduce() 很是靈活。Spark 邊 fetch 邊 reduce()(在 HashMap 上執行 func),所以要求 func 符合 commulative 的特性。
從內存利用上來對比:
- map 端區別:MapReduce 須要開一個大型環形緩衝區來暫存和排序 map() 的部分輸出結果,但 combine() 不須要額外空間(除非用戶本身定義)。 Spark 須要 HashMap 內存數據結構來進行 combine(),同時輸出 records 到磁盤上時也須要一個小的 buffer(bucket)。
- reduce 端區別:MapReduce 須要一部份內存空間來存儲 shuffle 過來的數據,combine() 和 reduce() 不須要額外空間,由於它們的輸入數據分段有序,只需歸併一下就能夠獲得。在 Spark 中,fetch 時須要 softBuffer,處理數據時若是隻使用內存,那麼須要 HashMap 來持有處理後的結果。若是使用內存+磁盤,那麼在 HashMap 存放一部分處理後的數據。
2. groupByKey(numPartitions)

與 reduceByKey() 流程同樣,只是 func 變成
result = result ++ record.value
,功能是將每一個 key 對應的全部 values 連接在一塊兒。result 來自 hashMap.get(record.key),計算後的 result 會再次被 put 到 hashMap 中。與 reduceByKey() 的區別就是 groupByKey() 沒有 map 端的 combine()。對於 groupByKey() 來講 map 端的 combine() 只是減小了重複 Key 佔用的空間,若是 key 重複率不高,不必 combine(),不然,最好可以 combine()。
3. distinct(numPartitions)

與 reduceByKey() 流程同樣,只是 func 變成
result = result == null? record.value : result
,若是 HashMap 中沒有該 record 就將其放入,不然捨棄。與 reduceByKey() 相同,在map 端存在 combine()。
4. cogroup(otherRDD, numPartitions)

CoGroupedRDD 可能有 0 個、1 個或者多個 ShuffleDependency。但並非要爲每個 ShuffleDependency 創建一個 HashMap,而是全部的 Dependency 共用一個 HashMap。與 reduceByKey() 不一樣的是,HashMap 在 CoGroupedRDD 的 compute() 中創建,而不是在 mapPartitionsWithContext() 中創建。 粗線表示的 task 首先 new 出一個 Array[ArrayBuffer(), ArrayBuffer()],ArrayBuffer() 的個數與參與 cogroup 的 RDD 個數相同。func 的邏輯是這樣的:每當從 RDD a 中 shuffle 過來一個 \<key, value\=""> record 就將其添加到 hashmap.get(Key) 對應的 Array 中的第一個 ArrayBuffer() 中,每當從 RDD b 中 shuffle 過來一個 record,就將其添加到對應的 Array 中的第二個 ArrayBuffer()。 CoGroupedRDD => MappedValuesRDD 對應 mapValues() 操做,就是將 [ArrayBuffer(), ArrayBuffer()] 變成 [Iterable[V], Iterable[W]]。
5. intersection(otherRDD) 和 join(otherRDD, numPartitions)

這兩個操做中均使用了 cogroup,因此 shuffle 的處理方式與 cogroup 同樣。
6. sortByKey(ascending, numPartitions)

sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的處理邏輯與 reduceByKey() 不太同樣,沒有使用 HashMap 和 func 來處理 fetch 過來的 records。 sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的處理邏輯是:將 shuffle 過來的一個個 record 存放到一個 Array 裏,而後按照 Key 來對 Array 中的 records 進行 sort。
7. coalesce(numPartitions, shuffle = true)

coalesce() 雖然有 ShuffleDependency,但不須要對 shuffle 過來的 records 進行 aggregate,因此沒有創建 HashMap。每 shuffle 一個 record,就直接流向 CoalescedRDD,進而流向 MappedRDD 中。
Shuffle read 中的 HashMap
HashMap 是 Spark shuffle read 過程當中頻繁使用的、用於 aggregate 的數據結構。Spark 設計了兩種:一種是全內存的 AppendOnlyMap,另外一種是內存+磁盤的 ExternalAppendOnlyMap。下面咱們來分析一下
二者特性及內存使用狀況。
1. AppendOnlyMap
AppendOnlyMap 的官方介紹是 A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed。意思是相似 HashMap,但沒有
remove(key)
方法。其實現原理很簡單,開一個大 Object 數組,藍色部分存儲 Key,白色部分存儲 Value。以下圖:

當要 put(K, V) 時,先 hash(K) 找存放位置,
若是存放位置已經被佔用,就使用 Quadratic probing 探測方法來找下一個空閒位置。對於圖中的 K6 來講,第三次查找找到 K4 後面的空閒位置,放進去便可。get(K6) 的時候相似,找三次找到 K6,取出緊挨着的 V6,與先來的 value 作 func,結果從新放到 V6 的位置。 迭代 AppendOnlyMap 中的元素的時候,從前到後掃描輸出。 若是 Array 的利用率達到 70%,那麼就擴張一倍,並對全部 key 進行 rehash 後,從新排列每一個 key 的位置。 AppendOnlyMap 還有一個
destructiveSortedIterator(): Iterator[(K, V)]
方法,能夠返回 Array 中排序後的 (K, V) pairs。實現方法很簡單:先將全部 (K, V) pairs compact 到 Array 的前端,並使得每一個 (K, V) 佔一個位置(原來佔兩個),以後直接調用 Array.sort() 排序,不過這樣作會破壞數組(key 的位置變化了)。
2. ExternalAppendOnlyMap

相比 AppendOnlyMap,ExternalAppendOnlyMap 的實現略複雜,但邏輯其實很簡單,相似 Hadoop MapReduce 中的 shuffle-merge-combine-sort 過程: ExternalAppendOnlyMap 持有一個 AppendOnlyMap,shuffle 來的一個個 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 過程與原始的 AppendOnlyMap 如出一轍。
若是 AppendOnlyMap 快被裝滿時檢查一下內存剩餘空間是否能夠夠擴展,夠就直接在內存中擴展,不夠就 sort 一下 AppendOnlyMap,將其內部全部 records 都 spill 到磁盤上。圖中 spill 了 4 次,每次 spill 完在磁盤上生成一個 spilledMap 文件,而後從新 new 出來一個 AppendOnlyMap。最後一個 (K, V) record insert 到 AppendOnlyMap 後,表示全部 shuffle 來的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已經被處理完,由於每次 insert 的時候,新來的 record 只與 AppendOnlyMap 中的 records 進行 aggregate,並非與全部的 records 進行 aggregate(一些 records 已經被 spill 到磁盤上了)。所以當須要 aggregate 的最終結果時,須要對 AppendOnlyMap 和全部的 spilledMaps 進行全局 merge-aggregate。
全局 merge-aggregate 的流程也很簡單:先將 AppendOnlyMap 中的 records 進行 sort,造成 sortedMap。而後利用 DestructiveSortedIterator 和 DiskMapIterator 分別從 sortedMap 和各個 spilledMap 讀出一部分數據(StreamBuffer)放到 mergeHeap 裏面。StreamBuffer 裏面包含的 records 須要具備相同的 hash(key),因此圖中第一個 spilledMap 只讀出前三個 records 進入 StreamBuffer。mergeHeap 顧名思義就是使用堆排序不斷提取出 hash(firstRecord.Key) 相同的 StreamBuffer,並將其一個個放入 mergeBuffers 中,放入的時候與已經存在於 mergeBuffers 中的 StreamBuffer 進行 merge-combine,第一個被放入 mergeBuffers 的 StreamBuffer 被稱爲 minBuffer,那麼 minKey 就是 minBuffer 中第一個 record 的 key。當 merge-combine 的時候,與 minKey 相同的 records 被 aggregate 一塊兒,而後輸出。整個 merge-combine 在 mergeBuffers 中結束後,StreamBuffer 剩餘的 records 隨着 StreamBuffer 從新進入 mergeHeap。一旦某個 StreamBuffer 在 merge-combine 後變爲空(裏面的 records 都被輸出了),那麼會使用 DestructiveSortedIterator 或 DiskMapIterator 從新裝填 hash(key) 相同的 records,而後再從新進入 mergeHeap。 整個 insert-merge-aggregate 的過程有三點須要進一步探討一下:
- 內存剩餘空間檢測 與 Hadoop MapReduce 規定 reducer 中 70% 的空間可用於 shuffle-sort 相似,Spark 也規定 executor 中
spark.shuffle.memoryFraction * spark.shuffle.safetyFraction
的空間(默認是0.3 * 0.8
)可用於 ExternalOnlyAppendMap。Spark 略保守是否是?更保守的是這 24% 的空間不是徹底用於一個 ExternalOnlyAppendMap 的,而是由在 executor 上同時運行的全部 reducer 共享的。爲此,exectuor 專門持有一個 ShuffleMemroyMap: HashMap[threadId, occupiedMemory]
來監控每一個 reducer 中 ExternalOnlyAppendMap 佔用的內存量。每當 AppendOnlyMap 要擴展時,都會計算ShuffleMemroyMap 持有的全部 reducer 中的 AppendOnlyMap 已佔用的內存 + 擴展後的內存 是會否會大於內存限制,大於就會將 AppendOnlyMap spill 到磁盤。有一點須要注意的是前 1000 個 records 進入 AppendOnlyMap 的時候不會啓動是否要 spill 的檢查,須要擴展時就直接在內存中擴展。
- AppendOnlyMap 大小估計 爲了獲知 AppendOnlyMap 佔用的內存空間,能夠在每次擴展時都將 AppendOnlyMap reference 的全部 objects 大小都算一遍,而後加和,但這樣作很是耗時。因此 Spark 設計了粗略的估算算法,算法時間複雜度是 O(1),核心思想是利用 AppendOnlyMap 中每次 insert-aggregate record 後 result 的大小變化及一共 insert 的 records 的個數來估算大小,具體見
SizeTrackingAppendOnlyMap
和SizeEstimator
。
- Spill 過程 與 shuffle write 同樣,在 spill records 到磁盤上的時候,會創建一個 buffer 緩衝區,大小仍爲
spark.shuffle.file.buffer.kb
,默認是 32KB。另外,因爲 serializer 也會分配緩衝區用於序列化和反序列化,因此若是一次 serialize 的 records 過多的話緩衝區會變得很大。Spark 限制每次 serialize 的 records 個數爲 spark.shuffle.spill.batchSize
,默認是 10000。
Discussion
經過本章的介紹能夠發現,相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加靈活,會根據不一樣的 transformation() 的語義去設計不一樣的 shuffle-aggregate 策略,再加上不一樣的內存數據結構來混搭出合理的執行流程。 這章主要討論了 Spark 是怎麼在不排序 records 的狀況下完成 shuffle write 和 shuffle read,以及怎麼將 shuffle 過程融入 RDD computing chain 中的。附帶討論了內存與磁盤的平衡以及與 Hadoop MapReduce shuffle 的異同。下一章將從部署圖以及進程通訊角度來描述 job 執行的整個流程,也會涉及 shuffle write 和 shuffle read 中的數據位置獲取問題。 另外,Jerry Shao 寫的
詳細探究Spark的shuffle實現 很贊,裏面還介紹了 shuffle 過程在 Spark 中的進化史。目前 sort-based 的 shuffle 也在實現當中,stay tuned。