大數據算法:排位問題

此文已由做者葉林寶受權網易雲社區發佈。
html

歡迎訪問網易雲社區,瞭解更多網易技術產品運營經驗。算法


問題描述數組

(1)如上圖是一張表, 第一行是列名,除第一列是string類型外, 其它列都是double類型。第二行開始,每行表明一條記錄安全

(2)需求:輸入一個數組例如[2, 4](後文中, 簡稱排位數組), 要求輸出每列(除第一列)按升序取出第二、第4位的值。 結果以下圖所示:bash

解決算法app

1、迭代計算ide

簡述:函數

簡單來講, 就是迭代每一列, 而後迭代每一行, 而後找出知足相應排名的數據ui

代碼:
spa

/**
    * 篩選出每一個列知足給定的排位數組中要求的位置上的取值, 簡單粗暴方案
    *
    * @param dataFrame 數據源
    * @param ranks 排位數組
    * @return
    */
  def findRankStatistics(dataFrame: DataFrame,  ranks: List[Long]): Map[Int, Iterable[Double]] = {
    require(ranks.forall(_ > 0))    // 獲取字段個數
    val numberOfColumns = dataFrame.schema.length
    var i = 0
    // 保存結果, key爲列索引, value爲該列知足排序數組的值構成的列表
    var result = Map[Int, Iterable[Double]]()    while (i < numberOfColumns) {      // 每輪迭代, 只filter出該列數據
      val col = dataFrame.rdd.map(row => row.getDouble(i))
      val sortedCol: RDD[(Double, Long)] = col.sortBy(v => v).zipWithIndex()      // 只過濾出知足排位的數值
      val ranksOnly = sortedCol.filter { case (colValue, index) => ranks.contains(index + 1)}.keys
      val list = ranksOnly.collect()
      result += (i -> list)
      i += 1
    }
    result
  }複製代碼

分析:

缺點:效率低, 每列排位的計算是串行的。

2、 groupByKey

簡述:

解決方案一, 循環迭代每一列帶來的效率問題, 列與列之間的計算本質上互不影響的, 因此, 方案二的改進方法是, 列索引做爲key , 每行的值做爲value,依據key的hash值shuffle到不一樣的partition, 並行計算每一個partition

代碼:

/**
    * 篩選出每一個列知足給定的排位數組中要求的位置上的數值, groupByKey方案
    *
    * @param dataFrame 數據源
    * @param ranks  排位數組
    * @return
    */
  def findRankStatistics(dataFrame: DataFrame, ranks: List[Long]): Map[Int, Iterable[Double]] = {
    require(ranks.forall(_ > 0))    // 將源數據 依據列索引(後文簡稱field_index)和每行對應的數值(後文簡稱field_value)轉換爲pairRDD
    val pairRDD: RDD[(Int, Double)] = mapToKeyValuePairs(dataFrame)    // 依據field_index 聚合相應filed_value
    val groupColumns: RDD[(Int, Iterable[Double])] = pairRDD.groupByKey()    
    // 對每一個field_index, 計算其相應的field_values, 過濾出知足排位要求的數值
    groupColumns.mapValues(
      iter => {        // 排序field_values
        val sortedIter = iter.toArray.sorted
        sortedIter.toIterable.zipWithIndex.flatMap({          case (colValue, index) =>            if (ranks.contains(index + 1)) {
              Iterator(colValue)
            } else {
              Iterator.empty
            }
        })
      }).collectAsMap()
  }複製代碼

分析:

這種方案只使用數數據規模較小(指的是行數較少)的數據, 對於大規模數據, groupByKey操做, 容易oom。

groupByKey執行效果以下圖所示:

groupByKey會在內存中暫存全部的<key, values>, 因此, 對於同一個key, value較多的狀況, 容易引發executor端oom

3、二次排序

簡述:

方案二, 除了容易引發executor端oom問題, 還有另一個問題, 排序操做時在shuffle後, 在executor進行的。spark sort based shuffle 支持在shuffle階段,直接對key進行排序。所以,可經過二次排序提升效率。

算法思想:

(1)將df的每行數據展開爲(field_index, field_value)格式, 再轉換爲((field_index, field_value), 1)

(2)自定義分區器, 根據((field_index, field_value), 1)中的field_index分區

(3)調用repartitionAndSortWithinPartitions函數, 依據field_index分區, 依據(field_index, field_value)排序

(4)過濾出各列知足配位需求的值

(5)轉換爲所需輸出格式

代碼:

/**
    * 篩選出每一個列知足給定的排位數組中要求的位置上的數值, 二次排序方案
    * @param dataFrame 數據源
    * @param targetRanks 排位數組
    * @param partitions 分區數
    * @return
    */
  def findRankStatistics(dataFrame: DataFrame, targetRanks: List[Long], partitions: Int) = {    // 將df的每行數據展開爲(field_index, field_value)格式, 再轉換爲((field_index, field_value), 1)
    // 即最終爲pariRdd
    // 其中「1」只是pairRdd中value的一個佔位符, 對最終計算結果不產生影響
    val pairRDD: RDD[((Int, Double), Int)] = mapToKeyValuePairs(dataFrame).map((_, 1))    //自定義分區器, 根據((field_index, field_value), 1)中的field_index分區
    val partitioner = new ColumnIndexPartition(partitions)    //根據((field_index, field_value), 1)中的(field_index, field_value)排序
    val sorted = pairRDD.repartitionAndSortWithinPartitions(partitioner)    //過濾出所需數據
    val filterForTargetIndex: RDD[(Int, Double)] =
      sorted.mapPartitions(iter => {
        var currentColumnIndex = -1
        var runningTotal = 0
         // 過濾出各列知足配位需求的值
        iter.filter({          case (((colIndex, value), _)) =>            // 同一個分區中的數據,可能包含多個field_index即多列數據, 當遍歷到新的field_index時, 須要重置計數器runningTotal
            if (colIndex != currentColumnIndex) {
              currentColumnIndex = colIndex
              runningTotal = 1
            } else {
              runningTotal += 1
            }            //保留知足排位的數值
            targetRanks.contains(runningTotal)
        })
      }.map(_._1), preservesPartitioning = true)    // 轉換爲所需輸出格式
    groupSorted(filterForTargetIndex.collect())
  }  // 隱式轉換, 對於二元數組, 先按第一排序, 再按第二個排序
  implicit val ordering: Ordering[(K, S)] = Ordering.Tuple2  /**
    * 將df的每行數據展開爲(field_index, field_value)格式, 再轉換爲((field_index, field_value), 1)
    * @param dataFrame
    * @return
    */
  def mapToKeyValuePairs(dataFrame: DataFrame): RDD[(Int, Double)] = {    // 獲取字段個數
    val rowLength = dataFrame.schema.length
    dataFrame.rdd.flatMap(
      row => Range(0, rowLength).map(i => (i, row.getDouble(i)))
    )
  }  /**
    * 自定義分區器, 根據((field_index, field_value), 1)中的field_index分區
    * @param numPartitions 分區個數
    */
  class ColumnIndexPartition(override val numPartitions: Int) extends Partitioner {
    require(numPartitions >= 0, s"Number of partitions " + s"($numPartitions) cannot be negative.")    override def getPartition(key: Any): Int = {
      val k = key.asInstanceOf[(Int, Double)]
      Math.abs(k._1) % numPartitions //hashcode of column index
    }
  }  /**
    * 轉換爲所需輸出格式
    * 將it聚合爲map, 其中key爲field_index, value爲同一個field_index下的field_value組成的數組
    * @param it 數組中的元素爲(field_index, field_value)
    * @return
    */
  private def groupSorted(it: Array[(Int, Double)]): Map[Int, Iterable[Double]] = {
    val res = List[(Int, ArrayBuffer[Double])]()
    it.foldLeft(res)((list, next) => list match {      case Nil =>
        val (firstKey, value) = next        List((firstKey, ArrayBuffer(value)))      case head :: rest =>
        val (curKey, valueBuf) = head        val (firstKey, value) = next        if (!firstKey.equals(curKey)) {
          (firstKey, ArrayBuffer(value)) :: list
        } else {
          valueBuf.append(value)
          list
        }
    }).map { case (key, buf) => (key, buf.toIterable) }.toMap
  }複製代碼


分析:

相比於方案二, 這種方案將數據排序下推到shuffle階段, 而後對每一個partitions的數據,迭代每條記錄過濾出所需數據, 避免了executor將全部數據加載到內存中。可是, 在shuffle階段, 也可能出現因爲數據量過大(數據自己行數較多, 且不一樣的key都打到同一個executor), 特別是重複數據特別多的狀況下, 致使在二次排序過程當中oom。



免費體驗雲安全(易盾)內容安全、驗證碼等服務

更多網易技術、產品、運營經驗分享請點擊


相關文章:
【推薦】 Windows擴展屏開發總結
【推薦】 在Android中使用FlatBuffers(上篇)

相關文章
相關標籤/搜索