0六、action操做開發實戰

一、reduce:
二、collect:
三、count:
四、take:
五、saveAsTextFile:
六、countByKey:
七、foreach:


package sparkcore.java;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
/**
 * action操做實戰
 */
public class ActionOperation {
    public static void main(String[] args) {
        // reduce();
        // collect();
        // count();
        // take();
        // saveAsTextFile();
        countByKey();
    }
    public static void reduce() {
        // 建立SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf().setAppName("reduce").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        // 使用reduce操做對集合中的數字進行累加
        // reduce操做的原理:
        // 首先將第一個和第二個元素,傳入call()方法,進行計算,會獲取一個結果,好比1 + 2 = 3
        // 接着將該結果與下一個元素傳入call()方法,進行計算,好比3 + 3 = 6
        // 以此類推
        // 因此reduce操做的本質,就是聚合,將多個元素聚合成一個元素
        int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Integer call(Integer v1, Integer v2throws Exception {
                return v1 + v2;
            }
        });
        System.out.println(sum);
        // 關閉JavaSparkContext
        sc.close();
    }
    public static void collect() {
        // 建立SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf().setAppName("collect").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        // 使用map操做將集合中全部數字乘以2
        JavaRDD<Integer> doubleNumbers = numbers.map(
                new Function<Integer, Integer>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Integer call(Integer v1throws Exception {
                        return v1 * 2;
                    }
                });
        // collect操做,將分佈在遠程集羣上的doubleNumbers RDD的數據拉取到本地
        // 這種方式,通常不建議使用,由於若是rdd中的數據量比較大的話,好比超過1萬條
        // 那麼性能會比較差,由於要從遠程走大量的網絡傳輸,將數據獲取到本地
        // 此外,除了性能差,還可能在rdd中數據量特別大的狀況下,發生oom異常,內存溢出
        // 所以,一般,仍是推薦使用foreach action操做,來對最終的rdd元素進行處理
        List<Integer> doubleNumberList = doubleNumbers.collect();
        for (Integer num : doubleNumberList) {
            System.out.println(num);
        }
        // 關閉JavaSparkContext
        sc.close();
    }
    public static void count() {
        // 建立SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf().setAppName("count").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        // 對rdd使用count操做,統計它有多少個元素
        long count = numbers.count();
        System.out.println(count);
        // 關閉JavaSparkContext
        sc.close();
    }
    public static void take() {
        // 建立SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf().setAppName("take").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        // 對rdd使用count操做,統計它有多少個元素
        // take操做,與collect相似,也是從遠程集羣上,獲取rdd的數據
        // 可是collect是獲取rdd的全部數據,take只是獲取前n個數據
        List<Integer> top3Numbers = numbers.take(3);
        for (Integer num : top3Numbers) {
            System.out.println(num);
        }
        // 關閉JavaSparkContext
        sc.close();
    }
    public static void saveAsTextFile() {
        // 建立SparkConf和JavaSparkContext
        SparkConf conf = new SparkConf().setAppName("saveAsTextFile");
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
        List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        JavaRDD<Integer> numbers = sc.parallelize(numberList);
        // 使用map操做將集合中全部數字乘以2
        JavaRDD<Integer> doubleNumbers = numbers.map(
                new Function<Integer, Integer>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Integer call(Integer v1throws Exception {
                        return v1 * 2;
                    }
                });
        // 直接將rdd中的數據,保存在HFDS文件中
        // 可是要注意,咱們這裏只能指定文件夾,也就是目錄
        // 那麼實際上,會保存爲目錄中的/double_number.txt/part-00000文件
        doubleNumbers.saveAsTextFile("hdfs://node1:8020/double_number.txt");
        // 關閉JavaSparkContext
        sc.close();
    }
    public static void countByKey() {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("countByKey").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 模擬集合
        List<Tuple2<String, String>> scoreList = Arrays.asList(new Tuple2<String, String>("class1""leo"),
                new Tuple2<String, String>("class2""jack"), new Tuple2<String, String>("class1""marry"),
                new Tuple2<String, String>("class2""tom"), new Tuple2<String, String>("class2""david"));
        // 並行化集合,建立JavaPairRDD
        JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList);
        // 對rdd應用countByKey操做,統計每一個班級的學生人數,也就是統計每一個key對應的元素個數
        // 這就是countByKey的做用
        // countByKey返回的類型,直接就是Map<String, Object>
        Map<String, Long> studentCounts = students.countByKey();
        for (Map.Entry<String, Long> studentCount : studentCounts.entrySet()) {
            System.out.println(studentCount.getKey() + ": " + studentCount.getValue());
        }
        // 關閉JavaSparkContext
        sc.close();
    }
}


package sparkcore.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ActionOperation {
  def main(args: Array[String]) {
    // reduce()  
    // collect()  
    // count() 
    // take() 
    countByKey()
  }
  def reduce() {
    val conf = new SparkConf()
      .setAppName("reduce")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val numberArray = Array(12345678910)
    val numbers = sc.parallelize(numberArray1)
    val sum = numbers.reduce(_ + _)
    println(sum)
  }
  def collect() {
    val conf = new SparkConf()
      .setAppName("collect")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val numberArray = Array(12345678910)
    val numbers = sc.parallelize(numberArray1)
    val doubleNumbers = numbers.map { num => num * 2 }
    val doubleNumberArray = doubleNumbers.collect()
    for (num <- doubleNumberArray) {
      println(num)
    }
  }
  def count() {
    val conf = new SparkConf()
      .setAppName("count")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val numberArray = Array(12345678910)
    val numbers = sc.parallelize(numberArray1)
    val count = numbers.count()
    println(count)
  }
  def take() {
    val conf = new SparkConf()
      .setAppName("take")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val numberArray = Array(12345678910)
    val numbers = sc.parallelize(numberArray1)
    val top3Numbers = numbers.take(3)
    for (num <- top3Numbers) {
      println(num)
    }
  }
  def saveAsTextFile() {
  }
  def countByKey() {
    val conf = new SparkConf()
      .setAppName("countByKey")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val studentList = Array(Tuple2("class1""leo"), Tuple2("class2""jack"),
      Tuple2("class1""tom"), Tuple2("class2""jen"), Tuple2("class2""marry"))
    val students = sc.parallelize(studentList1)
    val studentCounts = students.countByKey()
    println(studentCounts)
  }
}
相關文章
相關標籤/搜索