package sparkcore.java;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
/**
* action操做實戰
*/
public class ActionOperation {
public static void main(String[] args) {
// reduce();
// collect();
// count();
// take();
// saveAsTextFile();
countByKey();
}
public static void reduce() {
// 建立SparkConf和JavaSparkContext
SparkConf conf = new SparkConf().setAppName("reduce").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
// 使用reduce操做對集合中的數字進行累加
// reduce操做的原理:
// 首先將第一個和第二個元素,傳入call()方法,進行計算,會獲取一個結果,好比1 + 2 = 3
// 接着將該結果與下一個元素傳入call()方法,進行計算,好比3 + 3 = 6
// 以此類推
// 因此reduce操做的本質,就是聚合,將多個元素聚合成一個元素
int sum = numbers.reduce(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(sum);
// 關閉JavaSparkContext
sc.close();
}
public static void collect() {
// 建立SparkConf和JavaSparkContext
SparkConf conf = new SparkConf().setAppName("collect").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
// 使用map操做將集合中全部數字乘以2
JavaRDD<Integer> doubleNumbers = numbers.map(
new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
// collect操做,將分佈在遠程集羣上的doubleNumbers RDD的數據拉取到本地
// 這種方式,通常不建議使用,由於若是rdd中的數據量比較大的話,好比超過1萬條
// 那麼性能會比較差,由於要從遠程走大量的網絡傳輸,將數據獲取到本地
// 此外,除了性能差,還可能在rdd中數據量特別大的狀況下,發生oom異常,內存溢出
// 所以,一般,仍是推薦使用foreach action操做,來對最終的rdd元素進行處理
List<Integer> doubleNumberList = doubleNumbers.collect();
for (Integer num : doubleNumberList) {
System.out.println(num);
}
// 關閉JavaSparkContext
sc.close();
}
public static void count() {
// 建立SparkConf和JavaSparkContext
SparkConf conf = new SparkConf().setAppName("count").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
// 對rdd使用count操做,統計它有多少個元素
long count = numbers.count();
System.out.println(count);
// 關閉JavaSparkContext
sc.close();
}
public static void take() {
// 建立SparkConf和JavaSparkContext
SparkConf conf = new SparkConf().setAppName("take").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
// 對rdd使用count操做,統計它有多少個元素
// take操做,與collect相似,也是從遠程集羣上,獲取rdd的數據
// 可是collect是獲取rdd的全部數據,take只是獲取前n個數據
List<Integer> top3Numbers = numbers.take(3);
for (Integer num : top3Numbers) {
System.out.println(num);
}
// 關閉JavaSparkContext
sc.close();
}
public static void saveAsTextFile() {
// 建立SparkConf和JavaSparkContext
SparkConf conf = new SparkConf().setAppName("saveAsTextFile");
JavaSparkContext sc = new JavaSparkContext(conf);
// 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);
// 使用map操做將集合中全部數字乘以2
JavaRDD<Integer> doubleNumbers = numbers.map(
new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
// 直接將rdd中的數據,保存在HFDS文件中
// 可是要注意,咱們這裏只能指定文件夾,也就是目錄
// 那麼實際上,會保存爲目錄中的/double_number.txt/part-00000文件
doubleNumbers.saveAsTextFile("hdfs://node1:8020/double_number.txt");
// 關閉JavaSparkContext
sc.close();
}
public static void countByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("countByKey").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<String, String>> scoreList = Arrays.asList(new Tuple2<String, String>("class1", "leo"),
new Tuple2<String, String>("class2", "jack"), new Tuple2<String, String>("class1", "marry"),
new Tuple2<String, String>("class2", "tom"), new Tuple2<String, String>("class2", "david"));
// 並行化集合,建立JavaPairRDD
JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList);
// 對rdd應用countByKey操做,統計每一個班級的學生人數,也就是統計每一個key對應的元素個數
// 這就是countByKey的做用
// countByKey返回的類型,直接就是Map<String, Object>
Map<String, Long> studentCounts = students.countByKey();
for (Map.Entry<String, Long> studentCount : studentCounts.entrySet()) {
System.out.println(studentCount.getKey() + ": " + studentCount.getValue());
}
// 關閉JavaSparkContext
sc.close();
}
}
package sparkcore.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ActionOperation {
def main(args: Array[String]) {
// reduce()
// collect()
// count()
// take()
countByKey()
}
def reduce() {
val conf = new SparkConf()
.setAppName("reduce")
.setMaster("local")
val sc = new SparkContext(conf)
val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val numbers = sc.parallelize(numberArray, 1)
val sum = numbers.reduce(_ + _)
println(sum)
}
def collect() {
val conf = new SparkConf()
.setAppName("collect")
.setMaster("local")
val sc = new SparkContext(conf)
val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val numbers = sc.parallelize(numberArray, 1)
val doubleNumbers = numbers.map { num => num * 2 }
val doubleNumberArray = doubleNumbers.collect()
for (num <- doubleNumberArray) {
println(num)
}
}
def count() {
val conf = new SparkConf()
.setAppName("count")
.setMaster("local")
val sc = new SparkContext(conf)
val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val numbers = sc.parallelize(numberArray, 1)
val count = numbers.count()
println(count)
}
def take() {
val conf = new SparkConf()
.setAppName("take")
.setMaster("local")
val sc = new SparkContext(conf)
val numberArray = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val numbers = sc.parallelize(numberArray, 1)
val top3Numbers = numbers.take(3)
for (num <- top3Numbers) {
println(num)
}
}
def saveAsTextFile() {
}
def countByKey() {
val conf = new SparkConf()
.setAppName("countByKey")
.setMaster("local")
val sc = new SparkContext(conf)
val studentList = Array(Tuple2("class1", "leo"), Tuple2("class2", "jack"),
Tuple2("class1", "tom"), Tuple2("class2", "jen"), Tuple2("class2", "marry"))
val students = sc.parallelize(studentList, 1)
val studentCounts = students.countByKey()
println(studentCounts)
}
}