spark 運行模式

1. Spark中的基本概念javascript

在Spark中,有下面的基本概念。
Application:基於Spark的用戶程序,包含了一個driver program和集羣中多個executor
Driver Program:運行Application的main()函數並建立SparkContext。一般SparkContext表明driver program
Executor:爲某Application運行在worker node上的餓一個進程。該進程負責運行Task,並負責將數據存在內存或者磁盤上。每一個Application都有本身獨立的executors
Cluster Manager: 在集羣上得到資源的外部服務(例如 Spark Standalon,Mesos、Yarn)
Worker Node: 集羣中任何可運行Application代碼的節點
Task:被送到executor上執行的工做單元。
Job:能夠被拆分紅Task並行計算的工做單元,通常由Spark Action觸發的一次執行做業。
Stage:每一個Job會被拆分紅不少組Task,每組任務被稱爲stage,也可稱TaskSet。該術語能夠常常在日誌中看打。
RDD :Spark的基本計算單元,經過Scala集合轉化、讀取數據集生成或者由其餘RDD通過算子操做獲得。
html


2. Spark應用框架





客戶Spark程序(Driver Program)來操做Spark集羣是經過SparkContext對象來進行,SparkContext做爲一個操做和調度的總入口,在初始化過程當中集羣管理器會建立DAGScheduler做業調度和TaskScheduler任務調度。java

DAGScheduler做業調度模塊是基於Stage的高層調度模塊(參考:Spark分析之DAGScheduler),DAG全稱 Directed Acyclic Graph,有向無環圖。簡單的來講,就是一個由頂點和有方向性的邊構成的圖中,從任意一個頂點出發,沒有任何一條路徑會將其帶回到出發的頂點。它爲每一個Spark Job計算具備依賴關係的多個Stage任務階段(一般根據Shuffle來劃分Stage,如groupByKey, reduceByKey等涉及到shuffle的transformation就會產生新的stage),而後將每一個Stage劃分爲具體的一組任務,以TaskSets的形式提交給底層的任務調度模塊來具體執行。其中,不一樣stage以前的RDD爲寬依賴關係。 TaskScheduler任務調度模塊負責具體啓動任務,監控和彙報任務運行狀況。node

建立SparkContext通常要通過下面幾個步驟:web

a). 導入Spark的類和隱式轉換
算法

[java] view plaincopyprint?shell

  1. import org.apache.spark.{SparkContext, SparkConf}  apache

  2. import org.apache.spark.SparkContext._  api

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

b). 構建Spark應用程序的應用信息對象SparkConf 數組

[java] view plaincopyprint?

  1. val conf = new SparkConf().setAppName(appName).setMaster(master_url)  

val conf = new SparkConf().setAppName(appName).setMaster(master_url)

c). 利用SparkConf對象來初始化SparkContext

[java] view plaincopyprint?

  1. val sc = new SparkContext(conf)  

val sc = new SparkContext(conf)

d). 建立RDD、並執行相應的Transformation和action並獲得最終結果。
e). 關閉Context

在完成應用的設計和編寫後,使用spark-submit來提交應用的jar包。spark-submit的命令行參考以下:

Submitting Applications

[javascript] view plaincopyprint?

  1. ./bin/spark-submit   

  2.   --class <main-class>  

  3.   --master <master-url>   

  4.   --deploy-mode <deploy-mode>   

  5.   ... # other options  

  6.   <application-jar>   

  7.   [application-arguments]  

./bin/spark-submit 
  --class <main-class>
  --master <master-url> 
  --deploy-mode <deploy-mode> 
  ... # other options
  <application-jar> 
  [application-arguments]


Spark的運行模式取決於傳遞給SparkContext的MASTER環境變量的值。master URL能夠是如下任一種形式:
Master URL 含義
local 使用一個Worker線程本地化運行SPARK(徹底不併行)
local[*]使用邏輯CPU個數數量的線程來本地化運行Spark
local[K]使用K個Worker線程本地化運行Spark(理想狀況下,K應該根據運行機器的CPU核數設定)
spark://HOST:PORT鏈接到指定的Spark standalone master。默認端口是7077.
yarn-client以客戶端模式鏈接YARN集羣。集羣的位置能夠在HADOOP_CONF_DIR 環境變量中找到。
yarn-cluster 以集羣模式鏈接YARN集羣。集羣的位置能夠在HADOOP_CONF_DIR 環境變量中找到。
mesos://HOST:PORT 鏈接到指定的Mesos集羣。默認接口是5050.

而spark-shell會在啓動的時候自動構建SparkContext,名稱爲sc。


3. RDD的創造

Spark全部的操做都圍繞彈性分佈式數據集(RDD)進行,這是一個有容錯機制並能夠被並行操做的元素集合,具備只讀、分區、容錯、高效、無需物化、能夠緩存、RDD依賴等特徵。

目前有兩種類型的基礎RDD:

並行集合(Parallelized Collections):接收一個已經存在的Scala集合,而後進行各類並行計算。

Hadoop數據集(Hadoop Datasets) :在一個文件的每條記錄上運行函數。只要文件系統是HDFS,或者hadoop支持的任意存儲系統便可。 

這兩種類型的RDD均可以經過相同的方式進行操做,從而得到子RDD等一系列拓展,造成lineage血統關係圖。


(1). 並行化集合
並行化集合是經過調用SparkContext的parallelize方法,在一個已經存在的Scala集合上建立的(一個Seq對象)。集合的對象將會被拷貝,建立出一個能夠被並行操做的分佈式數據集。例如,下面的解釋器輸出,演示瞭如何從一個數組建立一個並行集合。
例如:

val rdd = sc.parallelize(Array(1 to 10)) 根據能啓動的executor的數量來進行切分多個slice,每個slice啓動一個Task來進行處理。

val rdd = sc.parallelize(Array(1 to 10), 5) 指定了partition的數量


(2). Hadoop數據集

Spark能夠將任何Hadoop所支持的存儲資源轉化成RDD,如本地文件(須要網絡文件系統,全部的節點都必須能訪問到)、HDFS、Cassandra、HBase、Amazon S3等,Spark支持文本文件、SequenceFiles和任何Hadoop InputFormat格式。
a). 使用textFile()方法能夠將本地文件或HDFS文件轉換成RDD
支持整個文件目錄讀取,文件能夠是文本或者壓縮文件(如gzip等,自動執行解壓縮並加載數據)。如textFile(」file:///dfs/data」)
支持通配符讀取,例如:

[java] view plaincopyprint?

  1. val rdd1 = sc.textFile("file:///root/access_log/access_log*.filter");  

  2. val rdd2=rdd1.map(_.split("t")).filter(_.length==6)  

  3. rdd2.count()  

  4. ......  

  5. 14/08/20 14:44:48 INFO HadoopRDD: Input split: file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903  

  6. ......  

val rdd1 = sc.textFile("file:///root/access_log/access_log*.filter");
val rdd2=rdd1.map(_.split("t")).filter(_.length==6)
rdd2.count()
......
14/08/20 14:44:48 INFO HadoopRDD: Input split: file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903
......


textFile()可選第二個參數slice,默認狀況下爲每個block分配一個slice。用戶也能夠經過slice指定更多的分片,但不能使用少於HDFS block的分片數。

b). 使用wholeTextFiles()讀取目錄裏面的小文件,返回(用戶名、內容)對
c). 使用sequenceFile[K,V]()方法能夠將SequenceFile轉換成RDD。SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。
d). 使用SparkContext.hadoopRDD方法能夠將其餘任何Hadoop輸入類型轉化成RDD使用方法。通常來講,HadoopRDD中每個HDFS block都成爲一個RDD分區。
此外,經過Transformation能夠將HadoopRDD等轉換成FilterRDD(依賴一個父RDD產生)和JoinedRDD(依賴全部父RDD)等。


4. RDD操做



RDD支持兩類操做:
轉換(transformation)現有的RDD通關轉換生成一個新的RDD,轉換是延時執行(lazy)的。
動做(actions)在RDD上運行計算後,返回結果給驅動程序或寫入文件系統。

例如,map就是一種transformation,它將數據集每個元素都傳遞給函數,並返回一個新的分佈數據集表示結果。

reduce則是一種action,經過一些函數將全部的元素疊加起來,並將最終結果返回給Driver程序。


Transformations

(1). map(func)

Return a new distributed dataset formed by passing each element of the source through a function func.
返回一個新分佈式數據集,由每個輸入元素通過func函數轉換後組成

2). filter(func)

Return a new dataset formed by selecting those elements of the source on which func returns true.
返回一個新數據集,由通過func函數計算後返回值爲true的輸入元素組成

[java] view plaincopyprint?

  1. val num=sc.parallelize(1 to 100)  

  2. val num2 = num.map(_*2)  

  3. val num3 = num2.filter(_ % 3 == 0)  

  4. ......  

  5. num3.collect  

  6. //res: Array[Int] = Array(6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 102, 108, 114, 120, 126, 132, 138, 144, 150, 156, 162, 168, 174, 180, 186, 192, 198)   

  7. num3.toDebugString  

  8. //res5: String =   

  9. //FilteredRDD[20] at filter at <console>:16 (48 partitions)   

  10. //  MappedRDD[19] at map at <console>:14 (48 partitions)   

  11. //    ParallelCollectionRDD[18] at parallelize at <console>:12 (48 partitions)  

val num=sc.parallelize(1 to 100)
val num2 = num.map(_*2)
val num3 = num2.filter(_ % 3 == 0)
......
num3.collect
//res: Array[Int] = Array(6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 102, 108, 114, 120, 126, 132, 138, 144, 150, 156, 162, 168, 174, 180, 186, 192, 198)
num3.toDebugString
//res5: String =
//FilteredRDD[20] at filter at <console>:16 (48 partitions)
//  MappedRDD[19] at map at <console>:14 (48 partitions)
//    ParallelCollectionRDD[18] at parallelize at <console>:12 (48 partitions)


(3). flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(所以func應該返回一個序列,而不是單一元素

[java] view plaincopyprint?

  1. val kv=sc.parallelize(List(List(1,2),List(3,4),List(3,6,8)))  

  2. kv.flatMap(x=>x.map(_+1)).collect  

  3. //res0: Array[Int] = Array(2, 3, 4, 5, 4, 7, 9)   

  4.   

  5. //Word Count   

  6. sc.textFile('hdfs://hdp01:9000/home/debugo/*.txt').flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)  

val kv=sc.parallelize(List(List(1,2),List(3,4),List(3,6,8)))
kv.flatMap(x=>x.map(_+1)).collect
//res0: Array[Int] = Array(2, 3, 4, 5, 4, 7, 9)

//Word Count
sc.textFile('hdfs://hdp01:9000/home/debugo/*.txt').flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)


 

(4). mapPartitions(func)

mapPartitions(func):和map很像,可是map是每一個element,而mapPartitions是每一個partition


Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
相似於map,但獨立地在RDD的每個分塊上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]。mapPartitions將會被每個數據集分區調用一次。各個數據集分區的所有內容將做爲順序的數據流傳入函數func的參數中,func必須返回另外一個Iterator[T]。被合併的結果自動轉換成爲新的RDD。下面的測試中,元組(3,4)和(6,7)將因爲咱們選擇的分區策略和方法而消失。
The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose

[java] view plaincopyprint?

  1. val nums = sc . parallelize (1 to 9 , 3)  

  2. def myfunc[T] ( iter : Iterator [T] ) : Iterator [( T , T ) ] = {  

  3.     var res = List [(T , T) ]()  

  4.     var pre = iter.next  

  5.     while ( iter.hasNext )  

  6.     {  

  7.         val cur = iter . next ;  

  8.         res .::= ( pre , cur )  

  9.         pre = cur ;  

  10.     }  

  11.     res . iterator  

  12. }  

  13. //myfunc: [T](iter: Iterator[T])Iterator[(T, T)]   

  14. nums.mapPartitions(myfunc).collect  

  15. //res12: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))  

val nums = sc . parallelize (1 to 9 , 3)
def myfunc[T] ( iter : Iterator [T] ) : Iterator [( T , T ) ] = {
    var res = List [(T , T) ]()
    var pre = iter.next
    while ( iter.hasNext )
    {
        val cur = iter . next ;
        res .::= ( pre , cur )
        pre = cur ;
    }
    res . iterator
}
//myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
nums.mapPartitions(myfunc).collect
//res12: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))



(5). mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T=>) ==> Iterator<U=> when running on an RDD of type T.
相似於mapPartitions, 其函數原型是:
def mapPartitionsWithIndex [ U : ClassTag ]( f : ( Int , Iterator [ T ]) => Iterator [ U ] , preservesPartitioning : Boolean = false ) : RDD [ U ],
mapPartitionsWithIndex的func接受兩個參數,第一個參數是分區的索引,第二個是一個數據集分區的迭代器。而輸出的是一個包含通過該函數轉換的迭代器。下面測試中,將分區索引和分區數據一塊兒輸出。

[java] view plaincopyprint?

  1. val x = sc . parallelize ( List (1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10) , 3)  

  2. def myfunc ( index : Int , iter : Iterator [ Int ]) : Iterator [ String ] = {  

  3. iter . toList . map ( x => index + "-" + x ) . iterator  

  4. }  

  5. //myfunc: (index: Int, iter: Iterator[Int])Iterator[String]   

  6. x . mapPartitionsWithIndex ( myfunc ) . collect()  

  7. res: Array[String] = Array(0-10-20-31-41-51-62-72-82-92-10)  

val x = sc . parallelize ( List (1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10) , 3)
def myfunc ( index : Int , iter : Iterator [ Int ]) : Iterator [ String ] = {
iter . toList . map ( x => index + "-" + x ) . iterator
}
//myfunc: (index: Int, iter: Iterator[Int])Iterator[String]
x . mapPartitionsWithIndex ( myfunc ) . collect()
res: Array[String] = Array(0-1, 0-2, 0-3, 1-4, 1-5, 1-6, 2-7, 2-8, 2-9, 2-10)


(6). sample(withReplacement,fraction, seed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
根據fraction指定的比例,對數據進行採樣,能夠選擇是否用隨機數進行替換,seed用於指定隨機數生成器種子。

[java] view plaincopyprint?

  1. val a = sc . parallelize (1 to 10000 , 3)  

  2. a . sample ( false , 0.1 , 0) . count  

  3. res0 : Long = 960  

  4. a . sample ( true , 0.7 , scala.util.Random.nextInt(10000)) . count  

  5. res1: Long = 7073  

val a = sc . parallelize (1 to 10000 , 3)
a . sample ( false , 0.1 , 0) . count
res0 : Long = 960
a . sample ( true , 0.7 , scala.util.Random.nextInt(10000)) . count
res1: Long = 7073


(7). union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.
返回一個新的數據集,新數據集是由源數據集和參數數據集聯合而成。


(8). intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

(9). distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.
返回一個包含源數據集中全部不重複元素的新數據集

[java] view plaincopyprint?

  1. val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))  

  2. val kv2=sc.parallelize(List(("A",4),("A"2),("C",3),("A",4),("B",5)))  

  3. kv2.distinct.collect  

  4. res0: Array[(String, Int)] = Array((A,4), (C,3), (B,5), (A,2))  

  5. kv1.union(kv2).collect  

  6. res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,4), (A,2), (C,3), (A,4), (B,5))  

  7. kv1.union(kv2).collect.distinct  

  8. res2: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,2))  

  9. kv1.intersection(kv2).collect  

  10. res43: Array[(String, Int)] = Array((A,4), (C,3), (B,5))  

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
val kv2=sc.parallelize(List(("A",4),("A", 2),("C",3),("A",4),("B",5)))
kv2.distinct.collect
res0: Array[(String, Int)] = Array((A,4), (C,3), (B,5), (A,2))
kv1.union(kv2).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,4), (A,2), (C,3), (A,4), (B,5))
kv1.union(kv2).collect.distinct
res2: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,2))
kv1.intersection(kv2).collect
res43: Array[(String, Int)] = Array((A,4), (C,3), (B,5))


(10.)groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

在一個(K,V)對的數據集上調用,返回一個(K,Seq[V])對的數據集
注意:默認狀況下,只有8個並行任務來作操做,可是你能夠傳入一個可選的numTasks參數來改變它。若是分組是用來計算聚合操做(如sum或average),那麼應該使用reduceByKey 或combineByKey 來提供更好的性能。
groupByKey, reduceByKey等transformation操做涉及到了shuffle操做,因此這裏引出兩個概念寬依賴和窄依賴。




窄依賴(narrow dependencies)
子RDD的每一個分區依賴於常數個父分區(與數據規模無關)
輸入輸出一對一的算子,且結果RDD的分區結構不變。主要是map/flatmap
輸入輸出一對一的算子,但結果RDD的分區結構發生了變化,如union/coalesce
從輸入中選擇部分元素的算子,如filter、distinct、substract、sample


寬依賴(wide dependencies)
子RDD的每一個分區依賴於全部的父RDD分區
對單個RDD基於key進行重組和reduce,如groupByKey,reduceByKey
對兩個RDD基於key進行join和重組,如join
通過大量shuffle生成的RDD,建議進行緩存。這樣避免失敗後從新計算帶來的開銷。
注意:reduce是一個action,和reduceByKey徹底不一樣。


(11).reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like ingroupByKey, the number of reduce tasks is configurable through an optional second argument.
在一個(K,V)對的數據集上調用時,返回一個(K,V)對的數據集,使用指定的reduce函數,將相同key的值聚合到一塊兒。相似groupByKey,reduce任務個數是能夠經過第二個可選參數來配置的


(12).sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
在一個(K,V)對的數據集上調用,K必須實現Ordered接口,返回一個按照Key進行排序的(K,V)對數據集。升序或降序由ascending布爾參數決定

[java] view plaincopyprint?

  1. val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))  

  2. res0: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3))  

  3. kv1.sortByKey().collect //注意sortByKey的小括號不能省   

  4. res1: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3))  

  5. kv1.groupByKey().collect  

  6. res1: Array[(String, Iterable[Int])] = Array((A,ArrayBuffer(14)), (B,ArrayBuffer(25)), (C,ArrayBuffer(3)))  

  7. kv1.reduceByKey(_+_).collect  

  8. res2: Array[(String, Int)] = Array((A,5), (B,7), (C,3))  

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
res0: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3))
kv1.sortByKey().collect //注意sortByKey的小括號不能省
res1: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3))
kv1.groupByKey().collect
res1: Array[(String, Iterable[Int])] = Array((A,ArrayBuffer(1, 4)), (B,ArrayBuffer(2, 5)), (C,ArrayBuffer(3)))
kv1.reduceByKey(_+_).collect
res2: Array[(String, Int)] = Array((A,5), (B,7), (C,3))


(13). join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported through leftOuterJoin and rightOuterJoin.
在類型爲(K,V)和(K,W)類型的數據集上調用時,返回一個相同key對應的全部元素對在一塊兒的(K, (V, W))數據集

(14).cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable, Iterable) tuples. This operation is also called groupWith.
在類型爲(K,V)和(K,W)的數據集上調用,返回一個 (K, Seq[V], Seq[W])元組的數據集。這個操做也能夠稱之爲groupwith

[java] view plaincopyprint?

  1. val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))  

  2. val kv3=sc.parallelize(List(("A",10),("B",20),("D",30)))  

  3. kv1.join(kv3).collect  

  4. res16: Array[(String, (Int, Int))] = Array((A,(1,10)), (A,(4,10)), (B,(2,20)), (B,(5,20)))  

  5. kv1.cogroup(kv3).collect  

  6. res0: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((A,(ArrayBuffer(14),ArrayBuffer(10))), (B,(ArrayBuffer(25),ArrayBuffer(20))), (C,(ArrayBuffer(3),ArrayBuffer())), (D,(ArrayBuffer(),ArrayBuffer(30))))  

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
val kv3=sc.parallelize(List(("A",10),("B",20),("D",30)))
kv1.join(kv3).collect
res16: Array[(String, (Int, Int))] = Array((A,(1,10)), (A,(4,10)), (B,(2,20)), (B,(5,20)))
kv1.cogroup(kv3).collect
res0: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((A,(ArrayBuffer(1, 4),ArrayBuffer(10))), (B,(ArrayBuffer(2, 5),ArrayBuffer(20))), (C,(ArrayBuffer(3),ArrayBuffer())), (D,(ArrayBuffer(),ArrayBuffer(30))))


(15).cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
笛卡爾積,在類型爲 T 和 U 類型的數據集上調用時,返回一個 (T, U)對數據集(兩兩的元素對)

(16). pipe(command, [envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
經過POSIX 管道來將每一個RDD分區的數據傳入一個shell命令(例如Perl或bash腳本)。RDD元素會寫入到進程的標準輸入,其標準輸出會做爲RDD字符串返回。

(17).coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
將RDD分區的數量下降爲numPartitions,對於通過過濾後的大數據集的在線處理更加有效。

(18).repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
隨機從新shuffle RDD中的數據,並建立numPartitions個分區。這個操做總會經過網絡來shuffle所有數據。


Actions

(19). reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
經過函數func(接受兩個參數,返回一個參數)彙集數據集中的全部元素。這個功能必須可交換且可關聯的,從而能夠正確的被並行執行。

(20). collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
在驅動程序中,以數組的形式,返回數據集的全部元素。這一般會在使用filter或者其它操做並返回一個足夠小的數據子集後再使用會比較有用。

(21). count()

Return the number of elements in the dataset.
返回數據集的元素的個數。

(22). first()

Return the first element of the dataset (similar to take(1)).
返回數據集的第一個元素(相似於take(1))

(23). take(n)

Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
返回一個由數據集的前n個元素組成的數組。注意,這個操做目前並不是並行執行,而是由驅動程序計算全部的元素

(24). countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
對(K,V)類型的RDD有效,返回一個(K,Int)對的Map,表示每個key對應的元素個數

(25). foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
在數據集的每個元素上,運行函數func進行更新。這一般用於邊緣效果,例如更新一個累加器,或者和外部存儲系統進行交互,例如HBase.

[java] view plaincopyprint?

  1. val num=sc.parallelize(1 to 10)  

  2. num.reduce (_ + _)  

  3. res1: Int = 55  

  4. num.take(5)  

  5. res2: Array[Int] = Array(12345)  

  6. num.first  

  7. res3: Int = 1  

  8. num.count  

  9. res4: Long = 10  

  10. num.take(5).foreach(println)  

  11. 1  

  12. 2  

  13. 3  

  14. 4  

  15. 5  

  16. val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5),("A",7),("B",7)))  

  17. val kv1_count=kv1.countByKey()  

  18. kv1_count: scala.collection.Map[String,Long] = Map(A -> 3, C -> 1, B -> 3)  

val num=sc.parallelize(1 to 10)
num.reduce (_ + _)
res1: Int = 55
num.take(5)
res2: Array[Int] = Array(1, 2, 3, 4, 5)
num.first
res3: Int = 1
num.count
res4: Long = 10
num.take(5).foreach(println)
1
2
3
4
5
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5),("A",7),("B",7)))
val kv1_count=kv1.countByKey()
kv1_count: scala.collection.Map[String,Long] = Map(A -> 3, C -> 1, B -> 3)



(26). takeSample(withReplacement,num, seed)

Return an array with a random sample of num elements of the dataset, with or without replacement, using the given random number generator seed.
返回一個數組,在數據集中隨機採樣num個元素組成,能夠選擇是否用隨機數替換不足的部分,Seed用於指定的隨機數生成器種子

(27). takeOrdered(n, [ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.
返回一個由數據集的前n個元素組成的有序數組,使用天然序或自定義的比較器。

(28). saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
將數據集的元素,以textfile的形式,保存到本地文件系統,HDFS或者任何其它hadoop支持的文件系統。對於每一個元素,Spark將會調用toString方法,將它轉換爲文件中的文本行

(29). saveAsSequenceFile(path)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
將數據集的元素,以Hadoop sequencefile的格式,保存到指定的目錄下,本地系統,HDFS或者任何其它hadoop支持的文件系統。這個只限於由key-value對組成,並實現了Hadoop的Writable接口,或者隱式的能夠轉換爲Writable的RDD。(Spark包括了基本類型的轉換,例如Int,Double,String,等等)

(30). saveAsObjectFile(path)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
將數據集元素寫入Java序列化的能夠被SparkContext.objectFile()加載的簡單格式中。
固然,transformation和action的操做遠遠不止這些。其餘請參考API文檔:
RDD API


5. RDD緩存


Spark可使用 persist 和 cache 方法將任意 RDD 緩存到內存、磁盤文件系統中。緩存是容錯的,若是一個 RDD 分片丟失,能夠經過構建它的 transformation自動重構。被緩存的 RDD 被使用的時,存取速度會被大大加速。通常的executor內存60%作 cache, 剩下的40%作task。


Spark中,RDD類可使用cache() 和 persist() 方法來緩存。cache()是persist()的特例,將該RDD緩存到內存中。而persist能夠指定一個StorageLevel。StorageLevel的列表能夠在StorageLevel 伴生單例對象中找到:


[java] view plaincopyprint?

  1. object StorageLevel {  

  2.   val NONE = new StorageLevel(falsefalsefalsefalse)  

  3.   val DISK_ONLY = new StorageLevel(truefalsefalsefalse)  

  4.   val DISK_ONLY_2 = new StorageLevel(truefalsefalsefalse2)  

  5.   val MEMORY_ONLY = new StorageLevel(falsetruefalsetrue)  

  6.   val MEMORY_ONLY_2 = new StorageLevel(falsetruefalsetrue2)  

  7.   val MEMORY_ONLY_SER = new StorageLevel(falsetruefalsefalse)  

  8.   val MEMORY_ONLY_SER_2 = new StorageLevel(falsetruefalsefalse2)  

  9.   val MEMORY_AND_DISK = new StorageLevel(truetruefalsetrue)  

  10.   val MEMORY_AND_DISK_2 = new StorageLevel(truetruefalsetrue2)  

  11.   val MEMORY_AND_DISK_SER = new StorageLevel(truetruefalsefalse)  

  12.   val MEMORY_AND_DISK_SER_2 = new StorageLevel(truetruefalsefalse2)  

  13.   val OFF_HEAP = new StorageLevel(falsefalsetruefalse// Tachyon   

  14. }  

  15. class StorageLevel private(    

  16.      private var useDisk_      :    Boolean,    

  17.      private var useMemory_   :  Boolean,    

  18.      private var useOffHeap_   : Boolean,    

  19.      private var deserialized_ : Boolean,    

  20.      private var replication_  : Int = 1  

  21. )  

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false) // Tachyon
}
class StorageLevel private(  
     private var useDisk_      :    Boolean,  
     private var useMemory_   :  Boolean,  
     private var useOffHeap_   : Boolean,  
     private var deserialized_ : Boolean,  
     private var replication_  : Int = 1
)




Spark的不一樣StorageLevel ,目的知足內存使用和CPU效率權衡上的不一樣需求。咱們建議經過如下的步驟來進行選擇:
·若是你的RDDs能夠很好的與默認的存儲級別(MEMORY_ONLY)契合,就不須要作任何修改了。這已是CPU使用效率最高的選項,它使得RDDs的操做盡量的快。
·若是不行,試着使用MEMORY_ONLY_SER而且選擇一個快速序列化的庫使得對象在有比較高的空間使用率的狀況下,依然能夠較快被訪問。
·儘量不要存儲到硬盤上,除非計算數據集的函數,計算量特別大,或者它們過濾了大量的數據。不然,從新計算一個分區的速度,和與從硬盤中讀取基本差很少快。
·若是你想有快速故障恢復能力,使用複製存儲級別(例如:用Spark來響應web應用的請求)。全部的存儲級別都有經過從新計算丟失數據恢復錯誤的容錯機制,可是複製存儲級別可讓你在RDD上持續的運行任務,而不須要等待丟失的分區被從新計算。
·若是你想要定義你本身的存儲級別(好比複製因子爲3而不是2),可使用StorageLevel 單例對象的apply()方法。
在不會使用cached RDD的時候,及時使用unpersist方法來釋放它。



6. RDD的共享變量

在應用開發中,一個函數被傳遞給Spark操做(例如map和reduce),在一個遠程集羣上運行,它實際上操做的是這個函數用到的全部變量的獨立拷貝。這些變量會被拷貝到每一臺機器。一般看來,在任務之間中,讀寫共享變量顯然不夠高效。然而,Spark仍是爲兩種常見的使用模式,提供了兩種有限的共享變量:廣播變量和累加器。


(1). 廣播變量(Broadcast Variables)
– 廣播變量緩存到各個節點的內存中,而不是每一個 Task
– 廣播變量被建立後,能在集羣中運行的任何函數調用
– 廣播變量是隻讀的,不能在被廣播後修改
– 對於大數據集的廣播, Spark 嘗試使用高效的廣播算法來下降通訊成本
使用方法:


[java] view plaincopyprint?

  1. val broadcastVar = sc.broadcast(Array(123))  

val broadcastVar = sc.broadcast(Array(1, 2, 3))


(2). 累加器
累加器只支持加法操做,能夠高效地並行,用於實現計數器和變量求和。Spark 原生支持數值類型和標準可變集合的計數器,但用戶能夠添加新的類型。只有驅動程序才能獲取累加器的值
使用方法:

[java] view plaincopyprint?

  1. val accum = sc.accumulator(0)  

  2. sc.parallelize(Array(1234)).foreach(x => accum  + = x)  

  3. accum.value  

  4. val num=sc.parallelize(1 to 100)  

val accum = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum  + = x)
accum.value
val num=sc.parallelize(1 to 100)
相關文章
相關標籤/搜索