Spark RDD工做原理詳解+RDD JAVA API編程

第1章 RDD 概念

1.1 RDD 爲何會產生

   RDD:Resilient Distributed Dataset 彈性分佈式數據集
  RDD 是 Spark 的基石,是實現 Spark 數據處理的核心抽象。那麼 RDD 爲何會產生呢?
  Hadoop 的 MapReduce 是一種基於數據集的工做模式,面向數據,這種工做模式通常是從存儲上加載數據集,而後操做數據集,最後寫入物理存儲設備。數據更多面臨的是一次性處理。
  MR 的這種方式對數據領域兩種常見的操做不是很高效。第一種是迭代式的算法。好比機器學習中 ALS、凸優化梯度降低等。這些都須要基於數據集或者數據集的衍生數據反覆查詢反覆操做。MR 這種模式不太合適,即便多 MR 串行處理,性能和時間也是一個問題。數據的共享依賴於磁盤。另一種是交互式數據挖掘,MR 顯然不擅長。
  java

MR 中的迭代:算法


  
Spark中的迭代:

  
  咱們須要一個效率很是快,且可以支持迭代計算和有效數據共享的模型,Spark 應運而生。RDD 是基於工做集的工做模式,更多的是面向工做流。
  可是不管是 MR 仍是 RDD 都應該具備相似位置感知、容錯和負載均衡等特性。編程

 

 

1.2 RDD 概述

1.2.1 什麼是 RDD

RDD(Resilient Distributed Dataset)叫作分佈式數據集是 Spark 中最基本的數據抽象,它表明一個不可變可分區(分片)裏面的元素可並行計算的集合(彈性)。在 Spark 中,對數據的全部操做不外乎建立 RDD、轉化已有 RDD 以及調用 RDD 操做進行求值。每一個 RDD 都被分爲多個分區,這些分區運行在集羣中的不一樣節點上。RDD 能夠包含 Python、Java、Scala 中任意類型的對象,甚至能夠包含用戶自定義的對象。RDD 具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD 容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。
  RDD 支持兩種操做:轉化操做和行動操做。RDD 的轉化操做是返回一個新的 RDD 的操做,好比 map() 和 filter(),而行動操做則是向驅動器程序返回結果或把結果寫入外部系統的操做。好比 count() 和 first()。 
  Spark 採用 惰性計算模式,RDD 只有第一次在一個行動操做中用到時,纔會真正計算。Spark 能夠優化整個計算過程。默認狀況下,Spark 的 RDD 會在你每次對它們進行行動操做時從新計算。若是想在多個行動操做中重用同一個 RDD,可使用 RDD.persist() 讓 Spark 把這個 RDD 緩存下來。緩存

1.2.2 RDD 的屬性

  1) 一組分片(Partition),即數據集的基本組成單位。對於 RDD 來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立 RDD 時指定 RDD 的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的 CPU Core 的數目。
  2) 一個計算每一個分區的函數。Spark 中 RDD 的計算是以分片爲單位的,每一個 RDD 都會實現 compute 函數以達到這個目的。compute 函數會對迭代器進行復合,不須要保存每次計算的結果。
  3) RDD 之間的依賴關係。RDD 的每次轉換都會生成一個新的 RDD,因此 RDD 之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark 能夠經過這個依賴關係從新計算丟失的分區數據,而不是對 RDD 的全部分區進行從新計算。
  4) 一個 Partitioner,即 RDD 的分片函數。當 前Spark 中實現了兩種類型的分片函數,一個是基於哈希的 HashPartitioner,另一個是基於範圍的 RangePartitioner。只有對於於 key-value 的 RDD,纔會有 Partitioner,非 key-value 的 RDD 的 Parititioner 的值是 None。Partitioner 函數不但決定了 RDD 自己的分片數量,也決定了Parent RDD Shuffle 輸出時的分片數量。
  5) 一個列表,存儲存取每一個 Partition 的優先位置(preferred location)。對於一個 HDFS 文件來講,這個列表保存的就是每一個 Partition 所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark 在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。
  RDD 是一個應用層面的邏輯概念。一個 RDD 多個分片。RDD 就是一個元數據記錄集,記錄了 RDD 內存全部的關係數據。
  網絡

1.3 RDD 彈性


1) 自動進行內存和磁盤數據存儲的切換
  Spark 優先把數據放到內存中,若是內存放不下,就會放到磁盤裏面,程序進行自動的存儲切換。
2) 基於血統的高效容錯機制
  在 RDD 進行轉換和動做的時候,會造成 RDD 的 Lineage 依賴鏈,當某一個 RDD 失效的時候,能夠經過從新計算上游的 RDD 來從新生成丟失的 RDD 數據。
3) Task 若是失敗會自動進行特定次數的重試
  RDD 的計算任務若是運行失敗,會自動進行任務的從新計算,默認次數是 4 次。
4) Stage 若是失敗會自動進行特定次數的重試
  若是 Job 的某個 Stage 階段計算失敗,框架也會自動進行任務的從新計算,默認次數也是 4 次。
5) Checkpoint 和 Persist 可主動或被動觸發
  RDD 能夠經過 Persist 持久化將 RDD 緩存到內存或者磁盤,當再次用到該 RDD 時直接讀取就行。也能夠將 RDD 進行檢查點,檢查點會將數據存儲在 HDFS 中,該 RDD 的全部父 RDD 依賴都會被移除。
6) 數據調度彈性
  Spark 把這個 JOB 執行模型抽象爲通用的有向無環圖 DAG,能夠將多 Stage 的任務串聯或並行執行,調度引擎自動處理 Stage 的失敗以及 Task 的失敗。
7) 數據分片的高度彈性
  能夠根據業務的特徵,動態調整數據分片的個數,提高總體的應用執行效率。

 

  RDD 全稱叫作彈性分佈式數據集(Resilient Distributed Datasets),它是一種分佈式的內存抽象,表示一個只讀的記錄分區的集合,它只能經過其餘 RDD 轉換而建立,爲此,RDD 支持豐富的轉換操做(如 map, join, filter, groupby 等),經過這種轉換操做,新的 RDD 則包含了如何從其餘 RDDs 衍生所必需的信息,因此說 RDDs 之間是有依賴關係的。基於 RDDs 之間的依賴,RDDs 會造成一個有向無環圖 DAG,該 DAG 描述了整個流式計算的流程,實際執行的時候,RDD 是經過血緣關係(Lineage)一鼓作氣的,即便出現數據分區丟失,也能夠經過血緣關係重建分區,總結起來,基於 RDD 的流式計算任務可描述爲:從穩定的物理存儲(如分佈式文件系統)中加載記錄,記錄被傳入由一組肯定性操做構成的 DAG,而後寫回穩定存儲。另外 RDD 還能夠將數據集緩存到內存中,使得在多個操做之間能夠重用數據集,基於這個特色能夠很方便地構建迭代型應用(圖計算、機器學習等)或者交互式數據分析應用。能夠說 Spark 最初也就是實現 RDD 的一個分佈式系統,後面經過不斷髮展壯大成爲如今較爲完善的大數據生態系統,簡單來說,Spark-RDD 的關係相似於 Hadoop-MapReduce 關係。負載均衡

1.4 RDD 特色

  RDD 表示只讀的分區的數據集,對 RDD 進行改動,只能經過 RDD 的轉換操做,由一個 RDD 獲得一個新的 RDD,新的 RDD 包含了從其餘 RDD 衍生所必需的信息。RDDs 之間存在依賴,RDD 的執行是按照血緣關係延時計算的。若是血緣關係較長,能夠經過持久化 RDD 來切斷血緣關係。框架

1.4.1 分區

  RDD 邏輯上是分區的,每一個分區的數據是抽象存在的,計算的時候會經過一個 compute 函數獲得每一個分區的數據。若是 RDD 是經過已有的文件系統構建,則 compute 函數是讀取指定文件系統中的數據,若是 RDD 是經過其餘 RDD 轉換而來,則 compute 函數是執行轉換邏輯將其餘 RDD 的數據進行轉換。
  機器學習

1.4.2 只讀

以下圖所示,RDD 是隻讀的,要想改變 RDD 中的數據,只能在現有的 RDD 基礎上建立新的 RDD。
  分佈式


  
由一個 RDD 轉換到另外一個 RDD,能夠經過豐富的操做算子實現,再也不像 MapReduce 那樣只能寫 map 和 reduce 了,以下圖所示。
  
  
RDD 的操做算子包括兩類,一類叫作 transformations,它是用來將 RDD 進行轉化,構建 RDD 的血緣關係;另外一類叫作 actions,它是用來觸發 RDD 的計算,獲得 RDD 的相關計算結果或者將 RDD 保存的文件系統中。下圖是 RDD 所支持的操做算子列表。
  

1.4.3 依賴

  RDDs 經過操做算子進行轉換,轉換獲得的新 RDD 包含了從其餘 RDDs 衍生所必需的信息,RDDs 之間維護着這種血緣關係,也稱之爲依賴。以下圖所示,依賴包括兩種,一種是窄依賴,RDDs 之間分區是一一對應的,另外一種是寬依賴,下游 RDD 的每一個分區與上游 RDD (也稱之爲父 RDD)的每一個分區都有關,是多對多的關係。
  ide


  經過 RDDs 之間的這種依賴關係,一個任務流能夠描述爲 DAG (有向無環圖),以下圖所示,在實際執行過程當中寬依賴對應於 Shuffle (圖中的 reduceByKey 和 join),窄依賴中的全部轉換操做能夠經過相似於管道的方式一鼓作氣執行(圖中 map 和 union 能夠一塊兒執行)。
  

1.4.4 緩存

  若是在應用程序中屢次使用同一個 RDD,能夠將該 RDD 緩存起來,該 RDD 只有在第一次計算的時候會根據血緣關係獲得分區的數據,在後續其餘地方用到該 RDD 的時候,會直接從緩存處取而不用再根據血緣關係計算,這樣就加速後期的重用。以下圖所示,RDD-1 通過一系列的轉換後獲得 RDD-n 並保存到 hdfs,RDD-1 在這一過程當中會有個中間結果,若是將其緩存到內存,那麼在隨後的 RDD-1 轉換到 RDD-m 這一過程當中,就不會計算其以前的 RDD-0 了。
  

1.4.5 CheckPoint

  雖然 RDD 的血緣關係自然地能夠實現容錯,當 RDD 的某個分區數據失敗或丟失,能夠經過血緣關係重建。可是對於長時間迭代型應用來講,隨着迭代的進行,RDDs 之間的血緣關係會愈來愈長,一旦在後續迭代過程當中出錯,則須要經過很是長的血緣關係去重建,勢必影響性能。爲此,RDD 支持 checkpoint 將數據保存到持久化的存儲中,這樣就能夠切斷以前的血緣關係,由於 checkpoint 後的 RDD 不須要知道它的父 RDDs 了,它能夠從 checkpoint 處拿到數據。
  給定一個 RDD 咱們至少能夠知道以下幾點信息:
  一、分區數以及分區方式;
  二、由父 RDDs 衍生而來的相關依賴信息;
  三、計算每一個分區的數據,計算步驟爲:
    1)若是被緩存,則從緩存中取的分區的數據;
    2)若是被 checkpoint,則從 checkpoint 處恢復數據;
    3)根據血緣關係計算分區的數據。

 

第2章 RDD 編程

2.1 RDD 編程模型

  在 Spark 中,RDD 被表示爲對象,經過對象上的方法調用來對 RDD 進行轉換。通過一系列的 transformations 定義 RDD 以後,就能夠調用 actions 觸發 RDD 的計算,action 能夠是嚮應用程序返回結果(count, collect 等),或者是向存儲系統保存數據(saveAsTextFile 等)。在 Spark 中,只有遇到 action,纔會執行 RDD 的計算(即延遲計算),這樣在運行時能夠經過管道的方式傳輸多個轉換。
  要使用 Spark,開發者須要編寫一個 Driver 程序,它被提交到集羣以調度運行 Worker,以下圖所示。Driver 中定義了一個或多個 RDD,並調用 RDD 上的 action,Worker 則執行 RDD 分區計算任務。
  


  Driver 和 Worker 內部示意圖:
  

2.2 RDD 建立

在 Spark 中建立 RDD 的建立方式大概能夠分爲三種:

1.從集合中建立 RDD;

2.從外部存儲建立 RDD;

3.從其餘 RDD 建立。

2.3 RDD 編程

  RDD 通常分爲數值 RDD 和鍵值對 RDD,本章不進行具體區分,先統一來看,下一章會對鍵值對 RDD 作專門說明。

2.3.1 Transformation(轉換)

  RDD 中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給 Driver
的動做時,這些轉換纔會真正運行。這種設計讓 Spark 更加有效率地運行。

經常使用的 Transformation 以下

  

 /**
* map算子案例:將集合中每個元素都乘以2
*/

private static void map(){
//建立SparkConf
SparkConf conf=new SparkConf().setAppName("map").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sparkContext=new JavaSparkContext(conf);
// 構造集合
List<Integer> numbers= Arrays.asList(1,2,3,4,5,6,7);
// 並行化集合,建立初始RDD
JavaRDD<Integer> numberRDD=sparkContext.parallelize(numbers);

// 使用map算子,將集合中的每一個元素都乘以2
// map算子,是對任何類型的RDD,均可以調用的
// 在java中,map算子接收的參數是Function對象
// 建立的Function對象,必定會讓你設置第二個泛型參數,這個泛型類型,就是返回的新元素的類型
// 同時call()方法的返回類型,也必須與第二個泛型類型同步
// 在call()方法內部,就能夠對原始RDD中的每個元素進行各類處理和計算,並返回一個新的元素
// 全部新的元素就會組成一個新的RDD
JavaRDD<Integer> doubleNumberRDD=numberRDD.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) throws Exception {
return integer*3;
}
});


// 打印新的RDD
doubleNumberRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});

// 關閉JavaSparkContext
sparkContext.close();
}



/**
* filter算子案例:過濾集合中的偶數
*/

 private static void filter() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("filter")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);

// 模擬集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// 並行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

// 對初始RDD執行filter算子,過濾出其中的偶數
// filter算子,傳入的也是Function,其餘的使用注意點,實際上和map是同樣的
// 可是,惟一的不一樣,就是call()方法的返回類型是Boolean
// 每個初始RDD中的元素,都會傳入call()方法,此時你能夠執行各類自定義的計算邏輯
// 來判斷這個元素是不是你想要的
// 若是你想在新的RDD中保留這個元素,那麼就返回true;不然,不想保留這個元素,返回false
JavaRDD<Integer> evenNumberRDD=numberRDD.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) throws Exception {
return integer%2==0;
}
});

//打印
evenNumberRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});

// 關閉JavaSparkContext
sc.close();
}



/**
* flatMap案例:將文本行拆分爲多個單詞
*/

private static void flatMap() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("flatMap")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);

// 構造集合
List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");

// 並行化集合,建立RDD
JavaRDD<String> lines = sc.parallelize(lineList);

// 對RDD執行flatMap算子,將每一行文本,拆分爲多個單詞
// flatMap算子,在java中,接收的參數是FlatMapFunction
// 咱們須要本身定義FlatMapFunction的第二個泛型類型,即,表明了返回的新元素的類型
// call()方法,返回的類型,不是U,而是Iterable<U>,這裏的U也與第二個泛型類型相同
// flatMap其實就是,接收原始RDD中的每一個元素,並進行各類邏輯的計算和處理,返回能夠返回多個元素
// 多個元素,即封裝在Iterable集合中,可使用ArrayList等集合
// 新的RDD中,即封裝了全部的新元素;也就是說,新的RDD的大小必定是 >= 原始RDD的大小
JavaRDD<String> words=lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
words.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
// 關閉JavaSparkContext
sc.close();
}


/**
* groupByKey案例:按照班級對成績進行分組
*/

 private static void groupByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("groupByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);

// 模擬集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 75),
new Tuple2<String, Integer>("class1", 90),
new Tuple2<String, Integer>("class2", 65));

// 並行化集合,建立JavaPairRDD
JavaPairRDD<String,Integer> scores=sc.parallelizePairs(scoreList);

// 針對scores RDD,執行groupByKey算子,對每一個班級的成績進行分組
// groupByKey算子,返回的仍是JavaPairRDD
// 可是,JavaPairRDD的第一個泛型類型不變,第二個泛型類型變成Iterable這種集合類型
// 也就是說,按照了key進行分組,那麼每一個key可能都會有多個value,此時多個value聚合成了Iterable
// 那麼接下來,咱們是否是就能夠經過groupedScores這種JavaPairRDD,很方便地處理某個分組內的數據
JavaPairRDD<String,Iterable<Integer>> groupedScores=scores.groupByKey();

// 打印groupedScores RDD
groupedScores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
@Override
public void call(Tuple2<String, Iterable<Integer>> s) throws Exception {
System.out.println("==========================");
System.out.println(s._1);
Iterator<Integer> ite=s._2.iterator();
while (ite.hasNext()){
System.out.println(ite.next());
}

}
});

// 關閉JavaSparkContext
sc.close();
}



/**
* reduceByKey案例:統計每一個班級的總分
*/

 private static void reduceByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("reduceByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);

// 模擬集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 75),
new Tuple2<String, Integer>("class1", 90),
new Tuple2<String, Integer>("class2", 65));

// 並行化集合,建立JavaPairRDD
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);

// 針對scores RDD,執行reduceByKey算子
// reduceByKey,接收的參數是Function2類型,它有三個泛型參數,實際上表明瞭三個值
// 第一個泛型類型和第二個泛型類型,表明了原始RDD中的元素的value的類型
// 所以對每一個key進行reduce,都會依次將第一個、第二個value傳入,將值再與第三個value傳入
// 所以此處,會自動定義兩個泛型類型,表明call()方法的兩個傳入參數的類型
// 第三個泛型類型,表明了每次reduce操做返回的值的類型,默認也是與原始RDD的value類型相同的
// reduceByKey算法返回的RDD,仍是JavaPairRDD<key, value>
JavaPairRDD<String,Integer> totalScores=scores.reduceByKey(new Function2<Integer, Integer, Integer>() {
// 對每一個key,都會將其value,依次傳入call方法
// 從而聚合出每一個key對應的一個value
// 而後,將每一個key對應的一個value,組合成一個Tuple2,做爲新RDD的元素
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
// 打印totalScores RDD
totalScores.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> s) throws Exception {
System.out.println(s._1+":"+s._2);
}
});



// 關閉JavaSparkContext
sc.close();
}

 
 

reduceByKey 和 groupByKey 的區別:




/**
* sortByKey案例:按照學生分數進行排序
*/

private static void sortByKey() {
 // 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("sortByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);

// 模擬集合
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer, String>(65, "leo"),
new Tuple2<Integer, String>(50, "tom"),
new Tuple2<Integer, String>(100, "marry"),
new Tuple2<Integer, String>(80, "jack"));

// 並行化集合,建立RDD
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);

// 對scores RDD執行sortByKey算子
// sortByKey其實就是根據key進行排序,能夠手動指定升序,或者降序
// 返回的,仍是JavaPairRDD,其中的元素內容,都是和原始的RDD如出一轍的
// 可是就是RDD中的元素的順序,不一樣了
JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(true);

// 打印sortedScored RDD
sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1 + ": " + t._2);
}

});

// 關閉JavaSparkContext
sc.close();
}


/**
* join案例:打印學生成績
*/

 private static void join() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("join")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);

// 模擬集合
List<Tuple2<Integer, String>> studentList = Arrays.asList(
new Tuple2<Integer, String>(1, "leo"),
new Tuple2<Integer, String>(2, "jack"),
new Tuple2<Integer, String>(3, "tom"));

List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1, 100),
new Tuple2<Integer, Integer>(2, 90),
new Tuple2<Integer, Integer>(3, 60));
//並行化兩個rdd
JavaPairRDD<Integer, String> students=sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores=sc.parallelizePairs(scoreList);
JavaPairRDD<Integer,Tuple2<String,Integer>> studentScores=students.join(scores);

studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception {
System.out.println("===============================");
System.out.println("class:"+t._1);
System.out.println("name:"+t._2._1);
System.out.println("name:"+t._2._2);
}
});
// 關閉JavaSparkContext
sc.close();
}


/**
* cogroup案例:打印學生成績
*/

 private static void cogroup() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("cogroup")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);

// 模擬集合
List<Tuple2<Integer, String>> studentList = Arrays.asList(
new Tuple2<Integer, String>(1, "leo"),
new Tuple2<Integer, String>(2, "jack"),
new Tuple2<Integer, String>(3, "tom"));

List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
new Tuple2<Integer, Integer>(1, 100),
new Tuple2<Integer, Integer>(2, 90),
new Tuple2<Integer, Integer>(3, 60),
new Tuple2<Integer, Integer>(1, 70),
new Tuple2<Integer, Integer>(2, 80),
new Tuple2<Integer, Integer>(3, 50));

// 並行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
//cogroup與join不一樣
// 至關因而,一個key join上的全部value,都給放到一個Iterable裏面去了
// cogroup,不太好講解,但願你們經過動手編寫咱們的案例,仔細體會其中的奧妙
JavaPairRDD<Integer,Tuple2<Iterable<String>,Iterable<Integer>>> studentScores=students.cogroup(scores);
studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {
System.out.println("student id: " + t._1);
System.out.println("student_name:"+t._2._1);
System.out.println("student score: " + t._2._2);
System.out.println("===============================");
}
});
}

 
  
 

2.3.2 Action(行動)

經常使用的 Action 以下:

1.reduce
 
 
private static void reduce() {
// 建立SparkConf和JavaSparkContext
SparkConf conf = new SparkConf()
.setAppName("reduce")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

// 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 使用reduce操做對集合中的數字進行累加
// reduce操做的原理:
// 首先將第一個和第二個元素,傳入call()方法,進行計算,會獲取一個結果,好比1 + 2 = 3
// 接着將該結果與下一個元素傳入call()方法,進行計算,好比3 + 3 = 6
// 以此類推
// 因此reduce操做的本質,就是聚合,將多個元素聚合成一個元素
int sum=numbers.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
System.out.println(sum);


// 關閉JavaSparkContext
sc.close();
}

2.collect
 
 
private static void collect() {
// 建立SparkConf和JavaSparkContext
SparkConf conf = new SparkConf()
.setAppName("collect")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

// 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 使用map操做將集合中全部數字乘以2
JavaRDD<Integer> doubleNumbers=numbers.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) throws Exception {
return integer*2;
}
});

// 不用foreach action操做,在遠程集羣上遍歷rdd中的元素
// 而使用collect操做,將分佈在遠程集羣上的doubleNumbers RDD的數據拉取到本地
// 這種方式,通常不建議使用,由於若是rdd中的數據量比較大的話,好比超過1萬條
// 那麼性能會比較差,由於要從遠程走大量的網絡傳輸,將數據獲取到本地
// 此外,除了性能差,還可能在rdd中數據量特別大的狀況下,發生oom異常,內存溢出
// 所以,一般,仍是推薦使用foreach action操做,來對最終的rdd元素進行處理
List<Integer> doubleNumberList = doubleNumbers.collect();
for(Integer num : doubleNumberList) {
System.out.println(num);
}

// 關閉JavaSparkContext
sc.close();
}

3.count


private static void count() {
// 建立SparkConf和JavaSparkContext
SparkConf conf = new SparkConf()
.setAppName("count")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

// 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 對rdd使用count操做,統計它有多少個元素
long count = numbers.count();
System.out.println(count);

// 關閉JavaSparkContext
sc.close();
}


4.take


private static void take() {
// 建立SparkConf和JavaSparkContext
SparkConf conf = new SparkConf()
.setAppName("take")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

// 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 對rdd使用count操做,統計它有多少個元素
// take操做,與collect相似,也是從遠程集羣上,獲取rdd的數據
// 可是collect是獲取rdd的全部數據,take只是獲取前n個數據
List<Integer> top3Numbers = numbers.take(3);

for(Integer num : top3Numbers) {
System.out.println(num);
}

// 關閉JavaSparkContext
sc.close();
}

5.saveAsTextFile



private static void saveAsTextFile() {
// 建立SparkConf和JavaSparkContext
SparkConf conf = new SparkConf()
.setAppName("saveAsTextFile");
JavaSparkContext sc = new JavaSparkContext(conf);

// 有一個集合,裏面有1到10,10個數字,如今要對10個數字進行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 使用map操做將集合中全部數字乘以2
JavaRDD<Integer> doubleNumbers = numbers.map(

new Function<Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}

});

// 直接將rdd中的數據,保存在HFDS文件中
// 可是要注意,咱們這裏只能指定文件夾,也就是目錄
// 那麼實際上,會保存爲目錄中的/double_number.txt/part-00000文件
doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt");

// 關閉JavaSparkContext
sc.close();
}



7.countByKey

@SuppressWarnings("unchecked")
private static void countByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf()
.setAppName("countByKey")
.setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);

// 模擬集合
List<Tuple2<String, String>> scoreList = Arrays.asList(
new Tuple2<String, String>("class1", "leo"),
new Tuple2<String, String>("class2", "jack"),
new Tuple2<String, String>("class1", "marry"),
new Tuple2<String, String>("class2", "tom"),
new Tuple2<String, String>("class2", "david"));

// 並行化集合,建立JavaPairRDD
JavaPairRDD<String, String> students = sc.parallelizePairs(scoreList);

// 對rdd應用countByKey操做,統計每一個班級的學生人數,也就是統計每一個key對應的元素個數
// 這就是countByKey的做用
// countByKey返回的類型,直接就是Map<String, Object>
Map<String, Long> studentCounts = students.countByKey();

for(Map.Entry<String, Long> studentCount : studentCounts.entrySet()) {
System.out.println(studentCount.getKey() + ": " + studentCount.getValue());
}

// 關閉JavaSparkContext
sc.close();
}
相關文章
相關標籤/搜索