此文已由做者葉林寶受權網易雲社區發佈。
html
歡迎訪問網易雲社區,瞭解更多網易技術產品運營經驗。算法
問題描述數組
(1)如上圖是一張表, 第一行是列名,除第一列是string類型外, 其它列都是double類型。第二行開始,每行表明一條記錄安全
(2)需求:輸入一個數組例如[2, 4](後文中, 簡稱排位數組), 要求輸出每列(除第一列)按升序取出第二、第4位的值。 結果以下圖所示:bash
解決算法app
1、迭代計算ide
簡述:函數
簡單來講, 就是迭代每一列, 而後迭代每一行, 而後找出知足相應排名的數據ui
代碼:
spa
/**
* 篩選出每一個列知足給定的排位數組中要求的位置上的取值, 簡單粗暴方案
*
* @param dataFrame 數據源
* @param ranks 排位數組
* @return
*/
def findRankStatistics(dataFrame: DataFrame, ranks: List[Long]): Map[Int, Iterable[Double]] = {
require(ranks.forall(_ > 0)) // 獲取字段個數
val numberOfColumns = dataFrame.schema.length
var i = 0
// 保存結果, key爲列索引, value爲該列知足排序數組的值構成的列表
var result = Map[Int, Iterable[Double]]() while (i < numberOfColumns) { // 每輪迭代, 只filter出該列數據
val col = dataFrame.rdd.map(row => row.getDouble(i))
val sortedCol: RDD[(Double, Long)] = col.sortBy(v => v).zipWithIndex() // 只過濾出知足排位的數值
val ranksOnly = sortedCol.filter { case (colValue, index) => ranks.contains(index + 1)}.keys
val list = ranksOnly.collect()
result += (i -> list)
i += 1
}
result
}複製代碼
分析:
缺點:效率低, 每列排位的計算是串行的。
2、 groupByKey
簡述:
解決方案一, 循環迭代每一列帶來的效率問題, 列與列之間的計算本質上互不影響的, 因此, 方案二的改進方法是, 列索引做爲key , 每行的值做爲value,依據key的hash值shuffle到不一樣的partition, 並行計算每一個partition
代碼:
/**
* 篩選出每一個列知足給定的排位數組中要求的位置上的數值, groupByKey方案
*
* @param dataFrame 數據源
* @param ranks 排位數組
* @return
*/
def findRankStatistics(dataFrame: DataFrame, ranks: List[Long]): Map[Int, Iterable[Double]] = {
require(ranks.forall(_ > 0)) // 將源數據 依據列索引(後文簡稱field_index)和每行對應的數值(後文簡稱field_value)轉換爲pairRDD
val pairRDD: RDD[(Int, Double)] = mapToKeyValuePairs(dataFrame) // 依據field_index 聚合相應filed_value
val groupColumns: RDD[(Int, Iterable[Double])] = pairRDD.groupByKey()
// 對每一個field_index, 計算其相應的field_values, 過濾出知足排位要求的數值
groupColumns.mapValues(
iter => { // 排序field_values
val sortedIter = iter.toArray.sorted
sortedIter.toIterable.zipWithIndex.flatMap({ case (colValue, index) => if (ranks.contains(index + 1)) {
Iterator(colValue)
} else {
Iterator.empty
}
})
}).collectAsMap()
}複製代碼
分析:
這種方案只使用數數據規模較小(指的是行數較少)的數據, 對於大規模數據, groupByKey操做, 容易oom。
groupByKey執行效果以下圖所示:
groupByKey會在內存中暫存全部的<key, values>, 因此, 對於同一個key, value較多的狀況, 容易引發executor端oom
3、二次排序
簡述:
方案二, 除了容易引發executor端oom問題, 還有另一個問題, 排序操做時在shuffle後, 在executor進行的。spark sort based shuffle 支持在shuffle階段,直接對key進行排序。所以,可經過二次排序提升效率。
算法思想:
(1)將df的每行數據展開爲(field_index, field_value)格式, 再轉換爲((field_index, field_value), 1)
(2)自定義分區器, 根據((field_index, field_value), 1)中的field_index分區
(3)調用repartitionAndSortWithinPartitions函數, 依據field_index分區, 依據(field_index, field_value)排序
(4)過濾出各列知足配位需求的值
(5)轉換爲所需輸出格式
代碼:
/**
* 篩選出每一個列知足給定的排位數組中要求的位置上的數值, 二次排序方案
* @param dataFrame 數據源
* @param targetRanks 排位數組
* @param partitions 分區數
* @return
*/
def findRankStatistics(dataFrame: DataFrame, targetRanks: List[Long], partitions: Int) = { // 將df的每行數據展開爲(field_index, field_value)格式, 再轉換爲((field_index, field_value), 1)
// 即最終爲pariRdd
// 其中「1」只是pairRdd中value的一個佔位符, 對最終計算結果不產生影響
val pairRDD: RDD[((Int, Double), Int)] = mapToKeyValuePairs(dataFrame).map((_, 1)) //自定義分區器, 根據((field_index, field_value), 1)中的field_index分區
val partitioner = new ColumnIndexPartition(partitions) //根據((field_index, field_value), 1)中的(field_index, field_value)排序
val sorted = pairRDD.repartitionAndSortWithinPartitions(partitioner) //過濾出所需數據
val filterForTargetIndex: RDD[(Int, Double)] =
sorted.mapPartitions(iter => {
var currentColumnIndex = -1
var runningTotal = 0
// 過濾出各列知足配位需求的值
iter.filter({ case (((colIndex, value), _)) => // 同一個分區中的數據,可能包含多個field_index即多列數據, 當遍歷到新的field_index時, 須要重置計數器runningTotal
if (colIndex != currentColumnIndex) {
currentColumnIndex = colIndex
runningTotal = 1
} else {
runningTotal += 1
} //保留知足排位的數值
targetRanks.contains(runningTotal)
})
}.map(_._1), preservesPartitioning = true) // 轉換爲所需輸出格式
groupSorted(filterForTargetIndex.collect())
} // 隱式轉換, 對於二元數組, 先按第一排序, 再按第二個排序
implicit val ordering: Ordering[(K, S)] = Ordering.Tuple2 /**
* 將df的每行數據展開爲(field_index, field_value)格式, 再轉換爲((field_index, field_value), 1)
* @param dataFrame
* @return
*/
def mapToKeyValuePairs(dataFrame: DataFrame): RDD[(Int, Double)] = { // 獲取字段個數
val rowLength = dataFrame.schema.length
dataFrame.rdd.flatMap(
row => Range(0, rowLength).map(i => (i, row.getDouble(i)))
)
} /**
* 自定義分區器, 根據((field_index, field_value), 1)中的field_index分區
* @param numPartitions 分區個數
*/
class ColumnIndexPartition(override val numPartitions: Int) extends Partitioner {
require(numPartitions >= 0, s"Number of partitions " + s"($numPartitions) cannot be negative.") override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[(Int, Double)]
Math.abs(k._1) % numPartitions //hashcode of column index
}
} /**
* 轉換爲所需輸出格式
* 將it聚合爲map, 其中key爲field_index, value爲同一個field_index下的field_value組成的數組
* @param it 數組中的元素爲(field_index, field_value)
* @return
*/
private def groupSorted(it: Array[(Int, Double)]): Map[Int, Iterable[Double]] = {
val res = List[(Int, ArrayBuffer[Double])]()
it.foldLeft(res)((list, next) => list match { case Nil =>
val (firstKey, value) = next List((firstKey, ArrayBuffer(value))) case head :: rest =>
val (curKey, valueBuf) = head val (firstKey, value) = next if (!firstKey.equals(curKey)) {
(firstKey, ArrayBuffer(value)) :: list
} else {
valueBuf.append(value)
list
}
}).map { case (key, buf) => (key, buf.toIterable) }.toMap
}複製代碼
分析:
相比於方案二, 這種方案將數據排序下推到shuffle階段, 而後對每一個partitions的數據,迭代每條記錄過濾出所需數據, 避免了executor將全部數據加載到內存中。可是, 在shuffle階段, 也可能出現因爲數據量過大(數據自己行數較多, 且不一樣的key都打到同一個executor), 特別是重複數據特別多的狀況下, 致使在二次排序過程當中oom。
更多網易技術、產品、運營經驗分享請點擊。
相關文章:
【推薦】 Windows擴展屏開發總結
【推薦】 在Android中使用FlatBuffers(上篇)