本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法
Spark提供了AppendOnlyMap數據結構來對任務執行結果進行聚合運算。可謂一件利器,爲何這樣說呢?由於Spark是基於內存運算的大數據計算引擎,即能基於內存作數據存儲,也能基於內存進行插入,更新,聚合,排序等操做。由此能夠看出,Spark真正把內存的使用技術發揮到了極致。數組
* A simple open hash table optimized for the append-only use case, where keys
* are never removed, but the value for each key may be changed.
*
* This implementation uses quadratic probing with a power-of-2 hash table
* size, which is guaranteed to explore all spaces for each key (see
* http://en.wikipedia.org/wiki/Quadratic_probing).
* The map can support up to `375809638 (0.7 * 2 ^ 29)` elements.
複製代碼
提供對null值得緩存緩存
initialCapacity : 主構造函數傳入 class AppendOnlyMap[K, V](initialCapacity: Int = 64)數據結構
capacity :容量取值爲:nextPowerOf2(initialCapacity),具體就是補零對比,相同爲原值,不相同則左移加一位。架構
data : 用於保存key和聚合值得數組。new Array[AnyRef](2 * capacity)app
* Holds keys and values in the same array for memory locality;
* specifically, the order of elements is key0, value0, key1, value1,
* key2, value2, etc.
複製代碼
LOAD_FACTOR :默認爲0.7框架
growThreshold : (LOAD_FACTOR * capacity).toIntide
growTable :擴容容量爲原先的兩倍,對key進行re-hash放入新數組。Double the table's size and re-hash everything。函數
update :key和value的更新。三種狀況:1:rehash(key.hashCode) & mask對應位置沒有值,直接插入。2:對應位置有值且等於原先key,直接更新。3:對應位置有值且 不等於原先key,向後挪動一位。oop
def update(key: K, value: V): Unit = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
incrementSize()
}
nullValue = value
haveNullValue = true
return
}
var pos = rehash(key.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(null)) {
data(2 * pos) = k
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
incrementSize() // Since we added a new key
return
} else if (k.eq(curKey) || k.equals(curKey)) {
data(2 * pos + 1) = value.asInstanceOf[AnyRef]
return
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
}
複製代碼
changeValue :緩存聚合算法,根據指定函數進行值的聚合操做,updateFunc爲匿名函數。三種狀況:1:rehash(key.hashCode) & mask對應位置沒有值,與NULL值聚合。2:對應位置有值且等於原先key,直接聚合。3:對應位置有值,且不等於原先key,向後挪動一位插入。
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
incrementSize()
}
nullValue = updateFunc(haveNullValue, nullValue)
haveNullValue = true
return nullValue
}
var pos = rehash(k.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(null)) {
val newValue = updateFunc(false, null.asInstanceOf[V])
data(2 * pos) = k
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
incrementSize()
return newValue
} else if (k.eq(curKey) || k.equals(curKey)) {
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}
複製代碼
destructiveSortedIterator:在不犧牲額外內存和不犧牲AppendOnlyMap的有效性的前提下,對AppendOnlyMap的data數組中的數據進行排序實現。這裏使用了優化版的TimSort,英文解釋以下:
* return an iterator of the map in sorted order. This provides a way to sort the
* map without using additional memory, at the expense of destroying the validity
* of the map.
複製代碼
代碼片斷以下:
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
destroyed = true
// Pack KV pairs into the front of the underlying array
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
data(2 * newIndex) = data(2 * keyIndex)
data(2 * newIndex + 1) = data(2 * keyIndex + 1)
newIndex += 1
}
keyIndex += 1
}
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
new Iterator[(K, V)] {
var i = 0
var nullValueReady = haveNullValue
def hasNext: Boolean = (i < newIndex || nullValueReady)
def next(): (K, V) = {
if (nullValueReady) {
nullValueReady = false
(null.asInstanceOf[K], nullValue)
} else {
val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
i += 1
item
}
}
}
}
複製代碼
SizeTrackingAppendOnlyMap :以自身的大小進行樣本採集和大小估算。
An append-only map that keeps track of its estimated size in bytes.
複製代碼
SizeTrackingAppendOnlyMap的代碼段,好短啊:
private[spark] class SizeTrackingAppendOnlyMap[K, V]
extends AppendOnlyMap[K, V] with SizeTracker
{
override def update(key: K, value: V): Unit = {
super.update(key, value)
super.afterUpdate()
}
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
super.afterUpdate()
newValue
}
override protected def growTable(): Unit = {
super.growTable()
resetSamples()
}
}
複製代碼
PartitionedAppendOnlyMap :增長了partitionedDestructiveSortedIterator,調用了AppendOnlyMap的destructiveSortedIterator對底層數組進行整理和排序後得到迭代器。
* Implementation of WritablePartitionedPairCollection that wraps a map in which the
* keys are tuples of (partition ID, K)
private[spark] class PartitionedAppendOnlyMap[K, V]
extends SizeTrackingAppendOnlyMap[(Int, K), V] with
WritablePartitionedPairCollection[K, V] {
(WritablePartitionedPairCollection定義的接口,未實現)
def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
(AppendOnlyMap內部方法,對底層的data數組進行整理和排序後得到迭代器)
destructiveSortedIterator(comparator)
}
複製代碼
def insert(partition: Int, key: K, value: V): Unit = {
update((partition, key), value)
}
}
複製代碼
- 1: Spark的Map任務在輸出時會根據分區進行計算,並輸出數據文件和索引文件。
- 2:Spark的shuffle過程會伴隨着緩存,排序,聚合,溢出,合併操做。固然遠端拉取Block的操做必不可少。
複製代碼
秦凱新 於深圳