spark 中的RDD編程 -如下基於Java api

1.RDD介紹:java

    RDD,彈性分佈式數據集,即分佈式的元素集合。在spark中,對全部數據的操做不外乎是建立RDD、轉化已有的RDD以及調用RDD操做進行求值。在這一切的背後,Spark會自動將RDD中的數據分發到集羣中,並將操做並行化。
    Spark中的RDD就是一個不可變的分佈式對象集合。每一個RDD都被分爲多個分區,這些分區運行在集羣中的不一樣節點上。RDD能夠包含Python,Java,Scala中任意類型的對象,甚至能夠包含用戶自定義的對象。
    用戶可使用兩種方法建立RDD: 讀取一個外部數據集,或在驅動器程序中分發驅動器程序中的對象集合,好比list或者set。
    RDD的轉化操做都是惰性求值的,這意味着咱們對RDD調用轉化操做,操做不會當即執行。相反,Spark會在內部記錄下所要求執行的操做的相關信息。咱們不該該把RDD看作存放着特定數據的數據集,而最好把每一個RDD當作咱們經過轉化操做構建出來的、記錄如何計算數據的指令列表。數據讀取到RDD中的操做也是惰性的,數據只會在必要時讀取。轉化操做和讀取操做都有可能屢次執行。
2.建立RDD數據集
    (1)讀取一個外部數據集
JavaRDD<String> lines=sc.textFile(inputFile);
    (2)分發對象集合,這裏以list爲例
List<String> list=new ArrayList<String>();
list.add("a");
list.add("b");
list.add("c");
JavaRDD<String> temp=sc.parallelize(list);
//上述方式等價於
JavaRDD<String> temp2=sc.parallelize(Arrays.asList("a","b","c"));
3.RDD操做
(1)轉化操做
    用java實現過濾器轉化操做:
List<String> list=new ArrayList<String>();
//創建列表,列表中包含如下自定義表項
list.add("error:a");
list.add("error:b");
list.add("error:c");
list.add("warning:d");
list.add("hadppy ending!");
//將列表轉換爲RDD對象
JavaRDD<String> lines = sc.parallelize(list);
//將RDD對象lines中有error的表項過濾出來,放在RDD對象errorLines中
JavaRDD<String> errorLines = lines.filter(
new Function<String, Boolean>() {
public Boolean call(String v1) throws Exception {
return v1.contains("error");
}
}
);
//遍歷過濾出來的列表項
List<String> errorList = errorLines.collect();
for (String line : errorList)
System.out.println(line);
       
輸出:
error:a
error:b
error:c
可見,列表list中包含詞語error的表項都被正確的過濾出來了。
(2)合併操做
將兩個RDD數據集合併爲一個RDD數據集
接上述程序示例:
  1. JavaRDD<String> warningLines=lines.filter(
  2. new Function<String, Boolean>() {
  3. public Boolean call(String v1) throws Exception {
  4. return v1.contains("warning");
  5. }
  6. }
  7. );
  8. JavaRDD<String> unionLines=errorLines.union(warningLines);
  9. for(String line :unionLines.collect())
  10. System.out.println(line);
輸出:
error:a
error:b
error:c
warning:d
可見,將原始列表項中的全部error項和warning項都過濾出來了。
(3)獲取RDD數據集中的部分或者所有元素
①獲取RDD數據集中的部分元素   .take(int num)  返回值List<T>   
獲取RDD數據集中的前num項。
/**
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
*/
def take(num: Int): JList[T]
程序示例:接上
JavaRDD<String> unionLines=errorLines.union(warningLines);

for(String line :unionLines.take(2))
System.
out.println(line);
輸出:
error:a
error:b
可見,輸出了RDD數據集unionLines的前2項
②獲取RDD數據集中的所有元素 .collect() 返回值 List<T>
程序示例:
List<String> unions=unionLines.collect();
for(String line :unions)
System.
out.println(line);
遍歷輸出RDD數據集unions的每一項
4.向spark傳遞函數
函數名
實現的方法
用途
Function<T,R>
R call(T)
接收一個輸入值並返回一個輸出值,用於相似map()和filter()的操做中
Function<T1,T2,R>
R call(T1,T2)
接收兩個輸入值並返回一個輸出值,用於相似aggregate()和fold()等操做中
FlatMapFunction<T,R>
Iterable <R> call(T)
接收一個輸入值並返回任意個輸出,用於相似flatMap()這樣的操做中
 ①Function<T,R>
JavaRDD<String> errorLines=lines.filter(
new Function<String, Boolean>() {
public Boolean call(String v1)throws Exception {
return v1.contains("error");
}
}
);
 
過濾RDD數據集中包含error的表項,新建RDD數據集errorLines
②FlatMapFunction<T,R> 
List<String> strLine=new ArrayList<String>();
strLine.add("how are you");
strLine.add("I am ok");
strLine.add("do you love me")
JavaRDD<String> input=sc.parallelize(strLine);
JavaRDD<String> words=input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
將文本行的單詞過濾出來,並將全部的單詞保存在RDD數據集words中。
 ③ Function<T1,T2,R>
List<String> strLine=new ArrayList<String>();
strLine.add("how are you");
strLine.add("I am ok");
strLine.add("do you love me");
JavaRDD<String> input=sc.parallelize(strLine);
JavaRDD<String> words=input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
JavaPairRDD<String,Integer> counts=words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2(s, 1);
}
}
);
JavaPairRDD <String,Integer> results=counts.reduceByKey(
new org.apache.spark.api.java.function.Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}
) ;
上述程序是spark中的wordcount實現方式,其中的reduceByKey操做的Function2函數定義了遇到相同的key時,value是如何reduce的->直接將二者的value相加。
*注意:
能夠將咱們的函數類定義爲使用匿名內部類,就像上述程序實現的那樣,也能夠建立一個具名類,就像這樣:
class ContainError implements Function<String,Boolean>{
public Boolean call(String v1) throws Exception {
return v1.contains("error");
}
}
JavaRDD<String> errorLines=lines.filter(new ContainError());
for(String line :errorLines.collect())
System.out.println(line);
具名類也能夠有參數,就像上述過濾出含有」error「的表項,咱們能夠自定義到底含有哪一個詞語,就像這樣,程序就更有普適性了。
 
5.針對每一個元素的轉化操做:
    轉化操做map()接收一個函數,把這個函數用於RDD中的每一個元素,將函數的返回結果做爲結果RDD中對應的元素。關鍵詞: 轉化
    轉化操做filter()接受一個函數,並將RDD中知足該函數的元素放入新的RDD中返回。關鍵詞: 過濾
示例圖以下所示:
①map()
計算RDD中各值的平方
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4));
JavaRDD<Integer> result=rdd.map(
new Function<Integer, Integer>() {
public Integer call(Integer v1) throwsException {
return v1*v1;
}
}
);
System.out.println( StringUtils.join(result.collect(),","));
輸出:
1,4,9,16
filter()
② 去除RDD集合中值爲1的元素:
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4));
JavaRDD<Integer> results=rdd.filter(
new Function<Integer, Boolean>() {
public Boolean call(Integer v1) throws Exception {
return v1!=1;
}
}
);
System.out.println(StringUtils.join(results.collect(),","));
結果:
2,3,4
③ 有時候,咱們但願對每一個輸入元素生成多個輸出元素。實現該功能的操做叫作flatMap()。和map()相似,咱們提供給flatMap()的函數被分別應用到了輸入的RDD的每一個元素上。不過返回的不是一個元素,而是一個返回值序列的迭代器。輸出的RDD倒不是由迭代器組成的。咱們獲得的是一個包含各個迭代器能夠訪問的全部元素的RDD。flatMap()的一個簡單用途是將輸入的字符串切分紅單詞,以下所示: 
JavaRDD<String> rdd =sc.parallelize(Arrays.asList("hello world","hello you","world i love you"));
JavaRDD<String> words=rdd.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
System.out.println(StringUtils.join(words.collect(),'\n'));
輸出:
hello
world
hello
you
world
i
love
you
6.集合操做
 
RDD中的集合操做
函數
用途
RDD1.distinct()
生成一個只包含不一樣元素的新RDD。須要數據混洗。
RDD1.union(RDD2)
返回一個包含兩個RDD中全部元素的RDD
RDD1.intersection(RDD2)
只返回兩個RDD中都有的元素
RDD1.substr(RDD2)
返回一個只存在於第一個RDD而不存在於第二個RDD中的全部元素組成的RDD。須要數據混洗。
集合操做對笛卡爾集的處理:
 
RDD1.cartesian(RDD2)
返回兩個RDD數據集的笛卡爾集
程序示例:生成RDD集合{1,2} 和{1,2}的笛卡爾集
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2));
JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1,2));
JavaPairRDD<Integer ,Integer> rdd=rdd1.cartesian(rdd2);
for(Tuple2<Integer,Integer> tuple:rdd.collect())
System.out.println(tuple._1()+"->"+tuple._2());
輸出:
1->1
1->2
2->1
2->2
7.行動操做
(1)reduce操做
    reduce()接收一個函數做爲參數,這個函數要操做兩個RDD的元素類型的數據並返回一個一樣類型的新元素。一個簡單的例子就是函數+,能夠用它來對咱們的RDD進行累加。使用reduce(),能夠很方便地計算出RDD中全部元素的總和,元素的個數,以及其餘類型的聚合操做。
    如下是求RDD數據集全部元素和的程序示例:
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
Integer sum =rdd.reduce(
new Function2<Integer, Integer, Integer>() {
public Integercall(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}
);
System.out.println(sum.intValue());
輸出:55
(2)fold()操做
    接收一個與reduce()接收的函數簽名相同的函數,再加上一個初始值來做爲每一個分區第一次調用時的結果。你所提供的初始值應當是你提供的操做的單位元素,也就是說,使用你的函數對這個初始值進行屢次計算不會改變結果(例如+對應的0,*對應的1,或者拼接操做對應的空列表)。
    程序實例:
①計算RDD數據集中全部元素的和:
zeroValue=0;//求和時,初始值爲0。
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
Integer sum =rdd.fold(0,
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}
);
System.out.println(sum);
②計算RDD數據集中全部元素的積:
zeroValue=1;//求積時,初始值爲1。
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
Integer result =rdd.fold(1,
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1*v2;
}
}
);
System.out.println(result);
(3)aggregate()操做
    aggregate()函數返回值類型 沒必要與所操做的RDD類型相同
    與fold()相似,使用aggregate()時,須要提供咱們期待返回的類型的初始值。而後經過一個函數把RDD中的元素合併起來放入累加器。考慮到每一個節點是在本地進行累加的,最終,還須要提供第二個函數來將累加器兩兩合併。
如下是程序實例:
public class AvgCount implements Serializable{
public int total;
public int num;
public AvgCount(int total,int num){
this.total=total;
this.num=num;
}
public double avg(){
return total/(double)num;
}
static Function2<AvgCount,Integer,AvgCount> addAndCount=
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) throws Exception {
a.total+=x;
a.num+=1;
return a;
}
};
static Function2<AvgCount,AvgCount,AvgCount> combine=
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) throws Exception {
a.total+=b.total;
a.num+=b.num;
return a;
}
};
public static void main(String args[]){

SparkConf conf = new SparkConf().setMaster("local").setAppName("my app");
JavaSparkContext sc = new JavaSparkContext(conf);

AvgCount intial =new AvgCount(0,0);
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
AvgCount result=rdd.aggregate(intial,addAndCount,combine);
System.out.println(result.avg());

}

}
這個程序示例能夠實現求出RDD對象集的平均數的功能。其中addAndCount將RDD對象集中的元素合併起來放入AvgCount對象之中,combine提供兩個AvgCount對象的合併的實現。咱們初始化AvgCount(0,0),表示有0個對象,對象的和爲0,最終返回的result對象中total中儲存了全部元素的和,num儲存了元素的個數,這樣調用result對象的函數avg()就可以返回最終所需的平均數,即avg=tatal/(double)num。
8.持久化緩存
    由於Spark RDD是惰性求值的,而有時咱們但願能屢次使用同一個RDD。 若是簡單地對RDD調用行動操做,Spark每次都會重算RDD以及它的全部依賴。這在迭代算法中消耗格外大,由於迭代算法經常會屢次使用同一組數據。
    爲了不屢次計算同一個RDD,可讓Spark對數據進行持久化。當咱們讓Spark持久化存儲一個RDD時,計算出RDD的節點會分別保存它們所求出的分區數據。
    出於不一樣的目的,咱們能夠爲RDD選擇不一樣的持久化級別。默認狀況下persist()會把數據以序列化的形式緩存在JVM的堆空間中
                                                         不一樣關鍵字對應的存儲級別表
級別
使用的空間
cpu時間
是否在內存
是否在磁盤
備註
MEMORY_ONLY
直接儲存在內存
MEMORY_ONLY_SER
序列化後儲存在內存裏
MEMORY_AND_DISK
中等
部分
部分
若是數據在內存中放不下,溢寫在磁盤上
MEMORY_AND_DISK_SER
部分
部分
數據在內存中放不下,溢寫在磁盤中。內存中存放序列化的數據。
DISK_ONLY
直接儲存在硬盤裏面
程序示例:將RDD數據集持久化在內存中。
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5));
rdd.persist(StorageLevel.MEMORY_ONLY());
System.out.println(rdd.count());
System.out.println(StringUtils.join(rdd.collect(),','));
RDD還有unpersist()方法,調用該方法能夠手動把持久化的RDD從緩存中移除。
9.不一樣的RDD類型
    Java中有兩個專門的類JavaDoubleRDD和JavaPairRDD,來處理特殊類型的RDD,這兩個類還針對這些類型提供了額外的函數,折讓你能夠更加了解所發生的一切,可是也顯得有些累贅。
    要構建這些特殊類型的RDD,須要使用特殊版本的類來替代通常使用的Function類。若是要從T類型的RDD建立出一個DoubleRDD,咱們就應當在映射操做中使用DoubleFunction<T>來替代Function<T,Double>。
程序實例:如下是一個求RDD每一個對象的平方值的程序實例,將普通的RDD對象轉化爲DoubleRDD對象,最後調用DoubleRDD對象的max()方法,返回生成的平方值中的最大值。
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5));
JavaDoubleRDD result=rdd.mapToDouble(
new DoubleFunction<Integer>() { public double call(Integer integer) throws Exception { return (double) integer*integer; } });System.out.println(result.max());
相關文章
相關標籤/搜索