intersection &union&zip

&& 對於鍵值對類型的RDD,若是鍵是自定義類型(好比:Person),則須要重寫其hashCode 和equals方法。html

一、 intersection

底層用的是groupByKey;subtract底層用的是subtractByKey;java

import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;


public class IntersectionDemo {
    public static void main(String[] xx){
    	SparkConf conf = new SparkConf();
    	conf.setMaster("local");
    	conf.setAppName("WordCounter");
    	conf.set("spark.testing.memory", "2147480000");
    	JavaSparkContext ctx = new JavaSparkContext(conf);

    	List<String> lines1 = new ArrayList<String>();
    	lines1.add("Hello");
    	lines1.add("How");
    	lines1.add("Moon");
    	
//    	JavaRDD<String> rd1=ctx.parallelize(lines1);
    	
    	JavaPairRDD<String, Integer> rdd1 = ctx.parallelize(lines1, 2).
    			mapToPair(new PairFunction<String, String, Integer>() {  
    	            @Override  
    	            public Tuple2<String, Integer> call(String s) throws Exception {  
    	                return new Tuple2<String, Integer>(s, 1);  
    	            }  
    	        }).partitionBy(new HashPartitioner(3));
   	
    	System.out.println("rdd1:" + rdd1.partitioner());
//    	rdd1.foreach(x -> {
//    		int index = x.hashCode() % 2;
//    		System.out.println("當前數據:" + x + " 它的hashindex:" + index);
//    	});
//    	System.out.println(rdd1.glom().collect());
    	
    	
    	List<String> lines2 = new ArrayList<String>();
    	lines2.add("Hello");
    	lines2.add("How");
    	lines2.add("Good");

    	JavaRDD<String> rd2=ctx.parallelize(lines2);
    	
    	JavaPairRDD<String, Integer> rdd2 = ctx.parallelize(lines2, 2).
    			mapToPair(new PairFunction<String, String, Integer>() {  
    	            @Override  
    	            public Tuple2<String, Integer> call(String s) throws Exception {  
    	                return new Tuple2<String, Integer>(s, 1);  
    	            }  
    	        }).partitionBy(new HashPartitioner(2));
    	System.out.println("rdd2:" + rdd2.partitioner());
    	
    	//底層是groupByKey  結合HashMap和hashset來使用   代碼複用
//    	JavaPairRDD<String, Integer> rdd3 = rdd1.intersection(rdd2);
    	
    	JavaPairRDD<String, Integer> rdd3 = rdd1.subtract(rdd2);
//    	JavaPairRDD<String, Integer> rdd3 = rdd1.union(rdd2);
    	System.out.println("rdd3:" + rdd3.partitioner());
    	System.out.println("rdd3:" + rdd3.getNumPartitions());
    	rdd3.foreach(x->System.out.println(x));
    }
}

 

二、 union

操做:父RDD分區對子RDD的影響apache

import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;


public class UnionDemo {
    public static void main(String[] xx){
    	SparkConf conf = new SparkConf();
    	conf.setMaster("local");
    	conf.set("spark.testing.memory", "2147480000");
    	conf.setAppName("WordCounter");

    	JavaSparkContext ctx = new JavaSparkContext(conf);
    	//建立RDD:1)經過讀取外部存儲 ----- 集羣環境使用 2)經過內存中的集合

    	List<Tuple2<String,Integer>> urls = new ArrayList<Tuple2<String,Integer>>();
    	urls.add(new Tuple2<String,Integer>("http://www.baidu.com/about.html", 3));
    	urls.add(new Tuple2<String,Integer>("http://www.ali.com/index.html", 2));
    	urls.add(new Tuple2<String,Integer>("http://www.sina.com/first.html", 4));
    	urls.add(new Tuple2<String,Integer>("http://www.sohu.com/index.html", 3));
    	urls.add(new Tuple2<String,Integer>("http://www.baidu.com/index.jsp",7));
    	urls.add(new Tuple2<String,Integer>("http://www.sina.com/help.html",1));
    	
    	JavaPairRDD<String, Integer> urlRdd1 = ctx.parallelizePairs(urls,2);
//    	JavaPairRDD<String, Integer> urlRdd1 = ctx.parallelizePairs(urls).
//    			                         partitionBy(new HashPartitioner(2));
    	
    	
    	System.out.println("urlRdd1:" + urlRdd1.partitioner());
    	System.out.println("urlRdd1:" + urlRdd1.glom().collect());

    	List<Tuple2<String,Integer>> anotherUrls = new ArrayList<Tuple2<String,Integer>>();
    	anotherUrls.add(new Tuple2<String,Integer>("http://www.163.com/about.html", 3));
    	anotherUrls.add(new Tuple2<String,Integer>("http://www.taobao.com/index.html", 2));
    	anotherUrls.add(new Tuple2<String,Integer>("http://www.sina.com/first.html", 4));
    	anotherUrls.add(new Tuple2<String,Integer>("http://www.csdn.com/index.html", 3));
    	anotherUrls.add(new Tuple2<String,Integer>("http://www.facebook.com/index.jsp",7));
    	anotherUrls.add(new Tuple2<String,Integer>("http://www.sina.com/help.html",1));
    	
    	JavaPairRDD<String, Integer> urlRdd2 = ctx.parallelizePairs(anotherUrls,2);
//    	JavaPairRDD<String, Integer> urlRdd2 = ctx.parallelizePairs(anotherUrls).
//    			                      partitionBy(new HashPartitioner(3));
    	System.out.println("urlRdd2:" + urlRdd2.partitioner());
    	System.out.println("urlRdd2:" + urlRdd2.glom().collect());
    	
    	//當設置了分區器和分區數相同,則union以後的分區是同樣的
    	//若分區器沒有設置,就算分區數相同,則union以後的分區是兩分區之和
    	
    	JavaPairRDD<String, Integer> rdd3 = urlRdd1.union(urlRdd2);
    	System.out.println("rdd3:" + rdd3.partitioner());
    	System.out.println("rdd3:" + rdd3.getNumPartitions());
    	System.out.println("urlRdd3:" + rdd3.glom().collect());
    	
    	rdd3.foreach(x->System.out.println(x));
    	
    }
}

三、 zip操做 與 zipPartitions操做

(zip底層實現就是zipPartitions)api

import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;
import scala.collection.Iterator;


public class ZipDemo {
    public static void main(String[] xx){
    	SparkConf conf = new SparkConf();
    	conf.setMaster("local");
    	conf.set("spark.testing.memory", "2147480000");
    	conf.setAppName("WordCounter");
    	JavaSparkContext ctx = new JavaSparkContext(conf);

    	List<String> lines1 = new ArrayList<String>();
    	lines1.add("Hello");
    	lines1.add("How");
    	lines1.add("Moon");
//    	lines1.add("Hope");
//    	lines1.add("Dog");
//    	lines1.add("House");
    	JavaRDD<String> rdd1 = ctx.parallelize(lines1, 2);
    	System.out.println(rdd1.glom().collect());

    	List<String> lines2 = new ArrayList<String>();
    	lines2.add("1");
    	lines2.add("2");
    	lines2.add("3");
    	JavaRDD<String> rdd2 = ctx.parallelize(lines2, 2);
    	System.out.println(rdd2.glom().collect());
    	
    	
    	//使用zip必須數量和分區數相同,否則會報錯
    	
//    	JavaPairRDD<String, String> rdd3 = rdd1.zip(rdd2);
//    	rdd3.foreach(x->System.out.println(x));
//    	(Hello,1)
//    	(How,2)
//    	(Moon,3)
    	
    	JavaRDD<HashMap<String, String>> rdd3 = rdd1.zipPartitions(rdd2,
    			(x, y)-> {
    		         System.out.println("*****************");
    				 List<HashMap<String, String>> lines = new ArrayList<HashMap<String, String>>();
//    		         List<String> list1 = new ArrayList<String>(); 
    		         while(x.hasNext()){												
//    		        	 list1.add(x.next());
    		        	 
    		        	 System.out.println(x.next());
    		         }
//    		         List<String> list2 = new ArrayList<String>(); 
    		         while(y.hasNext()){
//    		        	 list2.add(y.next());
    		        	 System.out.println(y.next());
    		         }
    				 
    		         return lines.iterator();
    		    });
    	rdd3.foreach(x->System.out.println(x));
    	
//    	*****************
//    	Hello
//    	1
//    	*****************
//    	How
//    	Moon
//    	2
//    	3
    	
//    	JavaRDD<String> rdd3 = rdd1.zipPartitions(rdd2,
//    			new FlatMapFunction2<
//    			                Iterator<String>,
//    			                Iterator<String>,
//    			                Iterator<String>>(){
//
//					@Override
//					public java.util.Iterator<Iterator<String>> call(
//							Iterator<String> x, Iterator<String> y)
//							throws Exception {
//						return null;
//					}
//    		
//    	});
//    	System.out.println(rdd3.collect());
//    	rdd3.foreach(x->System.out.println(x));
    }
}
相關文章
相關標籤/搜索