Task粒度的緩存聚合排序結構AppendOnlyMap詳細剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

Spark商業環境實戰及調優進階系列

1. AppendOnlyMap 何許人也?

Spark提供了AppendOnlyMap數據結構來對任務執行結果進行聚合運算。可謂一件利器,爲何這樣說呢?由於Spark是基於內存運算的大數據計算引擎,即能基於內存作數據存儲,也能基於內存進行插入,更新,聚合,排序等操做。由此能夠看出,Spark真正把內存的使用技術發揮到了極致。數組

* A simple open hash table optimized for the append-only use case, where keys
    * are never removed, but the value for each key may be changed.
    *
    * This implementation uses quadratic probing with a power-of-2 hash table
    * size, which is guaranteed to explore all spaces for each key (see
    * http://en.wikipedia.org/wiki/Quadratic_probing).
    * The map can support up to `375809638 (0.7 * 2 ^ 29)` elements.
複製代碼

1.1 AppendOnlyMap 內部成員及特殊使命

  • 提供對null值得緩存緩存

  • initialCapacity : 主構造函數傳入 class AppendOnlyMap[K, V](initialCapacity: Int = 64)數據結構

  • capacity :容量取值爲:nextPowerOf2(initialCapacity),具體就是補零對比,相同爲原值,不相同則左移加一位。架構

  • data : 用於保存key和聚合值得數組。new Array[AnyRef](2 * capacity)app

    * Holds keys and values in the same array for memory locality;
      * specifically, the order of elements is key0, value0, key1, value1, 
      * key2, value2, etc. 
    複製代碼
  • LOAD_FACTOR :默認爲0.7框架

  • growThreshold : (LOAD_FACTOR * capacity).toIntide

1.2 AppendOnlyMap 經常使用方法:

  • growTable :擴容容量爲原先的兩倍,對key進行re-hash放入新數組。Double the table's size and re-hash everything。函數

  • update :key和value的更新。三種狀況:1:rehash(key.hashCode) & mask對應位置沒有值,直接插入。2:對應位置有值且等於原先key,直接更新。3:對應位置有值且 不等於原先key,向後挪動一位。oop

    def update(key: K, value: V): Unit = {
       assert(!destroyed, destructionMessage)
       val k = key.asInstanceOf[AnyRef]
       if (k.eq(null)) {
         if (!haveNullValue) {
           incrementSize()
         }
         nullValue = value
         haveNullValue = true
         return
        }
        var pos = rehash(key.hashCode) & mask
        var i = 1
        while (true) {
         val curKey = data(2 * pos)
         if (curKey.eq(null)) {
           data(2 * pos) = k
           data(2 * pos + 1) = value.asInstanceOf[AnyRef]
           incrementSize()  // Since we added a new key
           return
         } else if (k.eq(curKey) || k.equals(curKey)) {
           data(2 * pos + 1) = value.asInstanceOf[AnyRef]
           return
         } else {
           val delta = i
           pos = (pos + delta) & mask
           i += 1
         }
       }
     }
    複製代碼
  • changeValue :緩存聚合算法,根據指定函數進行值的聚合操做,updateFunc爲匿名函數。三種狀況:1:rehash(key.hashCode) & mask對應位置沒有值,與NULL值聚合。2:對應位置有值且等於原先key,直接聚合。3:對應位置有值,且不等於原先key,向後挪動一位插入。

    def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
       assert(!destroyed, destructionMessage)
       val k = key.asInstanceOf[AnyRef]
       if (k.eq(null)) {
         if (!haveNullValue) {
           incrementSize()
         }
         nullValue = updateFunc(haveNullValue, nullValue)
         haveNullValue = true
         return nullValue
       }
       var pos = rehash(k.hashCode) & mask
       var i = 1
       while (true) {
         val curKey = data(2 * pos)
         if (curKey.eq(null)) {
           val newValue = updateFunc(false, null.asInstanceOf[V])
           data(2 * pos) = k
           data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
           incrementSize()
           return newValue
         } else if (k.eq(curKey) || k.equals(curKey)) {
           val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
           data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
           return newValue
         } else {
           val delta = i
           pos = (pos + delta) & mask
           i += 1
         }
       }
       null.asInstanceOf[V] // Never reached but needed to keep compiler happy
     }
    複製代碼
  • destructiveSortedIterator:在不犧牲額外內存和不犧牲AppendOnlyMap的有效性的前提下,對AppendOnlyMap的data數組中的數據進行排序實現。這裏使用了優化版的TimSort,英文解釋以下:

    * return an iterator of the map in sorted order. This provides a way to sort the
      * map without using additional memory, at the expense of destroying the validity
      * of the map.
    複製代碼

    代碼片斷以下:

    def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
          destroyed = true
          // Pack KV pairs into the front of the underlying array
           var keyIndex, newIndex = 0
           while (keyIndex < capacity) {
            if (data(2 * keyIndex) != null) {
              data(2 * newIndex) = data(2 * keyIndex)
              data(2 * newIndex + 1) = data(2 * keyIndex + 1)
              newIndex += 1
            }
            keyIndex += 1
          }
          assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
          new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
          new Iterator[(K, V)] {
            var i = 0
            var nullValueReady = haveNullValue
            def hasNext: Boolean = (i < newIndex || nullValueReady)
            def next(): (K, V) = {
              if (nullValueReady) {
                nullValueReady = false
                (null.asInstanceOf[K], nullValue)
              } else {
                val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
                i += 1
                item
              }
            }
          }
        }   
    複製代碼

2. AppendOnlyMap 孩子的延伸特性?

  • SizeTrackingAppendOnlyMap :以自身的大小進行樣本採集和大小估算。

    An append-only map that keeps track of its estimated size in bytes.
    複製代碼

    SizeTrackingAppendOnlyMap的代碼段,好短啊:

    private[spark] class SizeTrackingAppendOnlyMap[K, V]
        extends AppendOnlyMap[K, V] with SizeTracker
      {
        override def update(key: K, value: V): Unit = {
          super.update(key, value)
          super.afterUpdate()
        }
      
        override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
          val newValue = super.changeValue(key, updateFunc)
          super.afterUpdate()
          newValue
        }
      
        override protected def growTable(): Unit = {
          super.growTable()
          resetSamples()
        }
      }
    複製代碼
  • PartitionedAppendOnlyMap :增長了partitionedDestructiveSortedIterator,調用了AppendOnlyMap的destructiveSortedIterator對底層數組進行整理和排序後得到迭代器。

    (1)主要做用是根據指定的key比較器,返回對集合中的數據按照分區Id 順序進行迭代的迭代器。
    * Implementation of WritablePartitionedPairCollection that wraps a map in which the
      * keys are tuples of (partition ID, K)
    
      private[spark] class PartitionedAppendOnlyMap[K, V]
        extends SizeTrackingAppendOnlyMap[(Int, K), V] with
        WritablePartitionedPairCollection[K, V] {
      
            (WritablePartitionedPairCollection定義的接口,未實現)
        def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
          : Iterator[((Int, K), V)] = {
          val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
         
         (AppendOnlyMap內部方法,對底層的data數組進行整理和排序後得到迭代器)
            destructiveSortedIterator(comparator)
        }
    複製代碼
    (2) 同時對插入的鍵值進行了擴展,增長了分區和key的鍵值對元祖(partition, key)類型,以下:update((partition, key), value)。
    def insert(partition: Int, key: K, value: V): Unit = {
          update((partition, key), value)
        }
      }
    複製代碼

3 原創總結AppendOnlyMap牛在哪裏?

  • 1 大大減小了數據佔用內存的大小?
  • 2 來對中間結果進行聚合,Tim sort 優化排序算法
  • 3 Spark的Map任務逐條輸出計算結果,而不是一次性輸出到內存,並經過使用AppendOnlyMap緩存及其聚合算法來對中間結果進行聚合,這樣大大減小了中間結果所佔用的內存。
  • 4 Spark的reduce任務對拉取到的map任務中間結果逐條讀取,而不是一次性讀入內存,並在內存中進行聚合和排序, 本質上讀入內存操做都是通過AppendOnlyMap,大大減小了數據佔用內存的大小。
  • 5 經過優化的SizeTrackingAppendOnlyMap,SizeTrackingPairBuff及Tungsten的page進行溢出判斷,當超過限制時,會把數據寫入磁盤,放着內存溢出。

4 Spark shuffle 的流程剖析

- 1: Spark的Map任務在輸出時會根據分區進行計算,並輸出數據文件和索引文件。
- 2:Spark的shuffle過程會伴隨着緩存,排序,聚合,溢出,合併操做。固然遠端拉取Block的操做必不可少。
複製代碼

5 最後

秦凱新 於深圳

相關文章
相關標籤/搜索