1.從集合中建立RDDshell
val conf = new SparkConf().setAppName("Test").setMaster("local")
//這兩個方法都有第二參數是一個默認值2 分片數量(partition的數量)
//scala集合經過makeRDD建立RDD,底層實現也是parallelize
val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6))
//scala集合經過parallelize建立RDD
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6)) |
2.從外部存儲建立RDD數組
//從外部存儲建立RDD
val rdd3 = sc.textFile("hdfs://hadoop01:8020/word.txt") |
RDD支持兩種操做:轉化操做和行動操做。RDD 的轉化操做是返回一個新的 RDD的操做,好比 map()和 filter(),而行動操做則是向驅動器程序返回結果或把結果寫入外部系統的操做。好比 count() 和 first()。 緩存
Spark採用惰性計算模式,RDD只有第一次在一個行動操做中用到時,纔會真正計算。Spark能夠優化整個計算過程。默認狀況下,Spark 的 RDD 會在你每次對它們進行行動操做時從新計算。若是想在多個行動操做中重用同一個 RDD,可使用 RDD.persist() 讓 Spark 把這個 RDD 緩存下來。函數
Transformation算子oop
RDD中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給Driver的動做時,這些轉換纔會真正運行。這種設計讓Spark更加有效率地運行。大數據
轉換優化 |
含義scala |
map(func)設計 |
返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成 |
filter(func) |
返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成 |
flatMap(func) |
相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素) |
mapPartitions(func) |
相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) |
相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是(Int, Iterator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) |
根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) |
對源RDD和參數RDD求並集後返回一個新的RDD |
intersection(otherDataset) |
對源RDD和參數RDD求交集後返回一個新的RDD |
distinct([numTasks])) |
對源RDD進行去重後返回一個新的RDD |
groupByKey([numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) |
在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
相同的Key值進行聚合操做,在聚合過程當中一樣使用了一箇中立的初始值zeroValue:中立值,定義返回value的類型,並參與運算seqOp:用來在同一個partition中合併值combOp:用來在不一樣partiton中合併值 |
sortByKey([ascending], [numTasks]) |
在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) |
與sortByKey相似,可是更靈活 |
join(otherDataset, [numTasks]) |
在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) |
在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD |
cartesian(otherDataset) |
笛卡爾積 |
pipe(command, [envVars]) |
將一些shell命令用於Spark中生成新的RDD |
coalesce(numPartitions) |
從新分區 |
repartition(numPartitions) |
從新分區 |
repartitionAndSortWithinPartitions(partitioner) |
從新分區和排序 |
Action算子
在RDD上運行計算,並返回結果給Driver或寫入文件系統
動做 |
含義 |
reduce(
func
|
經過func函數彙集RDD中的全部元素,這個功能必須是可交換且可並聯的 |
collect() |
在驅動程序中,以數組的形式返回數據集的全部元素 |
count() |
返回RDD的元素個數 |
first() |
返回RDD的第一個元素(相似於take(1)) |
take(
n
|
返回一個由數據集的前n個元素組成的數組 |
takeSample(
withReplacement
num
seed
|
返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(
n
[ordering]
|
takeOrdered和top相似,只不過以和top相反的順序返回元素 |
saveAsTextFile(
path
|
將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本 |
saveAsSequenceFile(
path
|
將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。 |
saveAsObjectFile(
path
|
|
countByKey() |
針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。 |
foreach(
func
|
在數據集的每個元素上,運行函數func進行更新。 |
RDD支持兩種操做:轉化操做和行動操做。RDD 的轉化操做是返回一個新的 RDD的操做,好比 map()和 filter(),而行動操做則是向驅動器程序返回結果或把結果寫入外部系統的操做。好比 count() 和 first()。
Spark採用惰性計算模式,RDD只有第一次在一個行動操做中用到時,纔會真正計算。Spark能夠優化整個計算過程。默認狀況下,Spark 的 RDD 會在你每次對它們進行行動操做時從新計算。若是想在多個行動操做中重用同一個 RDD,可使用 RDD.persist() 讓 Spark 把這個 RDD 緩存下來。
Transformation算子****
RDD中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給Driver的動做時,這些轉換纔會真正運行。這種設計讓Spark更加有效率地運行。
轉換 | 含義 |
---|---|
map(func) | 返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成 |
filter(func) | 返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成 |
flatMap(func) | 相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素) |
mapPartitions(func) | 相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是(Int, Iterator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) | 對源RDD和參數RDD求並集後返回一個新的RDD |
intersection(otherDataset) | 對源RDD和參數RDD求交集後返回一個新的RDD |
distinct([numTasks])) | 對源RDD進行去重後返回一個新的RDD |
groupByKey([numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 相同的Key值進行聚合操做,在聚合過程當中一樣使用了一箇中立的初始值zeroValue:中立值,定義返回value的類型,並參與運算seqOp:用來在同一個partition中合併值combOp:用來在不一樣partiton中合併值 |
sortByKey([ascending], [numTasks]) | 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 與sortByKey相似,可是更靈活 |
join(otherDataset, [numTasks]) | 在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable,Iterable))類型的RDD |
cartesian(otherDataset) | 笛卡爾積 |
pipe(command, [envVars]) | 將一些shell命令用於Spark中生成新的RDD |
coalesce(numPartitions) | 從新分區 |
repartition(numPartitions) | 從新分區 |
repartitionAndSortWithinPartitions(partitioner) | 從新分區和排序 |
** Action算子**
在RDD上運行計算,並返回結果給Driver或寫入文件系統
動做 | 含義 |
---|---|
reduce(
func
) |
經過func函數彙集RDD中的全部元素,這個功能必須是可交換且可並聯的 |
collect() | 在驅動程序中,以數組的形式返回數據集的全部元素 |
count() | 返回RDD的元素個數 |
first() | 返回RDD的第一個元素(相似於take(1)) |
take(
n
) |
返回一個由數據集的前n個元素組成的數組 |
takeSample(
withReplacement
,
num
, [
seed
]) |
返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(
n
,
[ordering]
) |
takeOrdered和top相似,只不過以和top相反的順序返回元素 |
saveAsTextFile(
path
) |
將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本 |
saveAsSequenceFile(
path
) |
將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。 |
saveAsObjectFile(
path
) |
|
countByKey() | 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。 |
foreach(
func
) |
在數據集的每個元素上,運行函數func進行更新。 |