persist&checkpoint&countApi

一、 persisit機制

import java.util.Iterator;

import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.api.java.function.FlatMapFunction;  
import org.apache.spark.api.java.function.Function2;  
import org.apache.spark.api.java.function.PairFunction;  
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;  
  


import java.util.Arrays;  
import java.util.Iterator;
import java.util.Scanner;


public class PersisitDemo {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setMaster("local[3]").setAppName("wordcount").set("spark.testing.memory", "2147480000");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
//        ctx.setCheckpointDir("file:///d:/checkpoint");
        final JavaRDD<String> lines = ctx.textFile("words.txt").repartition(2);

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
            	long th=Thread.currentThread().getId();
            	System.out.println("flatMap...  thread  id:"+th);
                return Arrays.asList(s.split(" ")).iterator();
            }
        }).repartition(2)
        //.persist(StorageLevel.MEMORY_ONLY());
        .cache();
        
        //使用了緩存第二次調用的時候不會再次執行
        
        while(true){
        	Scanner sc = new Scanner(System.in);
        	String line = sc.next();
        	if(line.equals("END")){
        		break;
        	}
        	
	        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {  
	            @Override
	            public Tuple2<String, Integer> call(String s) throws Exception {  
	            	long th=Thread.currentThread().getId();
	            	System.out.println("mapToPair...  thread  id:"+th);
	                return new Tuple2<String, Integer>(s, 1);
	            }
	        }).repartition(2);
	        
	        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {  
	            @Override  
	            public Integer call(Integer integer, Integer integer2) throws Exception {  
	            	long th=Thread.currentThread().getId();
	            	System.out.println("reduceByKey...  thread  id:"+th);
	                return integer + integer2;  
	            }  
	        }).repartition(2);
	        //counts.saveAsTextFile(args[1]);
	        counts.foreach(x->System.out.println(x));

	        
//	        lines.unpersist();
        }
        ctx.stop();
    }  
}

 

二、checkpoint機制

import java.util.Iterator;

import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.api.java.function.FlatMapFunction;  
import org.apache.spark.api.java.function.Function2;  
import org.apache.spark.api.java.function.PairFunction;  
import org.apache.spark.storage.StorageLevel;

import scala.Tuple2;  
  


import java.util.Arrays;  
import java.util.Iterator;
import java.util.Scanner;


public class CheckpointDemo {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("wordcount").set("spark.testing.memory", "2147480000");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        ctx.setCheckpointDir("file:///d:/checkpoint");
        final JavaRDD<String> lines = ctx.textFile("words.txt");

        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
            	System.out.println("flatMap...");
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        
        JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {  
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {  
            	System.out.println("mapToPair...");
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        ones.checkpoint();            //設置檢查點       斬斷依賴
        
        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {  
            @Override  
            public Integer call(Integer integer, Integer integer2) throws Exception {  
            	System.out.println("reduceByKey...");
                return integer + integer2;  
            }
        });
        System.out.println(counts.toDebugString());
//        counts.saveAsTextFile(args[1]);
        counts.foreach(x->System.out.println(x));
        System.out.println("after action:" + counts.toDebugString());
        ctx.stop();
    }  
}

 

三、 count API 近似計算

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;

import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.AbstractJavaRDDLike;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.partial.BoundedDouble;
import org.apache.spark.partial.PartialResult;
import org.apache.spark.storage.StorageLevel;





import scala.Tuple2;

//JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y);
public class CountApiTest {
    public static void main(String[] xx){
    	SparkConf conf = new SparkConf();
    	conf.setMaster("local");
    	conf.setAppName("Count API");
    	conf.set("spark.testing.memory", "2147480000");
//    	conf.set("spark.default.parallelism", "4");
    	JavaSparkContext ctx = new JavaSparkContext(conf);
    	//建立RDD:1)經過讀取外部存儲 ----- 集羣環境使用 2)經過內存中的集合

    	List<Integer> list = new ArrayList<Integer>();
        for(int i = 0; i < 10000; i++){
    	    list.add(i);
        }
    	JavaRDD<Integer> rdd1 = ctx.parallelize(list, 2);
    	JavaRDD<Integer> rdd2 = rdd1.union(rdd1).union(rdd1).union(rdd1);
    	
    	//System.out.println(rdd2.count());                  //計算並集後的總數
    	PartialResult<BoundedDouble> result = rdd2.countApprox(450);//1000, 300  2秒內跑完給結果,若沒有完,也要返回結果
    	System.out.println(result.initialValue().mean());
    	System.out.println(result.initialValue().low());
    	System.out.println(result.initialValue().high());
    	System.out.println(result.initialValue().confidence()); //自信程度
    	
//    	40000.0             使用2000
//    	40000.0
//    	40000.0
//    	1.0
    	
//    	40000.6       使用450
//    	39696.95761248283
//    	40304.242387517166
//    	0.95
    	
    	//0.01  0.1  偏移度的大體跑完了的任務      執行的更快
//        System.out.println(rdd2.countApproxDistinct(0.01));   //9945  
    }
}
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息