spark中要將計算結果取回driver,有兩種方式:collect和take,這兩種方式有什麼差異?來看代碼:apache
org.apache.spark.rdd.RDDide
/** * Return an array that contains all of the elements in this RDD. * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. */ def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } /** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. * * @note Due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2) if (num == 0) { new Array[T](0) } else { val buf = new ArrayBuffer[T] val totalParts = this.partitions.length var partsScanned = 0 while (buf.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1L if (partsScanned > 0) { // If we didn't find any rows after the previous iteration, quadruple and retry. // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. if (buf.isEmpty) { numPartsToTry = partsScanned * scaleUpFactor } else { // the left side of max is >=1 whenever partsScanned >= 2 numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1) numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor) } } val left = num - buf.size val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p) res.foreach(buf ++= _.take(num - buf.size)) partsScanned += p.size } buf.toArray } }
可見collect是直接計算全部結果,而後將每一個partition的結果變成array,而後再合併成一個array;this
而take的實現就要複雜一些,它會首先計算1個partition,而後根據結果的數量推斷出還須要計算幾個分區,而後再計算這幾個分區,而後再看結果夠不夠,這是一個迭代的過程,計算越簡單或者take數量越少,越有可能在前邊的迭代中知足條件返回;spa