好程序員大數據學習路線分享彈性分佈式數據集RDD

  好程序員大數據學習路線分享彈性分佈式數據集RDD,RDD定義,RDD(Resilient Distributed Dataset)叫作分佈式數據集,是Spark中最基本的數據抽象,它表明一個不可變(數據和元數據)、可分區、裏面的元素可並行計算的集合。java

RDD的特色:自動容錯,位置感知性調度和可伸縮性程序員

RDD的屬性es6

1.一組分片shell

即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。apache

2.一個計算每一個分區的函數。數組

Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。分佈式

3.RDD之間的依賴關係。函數

RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。oop

容錯處理: 在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。學習

4.一個Partitioner,分區器

即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

5.一個列表

存儲存取每一個Partition的優先位置(preferred location)。-> 就近原則

對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。

RDD類型

1.Transformation -> 記錄計算過程(記錄參數,計算方法)

轉換
含義
map(func)
返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成
filter(func)
返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成
flatMap(func)
相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)
mapPartitions(func)
相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func)
相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲T的RDD上運行時,func的函數類型必須是
(Int, Iterator[T]) => Iterator[U]
sample(withReplacement, fraction, seed)
根據fraction指定的比例對數據進行採樣,能夠選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子
union(otherDataset)
對源RDD和參數RDD求並集後返回一個新的RDD
intersection(otherDataset)
diff -> 差集
對源RDD和參數RDD求交集後返回一個新的RDD
distinct([numTasks]))
         [改變分區數]
對源RDD進行去重後返回一個新的RDD
groupByKey([numTasks])
在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks])
在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,與groupByKey相似,reduce任務的個數能夠經過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

sortByKey([ascending], [numTasks])
在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks])
與sortByKey相似,可是更靈活
join(otherDataset, [numTasks])
在類型爲(K,V)和(K,W)的RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks])
在類型爲(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD
cartesian(otherDataset)
笛卡爾積
pipe(command, [envVars])

coalesce(numPartitions)

repartition(numPartitions)
 從新分區
repartitionAndSortWithinPartitions(partitioner)

2.Action  -> 觸發生成job(一個job對應一個action算子)

動做
含義
reduce(func)
經過func函數彙集RDD中的全部元素,這個功能必須是可交換且可並聯的
collect()
在驅動程序中,以數組的形式返回數據集的全部元素
count()
返回RDD的元素個數
first()
返回RDD的第一個元素(相似於take(1))
take(n)
取數據集的前n個元素組成的數組
takeSample(withReplacement,num, [seed])
返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering])
takeOrdered和top相似,只不過以和top相反的順序返回元素
saveAsTextFile(path)
將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本
saveAsSequenceFile(path) 
將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,能夠使HDFS或者其餘Hadoop支持的文件系統。
saveAsObjectFile(path) 

countByKey()
針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每個key對應的元素個數。
foreach(func)
在數據集的每個元素上,運行函數func進行更新。
建立RDD

Linux進入sparkShell:

/usr/local/spark.../bin/spark-shell \

--master spark://hadoop01:7077 \

--executor-memory 512m \

--total-executor-cores 2

或在Maven下:

object lx03 {
  def main(args: Array[String]): Unit = {
    val conf : SparkConf = new SparkConf()
      .setAppName("SparkAPI")
      .setMaster("local[*]")

    val sc: SparkContext = new SparkContext(conf)
    //經過並行化生成rdd
    val rdd1: RDD[Int] = sc.parallelize(List(24,56,3,2,1))
    //對add1的每一個元素乘以2而後排序
    val rdd2: RDD[Int] = rdd1.map(_ * 2).sortBy(x => x,true)

    println(rdd2.collect().toBuffer)
    //過濾出大於等於10的元素
//    val rdd3: RDD[Int] = rdd2.filter(_ >= 10)

//    println(rdd3.collect().toBuffer)
  }
練習2

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//將rdd1裏面的每個元素先切分在壓平
val rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect
//複雜的:
val rdd1 = sc.parallelize(List(List("a b c", "a b b"), List("e f g", "a f g"), List("h i j", "a a b")))
//將rdd1裏面的每個元素先切分在壓平
val rdd2 = rdd1.flatMap(_.flatMap(_.split(" ")))
練習3

val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求並集
val rdd3 = rdd1.union(rdd2)

//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect
練習4

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求join
val rdd3 = rdd1.join(rdd2)  -> 相同的key組成新的key,value
//結果: Array[(String,(Int,Int))] = Array((tom,(1,1)),(jerry,(3,2)))
rdd3.collect
//求左鏈接和右鏈接
val rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.collect
val rdd3 = rdd1.rightOuterJoin(rdd2)
rdd3.collect
//求並集
val rdd4 = rdd1 union rdd2
//按key進行分組
rdd4.groupByKey
rdd4.collect
//分別用groupByKey和reduceByKey實現單詞計數
val rdd3 = rdd1 union rdd2
rdd3.groupByKey().mapValues(_.sum).collect
rdd3.reduceByKey(_+_).collect
groupByKey和reduceByKey的區別

reduceByKey算子比較特殊,它首先會進行局部聚合,再全局聚合,咱們只須要傳一個局部聚合的函數就能夠了

圖片描述

練習5

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup與groupByKey的區別
rdd3.collect

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)

//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
//笛卡爾積
val rdd3 = rdd1.cartesian(rdd2)

計算元素個數

scala> val rdd1 = sc.parallelize(List(2,3,1,5,7,3,4))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> rdd1.count
res0: Long = 7  
top先升序排序在取值

scala> rdd1.top(3)
res1: Array[Int] = Array(7, 5, 4)                                               

scala> rdd1.top(0)
res2: Array[Int] = Array()

scala> rdd1.top(100)
res3: Array[Int] = Array(7, 5, 4, 3, 3, 2, 1)
take原集合前N個,有幾個取幾個

scala> rdd1.take(3)
res4: Array[Int] = Array(2, 3, 1)

scala> rdd1.take(100)
res5: Array[Int] = Array(2, 3, 1, 5, 7, 3, 4)

scala> rdd1.first
res6: Int = 2
takeordered倒序排序再取值

scala> rdd1.takeOrdered(3)
res7: Array[Int] = Array(1, 2, 3)

scala> rdd1.takeOrdered(30)
res8: Array[Int] = Array(1, 2, 3, 3, 4, 5, 7)
                             

生成RDD的兩種方式

1.並行化方式生成 (默認分區兩個)

手動指定分區

scala> val rdd1 = sc.parallelize(List(1,2,3,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:27

scala> rdd1.partitions.length  //獲取分區數
res9: Int = 2

scala> val rdd1 = sc.parallelize(List(1,2,3,5),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27

scala> rdd1.partitions.length
res10: Int = 3
2.使用textFile讀取文件存儲系統裏的數據  

scala> val rdd2 = sc.textFile("hdfs://hadoop01:9000/wordcount/input/a.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at <console>:27

scala> rdd2.collect  //調用算子獲得RDD顯示結果
res11: Array[(String, Int)] = Array((hello,6), (beijing,1), (java,1), (gp1808,1), (world,1), (good,1), (qianfeng,1))

scala> val rdd2 =  sc.textFile("hdfs://hadoop01:9000/wordcount/input/a.txt",4).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at reduceByKey at <console>:27

scala> rdd2.partitions.length    //也能夠本身指定分區數res15: Int = 4

相關文章
相關標籤/搜索