一、RDD是Spark的核心數據模型,可是個抽象類,全稱爲Resillient Distributed Dataset,即彈性分佈式數據集。java
二、RDD在抽象上來講是一種元素集合,包含了數據。它是被分區的,分爲多個分區,每一個分區分佈在集羣中的不一樣節點上,從而讓RDD中的數據能夠被並行操做。(分佈式數據集)算法
三、RDD一般經過Hadoop上的文件,即HDFS文件或者Hive表,來進行建立;有時也能夠經過應用程序中的集合來建立。sql
四、RDD最重要的特性就是,提供了容錯性,能夠自動從節點失敗中恢復過來。即若是某個節點上的RDDpartition,由於節點故障,致使數據丟了,那麼RDD會自動經過本身的數據來源從新計算該partition。這一切對使用者是透明的。apache
五、RDD的數據默認狀況下存放在內存中的,可是在內存資源不足時,Spark會自動將RDD數據寫入磁盤。(彈性)編程
進行Spark核心編程的第一步就是建立一個初始的RDD。該RDD,一般就表明和包含了Spark應用程序的輸入源數據。而後經過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其餘的RDD。數組
Spark Core提供了三種建立RDD的方式:緩存
1.使用程序中的集合建立RDD(主要用於測試)網絡
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10); JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);
2.使用本地文件建立RDD(主要用於臨時性處理有大量數據的文件)app
SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate(); JavaRDD<String> lines = spark.read().textFile("D:\\Users\\Administrator\\Desktop\\spark.txt").javaRDD();
3.使用HDFS文件建立RDD(生產環境的經常使用方式)分佈式
SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate(); JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();
使用HDFS文件建立RDD對比使用本地文件建立RDD,須要修改的,只有兩個地方:
第一,將SparkSession對象的master("local")方法去掉
第二,咱們針對的不是本地文件了,修改成hadoop hdfs上的真正的存儲大數據的文件
Spark支持兩種RDD操做:transformation和action。
transformation操做會針對已有的RDD建立一個新的RDD。transformation具備lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所作的操做,不會自發的執行。只有執行了一個action,以前的全部transformation纔會執行。
經常使用的transformation介紹:
map :將RDD中的每一個元素傳人自定義函數,獲取一個新的元素,而後用新的元素組成新的RDD。
filter:對RDD中每一個元素進行判斷,若是返回true則保留,返回false則剔除。
flatMap:與map相似,可是對每一個元素均可以返回一個或多個元素。
groupByKey:根據key進行分組,每一個key對應一個Iterable<value>。
reduceByKey:對每一個key對應的value進行reduce操做。
sortByKey:對每一個key對應的value進行排序操做。
join:對兩個包含<key,value>對的RDD進行join操做,每一個keyjoin上的pair,都會傳入自定義函數進行處理。
cogroup:同join,可是每一個key對應的Iterable<value>都會傳入自定義函數進行處理。
sparkRDD算子:
map與flatmap的區別:扁平化
map函數:會對每一條輸入進行指定func操做,而後爲每一條輸入返回一個對象
flatmap函數:先進行map映射,而後在flatten(進行扁平化操做)
reducebykey算子:
首先會觸發shuffle,會進行兩次聚合操做
1,按照key將數據放到一塊兒(本地聚合--Shuffle Write)
2,將相同key的數據聚合(全局聚合--Shuffle Reader)
action操做主要對RDD進行最後的操做,好比遍歷,reduce,保存到文件等,並能夠返回結果給Driver程序。action操做執行,會觸發一個spark job的運行,從而觸發這個action以前全部的transformation的執行,這是action的特性。
經常使用的action介紹:
reduce:將RDD中的全部元素進行聚合操做。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。
collect:將RDD中全部元素獲取到本地客戶端(通常不建議使用)。
count:獲取RDD元素總數。
take(n):獲取RDD中前n個元素。
saveAsTextFile:將RDD元素保存到文件中,對每一個元素調用toString方法。
countByKey:對每一個key對應的值進行count計數。
foreach:遍歷RDD中的每一個元素。
要持久化一個RDD,只要調用其cache()或者persist()方法便可。在該RDD第一次被計算出來時,就會直接緩存在每一個節點中。可是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等建立了一個RDD以後,直接連續調用cache()或persist()才能夠。
若是你先建立一個RDD,而後單獨另起一行執行cache()或persist()方法,是沒有用的,並且會報錯,大量的文件會丟失。
val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()
Spark提供的多種持久化級別,主要是爲了在CPU和內存消耗之間進行取捨。
通用的持久化級別的選擇建議:
一、優先使用MEMORY_ONLY,若是能夠緩存全部數據的話,那麼就使用這種策略。由於純內存速度最快,並且沒有序列化,不須要消耗CPU進行反序列化操做。
二、若是MEMORY_ONLY策略,沒法存儲全部數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操做仍是很是快,只是要消耗CPU進行反序列化。
三、若是須要進行快速的失敗恢復,那麼就選擇帶後綴爲_2的策略,進行數據的備份,這樣在失敗時,就不須要從新計算了。
四、能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如從新計算一次。
Spark提供了兩種共享變量:Broadcast Variable(廣播變量)和Accumulator(累加變量)。
BroadcastVariable會將使用到的變量,僅僅爲每一個節點拷貝一份,更大的用處是優化性能,減小網絡傳輸以及內存消耗。廣播變量是隻讀的。
val factor = 3 val broadcastVars = sc.broadcast(factor); val numberList = Array(1,2,3,4,5) val number = sc.parallelize(numberList).map( num => num * broadcastVars.value) //廣播變量讀值broadcastVars.value
Accumulator則可讓多個task共同操做一份變量,主要能夠進行累加操做。task只能對Accumulator進行累加操做,不能讀取它的值。只有Driver程序能夠讀取Accumulator的值。
val numberList = Array(1,2,3,4,5) val numberRDD = sc.parallelize(numberList,1) val sum = sc.accumulator(0) numberRDD.foreach{m => sum += m}
一、對文本文件內的每一個單詞都統計出其出現的次數。
二、按照每一個單詞出現次數的數量,降序排序。
scala編寫:
package cn.spark.study.core import org.apache.spark.sql.SparkSession object WordCount { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("WordCount").master("local").getOrCreate() val lines = spark.sparkContext.textFile("D:\\spark.txt") val words = lines.flatMap{line => line.split(" ")} val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _) val countWord = wordCounts.map{word =>(word._2,word._1)} val sortedCountWord = countWord.sortByKey(false) val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)} sortedWordCount.foreach(s=> { println("word \""+s._1+ "\" appears "+s._2+" times.") }) spark.stop() } }
一、按照文件中的第一列排序。
二、若是第一列相同,則按照第二列排序。
scala編寫:
class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{ override def compare(that: SecondSortKey): Int = { if(this.first - that.first !=0){ this.first-that.first }else{ this.second-that.second } } } object SecondSort { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate() val lines = spark.sparkContext.textFile("D:\\sort.txt") val pairs = lines.map{line => ( new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line )} val sortedParis = pairs.sortByKey() val sortedLines = sortedParis.map(pairs => pairs._2) sortedLines.foreach(s => println(s)) spark.stop() } }
對每一個班級內的學生成績,取出前3名。(分組取topn)
1.建立初始RDD
2.對初始RDD的文本行按空格分割,映射爲key-value鍵值對
3.對鍵值對按鍵分組
4.獲取分組後每組前3的成績:
5.打印輸出
如下是使用scala實現:
object GroupTop3 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate() //建立初始RDD val lines = spark.sparkContext.textFile("D:\\score.txt") //對初始RDD的文本行按空格分割,映射爲key-value鍵值對 val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt)) //對pairs鍵值對按鍵分組 val groupedPairs = pairs.groupByKey() //獲取分組後每組前3的成績 val top3Score = groupedPairs.map(classScores => { var className = classScores._1 //獲取每組的成績,將其轉換成一個數組緩衝,並按從大到小排序,取其前三 var top3 = classScores._2.toBuffer.sortWith(_>_).take(3) Tuple2(className,top3) }) top3Score.foreach(m => { println(m._1) for(s <- m._2) println(s) println("------------------") }) } }
以上三個小案例都用Scala實現了,用到了Scala中的集合的操做、高階函數、鏈式調用、隱式轉換等知識,本身動手實現,對Scala有個比較好的理解和掌握。