Spark 經常使用Action算子

Java版
 1 package com.huanfion.Spark;  2 
 3 import org.apache.spark.SparkConf;  4 import org.apache.spark.api.java.JavaPairRDD;  5 import org.apache.spark.api.java.JavaRDD;  6 import org.apache.spark.api.java.JavaSparkContext;  7 import org.apache.spark.api.java.function.Function2;  8 import scala.Tuple2;  9 
10 import java.util.Arrays; 11 import java.util.List; 12 import java.util.Map; 13 
14 public class ActionJava { 15     public static JavaSparkContext getsc() { 16         SparkConf conf = new SparkConf().setAppName("action").setMaster("local"); 17         return new JavaSparkContext(conf); 18  } 19 
20     public static void main(String[] args) { 21 // reduce(); 22 // collect(); 23 // count(); 24 // take();
25  countByKey(); 26  } 27 
28     public static void reduce() { 29         List list = Arrays.asList(1, 2, 3, 4); 30         JavaRDD<Integer> rdd = getsc().parallelize(list); 31         int reducevalue = rdd.reduce(new Function2<Integer, Integer, Integer>() { 32  @Override 33             public Integer call(Integer v1, Integer v2) throws Exception { 34                 return v1 + v2; 35  } 36  }); 37  System.out.println(reducevalue); 38  } 39 
40     public static void collect() { 41         List list = Arrays.asList(1, 2, 3, 4); 42         JavaRDD<Integer> rdd = getsc().parallelize(list); 43  System.out.println(rdd.collect()); 44  } 45 
46     public static void save() { 47         List list = Arrays.asList(1, 2, 3, 4); 48         JavaRDD<Integer> rdd = getsc().parallelize(list); 49         rdd.saveAsTextFile("hdfs://");//此處的hdfs目錄路徑必須存在
50  } 51 
52     public static void count() { 53         List list = Arrays.asList(1, 2, 3, 4); 54         JavaRDD<Integer> rdd = getsc().parallelize(list); 55  System.out.println(rdd.count()); 56  } 57 
58     public static void take() { 59         List list = Arrays.asList(1, 2, 3, 4); 60         JavaRDD<Integer> rdd = getsc().parallelize(list); 61         List<Integer> takevalue = rdd.take(2); 62  System.out.println(takevalue); 63  } 64 
65     public static void countByKey() { 66         List list = Arrays.asList(new Tuple2<>("class_1", 91), 67                 new Tuple2<>("class_2", 78), 68                 new Tuple2<>("class_1", 99), 69                 new Tuple2<>("class_2", 76), 70                 new Tuple2<>("class_2", 90)); 71         JavaPairRDD<String, Integer> rdd = getsc().parallelizePairs(list); 72         Map<String, Long> values = rdd.countByKey(); 73         values.forEach((x, y) -> System.out.println(x + ":" + y)); 74  } 75 }

 

 
Scala版本
package com.huanfion.Spark import org.apache.spark.{SparkConf, SparkContext} object ActionScala { def getsc: SparkContext = { val sparkconf = new SparkConf().setAppName("action").setMaster("local") new SparkContext(sparkconf) } def main(args: Array[String]): Unit = { // reduce // count // collect // take
 countByKey } def reduce = { val list = Array(1, 2, 3, 4) val rdd = getsc.parallelize(list) System.out.println(rdd.reduce(_ + _)) } def count = { val list = Array(1, 2, 3, 4) val rdd = getsc.parallelize(list) System.out.println(rdd.count()) } def collect = { val list = Array(1, 2, 3, 4) val rdd = getsc.parallelize(list) val value = rdd.collect(); value.foreach(x => System.out.println(x)) } def take = { val list = Array(1, 2, 3, 4) val rdd = getsc.parallelize(list) val value=rdd.take(2) value.foreach(x=>System.out.println(x)) } def countByKey={ val list = Array(new Tuple2("class_1", 91), Tuple2("class_2", 78), Tuple2("class_1", 99), Tuple2("class_2", 76), Tuple2("class_2", 90)); val rdd=getsc.parallelize(list) val countvalue=rdd.countByKey() countvalue.foreach(x=>System.out.println(x._1+":"+x._2)) } }
相關文章
相關標籤/搜索