java實現spark經常使用算子之join



import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

/**
* join(otherDataSet,[numTasks]) 算子:
* 一樣的也是按照key將兩個RDD中進行彙總操做,會對每一個key所對應的兩個RDD中的數據進行笛卡爾積計算。
*
*按照key進行分類彙總,而且作笛卡爾積
*/
public class JoinOperator {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("join");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String,String>> stus = Arrays.asList(
new Tuple2<>("w1","1"),
new Tuple2<>("w2","2"),
new Tuple2<>("w3","3"),
new Tuple2<>("w2","22"),
new Tuple2<>("w1","11")
);
List<Tuple2<String,String>> scores = Arrays.asList(
new Tuple2<>("w1","a1"),
new Tuple2<>("w2","a2"),
new Tuple2<>("w2","a22"),
new Tuple2<>("w1","a11"),
new Tuple2<>("w3","a3")
);

JavaPairRDD<String,String> stusRdd = sc.parallelizePairs(stus);
JavaPairRDD<String,String> scoresRdd = sc.parallelizePairs(scores);
JavaPairRDD<String,Tuple2<String,String>> result = stusRdd.join(scoresRdd);

result.foreach(new VoidFunction<Tuple2<String, Tuple2<String, String>>>() {
@Override
public void call(Tuple2<String, Tuple2<String, String>> tuple) throws Exception {
System.err.println(tuple._1+":"+tuple._2);
}
});

}
}

微信掃描下圖二維碼加入博主知識星球,獲取更多大數據、人工智能、算法等免費學習資料哦!java

 

相關文章
相關標籤/搜索