JavaRDD<String> lines=sc.textFile(inputFile);
(2)分發對象集合,這裏以list爲例
List<String> list=new ArrayList<String>();
list.add("a");
list.add("b");
list.add("c");
JavaRDD<String> temp=sc.parallelize(list);
//上述方式等價於
JavaRDD<String> temp2=sc.parallelize(Arrays.asList("a","b","c"));
List<String> list=new ArrayList<String>();
//創建列表,列表中包含如下自定義表項
list.add("error:a");
list.add("error:b");
list.add("error:c");
list.add("warning:d");
list.add("hadppy ending!");
//將列表轉換爲RDD對象
JavaRDD<String> lines = sc.parallelize(list);
//將RDD對象lines中有error的表項過濾出來,放在RDD對象errorLines中
JavaRDD<String> errorLines = lines.filter(
new Function<String, Boolean>() {
public Boolean call(String v1) throws Exception {
return v1.contains("error");
}
}
);
//遍歷過濾出來的列表項
List<String> errorList = errorLines.collect();
for (String line : errorList)
System.out.println(line);
JavaRDD<String> warningLines=lines.filter( new Function<String, Boolean>() { public Boolean call(String v1) throws Exception { return v1.contains("warning"); } });JavaRDD<String> unionLines=errorLines.union(warningLines);for(String line :unionLines.collect()) System.out.println(line);
/**程序示例:接上
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
*/
def take(num: Int): JList[T]
JavaRDD<String> unionLines=errorLines.union(warningLines);輸出:
for(String line :unionLines.take(2))
System.out.println(line);
List<String> unions=unionLines.collect();遍歷輸出RDD數據集unions的每一項
for(String line :unions)
System.out.println(line);
函數名
|
實現的方法
|
用途
|
Function<T,R>
|
R call(T)
|
接收一個輸入值並返回一個輸出值,用於相似map()和filter()的操做中 |
Function<T1,T2,R> |
R call(T1,T2)
|
接收兩個輸入值並返回一個輸出值,用於相似aggregate()和fold()等操做中
|
FlatMapFunction<T,R> |
Iterable <R> call(T)
|
接收一個輸入值並返回任意個輸出,用於相似flatMap()這樣的操做中
|
JavaRDD<String> errorLines=lines.filter(
new Function<String, Boolean>() {
public Boolean call(String v1)throws Exception {
return v1.contains("error");
}
}
);
List<String> strLine=new ArrayList<String>();
strLine.add("how are you");
strLine.add("I am ok");
strLine.add("do you love me")
JavaRDD<String> input=sc.parallelize(strLine);
JavaRDD<String> words=input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
List<String> strLine=new ArrayList<String>();
strLine.add("how are you");
strLine.add("I am ok");
strLine.add("do you love me");
JavaRDD<String> input=sc.parallelize(strLine);
JavaRDD<String> words=input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
JavaPairRDD<String,Integer> counts=words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2(s, 1);
}
}
);
JavaPairRDD <String,Integer> results=counts.reduceByKey(
new org.apache.spark.api.java.function.Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}
) ;
class ContainError implements Function<String,Boolean>{
public Boolean call(String v1) throws Exception {
return v1.contains("error");
}
}
JavaRDD<String> errorLines=lines.filter(new ContainError());
for(String line :errorLines.collect())
System.out.println(line);
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4));輸出:
JavaRDD<Integer> result=rdd.map(
new Function<Integer, Integer>() {
public Integer call(Integer v1) throwsException {
return v1*v1;
}
}
);
System.out.println( StringUtils.join(result.collect(),","));
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4));結果:
JavaRDD<Integer> results=rdd.filter(
new Function<Integer, Boolean>() {
public Boolean call(Integer v1) throws Exception {
return v1!=1;
}
}
);
System.out.println(StringUtils.join(results.collect(),","));
JavaRDD<String> rdd =sc.parallelize(Arrays.asList("hello world","hello you","world i love you"));輸出:
JavaRDD<String> words=rdd.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
System.out.println(StringUtils.join(words.collect(),'\n'));
函數
|
用途
|
RDD1.distinct()
|
生成一個只包含不一樣元素的新RDD。須要數據混洗。 |
RDD1.union(RDD2)
|
返回一個包含兩個RDD中全部元素的RDD |
RDD1.intersection(RDD2)
|
只返回兩個RDD中都有的元素 |
RDD1.substr(RDD2)
|
返回一個只存在於第一個RDD而不存在於第二個RDD中的全部元素組成的RDD。須要數據混洗。 |
RDD1.cartesian(RDD2)
|
返回兩個RDD數據集的笛卡爾集 |
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2));輸出:
JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1,2));
JavaPairRDD<Integer ,Integer> rdd=rdd1.cartesian(rdd2);
for(Tuple2<Integer,Integer> tuple:rdd.collect())
System.out.println(tuple._1()+"->"+tuple._2());
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));輸出:55
Integer sum =rdd.reduce(
new Function2<Integer, Integer, Integer>() {
public Integercall(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}
);
System.out.println(sum.intValue());
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));②計算RDD數據集中全部元素的積:
Integer sum =rdd.fold(0,
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}
);
System.out.println(sum);
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));(3)aggregate()操做
Integer result =rdd.fold(1,
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1*v2;
}
}
);
System.out.println(result);
public class AvgCount implements Serializable{這個程序示例能夠實現求出RDD對象集的平均數的功能。其中addAndCount將RDD對象集中的元素合併起來放入AvgCount對象之中,combine提供兩個AvgCount對象的合併的實現。咱們初始化AvgCount(0,0),表示有0個對象,對象的和爲0,最終返回的result對象中total中儲存了全部元素的和,num儲存了元素的個數,這樣調用result對象的函數avg()就可以返回最終所需的平均數,即avg=tatal/(double)num。
public int total;
public int num;
public AvgCount(int total,int num){
this.total=total;
this.num=num;
}
public double avg(){
return total/(double)num;
}
static Function2<AvgCount,Integer,AvgCount> addAndCount=
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) throws Exception {
a.total+=x;
a.num+=1;
return a;
}
};
static Function2<AvgCount,AvgCount,AvgCount> combine=
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) throws Exception {
a.total+=b.total;
a.num+=b.num;
return a;
}
};
public static void main(String args[]){
SparkConf conf = new SparkConf().setMaster("local").setAppName("my app");
JavaSparkContext sc = new JavaSparkContext(conf);
AvgCount intial =new AvgCount(0,0);
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
AvgCount result=rdd.aggregate(intial,addAndCount,combine);
System.out.println(result.avg());
}
}
級別 |
使用的空間
|
cpu時間
|
是否在內存
|
是否在磁盤
|
備註
|
MEMORY_ONLY
|
高 |
低
|
是
|
否
|
直接儲存在內存 |
MEMORY_ONLY_SER |
低
|
高
|
是
|
否
|
序列化後儲存在內存裏
|
MEMORY_AND_DISK
|
低 |
中等
|
部分
|
部分
|
若是數據在內存中放不下,溢寫在磁盤上 |
MEMORY_AND_DISK_SER
|
低 |
高
|
部分
|
部分
|
數據在內存中放不下,溢寫在磁盤中。內存中存放序列化的數據。 |
DISK_ONLY
|
低
|
高
|
否
|
是
|
直接儲存在硬盤裏面
|
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5));RDD還有unpersist()方法,調用該方法能夠手動把持久化的RDD從緩存中移除。
rdd.persist(StorageLevel.MEMORY_ONLY());
System.out.println(rdd.count());
System.out.println(StringUtils.join(rdd.collect(),','));
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5));
JavaDoubleRDD result=rdd.mapToDouble(
new DoubleFunction<Integer>() {
public double call(Integer integer) throws Exception {
return (double) integer*integer;
}
}
);
System.out.println(result.max());