// 這裏經過textFile()方法,針對外部文件建立了一個RDD,lines,可是實際上,程序執行到這裏爲止,spark.txt文件的數據是不會加載到內存中的。lines,只是表明了一個指向spark.txt文件的引用。
// 這裏對lines RDD進行了map算子,獲取了一個轉換後的lineLengths RDD。可是這裏連數據都沒有,固然也不會作任何操做。lineLengths RDD也只是一個概念上的東西而已。
// 之列,執行了一個action操做,reduce。此時就會觸發以前全部transformation操做的執行,Spark會將操做拆分紅多個task到多個機器上並行執行,每一個task會在本地執行map操做,而且進行本地的reduce聚合。最後會進行一個全局的reduce聚合,而後將結果返回給Driver程序。
package sparkcore.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* 統計每行出現的次數,即同一行在文件裏出現的次數
*/
public class LineCount {
public static void main(String[] args) {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("LineCount").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 建立初始RDD,lines,每一個元素是一行文本
JavaRDD<String> lines = sc.textFile("test.txt");
// 對lines RDD執行mapToPair算子,將每一行映射爲(line, 1)的這種key-value對的格式
// 而後後面才能統計每一行出現的次數
JavaPairRDD<String, Integer> pairs = lines.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t, 1);
}
});
// 對pairs RDD執行reduceByKey算子,統計出每一行出現的總次數
JavaPairRDD<String, Integer> lineCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 執行一個action操做,foreach,打印出每一行出現的次數
lineCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1 + " : " + t._2);
}
});
// 關閉JavaSparkContext
sc.close();
}
}
package sparkcore.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object LineCount {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("LineCount")
.setMaster("local")
val sc = new SparkContext(conf);
val lines = sc.textFile("test.txt", 1)
val pairs = lines.map { (_, 1) }
val lineCounts = pairs.reduceByKey { _ + _ }
lineCounts.foreach(lineCount => println(lineCount._1 + " : " + lineCount._2 ))
}
}