1. Spark RDD 建立操做html
1.1 數據集合es6
parallelize 能夠建立一個可以並行操做的RDD。其函數定義以下:sql
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
由定義可見有兩個參數,第一個參數指定數據集合,第二個參數指定數據分區。shell
實例:由普通數組建立RDD apache
scala> val data=Array(1,2,3,4,5,6,7,8,9)
data: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> val rdd=sc.parallelize(data,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
1.2 外部數據源數組
textFiles 能夠經過Hadoop支持的外部數據源(包括本地文件系統、HDFS、Cassandra、HBase等)創建RDD。其定義以下:數據結構
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String]
第一個參數指定數據路徑,第二個參數指定數據分區。dom
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
scala> sc.defaultParallelism 機器學習
res0: Int = 2
由以上可知,若是第二個參數若是不設置默認爲2,默認的並行度最大不超過2.分佈式
實例1:讀取本地文件建立RDD
scala> val rdd1=sc.textFile("file:///usr/local/doc/name1.txt")
rdd1: org.apache.spark.rdd.RDD[String] = file:///usr/local/doc/name1.txt MapPartitionsRDD[15] at textFile at <console>:24
scala> rdd1.collect
res7: Array[String] = Array(james, jack, jenny)
實例2:讀取hdfs上的文件建立RDD
[root@master doc]# hdfs dfs -cat /1.txt
hello world
hello terry
hello james
hello curry
hello bill
hello kact
hello james
scala> val rdd2=sc.textFile("hdfs://master:9000/1.txt")
rdd2: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/1.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd2.collect
res0: Array[String] = Array(hello world, hello terry, hello james, hello curry, hello bill, hello kact, hello james)
2. Spark RDD 轉換操做
2.1 map(func)
對集合的每個元素運用某個函數操做,而後將結果做爲一個新的列表返回。
實例:將列表中每一個元素值乘以2
scala> val rdd1=sc.parallelize(1 to 6,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd2=rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
scala> rdd2.collect
res0: Array[Int] = Array(2, 4, 6, 8, 10, 12)
2.2 filter(func)
對RDD元素進行過濾,返回值爲true的元素組成的一個新的數據集。
實例:返回數據集中的偶數
scala> val rdd1=sc.parallelize(1 to 9,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> val rdd3=rdd1.filter(x=>x%2==0)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:25
scala> rdd3.collect
res3: Array[Int] = Array(2, 4, 6, 8)
2.3 flatMap(func)
對集合中每一個元素運用某個函數操做(每一個元素會被映射爲0到多個輸出元素)後,將結果扁平化組成一個新的集合。
實例2:每一個元素映射爲多個元素
scala> val rdd1=sc.parallelize(1 to 3,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd4=rdd1.flatMap(x=>x to 5)
rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at flatMap at <console>:25
scala> rdd4.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 2, 3, 4, 5, 3, 4, 5)
2.4 mapPartitions(func)
與map相似,map函數是應用到每一個元素,而mapPartitions的輸入函數是每一個分區的數據,把每一個分區中的內容做爲總體來處理的。 當map裏面有比較耗時的初始化操做時,好比鏈接db,能夠採用mapPartitions,它對每一個partition操做一次,其函數的輸入與輸出都是iterator類型。其定義以下:
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
實例以下:
scala> val rdd1=sc.parallelize(1 to 9,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> def myfunc[T](iter:Iterator[T]):Iterator[(T,T)]={
| var res=List[(T,T)]()
| var pre=iter.next
| while (iter.hasNext) {
| val cur=iter.next
| res.::=(pre,cur)
| pre=cur
| }
| res.iterator
| }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
scala> rdd1.mapPartitions(myfunc)
res2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at mapPartitions at <console>:28
scala> res2.collect()
res3: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
2.5 mapPartitionsWithIndex
(func)
與 mapPartitions 相似,其傳入的函數除了數據集,還需一個分區的index.其定義以下:
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
實例以下:
scala> val rdd1=sc.parallelize(1 to 9,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val mapReslut=rdd1.mapPartitionsWithIndex{
| (index,iterator)=>{
| val list=iterator.toList
| list.map(x=>x +"->"+index).iterator
| }
| }
mapReslut: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at mapPartitionsWithIndex at <console>:25
scala> mapReslut.collect
res6: Array[String] = Array(1->0, 2->0, 3->0, 4->1, 5->1, 6->1, 7->2, 8->2, 9->2)
2.6 sample(withReplacement, fraction, seed)
根據給定的隨機種子seed,隨機抽樣出數量爲fraction的數據。其定義以下:
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
withReplacement取回的數據是否放回抽樣,fraction:比例,0.1表示10%,seed:隨機種子,相同的seed獲得的隨機序列同樣。
實例:
scala> val rdd1=sc.parallelize(1 to 1000,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd1.sample(false,0.1,1).count
res1: Long = 116
2.7 union(otherDataset)
兩個數據集合並,不去重,返回一個新的數據集,即全部舊的rdd的partition,直接移到新的rdd,新rdd 的Partition數量爲舊rdd的partition數量的和。
實例:
scala> val rdd1=sc.parallelize(1 to 6,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> val rdd2=rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at <console>:25
scala> val rdd3=rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[6] at union at <console>:27
scala> rdd3.collect
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 2, 4, 6, 8, 10, 12)
查看新rdd的partition數量
scala> rdd3.partitions.length
res3: Int = 6
2.8 intersection(otherDataset)
數據交集,相交的數據組成一個新的數據集返回。
實例:
scala> val rdd4=rdd1.intersection(rdd2)
rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at intersection at <console>:27
scala> rdd4.collect
res3: Array[Int] = Array(6, 4, 2)
2.9 distinct([numPartitions]))
去除兩個數據集的重複數據,返回去重後的數據集。
實例:
scala> val rdd5=rdd1.union(rdd2).distinct
rdd5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at distinct at <console>:27
scala> rdd5.collect
res4: Array[Int] = Array(6, 12, 1, 8, 2, 3, 4, 10, 5)
2.10 groupByKey([numPartitions]))
分組操做,在一個由(K,V)對組成的數據集上調用,返回一個(K,Iterable[V])對的數據集。
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
實例:
scala> val rdd=sc.parallelize(Array((1,2),(1,3),(1,4),(2,3),(2,4),(2,5)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val rdd10=rdd.groupByKey()
rdd10: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupByKey at <console>:25
scala> rdd10.collect
res0: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(2, 3, 4)), (2,CompactBuffer(3, 4, 5)))
2.11 reduceByKey(func, [numPartitions])
分組聚合操做, 在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集。K相同的值,都被使用相同的reduce函數聚合在一塊兒。
實例:相同的key的值加起來
scala> val rdd11=rdd.reduceByKey(_+_)
rdd11: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at reduceByKey at <console>:25
scala> rdd11.collect
res3: Array[(Int, Int)] = Array((1,9), (2,12))
2.12 aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
對PairRDD中相同的Key值進行聚合操做,在聚合過程當中一樣使用了一箇中立的初始值。aggregateByKey返回值的類型不須要和RDD中value的類型一致。由於aggregateByKey是對相同Key中的值進行聚合操做,因此aggregateByKey'函數最終返回的類型仍是PairRDD,對應的結果是Key和聚合後的值.
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]
aggregateByKey函數的使用,需爲它提供如下三個參數:
1.zeroValue:U,初始值,即聚合的初始值
2.
seqOp: (U, V) => U,seq操做符, 描述如何將V
合併到數據結構U
3.
combOp: (U, U) => U,comb操做符,描述如何
合併兩個數據結構U。
實例1:
scala> val rdd=sc.parallelize(List((1,3),(1,2),(1,4),(2,3)),2)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> val rdd12=rdd.aggregateByKey(0)(math.max(_,_),_+_)
rdd12: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at aggregateByKey at <console>:25
scala> rdd12.collect
res1: Array[(Int, Int)] = Array((2,3), (1,7))
分析過程:
分爲兩個分區,(1,3),(1,2)會落入0分區,(1,4),(2,3)落入1分區,每一個分區分開計算。
初始值爲0,因此第一步不對列表值產生影響。
seqOP:函數是相同key取最大值,0分區的結果爲(1,3),1分區的結果爲(1,4),(2,3)
combOP:函數是相同的key的value進行相加,結果爲(1,3+4)=>(1,7),(2,3)
實例2:
scala> val rdd=sc.parallelize(List((1,3),(1,2),(1,4),(2,3)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> val rdd12=rdd.aggregateByKey(0)(math.max(_,_),_+_)
rdd12: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[5] at aggregateByKey at <console>:25
scala> rdd12.collect
res2: Array[(Int, Int)] = Array((1,9), (2,3))
分析過程:
分爲三個分區,(1,3)會落入0分區,(1,2)進入1分區,(1,4),(2,3)落入2分區,每一個分區分開計算。
初始值爲0,因此第一步不對列表值產生影響。
seqOP:函數是相同key取最大值,0分區的結果爲(1,3),1分區的結果爲(1,2),2分區的結果爲(1,4),(2,3)
combOP:函數是相同的key的value進行相加,結果爲(1,3+2+4)=>(1,9),(2,3)
2.13 combineByKey
對RDD中的數據集按照key進行聚合操做。聚合操做經過自定義函數提供。
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
三個參數解釋:
createCombiner:在遍歷(k,v)時,若是combineByKey第一次遇到值爲k的key(類型K),那麼將這個(k,v)調用combineCombiner函數,將v轉換爲C.
mergeValue:在遍歷(k,v)時,若是combineByKey不是第一次遇到值爲k的key (類型K),那麼將這個(k,v)調用mergeValue函數,它的做用是將v累加到聚合對象(類型爲C)中。
mergeCombiners:combineByKey是在分佈式環境下執行的,RDD的生個分區單獨進行combineByKey操做,最後須要對各個分區的結果進行最後的聚合。
實例以下:
scala> val rdd=sc.parallelize(Array((1,1.0),(1,2.0),(1,3.0),(2,4.0),(2,5.0),(2,6.0)),2)
rdd: org.apache.spark.rdd.RDD[(Int, Double)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val combine=rdd.combineByKey(createCombiner = (v: Double) => (v: Double, 1),
| mergeValue = (c: (Double, Int), v: Double) => (c._1 + v, c._2 + 1),
| mergeCombiners = (c1: (Double, Int), c2: (Double, Int)) => (c1._1 + c2._1, c1._2 + c2._2),
| numPartitions = 2)
combine: org.apache.spark.rdd.RDD[(Int, (Double, Int))] = ShuffledRDD[2] at combineByKey at <console>:25
scala> combine.collect
res0: Array[(Int, (Double, Int))] = Array((2,(15.0,3)), (1,(6.0,3)))
2.14 sortByKey([ascending], [numPartitions])
按key對RDD進行排序,其定義以下:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
由上可知,該函數有兩個參數,第一個參數是排序方式,默認是true(升序),第二個參數能夠指定分區,即並行任務數。另外,排序的key需可排序的
Ordering
實例:
scala> val rdd=sc.parallelize(Array((1,3),(2,6),(2,3),(1,2),(1,8),(2,9)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd.sortByKey()
res6: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[15] at sortByKey at <console>:26
scala> res6.collect
res7: Array[(Int, Int)] = Array((1,3), (1,2), (1,8), (2,6), (2,3), (2,9))
2.15 join(otherDataset, [numPartitions])
鏈接操做,將輸入數據集(K, V) 和另外一數據集 (K, W)進行join, 返回兩個集合匹配的(K, (V, W))集合對。即該操做過濾掉不匹配的key,而後返回相同K的V,W集合進行笛卡爾積操做。其定義以下:
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
實例:
scala> val rdd=sc.parallelize(Array((1,2),(1,3),(2,4),(3,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> val rdd2=sc.parallelize(Array((1,2),(1,5),(2,6)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> val rdd14=rdd.join(rdd2)
rdd14: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[13] at join at <console>:27
scala> rdd14.collect
res2: Array[(Int, (Int, Int))] = Array((2,(4,6)), (1,(2,2)), (1,(2,5)), (1,(3,2)), (1,(3,5)))
因而可知,不匹配的key(3,6)未顯示。與sql相似,還有左鏈接、右鏈接及全鏈接操做函數:
leftOuterJoin、rightOuterJoin及fullOuterJoin。
2.16 cogroup(otherDataset, [numPartitions])
相似於join,像上面Join的定義,其底層使用了cogroup. 輸入數據集(K, V) 和另外一數據集 (K, W)進行cogroup, 將返回格式爲(K, (Iterable<V>, Iterable<W>))的數據集,與join不一樣的是,兩個集合中不匹配的Key,也會返回。
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))]
實例:
scala> val rdd=sc.parallelize(Array((1,2),(1,3),(2,4),(3,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd2=sc.parallelize(Array((1,2),(1,5),(2,6)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val rdd15=rdd.cogroup(rdd2)
rdd15: org.apache.spark.rdd.RDD[(Int, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[3] at cogroup at <console>:27
scala> rdd15.collect
res0: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((2,(CompactBuffer(4),CompactBuffer(6))), (1,(CompactBuffer(2, 3),CompactBuffer(2, 5))), (3,(CompactBuffer(6),CompactBuffer())))
2.17 cartesian
(otherDataset)
兩個集合進行笛卡爾積
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
實例:
scala> val rdd1=sc.parallelize(Array(1,2,3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
scala> val rdd2=sc.parallelize(Array(4,5))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
scala> val rdd17=rdd1.cartesian(rdd2)
rdd17: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[20] at cartesian at <console>:27
scala> rdd17.collect
res7: Array[(Int, Int)] = Array((1,4), (1,5), (2,4), (3,4), (2,5), (3,5))
2.18 pipe(command, [envVars])
能夠經過pipe使用shell命令來處理RDD
實例:
scala> val rdd1=sc.parallelize(1 to 9,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val rdd2=rdd1.pipe("head -n 1").collect
rdd2: Array[String] = Array(1, 4, 7)
2.19 coalesce(numPartitions)
coalesce(numPartitions: Int)將RDD進行重分區,默認只能減小分區,默認不進行shuffle,當開啓shuffle時,能夠擴大分區。其定義以下:
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
實例:減小分區
scala> val rdd1=sc.parallelize(1 to 9,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd1.partitions.length
res1: Int = 3
scala> val rdd2=rdd1.coalesce(2)
rdd2: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[4] at coalesce at <console>:25
scala> rdd2.partitions.length
res4: Int = 2
實例:擴大分區
scala> val rdd2=rdd1.coalesce(5)
rdd2: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at <console>:25
scala> rdd2.partitions.length
res0: Int = 3
由上面實例可見,不開啓shuffle是不能擴大分區的。
實例:開啓 shuffle後,能夠擴大分區數
scala> val rdd3=rdd1.coalesce(5,true)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at coalesce at <console>:25
scala> rdd3.partitions.length
res1: Int = 5
2.20 repartition(numPartitions)
對RDD進行重分區,能夠增長分區,也可減小分區,它建立新的分區,會進行shuffer操做
實例:
scala> val rdd1=sc.parallelize(1 to 9,3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> val rdd3=rdd1.repartition(5)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at repartition at <console>:25
scala> rdd3.partitions.length
res6: Int = 5
2.21 repartitionAndSortWithinPartitions(partitioner)
根據給定的分區程序對RDD進行從新分區,並在每一個生成的分區內按鍵對記錄進行排序。 這比調用從新分區,它要比使用repartition And sortByKey 效率高,這是因爲它的排序是在shuffle過程當中進行,一邊shuffle,一邊排序。定義以下:
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
}
使用repartitionAndSortWithinPartitions時,須要本身傳入一個分區器,這個分區器可使用系統提供的,也能夠是自定義的,如下實例咱們使用系統的HashPartitioner
實例:
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> val rdd=sc.parallelize(Array(2,4,8,6,23,12,123,98,18))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:25
scala> rdd.zipWithIndex().repartitionAndSortWithinPartitions(new HashPartitioner(1)).foreach(println)
(2,0)
(4,1)
(6,3)
(8,2)
(12,5)
(18,8)
(23,4)
(98,7)
(123,6)
3. 轉換操做
3.1 reduce(func)
對數據集中每一個元素執行指定的彙集函數(有兩個輸入參數,一個返回值) ,這個函數必須是可交換的和組合的。
實例:
scala> val arrays=Array(1,2,3,4,5,6,7,8,9,10);
arrays: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val rdd=sc.parallelize(arrays,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26
scala> val sum=rdd.reduce(_+_)
sum: Int = 55
3.2 collect()
將數據集的內容以Array數據的形式返回
實例:
scala> val rdd=sc.parallelize(1 to 9,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
3.3 count()
返回數據集元素的個數。
實例:
scala> rdd.count
res0: Long = 9
3.4 first()
返回集合中的第一個元素。
實例:
scala> val rdd=sc.parallelize(1 to 9,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.first
res2: Int = 1
3.5 take(n)
返回集合中前n個元素的數組。
3.6 takeSample(withReplacement, num, [seed])
返回包含隨機的num個元素的數組。第一個參數withReplacement是抽樣時是否放回,第二個參數num會精確指定抽樣數,而不是比例.第三個參數seed是隨機種子。
實例:
scala> rdd.take(3)
res3: Array[Int] = Array(1, 2, 3)
3.7 takeOrdered(n, [ordering])
按天然順序或者自定義比較器返回第1到n元素的數組
實例:
scala> rdd.takeOrdered(5)
res6: Array[Int] = Array(1, 2, 3, 4, 5)
3.8 saveAsTextFile(path)
把數據集中的元素轉換爲文本文件寫到指定的目錄(本地系統、HDFS或者其它hadoop支持的文件系統).Spark將每一個元素調用toString方法轉換爲文本文件中的一行。
實例:將數據轉爲文本存儲到hdfs中。
scala> val rdd=sc.parallelize(1 to 9,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd.saveAsTextFile("hdfs://master:9000/number")
hdfs中查看是否有文件存在,並進行3個分區存儲
[root@master ~]# hdfs dfs -ls /number.txt
Found 4 items
-rw-r--r-- 3 root supergroup 0 2018-10-29 14:45 /number.txt/_SUCCESS
-rw-r--r-- 3 root supergroup 6 2018-10-29 14:45 /number.txt/part-00000
-rw-r--r-- 3 root supergroup 6 2018-10-29 14:45 /number.txt/part-00001
-rw-r--r-- 3 root supergroup 6 2018-10-29 14:45 /number.txt/part-00002
[root@master ~]# hdfs dfs -ls /number
Found 4 items
-rw-r--r-- 3 root supergroup 0 2018-10-29 14:47 /number/_SUCCESS
-rw-r--r-- 3 root supergroup 6 2018-10-29 14:47 /number/part-00000
-rw-r--r-- 3 root supergroup 6 2018-10-29 14:47 /number/part-00001
-rw-r--r-- 3 root supergroup 6 2018-10-29 14:47 /number/part-00002
[root@master ~]# hdfs dfs -cat /number/part-00000
1
2
3
3.9 saveAsSequenceFile
(path)
類型於saveAsTextFile,用於將RDD中元素轉換爲 hadoop SequenceFile保存到指定的目錄(本地系統、HDFS或者其它hadoop支持的文件系統)。
在RDD的鍵值對實現了hadoop的Writable接口是可用的,在Scala中,即類型能夠隱式轉爲Writable(Spark可轉換的基本類型如Int,Double,String等)
3.10 saveAsObjectFile
用於將RDD中的元素序列化成對象,存儲到文件中。
3.11 countByKey()
對於類型 (K, V)的RDD. 返回一個 (K, Int)的map,Int爲K的個數。
實例:
scala> val rdd=sc.parallelize(Array((1,2),(1,3),(1,5),(2,4),(2,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd.countByKey()
res2: scala.collection.Map[Int,Long] = Map(3 -> 1, 1 -> 3, 2 -> 2)
3.12 foreach(func)
對數據集中每一個元素執行func函數。
實例:
scala> val rdd=sc.parallelize(1 to 6,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> rdd.foreach(x=>println(x*2))
2
4
10
12
6
8
參考文獻:
http://spark.apache.org/docs/latest/rdd-programming-guide.html
Spark MLlib機器學習 -黃美靈