&& 對於鍵值對類型的RDD,若是鍵是自定義類型(好比:Person),則須要重寫其hashCode 和equals方法。html
底層用的是groupByKey;subtract底層用的是subtractByKey;java
import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.List; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class IntersectionDemo { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCounter"); conf.set("spark.testing.memory", "2147480000"); JavaSparkContext ctx = new JavaSparkContext(conf); List<String> lines1 = new ArrayList<String>(); lines1.add("Hello"); lines1.add("How"); lines1.add("Moon"); // JavaRDD<String> rd1=ctx.parallelize(lines1); JavaPairRDD<String, Integer> rdd1 = ctx.parallelize(lines1, 2). mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }).partitionBy(new HashPartitioner(3)); System.out.println("rdd1:" + rdd1.partitioner()); // rdd1.foreach(x -> { // int index = x.hashCode() % 2; // System.out.println("當前數據:" + x + " 它的hashindex:" + index); // }); // System.out.println(rdd1.glom().collect()); List<String> lines2 = new ArrayList<String>(); lines2.add("Hello"); lines2.add("How"); lines2.add("Good"); JavaRDD<String> rd2=ctx.parallelize(lines2); JavaPairRDD<String, Integer> rdd2 = ctx.parallelize(lines2, 2). mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }).partitionBy(new HashPartitioner(2)); System.out.println("rdd2:" + rdd2.partitioner()); //底層是groupByKey 結合HashMap和hashset來使用 代碼複用 // JavaPairRDD<String, Integer> rdd3 = rdd1.intersection(rdd2); JavaPairRDD<String, Integer> rdd3 = rdd1.subtract(rdd2); // JavaPairRDD<String, Integer> rdd3 = rdd1.union(rdd2); System.out.println("rdd3:" + rdd3.partitioner()); System.out.println("rdd3:" + rdd3.getNumPartitions()); rdd3.foreach(x->System.out.println(x)); } }
操做:父RDD分區對子RDD的影響apache
import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.List; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class UnionDemo { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.set("spark.testing.memory", "2147480000"); conf.setAppName("WordCounter"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)經過讀取外部存儲 ----- 集羣環境使用 2)經過內存中的集合 List<Tuple2<String,Integer>> urls = new ArrayList<Tuple2<String,Integer>>(); urls.add(new Tuple2<String,Integer>("http://www.baidu.com/about.html", 3)); urls.add(new Tuple2<String,Integer>("http://www.ali.com/index.html", 2)); urls.add(new Tuple2<String,Integer>("http://www.sina.com/first.html", 4)); urls.add(new Tuple2<String,Integer>("http://www.sohu.com/index.html", 3)); urls.add(new Tuple2<String,Integer>("http://www.baidu.com/index.jsp",7)); urls.add(new Tuple2<String,Integer>("http://www.sina.com/help.html",1)); JavaPairRDD<String, Integer> urlRdd1 = ctx.parallelizePairs(urls,2); // JavaPairRDD<String, Integer> urlRdd1 = ctx.parallelizePairs(urls). // partitionBy(new HashPartitioner(2)); System.out.println("urlRdd1:" + urlRdd1.partitioner()); System.out.println("urlRdd1:" + urlRdd1.glom().collect()); List<Tuple2<String,Integer>> anotherUrls = new ArrayList<Tuple2<String,Integer>>(); anotherUrls.add(new Tuple2<String,Integer>("http://www.163.com/about.html", 3)); anotherUrls.add(new Tuple2<String,Integer>("http://www.taobao.com/index.html", 2)); anotherUrls.add(new Tuple2<String,Integer>("http://www.sina.com/first.html", 4)); anotherUrls.add(new Tuple2<String,Integer>("http://www.csdn.com/index.html", 3)); anotherUrls.add(new Tuple2<String,Integer>("http://www.facebook.com/index.jsp",7)); anotherUrls.add(new Tuple2<String,Integer>("http://www.sina.com/help.html",1)); JavaPairRDD<String, Integer> urlRdd2 = ctx.parallelizePairs(anotherUrls,2); // JavaPairRDD<String, Integer> urlRdd2 = ctx.parallelizePairs(anotherUrls). // partitionBy(new HashPartitioner(3)); System.out.println("urlRdd2:" + urlRdd2.partitioner()); System.out.println("urlRdd2:" + urlRdd2.glom().collect()); //當設置了分區器和分區數相同,則union以後的分區是同樣的 //若分區器沒有設置,就算分區數相同,則union以後的分區是兩分區之和 JavaPairRDD<String, Integer> rdd3 = urlRdd1.union(urlRdd2); System.out.println("rdd3:" + rdd3.partitioner()); System.out.println("rdd3:" + rdd3.getNumPartitions()); System.out.println("urlRdd3:" + rdd3.glom().collect()); rdd3.foreach(x->System.out.println(x)); } }
(zip底層實現就是zipPartitions)api
import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import scala.collection.Iterator; public class ZipDemo { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.set("spark.testing.memory", "2147480000"); conf.setAppName("WordCounter"); JavaSparkContext ctx = new JavaSparkContext(conf); List<String> lines1 = new ArrayList<String>(); lines1.add("Hello"); lines1.add("How"); lines1.add("Moon"); // lines1.add("Hope"); // lines1.add("Dog"); // lines1.add("House"); JavaRDD<String> rdd1 = ctx.parallelize(lines1, 2); System.out.println(rdd1.glom().collect()); List<String> lines2 = new ArrayList<String>(); lines2.add("1"); lines2.add("2"); lines2.add("3"); JavaRDD<String> rdd2 = ctx.parallelize(lines2, 2); System.out.println(rdd2.glom().collect()); //使用zip必須數量和分區數相同,否則會報錯 // JavaPairRDD<String, String> rdd3 = rdd1.zip(rdd2); // rdd3.foreach(x->System.out.println(x)); // (Hello,1) // (How,2) // (Moon,3) JavaRDD<HashMap<String, String>> rdd3 = rdd1.zipPartitions(rdd2, (x, y)-> { System.out.println("*****************"); List<HashMap<String, String>> lines = new ArrayList<HashMap<String, String>>(); // List<String> list1 = new ArrayList<String>(); while(x.hasNext()){ // list1.add(x.next()); System.out.println(x.next()); } // List<String> list2 = new ArrayList<String>(); while(y.hasNext()){ // list2.add(y.next()); System.out.println(y.next()); } return lines.iterator(); }); rdd3.foreach(x->System.out.println(x)); // ***************** // Hello // 1 // ***************** // How // Moon // 2 // 3 // JavaRDD<String> rdd3 = rdd1.zipPartitions(rdd2, // new FlatMapFunction2< // Iterator<String>, // Iterator<String>, // Iterator<String>>(){ // // @Override // public java.util.Iterator<Iterator<String>> call( // Iterator<String> x, Iterator<String> y) // throws Exception { // return null; // } // // }); // System.out.println(rdd3.collect()); // rdd3.foreach(x->System.out.println(x)); } }