首先介紹Spark中的核心名詞概念,而後再逐一詳細說明。html
RDD:彈性分佈式數據集,是Spark最核心的數據結構。有分區機制,因此能夠分佈式進行處理。有容錯機制,經過RDD之間的依賴關係來恢復數據。java
依賴關係:RDD的依賴關係是經過各類Transformation(變換)來獲得的。父RDD和子RDD之間的依賴關係分兩種:①窄依賴②寬依賴。web
①窄依賴:父RDD的分區和子RDD的分區關係是:一對一。redis
窄依賴不會發生Shuffle,執行效率高,spark框架底層會針對多個連續的窄依賴執行流水線優化,從而提升性能。例如map、flatMap等方法都是窄依賴方法。算法
②寬依賴:父RDD的分區和子RDD的分區關係是:一對多。apache
寬依賴會產生shuffle,會產生磁盤讀寫,沒法優化。編程
DAG:有向無環圖,當一個RDD的依賴關係造成以後,就造成了一個DAG。通常來講,一個DAG,最後都至少會觸發一個Action操做,觸發執行。一個Action對應一個Job任務。api
Stage:一個DAG會根據RDD之間的依賴關係進行Stage劃分,流程是:以Action爲基準,向前回溯,遇到寬依賴,就造成一個Stage。遇到窄依賴,則執行流水線優化(將多個連續的窄依賴放到一塊兒執行)。數組
task:任務。一個分區對應一個task。能夠這樣理解:一個Stage是一組Task的集合。緩存
RDD的Transformation(變換)操做:懶執行,並不會當即執行。
RDD的Action(執行)操做:觸發真正的執行。
Resilient Distributed Datasets (RDDs)
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
RDD彈性分佈式數據集:就是帶有分區的集合類型。特色是能夠並行操做,而且是容錯的。
有兩種方法能夠建立RDD:
1)執行Transform操做(變換操做),
2)讀取外部存儲系統的數據集,如HDFS,HBase,或任何與Hadoop有關的數據源。
Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program (a Scala Seq). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
val data = Array(1, 2, 3, 4, 5) val r1 = sc.parallelize(data) val r2 = sc.parallelize(data,2)
你能夠這樣理解RDD:
它是spark提供的一個特殊集合類。諸如普通的集合類型,如傳統的Array:(1,2,3,4,5)是一個總體,但轉換成RDD後,咱們能夠對數據進行Partition(分區)處理,這樣作的目的就是爲了分佈式。
你可讓這個RDD有兩個分區,那麼有多是這個形式:RDD(1,2) (3,4)。
這樣設計的目的在於:能夠進行分佈式運算。
注:建立RDD的方式有多種,好比案例一中是基於一個基本的集合類型(Array)轉換而來,像parallelize這樣的方法還有不少,以後就會學到。此外,咱們也能夠在讀取數據集時就建立RDD。
Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. Here is an example invocation:
val distFile = sc.textFile("data.txt")
查看RDD
rdd.collect
收集rdd中的數據組成Array返回,此方法將會把分佈式存儲的rdd中的數據集中到一臺機器中組建Array。
在生產環境下必定要慎用這個方法,容易內存溢出。
查看RDD的分區數量:
rdd.partitions.size
查看RDD每一個分區的元素:
rdd.glom.collect
此方法會將每一個分區的元素以Array形式返回。
在上圖中,一個RDD有item1~item25個數據,共5個分區,分別在3臺機器上進行處理。此外,spark並無原生的提供rdd的分區查看工具,咱們能夠本身來寫一個。
示例代碼:
import org.apache.spark.rdd.RDD import scala.reflect.ClassTag object su { def debug[T: ClassTag](rdd: RDD[T]) = { rdd.mapPartitionsWithIndex((i: Int, iter: Iterator[T]) => { val m = scala.collection.mutable.Map[Int, List[T]]() var list = List[T]() while (iter.hasNext) { list = list :+ iter.next } m(i) = list m.iterator }).collect().foreach((x: Tuple2[Int, List[T]]) => { val i = x._1 println(s"partition:[$i]") x._2.foreach { println } }) } }
針對RDD的操做,分兩種,一種是Transformation(變換),一種是Actions(執行)。
Transformation(變換)操做屬於懶操做,不會真正觸發RDD的處理計算。
Actions(執行)操做纔會真正觸發。
①map(func)
Return a new distributed dataset formed by passing each element of the source through a function func.
參數是函數,函數應用於RDD每個元素,返回值是新的RDD。
案例展現:
map將函數應用到rdd的每一個元素中。
val rdd = sc.makeRDD(List(1,3,5,7,9)) rdd.map(_*10)
②flatMap(func)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
扁平化map,對RDD每一個元素轉換,而後再扁平化處理。
案例展現:
flatMap 扁平map處理
val rdd = sc.makeRDD(List("hello world","hello count","world spark"),2) rdd.map(_.split{" "})//Array(Array(hello, world), Array(hello, count), Array(world, spark)) rdd.flatMap(_.split{" "})//Array[String] = Array(hello, world, hello, count, world, spark) //Array[String] = Array(hello, world, hello, count, world, spark)
注:map和flatMap有何不一樣?
map:對RDD每一個元素轉換。
flatMap:對RDD每一個元素轉換,而後再扁平化(即去除集合)
因此,通常咱們在讀取數據源後,第一步執行的操做是flatMap。
③filter(func)
Return a new dataset formed by selecting those elements of the source on which func returns true.
參數是函數,函數會過濾掉不符合條件的元素,返回值是新的RDD。
案例展現:
filter用來從rdd中過濾掉不符合條件的數據。
val rdd = sc.makeRDD(List(1,3,5,7,9)) rdd.filter(_<5)
④mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
該函數和map函數相似,只不過映射函數的參數由RDD中的每個元素變成了RDD中每個分區的迭代器。
案例展現:
scala>val rdd3 = rdd1.mapPartitions{ x => { val result = List[Int]() var i = 0 while(x.hasNext){ i += x.next() } result.::(i).iterator }} scala>rdd3.collect
⑤mapPartitionsWithIndex(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
函數做用同mapPartitions,不過提供了兩個參數,第一個參數爲分區的索引。
案例展現:
var rdd1 = sc.makeRDD(1 to 5,2) var rdd2 = rdd1.mapPartitionsWithIndex{ (index,iter) => { var result = List[String]() var i = 0 while(iter.hasNext){ i += iter.next() } result.::(index + "|" + i).iterator } }
案例展現:
val rdd = sc.makeRDD(List(1,2,3,4,5),2) rdd.mapPartitionsWithIndex((index,iter)=>{ var list = List[String]() while(iter.hasNext){ if(index==0) list = list :+ (iter.next + "a") else { list = list :+ (iter.next + "b") } } list.iterator })
⑥union(otherDataset)
Return a new dataset that contains the union of the elements in the source dataset and the argument.
union並集,也能夠用++實現。
案例展現:
val rdd1 = sc.makeRDD(List(1,3,5)) val rdd2 = sc.makeRDD(List(2,4,6,8)) val rdd = rdd1.union(rdd2) val rdd = rdd1 ++ rdd2
⑦intersection(otherDataset)
Return a new RDD that contains the intersection of elements in the source dataset and the argument.
intersection交集
案例展現:
val rdd1 = sc.makeRDD(List(1,3,5,7)) val rdd2 = sc.makeRDD(List(5,7,9,11)) val rdd = rdd1.intersection(rdd2)
⑧subtract
求差集。
案例展現:
val rdd1 = sc.makeRDD(List(1,3,5,7,9)) val rdd2 = sc.makeRDD(List(5,7,9,11,13)) val rdd = rdd1.subtract(rdd2)
⑨distinct([numTasks])
Return a new dataset that contains the distinct elements of the source dataset.
沒有參數,將RDD裏的元素進行去重操做。
案例展現:
val rdd = sc.makeRDD(List(1,3,5,7,9,3,7,10,23,7)) rdd.distinct
⑩groupByKey([numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note:If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note:By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
groupByKey對於數據格式是有要求的,即操做的元素必須是一個二元tuple,tuple._1是key,tuple._2是value。
好比下面兩種種數據格式都不符合要求:
sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) sc.parallelize(List(("cat",2,1),("dog",5,1),("cat",4,1),("dog",3,2), ("cat",6,2),("dog",3,4),("cat",9,4),("dog",1,4)),2)
案例展現:
scala>val rdd = sc.parallelize(List(("cat",2), ("dog",5),("cat",4), ("dog",3),("cat",6),("dog",3), ("cat",9),("dog",1)),2) scala>rdd.groupByKey()
⑪reduceByKey(func, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
reduceByKey操做的數據格式必須是一個二元tuple
案例展現:
scala>var rdd = sc.makeRDD( List( ("hello",1),("spark",1),("hello",1),("world",1) ) ) rdd.reduceByKey(_+_);
⑫aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(func1,func2)
zeroValue表示初始值,初始值會參與func1的計算,在分區內,按key分組,把每組的值進行fun1的計算,再將每一個分區每組的計算結果按fun2進行計算
scala> val rdd = sc.parallelize(List(("cat",2),("dog",5),("cat",4),("dog",3),("cat",6),("dog",3),("cat",9),("dog",1)),2);
查看分區結果:
partition:[0] (cat,2) (dog,5) (cat,4) (dog,3) partition:[1] (cat,6) (dog,3) (cat,9) (dog,1) scala> rdd.aggregateByKey(0)(_+_,_+_);
scala> rdd.aggregateByKey(0)(_+_,_*_);
⑬sortByKey([ascending],[numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
案例展現:
val d2 = sc.parallelize(Array(("cc",32),("bb",32),("cc",22),("aa",18),("bb",6),("dd",16),("ee",104),("cc",1),("ff",13),("gg",68),("bb",44))) d2.sortByKey(true).collect
⑭join(otherDataset,[numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin,rightOuterJoin, and fullOuterJoin.
案例展現:
val rdd1 = sc.makeRDD(List(("cat",1),("dog",2))) val rdd2 = sc.makeRDD(List(("cat",3),("dog",4),("tiger",9))) rdd1.join(rdd2);
⑮cartesian(otherDataset)
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
參數是RDD,求兩個RDD的笛卡兒積。
案例展現:
cartesian 笛卡爾積
val rdd1 = sc.makeRDD(List(1,2,3)) val rdd2 = sc.makeRDD(List("a","b")) rdd1.cartesian(rdd2);
⑯coalesce(numPartitions)
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
coalesce(n,true/false)擴大或縮小分區。
案例展現:
val rdd = sc.makeRDD(List(1,2,3,4,5),2) rdd9.coalesce(3,true);
若是是擴大分區,須要傳入一個true,表示要從新shuffle。
rdd9.coalesce(2);
若是是縮小分區,默認就是false,不須要明確的傳入。
⑰repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartition(n) 等價於上面的coalesce
⑱partitionBy
一般在建立RDD時指定分區規則,將會致使數據自動分區;也能夠經過partitionBy方法人爲指定分區方式來進行分區。
常見的分區器有:
HashPartitioner、RangePartitioner
案例展現:
import org.apache.spark._ val r1 = sc.makeRDD(List((2,"aaa"),(9,"bbb"),(7,"ccc"),(9,"ddd"),(3,"eee"),(2,"fff")),2); val r2=r1.partitionBy(new HashPartitioner(2))
按照鍵的hash%分區數獲得的編號去往指定的分區,這種方式能夠實現將相同鍵的數據分發給同一個分區的效果。
val r3=r1.partitionBy(new RangePartitioner(2,r1))
將數據按照值的字典順序進行排序,再分區。
①reduce(func)
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
並行整合全部RDD數據,例如求和操做。
②collect
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
返回RDD全部元素,將rdd分佈式存儲在集羣中不一樣分區的數據獲取到一塊兒組成一個數組返回。
要注意:這個方法將會把全部數據收集到一個機器內,容易形成內存的溢出,在生產環境下千萬慎用。
③count
Return the number of elements in the dataset.
統計RDD裏元素個數
案例展現:
val rdd = sc.makeRDD(List(1,2,3,4,5),2) rdd.count
④first
Return the first element of the dataset (similar to take(1)).
⑤take(n)
Return an array with the first n elements of the dataset.
take獲取前n個數據。
案例展現:
val rdd = sc.makeRDD(List(52,31,22,43,14,35)) rdd.take(2)
⑥takeOrdered(n,[ordering])
Return the first n elements of the RDD using either their natural order or a custom comparator.
takeOrdered(n)先將對象中的數據進行升序排序,而後取前n個。
案例展現:
val rdd = sc.makeRDD(List(52,31,22,43,14,35)) rdd.takeOrdered(3)
⑦top(n)
top(n)先將對象中的數據進行降序排序,而後取前n個。
val rdd = sc.makeRDD(List(52,31,22,43,14,35)) rdd.top(3)
⑧saveAsTextFile(path)
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsTextFile 按照文本方式保存分區數據,到指定路徑。
案例示例:
val rdd = sc.makeRDD(List(1,2,3,4,5),2); rdd.saveAsTextFile("/root/work/aaa")
⑨countByKey()
Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
⑩foreach(func)
Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.
經過rdd實現統計文件中的單詞數量。
sc.textFile("/root/work/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/root/work/wcresult")
RDD之間的關係能夠從兩個維度來理解:
一個是RDD是從哪些RDD轉換而來,也就是RDD的parent RDD(s)是什麼;還有就是依賴於parent RDD(s)的哪些Partition(s)。這個關係,就是RDD之間的依賴,org.apache.spark.Dependency。
根據依賴於parent RDD(s)的Partitions的不一樣狀況,Spark將這種依賴分爲兩種,一種是寬依賴,一種是窄依賴。
RDD和它依賴的parent RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
窄依賴指的是每個parent RDD的Partition最多被子RDD的一個Partition使用,以下圖所示。
寬依賴指的是多個子RDD的Partition會依賴同一個parent RDD的Partition。
咱們能夠從不一樣類型的轉換來進一步理解RDD的窄依賴和寬依賴的區別,以下圖所示。
對於窄依賴操做,它們只是將Partition的數據根據轉換的規則進行轉化,並不涉及其餘的處理,能夠簡單地認爲只是將數據從一個形式轉換到另外一個形式。
窄依賴底層的源碼:
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { //返回子RDD的partitionId依賴的全部的parent RDD的Partition(s) def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd } class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int) = List(partitionId) }
因此對於窄依賴,並不會引入昂貴的Shuffle。因此執行效率很是高。若是整個DAG中存在多個連續的窄依賴,則能夠將這些連續的窄依賴整合到一塊兒連續執行,中間不執行shuffle 從而提升效率,這樣的優化方式稱之爲流水線優化。
此外,針對窄依賴,若是子RDD某個分區數據丟失,只須要找到父RDD對應依賴的分區,恢復便可。但若是是寬依賴,當分區丟失時,最糟糕的狀況是要重算全部父RDD的全部分區。
對於groupByKey這樣的操做,子RDD的全部Partition(s)會依賴於parent RDD的全部Partition(s),子RDD的Partition是parent RDD的全部Partition Shuffle的結果。
寬依賴的源碼:
class ShuffleDependency[K, V, C]( @transient _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Option[Serializer] = None, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]] { override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]] //獲取新的shuffleId val shuffleId: Int = _rdd.context.newShuffleId() //向ShuffleManager註冊Shuffle的信息 val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.size, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) }
spark中一旦遇到寬依賴就須要進行shuffle的操做,所謂的shuffle的操做的本質就是將數據彙總後從新分發的過程。
這個過程數據要彙總到一塊兒,數據量可能很大因此不可避免的須要進行數據落磁盤的操做,會下降程序的性能,因此spark並非徹底內存不讀寫磁盤,只能說它盡力避免這樣的過程來提升效率 。
分佈式系統一般在一個機器集羣上運行,同時運行的幾百臺機器中某些出問題的機率大大增長,因此容錯設計是分佈式系統的一個重要能力。
Spark之前的集羣容錯處理模型,像MapReduce,將計算轉換爲一個有向無環圖(DAG)的任務集合,這樣能夠經過重複執行DAG裏的一部分任務來完成容錯恢復。可是因爲主要的數據存儲在分佈式文件系統中,沒有提供其餘存儲的概念,容錯過程須要在網絡上進行數據複製,從而增長了大量的消耗。因此,分佈式編程中常常須要作檢查點,即將某個時機的中間數據寫到存儲(一般是分佈式文件系統)中。
RDD也是一個DAG,每個RDD都會記住建立該數據集須要哪些操做,跟蹤記錄RDD的繼承關係,這個關係在Spark裏面叫lineage(血緣關係)。當一個RDD的某個分區丟失時,RDD是有足夠的信息記錄其如何經過其餘RDD進行計算,且只需從新計算該分區,這是Spark的一個創新。
相比Hadoop MapReduce來講,Spark計算具備巨大的性能優點,其中很大一部分緣由是Spark對於內存的充分利用,以及提供的緩存機制。
持久化在早期被稱做緩存(cache),但緩存通常指將內容放在內存中。雖然持久化操做在絕大部分狀況下都是將RDD緩存在內存中,但通常都會在內存不夠時用磁盤頂上去(比操做系統默認的磁盤交換性能高不少)。固然,也能夠選擇不使用內存,而是僅僅保存到磁盤中。因此,如今Spark使用持久化(persistence)這一更普遍的名稱。
若是一個RDD不止一次被用到,那麼就能夠持久化它,這樣能夠大幅提高程序的性能,甚至達10倍以上。
默認狀況下,RDD只使用一次,用完即扔,再次使用時須要從新計算獲得,而持久化操做避免了這裏的重複計算,實際測試也顯示持久化對性能提高明顯,這也是Spark剛出現時被人稱爲內存計算框架的緣由。
假設首先進行了RDD0→RDD1→RDD2的計算做業,那麼計算結束時,RDD1就已經緩存在系統中了。在進行RDD0→RDD1→RDD3的計算做業時,因爲RDD1已經緩存在系統中,所以RDD0→RDD1的轉換不會重複進行,計算做業只須進行RDD1→RDD3的計算就能夠了,所以計算速度能夠獲得很大提高。
持久化的方法是調用persist()函數,除了持久化至內存中,還能夠在persist()中指定storage level參數使用其餘的類型,具體以下:
①MEMORY_ONLY
MEMORY_ONLY:將RDD以反序列化的Java對象的形式存儲在JVM中。若是內存空間不足,部分數據分區將不會被緩存,在每次須要用到這些數據時從新進行計算。這是默認的級別。
persist()方法的默認級別就是cache()方法,cache()方法對應的級別就是MEMORY_ONLY級別。
②MEMORY_AND_DISK
MEMORY_AND_DISK:將RDD以反序列化的Java對象的形式存儲在JVM中。若是內存空間不夠,將未緩存的數據分區存儲到磁盤,在須要使用這些分區時從磁盤讀取,存入磁盤的對象也是沒有通過序列化的。
③MEMORY_ONLY_SER
MEMORY_ONLY_SER:將RDD以序列化的Java對象的形式進行存儲(每一個分區爲一個byte數組)。這種方式會比反序列化對象的方式節省不少空間,尤爲是在使用fast serialize時會節省更多的空間,可是在讀取時會使得CPU的read變得更加密集。若是內存空間不夠,部分數據分區將不會被緩存,在每次須要用到這些數據時從新進行計算。
④MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER:相似於MEMORY_ONLY_SER,可是溢出的分區會存儲到磁盤,而不是在用到它們時從新計算,存儲到磁盤上的對象會進行序列化。在須要使用這些分區時從磁盤讀取。
⑤DISK_ONLY
DISK_ONLY:只在磁盤上緩存RDD。
⑥MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. :與上面的級別功能相同,只不過每一個分區在集羣中兩個節點上創建副本。
⑦OFF_HEAP
OFF_HEAP:將數據存儲在off-heap memory中。使用堆外內存,這是Java虛擬機裏面的概念,堆外內存意味着把內存對象分配在Java虛擬機的堆之外的內存,這些內存直接受操做系統管理(而不是虛擬機)。注意,可能帶來一些GC回收問題。
Spark也會自動持久化一些在shuffle操做過程當中產生的臨時數據(好比reduceByKey),即使是用戶並無調用持久化的方法。這樣作能夠避免當shuffle階段時若是一個節點掛掉了就得從新計算整個數據的問題。若是用戶打算屢次重複使用這些數據,咱們仍然建議用戶本身調用持久化方法對數據進行持久化。
須要先導包,而後才能調用命令。
scala> import org.apache.spark.storage._ scala> val rdd1=sc.makeRDD(1 to 5) scala> rdd1.cache //cache只有一種默認的緩存級別,即MEMORY_ONLY scala> rdd1.persist(StorageLevel.MEMORY_ONLY)
Spark會自動監控每一個節點上的緩存數據,而後使用least-recently-used(LRU)機制來處理舊的緩存數據。若是你想手動清理這些緩存的RDD數據而不是去等待它們被自動清理掉,
可使用RDD.unpersist()方法。
cala> rdd1.unpersist()
Spark會根據用戶提交的計算邏輯中的RDD的轉換和動做來生成RDD之間的依賴關係,同時這個計算鏈也就生成了邏輯上的DAG。接下來以「Word Count」爲例,詳細描述這個DAG生成的實現過程。
Spark Scala版本的Word Count程序以下:
1:val file=sc.textFile("hdfs://hadoop01:9000/hello1.txt") 2:val counts = file.flatMap(line => line.split(" ")) 3: .map(word=>(word,1)) 4: .reduceByKey(_+_) 5:counts.saveAsTextFile("hdfs://...")
file和counts都是RDD,其中file是從HDFS上讀取文件並建立了RDD,而counts是在file的基礎上經過flatMap、map和reduceByKey這三個RDD轉換生成的。最後,counts調用了動做saveAsTextFile,用戶的計算邏輯就從這裏開始提交的集羣進行計算。那麼上面這5行代碼的具體實現是什麼呢?
行1:sc是org.apache.spark.SparkContext的實例,它是用戶程序和Spark的交互接口,會負責鏈接到集羣管理者,並根據用戶設置或者系統默認設置來申請計算資源,完成RDD的建立等。
sc.textFile("hdfs://...")就完成了一個org.apache.spark.rdd.HadoopRDD的建立,而且完成了一次RDD的轉換:經過map轉換到一個org.apache.spark.rdd.MapPartitions-RDD。也就是說,file其實是一個MapPartitionsRDD,它保存了文件的全部行的數據內容。
行2:將file中的全部行的內容,以空格分隔爲單詞的列表,而後將這個按照行構成的單詞列表合併爲一個列表。最後,以每一個單詞爲元素的列表被保存到MapPartitionsRDD。
行3:將第2步生成的MapPartittionsRDD再次通過map將每一個單詞word轉爲(word,1)的元組。這些元組最終被放到一個MapPartitionsRDD中。
行4:首先會生成一個MapPartitionsRDD,起到map端combiner的做用;而後會生成一個ShuffledRDD,它從上一個RDD的輸出讀取數據,做爲reducer的開始;最後,還會生成一個MapPartitionsRDD,起到reducer端reduce的做用。
行5:向HDFS輸出RDD的數據內容。最後,調用org.apache.spark.SparkContext#runJob向集羣提交這個計算任務。
原始的RDD(s)經過一系列轉換就造成了DAG。RDD之間的依賴關係,包含了RDD由哪些Parent RDD(s)轉換而來和它依賴parent RDD(s)的哪些Partitions,是DAG的重要屬性。
藉助這些依賴關係,DAG能夠認爲這些RDD之間造成了Lineage(血統,血緣關係)。藉助Lineage,能保證一個RDD被計算前,它所依賴的parent RDD都已經完成了計算;同時也實現了RDD的容錯性,即若是一個RDD的部分或者所有的計算結果丟失了,那麼就須要從新計算這部分丟失的數據。
Spark在執行任務(job)時,首先會根據依賴關係,將DAG劃分爲不一樣的階段(Stage)。
處理流程是:
1)Spark在執行Transformation類型操做時都不會當即執行,而是懶執行(計算)。
2)執行若干步的Transformation類型的操做後,一旦遇到Action類型操做時,纔會真正觸發執行(計算)。
3)執行時,從當前Action方法向前回溯,若是遇到的是窄依賴則應用流水線優化,繼續向前找,直到碰到某一個寬依賴。
4)由於寬依賴必需要進行shuffle,沒法實現優化,因此將這一次段執行過程組裝爲一個stage。
5)再從當前寬依賴開始繼續向前找。重複剛纔的步驟,從而將這個DAG還分爲若干的stage。
在stage內部能夠執行流水線優化,而在stage之間沒辦法執行流水線優化,由於有shuffle。可是這種機制已經盡力的去避免了shuffle。
原始的RDD通過一系列轉換後(一個DAG),會在最後一個RDD上觸發一個動做,這個動做會生成一個Job。
因此能夠這樣理解:一個DAG對應一個Spark的Job。
在Job被劃分爲一批計算任務(Task)後,這批Task會被提交到集羣上的計算節點去計算。
Spark的Task分爲兩種:
1)org.apache.spark.scheduler.ShuffleMapTask
2)org.apache.spark.scheduler.ResultTask
簡單來講,DAG的最後一個階段會爲每一個結果的Partition生成一個ResultTask,其他全部的階段都會生成ShuffleMapTask。
案例 單詞統計
scala>val data=sc.textFile("/home/software/hello.txt",2) scala> data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
1)打開web頁面控制檯(ip:4040端口地址),刷新,會發現剛纔的操做會出如今頁面上。
2)點擊 Description下的 collect at…… 進入job的詳細頁面
3)點擊 DAG Visualization 會出現以下圖形
數據樣例:
hello scala hello spark hello world
建立spark的項目,在scala中建立項目,導入spark相關的jar包。
統計單詞個數,並輸出
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object Driver { def main(args: Array[String]): Unit = { //建立Spark的環境對象,配置運行模式 //1:集羣模式:setMaster("spark://yun01:7077") //2:本地模式:setMaster("local") val conf=new SparkConf().setMaster("spark://yun01:7077").setAppName("wordcount") //獲取Spark上下文對象 val sc=new SparkContext(conf) val data=sc.textFile("hdfs://yun01:9000/words.txt", 2) val result=data.flatMap { x => x.split(" ") }.map { x => (x,1) }.reduceByKey(_+_) result.saveAsTextFile("hdfs://yun01:9000/wcresult") } }
將寫好的項目打成jar,上傳到服務器,進入bin目錄,執行以下命令:
spark-submit --class cn.tedu.WordCountDriver /home/software/spark/conf/wc.jar
注意輸出的目錄必須是不存在的,若是存在會報錯。
數據樣例:
第一列是編號,第二列是數據。
1 16 2 74 3 51 4 35 5 44 6 95 7 5 8 29 10 60 11 13 12 99 13 7 14 26
正確答案:42
①只拿第二列,造成RDD
②類型轉換->String->Int
③和/個數
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object Average { def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("average") val sc=new SparkContext(conf) val data=sc.textFile("d://data/average.txt") val r11=data.map { line => line.split(" ")(1).toInt }.reduce(_+_) val r1=data.flatMap { x => x.split(" ").drop(1) }.map { x => x.toInt }.reduce(_+_) val r2=data.count() val result=r1/r2 println(result) } }
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object AverageDriver { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("AverageDriver") val sc = new SparkContext(conf) val data = sc.textFile("d://average.txt", 3) val ageData = data.map { line => { line.split(" ")(1).toInt } } val ageSum = ageData.mapPartitions { it => { val result = List[Int]() var i = 0 while (it.hasNext) { i += it.next() } result.::(i).iterator } }.reduce(_ + _) val pepopleCount = data.count() val average = ageSum / pepopleCount println(average) } }
數據樣例:
第一列是編號,第二列是性別,第三列是身高。
1 M 174 2 F 165 3 M 172 4 M 180 5 F 160 6 F 162 7 M 172 8 M 191 9 F 175 10 F 167
獲取最大、最小值。
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import breeze.linalg.split import scala.collection.Iterable object MaxMin { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("Max") val sc = new SparkContext(conf) val data = sc.textFile("d://data/MaxMin.txt") //第一種方法 val r1 = data.filter { line => line.contains("M") } .map { line => line.split(" ")(2).toInt } //第二種方法 val r2 = data.filter { line => line.split(" ")(1) .equals("M") }.map { line => line.split(" ")(2).toInt } val max = r1.max() val min = r1.min() println(max + " " + min) } }
獲取最大、最小值的所有信息。
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object MaxMinInfo { def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("Info") val sc=new SparkContext(conf) val data=sc.textFile("d://data/MaxMin.txt") val r1=data.filter { line => line.split(" ")(1) .equals("M") } .map { x => (x.split(" ")(0),x.split(" ")(1),x.split(" ")(2).toInt) } val max=r1.sortBy(x=> -x._3, true, 1).take(1) val min=r1.sortBy(x=> x._3).take(1) println(max(0)._1+" "+max(0)._2+" "+max(0)._3) println(min(0)._1+" "+min(0)._2+" "+min(0)._3) } }
統計單詞出現的次數最多的前三個。
hello world bye world hello hadoop bye hadoop hello world java web hadoop scala java hive hadoop hive redis hbase hello hbase java redis
Top K算法有兩步,一是統計詞頻,二是找出詞頻最高的前K個詞。
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object Topk { def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("topk") val sc=new SparkContext(conf) val data=sc.textFile("d://data/topk.txt") //方法一 val r1=data.flatMap { _.split(" ").map { x => (x,1) } } .reduceByKey(_+_).sortBy(x=> -x._2).take(3) r1.foreach(println(_)) //方法二 val r2=data.flatMap { x => x.split(" ") }.groupBy { x => x } .map{x=>(x._1,x._2.count { x => true })} .map{case(word,count)=>(count,word)}.top(3) r2.foreach(println(_)) //方法三 val wordcunt=data.flatMap { x => x.split(" ") .map { x => (x,1) } }.reduceByKey(_+_) val top3=wordcunt.top(3)(Ordering.by { case(word,count) => count }) top3.foreach(println(_)) } }
Top K的示例模型能夠應用在求過去一段時間消費次數最多的消費者、訪問最頻繁的IP地址和最近、更新、最頻繁的微博等應用場景。
數據樣例:
1 20 8 2 5 11 29 10 7 4 45 6 23 17 19
一共是15個數,正確答案是10
代碼示例
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.dmg.pmml.True object Median { def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("median") val sc=new SparkContext(conf) val data=sc.textFile("d://data/median.txt") //方法一 val r1=data.flatMap { x => x.split(" ") }.count().toInt val zhi=r1/2 val r2=data.flatMap { x => x.split(" ") } .map { x => x.toInt } .sortBy(x=>x, true, 1).take(r1)(zhi) println(r2) //方法二 val sortData=data.flatMap { x => x.split(" ") } .map { x => x.toInt }.sortBy(x=>x) .take(zhi+1).last println(sortData) } }
數據樣例:
aa 12 bb 32 aa 3 cc 43 dd 23 cc 5 cc 8 bb 33 bb 12
要求:先按第一例升序排序,再按第二列降序排序。
①自定義排序類
import scala.math.Ordered class SecondarySort(v1:String,v2:Int) extends Ordered[SecondarySort] with Serializable { var col1=v1 var col2=v2 def compare(that: SecondarySort): Int = { //按第一列作升序排序 val tmp=this.col1.compareTo(that.col1) if(tmp==0){ //按第二列作降序排序 that.col2.compareTo(this.col2) }else{ tmp } } }
②Driver
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object Driver { def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("ssort") val sc=new SparkContext(conf) val data=sc.textFile("d://data/ssort.txt") //用sortByKey來實現二次排序,因此先把數據組成一個二元Tuple //二元Tuple的形式(SecondarySort(col1,col2),line) val result=data.map { line => val infos=line.split(" ") (new SecondarySort(infos(0),infos(1).toInt),line) }.sortByKey().map(x=>x._2) result.foreach(println(_)) } }
數據樣例:
doc1.txt: hello spark hello hadoop doc2.txt: hello hive hello hbase hello spark doc3.txt: hadoop hbase hive scala
最後的結果形式爲:
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object Driver { def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("invert") val sc=new SparkContext(conf) //將指定目錄下的全部文件,返回到一個RDD中 val data=sc.wholeTextFiles("d://data/inverted/*") data.foreach(println(_)) //且分時,先按\r\n,而後按空格切 val result=data.map{case(filePath,text)=> //切分出文件名稱 val fileName=filePath.split("/").last.dropRight(4) (fileName,text) }.flatMap{case(filename,text)=> //切分文件內容 text.split("\r\n").flatMap { line => line.split(" ") } //調換內容和文件名稱的位置 .map { word => (word,filename) } } //經過單詞分組 .groupByKey() //聚合文件名稱 .map{case(word,buffer)=>(word,buffer.toSet.mkString(","))} result.foreach(println) } }
下一篇:Spark的架構