reservoir的做用是:**在不知道文件總行數的狀況下,如何從文件中隨機的抽取一行?**便是說若是最後發現文字檔共有N行,則每一行被抽取的機率均爲1/N?html
咱們能夠:定義取出的行號爲choice,第一次直接以第一行做爲取出行 choice ,然後第二次以二分之一律率決定是否用第二行替換 choice ,第三次以三分之一的機率決定是否以第三行替換 choice ……,以此類推。由上面的分析咱們能夠得出結論,在取第n個數據的時候,咱們生成一個0到1的隨機數p,若是p小於1/n,保留第n個數。大於1/n,繼續保留前面的數。直到數據流結束,返回此數,算法結束。算法
這個問題的擴展就是:如何從未知或者很大樣本空間隨機地取k個數?亦便是說,若是檔案共有N ≥ k行,則每一行被抽取的機率爲k/N。express
根據上面(隨機取出一元素)的分析,咱們能夠把上面的1/n變爲k/n便可。思路爲:在取第n個數據的時候,咱們生成一個0到1的隨機數p,若是p小於k/n,替換池中任意一個爲第n個數。大於k/n,繼續保留前面的數。直到數據流結束,返回此k個數。可是爲了保證計算機計算分數額準確性,通常是生成一個0到n的隨機數,跟k相比,道理是同樣的。數組
從S中抽取首k項放入「水塘」中 對於每個S[j]項(j ≥ k): 隨機產生一個範圍0到j的整數r 若 r < k 則把水塘中的第r項換成S[j]項
/* S has items to sample, R will contain the result */ ReservoirSample(S[1..n], R[1..k]) // fill the reservoir array for i = 1 to k R[i] := S[i] // replace elements with gradually decreasing probability for i = k+1 to n j := random(1, i) // important: inclusive range if j <= k R[j] := S[i]
/** * Reservoir sampling implementation that also returns the input size. * * @param input:RDD的分區裏面的key組成的Iterator * @param k :抽樣大小= val sampleSize = math.min(20.0 * partitions, 1e6) val k=math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt * @param seed random seed:選取隨機數的種子 * @return (samples, input size) */ def reservoirSampleAndCount[T: ClassTag]( input: Iterator[T], k: Int, seed: Long = Random.nextLong()) : (Array[T], Long) = { val reservoir = new Array[T](k) // Put the first k elements in the reservoir. // 初始化水塘數據爲input的錢K個數,即:reservoir數組中放了RDD分區的前K個key值 var i = 0 while (i < k && input.hasNext) { val item = input.next() reservoir(i) = item i += 1 } // If we have consumed all the elements, return them. Otherwise do the replacement. // 若是當前的RDD總數小於預設值的採樣量則所有做爲採樣數據並結束採樣 if (i < k) { // If input size < k, trim the array to return only an array of input size. val trimReservoir = new Array[T](i) System.arraycopy(reservoir, 0, trimReservoir, 0, i) (trimReservoir, i) } else { // If input size > k, continue the sampling process. var l = i.toLong val rand = new XORShiftRandom(seed) // 遍歷全部的key while (input.hasNext) { val item = input.next() l += 1 // There are k elements in the reservoir, and the l-th element has been // consumed. It should be chosen with probability k/l. The expression // below is a random long chosen uniformly from [0,l) // 計算出須要替換的數組下標 // 選取第n個數的機率是:n/l; 若是隨機替換數組值的機率是p=rand.nextDouble, // 則若是p<k/l;則替換池中任意一個數,即: p*l < k 則進行替換,用p*l做爲隨機替換的下標 val replacementIndex = (rand.nextDouble() * l).toLong if (replacementIndex < k) { // 替換reservoir[隨機抽取的下標]的值爲input[l]的值item reservoir(replacementIndex.toInt) = item } } (reservoir, l) } }