0三、操做RDD(transformation和action案例實戰)

一、transformation和action介紹

Spark支持兩種RDD操做:transformation和action。transformation操做會針對已有的RDD建立一個新的RDD;而action則主要是對RDD進行最後的操做,好比遍歷、reduce、保存到文件等,並能夠返回結果給Driver程序。
 
例如,map就是一種transformation操做,它用於將已有RDD的每一個元素傳入一個自定義的函數,並獲取一個新的元素,而後將全部的新元素組成一個新的RDD。而reduce就是一種action操做,它用於對RDD中的全部元素進行聚合操做,並獲取一個最終的結果,而後返回給Driver程序。
 
transformation的特色就是 lazy特性。lazy特性指的是,若是一個spark應用中只定義了transformation操做,那麼即便你執行該應用,這些操做也不會執行。也就是說,transformation是不會觸發spark程序的執行的,它們只是記錄了對RDD所作的操做,可是不會自發的執行。只有當transformation以後,接着執行了一個action操做,那麼全部的transformation纔會執行。Spark經過這種lazy特性,來進行底層的spark應用執行的優化,避免產生過多中間結果。
 
action操做執行,會觸發一個spark job的運行,從而觸發這個action以前全部的transformation的執行。這是action的特性。

二、案例:統計文件字數


這裏 經過一個以前學習過的案例,統計文件字數,來說解transformation和action。
 
// 這裏經過textFile()方法,針對外部文件建立了一個RDD,lines,可是實際上,程序執行到這裏爲止,spark.txt文件的數據是不會加載到內存中的。lines,只是表明了一個指向spark.txt文件的引用。
val lines = sc.textFile("spark.txt")
 
// 這裏對lines RDD進行了map算子,獲取了一個轉換後的lineLengths RDD。可是這裏連數據都沒有,固然也不會作任何操做。lineLengths RDD也只是一個概念上的東西而已。
val lineLengths = lines.map(line => line.length)
 
// 之列,執行了一個action操做,reduce。此時就會觸發以前全部transformation操做的執行,Spark會將操做拆分紅多個task到多個機器上並行執行,每一個task會在本地執行map操做,而且進行本地的reduce聚合。最後會進行一個全局的reduce聚合,而後將結果返回給Driver程序。
val totalLength = lineLengths.reduce(_ + _)
 

 三、案例:統計文件每行出現的次數

    3.一、java

package sparkcore.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
 * 統計每行出現的次數,即同一行在文件裏出現的次數
 */
public class LineCount {
    public static void main(String[] args) {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("LineCount").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 建立初始RDD,lines,每一個元素是一行文本
        JavaRDD<String> lines = sc.textFile("test.txt");
        // 對lines RDD執行mapToPair算子,將每一行映射爲(line, 1)的這種key-value對的格式
        // 而後後面才能統計每一行出現的次數
        JavaPairRDD<String, Integer> pairs = lines.mapToPair(
                new PairFunction<String, String, Integer>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Tuple2<String, Integer> call(String tthrows Exception {
                        return new Tuple2<String, Integer>(t, 1);
                    }
                });
        // 對pairs RDD執行reduceByKey算子,統計出每一行出現的總次數
        JavaPairRDD<String, Integer> lineCounts = pairs.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Integer call(Integer v1, Integer v2throws Exception {
                        return v1 + v2;
                    }
                });
        // 執行一個action操做,foreach,打印出每一行出現的次數
        lineCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Integer> tthrows Exception {
                System.out.println(t._1 + " : " + t._2);
            }
        });
        // 關閉JavaSparkContext
        sc.close();
    }
}

    3.二、scala

 
package sparkcore.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object LineCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("LineCount")
      .setMaster("local")
    val sc = new SparkContext(conf);
    val lines = sc.textFile("test.txt"1)
    val pairs = lines.map { (_, 1) }
    val lineCounts = pairs.reduceByKey { _ + _ }
    lineCounts.foreach(lineCount => println(lineCount._1 + " : " + lineCount._2 ))
  }
} 
相關文章
相關標籤/搜索