在分佈式程序中,通訊的代價是很大的,所以控制數據分佈以得到最少的網絡傳輸能夠極大地提高總體性能。和單節點的程序須要爲記錄集合選擇合適的數據結構同樣,Spark 程序能夠經過控制RDD 分區方式來減小通訊開銷。java
分區並非對全部應用都有好處的——好比,若是給定RDD 只須要被掃描一次,咱們徹底沒有必要對其預先進行分區處理。只有當數據集屢次在諸如鏈接這種基於鍵的操做中使用時,分區纔會有幫助。apache
好比, sortByKey() 和 groupByKey()會分別生成範圍分區的 RDD 和哈希分區的 RDD。而另外一方面,諸如 map() 這樣的操做會致使新的 RDD 失去父 RDD 的分區信息,由於這樣的操做理論上可能會修改每條記錄的鍵。api
從分區中獲益的操做緩存
Spark 的許多操做都引入了將數據根據鍵跨節點進行混洗的過程。全部這些操做都會從數據分區中獲益。就 Spark 1.0 而言,可以從數據分區中獲益的操做有 cogroup() 、groupWith() 、 join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 以及 lookup() 。網絡
而對於諸如 cogroup() 和join() 這樣的二元操做,預先進行數據分區會致使其中至少一個 RDD(使用已知分區器的那個 RDD)不發生數據混洗。若是兩個 RDD 使用一樣的分區方式,而且它們還緩存在一樣的機器上(好比一個 RDD 是經過 mapValues() 從另外一個 RDD 中建立出來的,這兩個RDD 就會擁有相同的鍵和分區方式),或者其中一個 RDD 尚未被計算出來,那麼跨節點的數據混洗就不會發生了。數據結構
影響分區方式的操做分佈式
全部會爲生成的結果 RDD 設好分區方式的操做: cogroup() 、 groupWith() 、join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 、 partitionBy() 、 sort() 、 mapValues() (若是父 RDD 有分區方式的話)、flatMapValues() (若是父 RDD 有分區方式的話),以及 filter() (若是父 RDD 有分區方式的話)。其餘全部的操做生成的結果都不會存在特定的分區方式。ide
最後,對於二元操做,輸出數據的分區方式取決於父 RDD 的分區方式。默認狀況下,結果會採用哈希分區,分區的數量和操做的並行度同樣。不過,若是其中的一個父 RDD 已性能
經設置過度區方式,那麼結果就會採用那種分區方式;若是兩個父 RDD 都設置過度區方式,結果 RDD 會採用第一個父 RDD 的分區方式。this
import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Scanner; import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.AbstractJavaRDDLike; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y); public class CombineByKeyTest3 { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCounter"); conf.set("spark.testing.memory", "2147480000"); conf.set("spark.default.parallelism", "4"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)經過讀取外部存儲 ----- 集羣環境使用 2)經過內存中的集合 List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>(); data.add(new Tuple2<>("Cake", 2)); data.add(new Tuple2<>("Bread", 3)); data.add(new Tuple2<>("Cheese", 4)); data.add(new Tuple2<>("Milk", 1)); data.add(new Tuple2<>("Toast", 2)); data.add(new Tuple2<>("Bread", 2)); data.add(new Tuple2<>("Egg", 6)); JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2); JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY()); JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() { @Override public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception { return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1)); } }); mapRdd1.foreach(x->System.out.println(x)); /* * 所有使用List或者Iterable都能實現 */ // JavaPairRDD<String, List<Tuple2<Integer, Integer>>> results = mapRdd1.groupByKey(); // JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> results = mapRdd1.groupByKey(); JavaPairRDD<String, List<Tuple2<Integer, Integer>>> results = mapRdd1.combineByKey( new Function<Tuple2<Integer,Integer>, List<Tuple2<Integer, Integer>>>() { @Override public List<Tuple2<Integer, Integer>> call(Tuple2<Integer, Integer> value) throws Exception { List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>(); list.add(value); return list; } }, new Function2<List<Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>, List<Tuple2<Integer, Integer>>>() { @Override public List<Tuple2<Integer, Integer>> call( List<Tuple2<Integer, Integer>> it, Tuple2<Integer, Integer> value) throws Exception { // List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>(); // it.forEach(list::add); // list.add(value); ((List<Tuple2<Integer, Integer>>)it).add(value); return it; } }, new Function2<List<Tuple2<Integer, Integer>>, List<Tuple2<Integer, Integer>>, List<Tuple2<Integer, Integer>>>() { @Override public List<Tuple2<Integer, Integer>> call( List<Tuple2<Integer, Integer>> it1, List<Tuple2<Integer, Integer>> it2) throws Exception { // List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>(); // it1.forEach(list::add); // it2.forEach(list::add); // return list; ((List)it1).addAll((List)it2); return it1; } }); results.foreach(x->System.out.println(x)); //其實,distinct 基於 reduceByKey實現 // mapRdd1.distinct(); ctx.stop(); } }
import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Scanner; import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.AbstractJavaRDDLike; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y); public class CombineByKeyTest2 { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.set("spark.testing.memory", "2147480000"); conf.setAppName("WordCounter"); conf.set("spark.default.parallelism", "4"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)經過讀取外部存儲 ----- 集羣環境使用 2)經過內存中的集合 List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>(); data.add(new Tuple2<>("Cake", 2)); data.add(new Tuple2<>("Bread", 3)); data.add(new Tuple2<>("Cheese", 4)); data.add(new Tuple2<>("Milk", 1)); data.add(new Tuple2<>("Toast", 2)); data.add(new Tuple2<>("Bread", 2)); data.add(new Tuple2<>("Egg", 6)); int index="Code".hashCode() % 4; JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2); JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }).partitionBy(new HashPartitioner(4)).persist(StorageLevel.MEMORY_ONLY()); // JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.combineByKey( // (value) -> new Tuple2<Integer, Integer>(value,1), // (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), // (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()), // new HashPartitioner(2), // false, // null // ); // JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.aggregateByKey( // new Tuple2<Integer, Integer>(0,0), // (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), // (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()) // ); // JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() { // @Override // public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception { // return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1)); // } // }); // mapRdd1.foreach(System.out::println); // JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd1.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { // @Override // public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception { // return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); // } // }); //results.foreach(System.out::println); // results = mapRdd1.foldByKey(new Tuple2<Integer, Integer>(0, 0), new Function2<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() { // @Override // public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception { // return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); // } // }); //results.foreach(System.out::println); //思考:如何用combineByKey實現groupByKey // mapRdd.groupByKey().foreach(System.out::println); Function<Integer, List<Integer>> createCombiner=new Function<Integer, List<Integer>>() { @Override public List<Integer> call(Integer arg0) throws Exception { List<Integer>list=new ArrayList<Integer>(); list.add(arg0); return list; } }; Function2<List<Integer>, Integer, List<Integer>> mergeValue=new Function2<List<Integer>, Integer, List<Integer>>() { @Override public List<Integer> call(List<Integer> list, Integer value) throws Exception { list.add(value); return list; } }; Function2< List<Integer>,List<Integer> ,List<Integer> > mergeCombiners=new Function2<List<Integer>, List<Integer>, List<Integer>>() { @Override public List<Integer> call(List<Integer> list1, List<Integer> list2) throws Exception { List<Integer> list=new ArrayList<Integer>(); // list.addAll(list1); // list.addAll(list2); list1.forEach(list::add); list2.forEach(list::add); return list; } }; JavaPairRDD<String, List<Integer>> results=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners); results.foreach(x->System.out.println(x)); JavaPairRDD<String, Integer> re=mapRdd.partitionBy(new HashPartitioner(2)); System.out.println(re.glom().collect()); //第四個參數是分區數,glom()打印分區狀態 JavaPairRDD<String, List<Integer>> results2=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners, 2); System.out.println(results2.glom().collect()); System.out.println(results2.getNumPartitions()); //第四個參數自定義分區器 JavaPairRDD<String, List<Integer>> results3=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners,new HashPartitioner(3)); System.out.println(results3.glom().collect()); System.out.println(results3.getNumPartitions()); //第四個參數自定義分區器,第五個參數Boolean類型(map短是否merge),第六個參數定義序列化規則,null爲默認序列化規則 JavaPairRDD<String, List<Integer>> results4=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(3), true, null); System.out.println(results4.glom().collect()); System.out.println(results4.getNumPartitions()); // mapRdd1.combineByKey( // new Function<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() { // @Override // public Tuple2<Integer,Integer> call(Tuple2<Integer, Integer> arg0) throws Exception { // return arg0; // } // }, // // new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>[]>() { // @Override // public Tuple2<Integer, Integer>[] call(Tuple2<Integer, Integer> arg0, Integer arg1) throws Exception { // return null; // } // }, // mergeCombiners); //其實,distinct 基於 reduceByKey實現 // mapRdd1.distinct(); ctx.stop(); } }
import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Scanner; import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.AbstractJavaRDDLike; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y); public class CombineByKeyTest { @SuppressWarnings("serial") public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCounter"); conf.set("spark.testing.memory", "5435657567560"); conf.set("spark.default.parallelism", "4"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)經過讀取外部存儲 ----- 集羣環境使用 2)經過內存中的集合 List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>(); data.add(new Tuple2<>("Cake", 2)); data.add(new Tuple2<>("Bread", 3)); //<"Bread", <3,1>> data.add(new Tuple2<>("Cheese", 4)); data.add(new Tuple2<>("Milk", 1)); data.add(new Tuple2<>("Toast", 2)); data.add(new Tuple2<>("Bread", 2)); data.add(new Tuple2<>("Egg", 6)); JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2); JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair( new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }); // JavaPairRDD<String, Integer> mapRdd=ctx.parallelizePairs(data,2); mapRdd.groupByKey().foreach(x->System.out.println(x)); // JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.combineByKey( // new Function<Integer, Tuple2<Integer, Integer>>() { // @Override // public Tuple2<Integer, Integer> call(Integer v1) throws Exception { // return new Tuple2<Integer, Integer>(v1 ,1); // } // }, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() { // @Override // public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Integer v2) throws Exception { // return new Tuple2<Integer, Integer>(v1._1() + v2, v1._2() + 1); // } // }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { // @Override // public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception { // return new Tuple2<Integer, Integer>(v1._1() + v2._1(), v1._2() + v2._2()); // } // }); JavaPairRDD<String, Tuple2<Integer, Integer>> result2s = mapRdd.combineByKey( (Integer value) -> new Tuple2<Integer, Integer>(value,1), (Tuple2<Integer, Integer> acc, Integer value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), (Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()), new HashPartitioner(3), true, null ); result2s.foreach(x->System.out.println(x)); JavaPairRDD<String, Tuple2<Integer, Integer>> results3 = mapRdd.aggregateByKey( new Tuple2<Integer, Integer>(0,0), (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()) ); results3.foreach(x->System.out.println(x)); JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() { @Override public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception { return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1)); } }); JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd1.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception { return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); } }); // results.foreach(System.out::println); results.foreach(x->System.out.println(x)); ctx.stop(); } }
import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Scanner; import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.AbstractJavaRDDLike; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y); public class CogroupApiTest { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.set("spark.testing.memory", "2147480000"); conf.setAppName("WordCounter"); conf.set("spark.default.parallelism", "4"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)經過讀取外部存儲 ----- 集羣環境使用 2)經過內存中的集合 List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>(); data1.add(new Tuple2<>("Cake", 2)); data1.add(new Tuple2<>("Bread", 3)); data1.add(new Tuple2<>("Cheese", 4)); data1.add(new Tuple2<>("Milk", 1)); data1.add(new Tuple2<>("Toast", 2)); data1.add(new Tuple2<>("Bread", 2)); data1.add(new Tuple2<>("Egg", 6)); // JavaPairRDD<String, Integer> mapRdd1=ctx.parallelizePairs(data1); JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data1, 2); JavaPairRDD<String, Integer> mapRdd1 = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY()); List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>(); data2.add(new Tuple2<>("Cake", 2)); data2.add(new Tuple2<>("Bread", 3)); data2.add(new Tuple2<>("Cheese", 4)); data2.add(new Tuple2<>("Milk", 1)); data2.add(new Tuple2<>("Toast", 2)); JavaRDD<Tuple2<String, Integer>> rdd2 = ctx.parallelize(data2, 2); JavaPairRDD<String, Integer> mapRdd2 = rdd2.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY()); //groupWith,和cogroup是同樣的效果 (Bread,([3, 2],[3])) JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> mapRdd3 = mapRdd1.cogroup(mapRdd2); mapRdd3.foreach(x->System.out.println(x)); //(Bread,(3,3)),(Bread,(2,3)),(Cake,(2,2)) 聚合操做 // JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd3 = mapRdd1.join(mapRdd2); // mapRdd3.foreach(x->System.out.println(x)); //(Bread,(Optional[3],3)), (Bread,(Optional[3],2)),(Cake,(Optional[2],2)) 聚合操做,主集合能夠爲optional.empty // JavaPairRDD<String, Tuple2<Optional<Integer>, Integer>> mapRdd3 = mapRdd2.rightOuterJoin(mapRdd1); // mapRdd3.foreach(x->System.out.println(x)); //(Cheese,(4,Optional[4])), (Toast,(2,Optional[2])), (Egg,(6,Optional.empty)) // JavaPairRDD<String, Tuple2<Integer, Optional<Integer>>> mapRdd4 = mapRdd1.leftOuterJoin(mapRdd2); // mapRdd4.foreach(x->System.out.println(x)); //兩邊都能爲空 // JavaPairRDD<String, Tuple2<Optional<Integer>, Optional<Integer>>> mapRdd5 = mapRdd1.fullOuterJoin(mapRdd2); // mapRdd5.foreach(x->System.out.println(x)); //groupWith,和cogroup是同樣的效果 (Bread,([3, 2],[3])) // JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> mapRdd6 = mapRdd1.groupWith(mapRdd2); // mapRdd6.foreach(x->System.out.println(x)); //(Bread,(3,3)),(Bread,(2,3)),(Cake,(2,2)) 聚合操做 // JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd7=mapRdd1.join(mapRdd2); // mapRdd7.foreach(x->System.out.println(x)); //聚合操做,將兩個maprdd並集,重複元素不會被刪掉 // JavaPairRDD<String,Integer> mapRdd8=mapRdd1.union(mapRdd2); // mapRdd8.foreach(x->System.out.println(x)); //刪除key相同的元素 // JavaPairRDD<String, Integer> mapRdd9=mapRdd1.subtractByKey(mapRdd2); // mapRdd9.foreach(x->System.out.println(x)); //求交集,只返回key,value相同的tuple // JavaPairRDD<String, Integer> mapRdd10=mapRdd1.intersection(mapRdd2); // mapRdd10.foreach(x->System.out.println(x)); } }
import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; public class SortByKeyApi { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCounter"); conf.set("spark.testing.memory", "2147480000"); conf.set("spark.default.parallelism", "4"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)經過讀取外部存儲 ----- 集羣環境使用 2)經過內存中的集合 List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>(); data.add(new Tuple2<>("Cake", 2)); data.add(new Tuple2<>("Bread", 3)); data.add(new Tuple2<>("Cheese", 4)); data.add(new Tuple2<>("Milk", 1)); data.add(new Tuple2<>("Toast", 2)); data.add(new Tuple2<>("Bread", 2)); data.add(new Tuple2<>("Egg", 6)); JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2); JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY()); //mapRdd.sortByKey().foreach(System.out::println); mapRdd.sortByKey(false).foreach(x->System.out.println(x)); // mapRdd.sortByKey(new Comparator<Tuple2<String, Integer>>() { // @Override // public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) { // return 0; // } // }); // mapRdd.f // mapRdd.mapValues(x->x+1).foreach(x->System.out.println(x)); // mapRdd.flatMapValues(()->Arrays.asList(1,1,1)); ctx.stop(); } }
import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class SortByKeyApiTest { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCounter"); conf.set("spark.default.parallelism", "4"); conf.set("spark.testing.memory", "2147480000"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)經過讀取外部存儲 ----- 集羣環境使用 2)經過內存中的集合 List<Person> data1 = new ArrayList<Person>(); data1.add(new Person("Cake",32)); data1.add(new Person("Bread",21)); data1.add(new Person("Smith",32)); data1.add(new Person("Hourse",21)); data1.add(new Person("Mary",32)); data1.add(new Person("Greey",21)); data1.add(new Person("Greey",21)); data1.add(new Person("Tom",32)); data1.add(new Person("Gao",21)); System.out.println(ctx.parallelize(data1).distinct().count()); // .sortBy(x->x, true, 2).foreach(x->System.out.println(x)); List<Tuple2<Person, Integer>> data = new ArrayList<Tuple2<Person, Integer>>(); data.add(new Tuple2<Person, Integer>(new Person("Cake",32), 2)); data.add(new Tuple2<Person, Integer>(new Person("Bread",21), 3)); data.add(new Tuple2<Person, Integer>(new Person("Smith",32), 2)); data.add(new Tuple2<Person, Integer>(new Person("Hourse",21), 3)); data.add(new Tuple2<Person, Integer>(new Person("Mary",32), 2)); data.add(new Tuple2<Person, Integer>(new Person("Greey",21), 3)); data.add(new Tuple2<Person, Integer>(new Person("Greey",11), 3)); data.add(new Tuple2<Person, Integer>(new Person("Tom",32), 2)); data.add(new Tuple2<Person, Integer>(new Person("Gao",21), 3)); JavaPairRDD<Person, Integer> dataRdd = ctx.parallelizePairs(data); dataRdd.sortByKey().foreach(x->System.out.println(x)); dataRdd.sortByKey(new Comparator<Person>() { @Override public int compare(Person o1, Person o2) { int res = o1.name.compareTo(o2.name); if(res == 0){ res = o1.age - o2.age; } return res; } }); ctx.close(); ctx.stop(); } } class Person implements Serializable, Comparable<Person>{ private static final long serialVersionUID = 1L; public Person(String name, int age) { super(); this.name = name; this.age = age; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + age; result = prime * result + ((name == null) ? 0 : name.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Person other = (Person) obj; if (age != other.age) return false; if (name == null) { if (other.name != null) return false; } else if (!name.equals(other.name)) return false; return true; } String name; int age; @Override public int compareTo(Person p) { int res = this.name.compareTo(p.name); if(res == 0){ res = this.age - p.age; } return res; } @Override public String toString() { return "Person [name=" + name + ", age=" + age + "]"; } }