RDD(Resilient Distributed Dataset) 叫着 彈性分佈式數據集 ,是Spark 中最基本的抽象,它表明一個不可變、可分區、裏面元素能夠並行計算的集合。php
RDD 具備數據流模型特色:自動容錯、位置感知性調度和可伸縮。java
RDD 容許用戶在執行多個查詢時,顯示地將工做集緩存在內存中,後續的查詢可以重用工做集,這將會極大的提高查詢的效率。linux
咱們能夠認爲 RDD 就是一個代理,咱們操做這個代理就像操做本地集合同樣,不需去關心任務調度、容錯等問題。android
在 RDD 源碼中這樣來描述 RDD算法
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
複製代碼
#經過並行化scala集合建立RDD,通常在測試的時候使用
scala> var rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
複製代碼
var rdd1 = sc.textFile("/root/words.txt")
var rdd2 = sc.textFile("hdfs:192.168.80.131:9000/words.text")
複製代碼
這種 RDD 中的全部轉換都是延遲加載的,也就是說,他們並不會直接就計算結果。相反的,他們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個返回結果的 Driver 的動做時,這些操做纔會真正的運行。這種設計會讓Spark 更加有效率的運行。apache
經常使用的 Transformation 操做:編程
轉換 | 含義 |
---|---|
map(func) | 返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成 |
filter(func) | 返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成 |
flatMap(func) | 相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素) |
mapPartitions(func) | 相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) | 對源RDD和參數RDD求並集後返回一個新的RDD |
intersection(otherDataset) | 對源RDD和參數RDD求交集後返回一個新的RDD |
distinct([numTasks])) | 對源RDD進行去重後返回一個新的RDD |
groupByKey([numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 先按分區聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 對k/y的RDD進行操做 |
sortByKey([ascending], [numTasks]) | 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 與sortByKey相似,可是更靈活 |
join(otherDataset, [numTasks]) | 在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD |
cartesian(otherDataset) | 笛卡爾積 |
pipe(command, [envVars]) | 調用外部程序 |
coalesce(numPartitions) | 從新分區 第一個參數是要分多少區,第二個參數是否shuffle 默認false ;少分區變多分區 true ; 多分區變少分區 false |
repartition(numPartitions) | 從新分區 必須shuffle 參數是要分多少區 少變多 |
repartitionAndSortWithinPartitions(partitioner) | 從新分區+排序 比先分區再排序效率高 對K/V的RDD進行操做 |
觸發代碼的運行操做,咱們一個Spark 應用,至少須要一個 Action 操做。windows
動做 | 含義 |
---|---|
reduce(func) | 經過func函數彙集RDD中的全部元素,這個功能必須是課交換且可並聯的 |
collect() | 在驅動程序中,以數組的形式返回數據集的全部元素 |
count() | 返回RDD的元素個數 |
first() | 返回RDD的第一個元素(相似於take(1)) |
take(n) | 返回一個由數據集的前n個元素組成的數組 |
takeSample(withReplacement,num, [seed]) | 返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(n, [ordering]) | |
saveAsTextFile(path) | 將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本 |
saveAsSequenceFile(path) | 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。 |
saveAsObjectFile(path) | |
countByKey() | 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。 |
foreach(func) | 在數據集的每個元素上,運行函數func進行更新。 |
foreachPartition(func) | 在每一個分區上,運行函數 func |
執行流程圖: api
pom.xml 依賴數組
<!-- 導入scala的依賴 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 導入spark的依賴 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
複製代碼
scala 版本代碼實現:
package com.zhouq.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* scala 版本實現 wc
*
*/
object ScalaWordCount {
def main(args: Array[String]): Unit = {
//這行代碼是由於我在windows 上直接跑,須要去讀取 hadoop 上的文件,設置個人用戶名。若是是linux 環境能夠不設置。視狀況而定
System.setProperty("HADOOP_USER_NAME", "root")
//建立spark 配置,設置應用程序名字
// val conf = new SparkConf().setAppName("scalaWordCount")
val conf = new SparkConf().setAppName("scalaWordCount").setMaster("local[4]")
// conf.set("spark.testing.memory","102457600")
//建立spark 執行的入口
val sc = new SparkContext(conf)
//指定之後從哪裏讀取數據建立RDD (彈性分佈式數據集)
//取到一行數據
val lines: RDD[String] = sc.textFile(args(0))
//切分壓平
val words: RDD[String] = lines.flatMap(_.split(" "))
//按單詞和一組合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//按key 進行聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
// 排序, false 表示倒序
val sorted = reduced.sortBy(_._2, false)
//將結果保存到hdfs中
sorted.saveAsTextFile(args(1))
//釋放資源
sc.stop()
}
}
複製代碼
Java7 版本:
package com.zhouq.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* Java 版WordCount
*/
public class JavaWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaWordCount");
//建立SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
//指定讀取數據的位置
JavaRDD<String> lines = jsc.textFile(args[0]);
//切分壓平
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception{
return Arrays.asList(line.split(" ")).iterator();
}
});
//將單詞進行組合 (a,1),(b,1),(c,1),(a,1)
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String tp) throws Exception {
return new Tuple2<>(tp, 1);
}
});
//先交換再排序,由於 只有groupByKey 方法
JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
// return new Tuple2<>(tp._2, tp._1);
return tp.swap();
}
});
//排序
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//再次交換順序
JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
return tp.swap();
}
});
//輸出到hdfs
result.saveAsTextFile(args[1]);
jsc.stop();
}
}
複製代碼
Java8 版本:
package com.zhouq.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
* Java Lambda 表達式版本的 WordCount
*/
public class JavaLambdaWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaWordCount");
//建立SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
//指定讀取數據的位置
JavaRDD<String> lines = jsc.textFile(args[0]);
//切分壓平
// lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());
//將單詞進行組合 (a,1),(b,1),(c,1),(a,1)
// words.mapToPair(tp -> new Tuple2<>(tp,1));
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair((PairFunction<String, String, Integer>) tp -> new Tuple2<>(tp, 1));
//先交換再排序,由於 只有groupByKey 方法
// swaped.mapToPair(tp -> tp.swap());
JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair((PairFunction<Tuple2<String, Integer>, Integer, String>) tp -> {
// return new Tuple2<>(tp._2, tp._1);
return tp.swap();
});
//排序
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//再次交換順序
// sorted.mapToPair(tp -> tp.swap());
JavaPairRDD<String, Integer> result = sorted.mapToPair((PairFunction<Tuple2<Integer, String>, String, Integer>) tp -> tp.swap());
//輸出到hdfs
result.saveAsTextFile(args[1]);
jsc.stop();
}
}
複製代碼
RDD 和它依賴的 父 RDD(可能有多個) 的關係有兩種不一樣的類型,即 窄依賴(narrow dependency)和寬依賴(wide dependency)。
窄依賴:窄依賴指的是每個父 RDD 的 Partition 最多被子 RDD 的一個分區使用。能夠比喻爲獨生子女。 寬依賴:寬依賴是多個字 RDD 的Partition 會依賴同一個父 RDD 的 Partition
Spark中最重要的功能之一是跨操做在內存中持久化(或緩存)數據集。當您持久保存RDD時,每一個節點都會存儲它在內存中計算的任何分區,並在該數據集(或從中派生的數據集)的其餘操做中重用它們。這使得將來的行動更快(一般超過10倍)。緩存是迭代算法和快速交互使用的關鍵工具。
您可使用persist()或cache()方法標記要保留的RDD 。第一次在動做中計算它,它將保留在節點的內存中。Spark的緩存是容錯的 - 若是丟失了RDD的任何分區,它將使用最初建立它的轉換自動從新計算。
使用 rdd.persist()或者rdd.cache()
val lines: RDD[String] = sc.textFile("hdfs://xxx/user/accrss")
//使用cache 方法來緩存數據到內存
val cache = lines.cache()
//注意查看下面兩次count 的時間
cached.count
cached.count
複製代碼
咱們在 StorageLevel.scala 源碼中能夠看到:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
複製代碼
解釋一下各個參數的意思:
第一個參數表示: 放到磁盤 第二個參數表示: 放到內存 第三個參數表示: 磁盤中的數據是否以Java 對象的方式保存,true 表示是, false表示以序列化的方式存放 第四個參數表示: 內存中的數據是否以Java 對象的方式保存,true 表示是, false表示以序列化的方式存放 第五個參數表示: 存放幾份數據(目的是爲了怕executor 出現故障致使分區數據丟失,當從新分配任務時,去另外的機器讀取備份數據進行從新計算)
OFF_HEAP : 堆外內存,以序列化的格式存儲RDD到Tachyon(一個分佈式內存存儲系統)中
Spark的多個存儲級別意味着在內存利用率和cpu利用效率間的不一樣權衡。咱們推薦經過下面的過程選擇一個合適的存儲級別:
Spark自動的監控每一個節點緩存的使用狀況,利用最近最少使用原則刪除老舊的數據。若是你想手動的刪除RDD,可使用 RDD.unpersist()方法
咱們除了把數據緩存到內存中,還能夠把數據緩存到HDFS 中,保證中間數據不丟失.
何時咱們須要作chechpoint?
怎麼作 checkpoint ?
首先設置 checkpoint 目錄,而後再執行計算邏輯,再執行 checkpoint() 操做。
下面代碼使用cache 和 checkpoint 兩種方式實現計算每門課最受歡迎老師的 topN
package com.zhouq.spark
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 求每門課程最受歡迎老師TopN --2
* -- 使用cache
* -- 使用checkpoint 通常設置hdfs 目錄
*/
object GroupFavTeacher2_cache_checkpoint {
def main(args: Array[String]): Unit = {
//前 N
val topN = args(1).toInt
//學科集合
val subjects = Array("bigdata", "javaee", "php")
val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]")
//建立spark 執行入口
val sc = new SparkContext(conf)
//checkpoint 得先設置 sc 的checkpoint 的dir
// sc.setCheckpointDir("hdfs://hdfs://hadoop1:8020/user/root/ck20190215")
//指定讀取數據
val lines: RDD[String] = sc.textFile(args(0))
val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
val index = line.lastIndexOf("/")
var teacher = line.substring(index + 1)
var httpHost = line.substring(0, index)
var subject = new URL(httpHost).getHost.split("[.]")(0)
((subject, teacher), 1)
})
//將學科,老師聯合當作key
val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_ + _)
//第一種使用cache RDD 把數據緩存在內存中.標記爲cache 的RDD 之後被反覆使用,才使用cache
val cached: RDD[((String, String), Int)] = reduced.cache()
//第二種 使用checkpoint,得先設置 sc 的 checkpointDir
// val cached: RDD[((String, String), Int)] = reduced.checkpoint()
/**
* 先對學科進行過濾,而後再進行排序,調用RDD 的sortBy進行排序,避免scala 的排序當數據量大時,內存不足的狀況.
* take 是Action 操做,每次take 都會進行一次任務提交,具體查看日誌打印狀況
*/
for (sub <- subjects) {
//過濾出當前的學科
val filtered: RDD[((String, String), Int)] = cached.filter(_._1._1 == sub)
//使用RDD 的 sortBy ,內存+磁盤排序,避免scala 中的排序因內存不足致使異常狀況.
//take 是Action 的,因此每次循環都會觸發一次提交任務,祥見日誌打印狀況
val favTeacher: Array[((String, String), Int)] = filtered.sortBy(_._2, false).take(topN)
println(favTeacher.toBuffer)
}
/**
* 前面cache的數據已經計算完了,後面還有不少其餘的指標要計算
* 後面計算的指標也要觸發不少次Action,最好將數據緩存到內存
* 原來的數據佔用着內存,把原來的數據釋放掉,才能緩存新的數據
*/
//把原來緩存的數據釋放掉
cached.unpersist(true)
sc.stop()
}
}
複製代碼
DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據。
微信公衆號文章連接:Spark RDD
有興趣歡迎關注,你們一塊兒交流學習。