(1)A list of partitions數據庫
(2)A function for computing each split緩存
(3)A list of dependencies on other RDDs微信
(4)Optionally,a Partitioner for key-value RDDs網絡
(5)Optionally,a list of preferred locations to compute each split閉包
Transformations:轉換,lazy型,不會觸發計算分佈式
Action:觸發jobide
Persist:緩存也不會觸發job,在第一次觸發job以後纔會真正進行緩存函數
RDD的計算實際上咱們能夠分爲兩大部分。性能
1)Driver端的計算this
主要是stage劃分,task的封裝,task調度執行
2)Executor端的計算
真正的計算開始,默認狀況下每一個cpu運行一個task。一個task實際上就是一個分區,咱們的方法不管是轉換算子裏封裝的,仍是action算子裏封裝的都是此時在一個task裏面計算一個分區的數據。
/** * Applies a function f to all elements of this RDD. */ def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) } /** * Applies a function f to each partition of this RDD. */ def foreachPartition(f: Iterator[T] => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => cleanF(iter)) }
能夠看到方法經過clean操做(清理閉包,爲序列化和網絡傳輸作準備),進行了一次匿名函數的封裝, 針對foreach方法,是咱們的方法被傳入了迭代器foreach(每一個元素遍歷執行一次函數), 而對於foreachpartition方法是迭代器被傳入了咱們的方法(每一個分區執行一次函數,咱們獲取迭代器後須要自行進行迭代處理)
def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = { val cleanedFunc = clean(func) runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions) }
就是講上述封裝的方法進一步按照匿名函數封裝
(ctx:TaskContext,it:Iterator[T] => cleanFunc(it))
Spark的Task類型咱們用到的也就兩個
1)shuffleMapTask
2)ResultTask
Action算子的方法是在ResultTask中執行的,也即ResultTask的runTask方法。
首先反序列化獲得咱們的方法和RDD,而後執行。傳入的是迭代器
override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L func(context, rdd.iterator(partition, context)) }
RDD.foreach(foreachFunction)
RDD.foreachPatition(foreachPartitionFunction)
通過第二部分析咱們能夠理解,展開以後實際上就是
RDD的每一個分區的iterator:
iterator.foreach(foreachFunction)
foreachPartitionFunction(iterator)
這就很明顯了,假如咱們的Function中有數據庫,網絡TCP等IO鏈接,文件流等等的建立關閉操做,採用foreachPartition方法,針對每一個分區集合進行計算,更能提升咱們的性能。