算法——水塘抽樣 reservoirSample

簡介

reservoir的做用是:**在不知道文件總行數的狀況下,如何從文件中隨機的抽取一行?**便是說若是最後發現文字檔共有N行,則每一行被抽取的機率均爲1/N?html

咱們能夠:定義取出的行號爲choice,第一次直接以第一行做爲取出行 choice ,然後第二次以二分之一律率決定是否用第二行替換 choice ,第三次以三分之一的機率決定是否以第三行替換 choice ……,以此類推。由上面的分析咱們能夠得出結論,在取第n個數據的時候,咱們生成一個0到1的隨機數p,若是p小於1/n,保留第n個數。大於1/n,繼續保留前面的數。直到數據流結束,返回此數,算法結束。算法

這個問題的擴展就是:如何從未知或者很大樣本空間隨機地取k個數?亦便是說,若是檔案共有N ≥ k行,則每一行被抽取的機率爲k/N。express

  根據上面(隨機取出一元素)的分析,咱們能夠把上面的1/n變爲k/n便可。思路爲:在取第n個數據的時候,咱們生成一個0到1的隨機數p,若是p小於k/n,替換池中任意一個爲第n個數。大於k/n,繼續保留前面的數。直到數據流結束,返回此k個數。可是爲了保證計算機計算分數額準確性,通常是生成一個0到n的隨機數,跟k相比,道理是同樣的。數組

僞代碼

從S中抽取首k項放入「水塘」中
對於每個S[j]項(j ≥ k):
   隨機產生一個範圍0到j的整數r
   若 r < k 則把水塘中的第r項換成S[j]項
/*
  S has items to sample, R will contain the result
*/
ReservoirSample(S[1..n], R[1..k])
  // fill the reservoir array
  for i = 1 to k
      R[i] := S[i]
 
  // replace elements with gradually decreasing probability
  for i = k+1 to n
    j := random(1, i)   // important: inclusive range
    if j <= k
        R[j] := S[i]

實現概述

  1. 獲取到須要抽樣RDD分區的樣本大小k和分區的全部KEY數組input
  2. 初始化抽樣結果集reservoir爲分區前K個KEY值
  3. 若是分區的總數小於預計樣本大小k,則將當前分區的全部數據做爲樣本數據,不然到第四步
  4. 遍歷分區裏全部Key組成的數組input
  5. 生成隨機須要替換input數組的下標,若是下標小於K則替換
  6. 返回抽取的key值數組和當前分區的總數據量: (reservoir, l)

實現源碼

/**
   * Reservoir sampling implementation that also returns the input size.
   *
   * @param input:RDD的分區裏面的key組成的Iterator
   * @param k :抽樣大小=
   		val sampleSize = math.min(20.0 * partitions, 1e6)
   		val k=math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
   * @param seed random seed:選取隨機數的種子
   * @return (samples, input size)
   */
  def reservoirSampleAndCount[T: ClassTag](
      input: Iterator[T],
      k: Int,
      seed: Long = Random.nextLong())
    : (Array[T], Long) = {
    val reservoir = new Array[T](k)
    // Put the first k elements in the reservoir.
    // 初始化水塘數據爲input的錢K個數,即:reservoir數組中放了RDD分區的前K個key值
    var i = 0
    while (i < k && input.hasNext) {
      val item = input.next()
      reservoir(i) = item
      i += 1
    }

    // If we have consumed all the elements, return them. Otherwise do the replacement.
    // 若是當前的RDD總數小於預設值的採樣量則所有做爲採樣數據並結束採樣
    if (i < k) {
      // If input size < k, trim the array to return only an array of input size.
      val trimReservoir = new Array[T](i)
      System.arraycopy(reservoir, 0, trimReservoir, 0, i)
      (trimReservoir, i)
    } else {
      // If input size > k, continue the sampling process.
      var l = i.toLong
      val rand = new XORShiftRandom(seed)
      // 遍歷全部的key
      while (input.hasNext) {
        val item = input.next()
        l += 1
        // There are k elements in the reservoir, and the l-th element has been
        // consumed. It should be chosen with probability k/l. The expression
        // below is a random long chosen uniformly from [0,l)
        // 計算出須要替換的數組下標
        // 選取第n個數的機率是:n/l; 若是隨機替換數組值的機率是p=rand.nextDouble,
        // 則若是p<k/l;則替換池中任意一個數,即: p*l < k 則進行替換,用p*l做爲隨機替換的下標
        val replacementIndex = (rand.nextDouble() * l).toLong
        if (replacementIndex < k) {
          // 替換reservoir[隨機抽取的下標]的值爲input[l]的值item
          reservoir(replacementIndex.toInt) = item
        }
      }
      (reservoir, l)
    }
  }

參考:https://www.iteblog.com/archives/1525.htmldom

相關文章
相關標籤/搜索