RDD(Resilient Distributed Dataset)叫作彈性分佈式數據集,是Spark中最基本的數據抽象,它表明一個不可變、可分區、裏面的元素可並行計算的集合。RDD具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。java
(1)一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。數據庫
(2)一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。apache
(3)RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。編程
(4)一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。api
(5)一個列表,存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。數組
其中hello.txt緩存
由外部存儲系統的數據集建立,包括本地的文件系統,還有全部Hadoop支持的數據集,好比HDFS、Cassandra、HBase等app
scala> val file = sc.textFile("/spark/hello.txt")
由一個已經存在的Scala集合建立。maven
scala> val array = Array(1,2,3,4,5) array: Array[Int] = Array(1, 2, 3, 4, 5) scala> val rdd = sc.parallelize(array) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:26 scala>
讀取數據庫等等其餘的操做。也能夠生成RDD。分佈式
RDD能夠經過其餘的RDD轉換而來的。
Spark支持兩個類型(算子)操做:Transformation和Action
主要作的是就是將一個已有的RDD生成另一個RDD。Transformation具備lazy特性(延遲加載)。Transformation算子的代碼不會真正被執行。只有當咱們的程序裏面遇到一個action算子的時候,代碼纔會真正的被執行。這種設計讓Spark更加有效率地運行。
經常使用的Transformation:
轉換 |
含義 |
map(func) |
返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成 |
filter(func) |
返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成 |
flatMap(func) |
相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素) |
mapPartitions(func) |
相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) |
相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是 (Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) |
根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) |
對源RDD和參數RDD求並集後返回一個新的RDD |
intersection(otherDataset) |
對源RDD和參數RDD求交集後返回一個新的RDD |
distinct([numTasks])) |
對源RDD進行去重後返回一個新的RDD |
groupByKey([numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
先按分區聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(_+_,_+_) 對k/y的RDD進行操做 |
sortByKey([ascending], [numTasks]) |
在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) |
與sortByKey相似,可是更靈活 第一個參數是根據什麼排序 第二個是怎麼排序 false倒序 第三個排序後分區數 默認與原RDD同樣 |
join(otherDataset, [numTasks]) |
在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD 至關於內鏈接(求交集) |
cogroup(otherDataset, [numTasks]) |
在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD |
cartesian(otherDataset) |
兩個RDD的笛卡爾積 的成不少個K/V |
pipe(command, [envVars]) |
調用外部程序 |
coalesce(numPartitions) |
從新分區 第一個參數是要分多少區,第二個參數是否shuffle 默認false 少分區變多分區 true 多分區變少分區 false |
repartition(numPartitions) |
從新分區 必須shuffle 參數是要分多少區 少變多 |
repartitionAndSortWithinPartitions(partitioner) |
從新分區+排序 比先分區再排序效率高 對K/V的RDD進行操做 |
foldByKey(zeroValue)(seqOp) |
該函數用於K/V作摺疊,合併處理 ,與aggregate相似 第一個括號的參數應用於每一個V值 第二括號函數是聚合例如:_+_ |
combineByKey |
合併相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) |
partitionBy(partitioner) |
對RDD進行分區 partitioner是分區器 例如new HashPartition(2 |
cache |
RDD緩存,能夠避免重複計算從而減小時間,區別:cache內部調用了persist算子,cache默認就一個緩存級別MEMORY-ONLY ,而persist則能夠選擇緩存級別 |
persist |
|
|
|
Subtract(rdd) |
返回前rdd元素不在後rdd的rdd |
leftOuterJoin |
leftOuterJoin相似於SQL中的左外關聯left outer join,返回結果之前面的RDD爲主,關聯不上的記錄爲空。只能用於兩個RDD之間的關聯,若是要多個RDD關聯,多關聯幾回便可。 |
rightOuterJoin |
rightOuterJoin相似於SQL中的有外關聯right outer join,返回結果以參數中的RDD爲主,關聯不上的記錄爲空。只能用於兩個RDD之間的關聯,若是要多個RDD關聯,多關聯幾回便可 |
subtractByKey |
substractByKey和基本轉換操做中的subtract相似只不過這裏是針對K的,返回在主RDD中出現,而且不在otherRDD中出現的元素 |
觸發代碼的運行,咱們一段spark代碼裏面至少須要有一個action操做。
經常使用的Action:
動做 |
含義 |
reduce(func) |
經過func函數彙集RDD中的全部元素,這個功能必須是課交換且可並聯的 |
collect() |
在驅動程序中,以數組的形式返回數據集的全部元素 |
count() |
返回RDD的元素個數 |
first() |
返回RDD的第一個元素(相似於take(1)) |
take(n) |
返回一個由數據集的前n個元素組成的數組 |
takeSample(withReplacement,num, [seed]) |
返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(n, [ordering]) |
|
saveAsTextFile(path) |
將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本 |
saveAsSequenceFile(path) |
將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。 |
saveAsObjectFile(path) |
|
countByKey() |
針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。 |
foreach(func) |
在數據集的每個元素上,運行函數func進行更新。 |
aggregate |
先對分區進行操做,在整體操做 |
reduceByKeyLocally |
|
lookup |
|
top |
|
fold |
|
foreachPartition |
|
|
|
使用maven進行項目構建
查看官方網站,須要導入2個依賴包
詳細代碼
SparkWordCountWithScala.scala
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SparkWordCountWithScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() /** * 若是這個參數不設置,默認認爲你運行的是集羣模式 * 若是設置成local表明運行的是local模式 */ conf.setMaster("local") //設置任務名 conf.setAppName("WordCount") //建立SparkCore的程序入口 val sc = new SparkContext(conf) //讀取文件 生成RDD val file: RDD[String] = sc.textFile("E:\\hello.txt") //把每一行數據按照,分割 val word: RDD[String] = file.flatMap(_.split(",")) //讓每個單詞都出現一次 val wordOne: RDD[(String, Int)] = word.map((_,1)) //單詞計數 val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_) //按照單詞出現的次數 降序排序 val sortRdd: RDD[(String, Int)] = wordCount.sortBy(tuple => tuple._2,false) //將最終的結果進行保存 sortRdd.saveAsTextFile("E:\\result") sc.stop() }
運行結果
SparkWordCountWithJava7.java
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class SparkWordCountWithJava7 { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCount"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> fileRdd = sc.textFile("E:\\hello.txt"); JavaRDD<String> wordRDD = fileRdd.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(",")).iterator(); } }); JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<>(word, 1); } }); JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); JavaPairRDD<Integer, String> count2WordRDD = wordCountRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception { return new Tuple2<>(tuple._2, tuple._1); } }); JavaPairRDD<Integer, String> sortRDD = count2WordRDD.sortByKey(false); JavaPairRDD<String, Integer> resultRDD = sortRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception { return new Tuple2<>(tuple._2, tuple._1); } }); resultRDD.saveAsTextFile("E:\\result7"); } }
lambda表達式
SparkWordCountWithJava8.java
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; public class SparkWordCountWithJava8 { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("WortCount"); conf.setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> fileRDD = sc.textFile("E:\\hello.txt"); JavaRDD<String> wordRdd = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator()); JavaPairRDD<String, Integer> wordOneRDD = wordRdd.mapToPair(word -> new Tuple2<>(word, 1)); JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey((x, y) -> x + y); JavaPairRDD<Integer, String> count2WordRDD = wordCountRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); JavaPairRDD<Integer, String> sortRDD = count2WordRDD.sortByKey(false); JavaPairRDD<String, Integer> resultRDD = sortRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); resultRDD.saveAsTextFile("E:\\result8"); }
因爲RDD是粗粒度的操做數據集,每一個Transformation操做都會生成一個新的RDD,因此RDD之間就會造成相似流水線的先後依賴關係;RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。如圖所示顯示了RDD之間的依賴關係。
從圖中可知:
窄依賴:是指每一個父RDD的一個Partition最多被子RDD的一個Partition所使用,例如map、filter、union等操做都會產生窄依賴;(獨生子女)
寬依賴:是指一個父RDD的Partition會被多個子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操做都會產生寬依賴;(超生)
須要特別說明的是對join操做有兩種狀況:
(1)圖中左半部分join:若是兩個RDD在進行join操做時,一個RDD的partition僅僅和另外一個RDD中已知個數的Partition進行join,那麼這種類型的join操做就是窄依賴,例如圖1中左半部分的join操做(join with inputs co-partitioned);
(2)圖中右半部分join:其它狀況的join操做就是寬依賴,例如圖1中右半部分的join操做(join with inputs not co-partitioned),因爲是須要父RDD的全部partition進行join的轉換,這就涉及到了shuffle,所以這種類型的join操做也是寬依賴。
總結:
在這裏咱們是從父RDD的partition被使用的個數來定義窄依賴和寬依賴,所以能夠用一句話歸納下:若是父RDD的一個Partition被子RDD的一個Partition所使用就是窄依賴,不然的話就是寬依賴。由於是肯定的partition數量的依賴關係,因此RDD之間的依賴關係就是窄依賴;由此咱們能夠得出一個推論:即窄依賴不只包含一對一的窄依賴,還包含一對固定個數的窄依賴。
一對固定個數的窄依賴的理解:即子RDD的partition對父RDD依賴的Partition的數量不會隨着RDD數據規模的改變而改變;換句話說,不管是有100T的數據量仍是1P的數據量,在窄依賴中,子RDD所依賴的父RDD的partition的個數是肯定的,而寬依賴是shuffle級別的,數據量越大,那麼子RDD所依賴的父RDD的個數就越多,從而子RDD所依賴的父RDD的partition的個數也會變得愈來愈多。
在spark中,會根據RDD之間的依賴關係將DAG圖(有向無環圖)劃分爲不一樣的階段,對於窄依賴,因爲partition依賴關係的肯定性,partition的轉換處理就能夠在同一個線程裏完成,窄依賴就被spark劃分到同一個stage中,而對於寬依賴,只能等父RDD shuffle處理完成後,下一個stage才能開始接下來的計算。
所以spark劃分stage的總體思路是:從後往前推,遇到寬依賴就斷開,劃分爲一個stage;遇到窄依賴就將這個RDD加入該stage中。所以在圖2中RDD C,RDD D,RDD E,RDDF被構建在一個stage中,RDD A被構建在一個單獨的Stage中,而RDD B和RDD G又被構建在同一個stage中。
在spark中,Task的類型分爲2種:ShuffleMapTask和ResultTask;
簡單來講,DAG的最後一個階段會爲每一個結果的partition生成一個ResultTask,即每一個Stage裏面的Task的數量是由該Stage中最後一個RDD的Partition的數量所決定的!而其他全部階段都會生成ShuffleMapTask;之因此稱之爲ShuffleMapTask是由於它須要將本身的計算結果經過shuffle到下一個stage中;也就是說上圖中的stage1和stage2至關於mapreduce中的Mapper,而ResultTask所表明的stage3就至關於mapreduce中的reducer。
在以前動手操做了一個wordcount程序,所以可知,Hadoop中MapReduce操做中的Mapper和Reducer在spark中的基本等量算子是map和reduceByKey;不過區別在於:Hadoop中的MapReduce天生就是排序的;而reduceByKey只是根據Key進行reduce,但spark除了這兩個算子還有其餘的算子;所以從這個意義上來講,Spark比Hadoop的計算算子更爲豐富。