Java版
1 package com.huanfion.Spark; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.JavaPairRDD; 5 import org.apache.spark.api.java.JavaRDD; 6 import org.apache.spark.api.java.JavaSparkContext; 7 import org.apache.spark.api.java.function.Function2; 8 import scala.Tuple2; 9 10 import java.util.Arrays; 11 import java.util.List; 12 import java.util.Map; 13 14 public class ActionJava { 15 public static JavaSparkContext getsc() { 16 SparkConf conf = new SparkConf().setAppName("action").setMaster("local"); 17 return new JavaSparkContext(conf); 18 } 19 20 public static void main(String[] args) { 21 // reduce(); 22 // collect(); 23 // count(); 24 // take(); 25 countByKey(); 26 } 27 28 public static void reduce() { 29 List list = Arrays.asList(1, 2, 3, 4); 30 JavaRDD<Integer> rdd = getsc().parallelize(list); 31 int reducevalue = rdd.reduce(new Function2<Integer, Integer, Integer>() { 32 @Override 33 public Integer call(Integer v1, Integer v2) throws Exception { 34 return v1 + v2; 35 } 36 }); 37 System.out.println(reducevalue); 38 } 39 40 public static void collect() { 41 List list = Arrays.asList(1, 2, 3, 4); 42 JavaRDD<Integer> rdd = getsc().parallelize(list); 43 System.out.println(rdd.collect()); 44 } 45 46 public static void save() { 47 List list = Arrays.asList(1, 2, 3, 4); 48 JavaRDD<Integer> rdd = getsc().parallelize(list); 49 rdd.saveAsTextFile("hdfs://");//此處的hdfs目錄路徑必須存在 50 } 51 52 public static void count() { 53 List list = Arrays.asList(1, 2, 3, 4); 54 JavaRDD<Integer> rdd = getsc().parallelize(list); 55 System.out.println(rdd.count()); 56 } 57 58 public static void take() { 59 List list = Arrays.asList(1, 2, 3, 4); 60 JavaRDD<Integer> rdd = getsc().parallelize(list); 61 List<Integer> takevalue = rdd.take(2); 62 System.out.println(takevalue); 63 } 64 65 public static void countByKey() { 66 List list = Arrays.asList(new Tuple2<>("class_1", 91), 67 new Tuple2<>("class_2", 78), 68 new Tuple2<>("class_1", 99), 69 new Tuple2<>("class_2", 76), 70 new Tuple2<>("class_2", 90)); 71 JavaPairRDD<String, Integer> rdd = getsc().parallelizePairs(list); 72 Map<String, Long> values = rdd.countByKey(); 73 values.forEach((x, y) -> System.out.println(x + ":" + y)); 74 } 75 }
Scala版本
package com.huanfion.Spark import org.apache.spark.{SparkConf, SparkContext} object ActionScala { def getsc: SparkContext = { val sparkconf = new SparkConf().setAppName("action").setMaster("local") new SparkContext(sparkconf) } def main(args: Array[String]): Unit = { // reduce // count // collect // take countByKey } def reduce = { val list = Array(1, 2, 3, 4) val rdd = getsc.parallelize(list) System.out.println(rdd.reduce(_ + _)) } def count = { val list = Array(1, 2, 3, 4) val rdd = getsc.parallelize(list) System.out.println(rdd.count()) } def collect = { val list = Array(1, 2, 3, 4) val rdd = getsc.parallelize(list) val value = rdd.collect(); value.foreach(x => System.out.println(x)) } def take = { val list = Array(1, 2, 3, 4) val rdd = getsc.parallelize(list) val value=rdd.take(2) value.foreach(x=>System.out.println(x)) } def countByKey={ val list = Array(new Tuple2("class_1", 91), Tuple2("class_2", 78), Tuple2("class_1", 99), Tuple2("class_2", 76), Tuple2("class_2", 90)); val rdd=getsc.parallelize(list) val countvalue=rdd.countByKey() countvalue.foreach(x=>System.out.println(x._1+":"+x._2)) } }