sparkRDD相關操做

RDD(彈性分佈式數據集)。RDD以分區中的每一行進行分佈式計算。父子依賴關係。shell

1、RDD建立操做apache

1)數據集合centos

Val data=Array(1, 2, 3, 4, 5, 6, 7, 8, 9)數組

Val distData = sc.parallelize(data, 3) #分區,生成RDD數據集dom

Val distData =sc.parallelize(1 to 10, 2) #2是並行程度,指定多少線程同時執行。分佈式

distData.collect函數

distData.take(1)spa

sc.makeRDD(1 to 10, 4).map(e=> {val tname=Thread.currentThread().getName; println(tname + ":" +e)}).collect線程

2)外部讀取scala

Val distFile1 = sc.textFile(「data.txt」) /#本地當前目錄下文件或指定目錄下文件

Val distFile2 = sc.textFile(「hdfs://192.168.1.100:9000/input/data.txt」)#HDFS文件

               textFile(「/input/001.txt, /input/002.txt」)#讀取多個文件

               textFile(「/input/*.txt」)#讀取含通配符路徑

 

2、RDD轉換操做(不會當即執行,返回RDD)

1)   Map

Map是對RDD中每一個元素都執行一個指定的函數來生成一個新的RDD

Val rdd1=sc.parallelize(1 to 9, 3)

Val rdd2=rdd1.map(x=>x*2)

Rdd2.collect

2)   Filter

Filter是對RDD元素進行過濾,返回一個新的數據集,是通過func函數後返回值爲True的原元素組成

Val rdd3=rdd2.filter (x=>x>10)

(12, 14, 16)

3)   Top

提取rdd最大的n個元素

rdd1.top(1)

rdd1.top(1)(scala.math.Ordering.String.reverse) #倒序 

4)   flatMap

相似於map,可是它是一對多關係

rdd3.flatMap(x => x to 20)

(12,13,14,15,16,17,18,19,20,14,15,16,17,18,19,20,16,17,18,19,20)

5)   mapPartitons

是map的一種變種,mapPartitions的輸入函數是每一個分區的數據,也就是把每一個分區中的內容做爲總體來處理 。

6)   repatition

再分區

rdd.repartition(4)

7)   sample

   Sample(withReplacement, fraction, seed) 第一個參數是是否爲又放回抽樣,第二個參數是比例。

   Val a=sc.parallelize(1 to 10000,3)

a.sample(false, 0.1).collect().foreach(println)

8)   union

數據合併,返回一個新的數據集

Val rdd8=rdd1.union(rdd3)

Rdd8.collect

9)   intersection

數據交集

Val rdd9=rdd8.intersection(rdd1)

10)  distinct

數據去重

Val rdd10=rdd8.union(rdd9).distinct

11)  groupBy

對RDD元素進行分組

val rdd = sc.parallelize(Array((「tom」,10),(「tomas」,12),(「tomlee」,12),(「tomsan」,10))

val rdd2 = rdd.groupBy(e => e _2)

rdd2.collect()

Array((10.CompareBuffer((tom,10),(tomsam,10)),12.CompareBuffer((tomas,12),(tomlee,12))))

12)  groupByKey

   根據Key進行分組,迭代部分都是value

   Val rdd0=sc.parallelize(Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3)),3)

   Val rdd1=rdd0.groupByKey()

   Array((1,ArrayBuffer(1,2,3)),(2,ArrayBuffer(1,2,3)))

13)  groupWith

兩個RDD

100->tom

200->tomas

 

100->20

200->30

Val rdd1 = sc.makeRDD(Array((100,」tom」),(200, 「tomas」)))

Val rdd2 = sc.makeRDD(Array((100, 20),(200, 30)))

rdd1.groupWith(rdd2)

Array((100,(CompactBuffer(tom),CompactBuffer(20))),(200,(CompactBuffer(tomas),CompactBuffer(30))))

14)  reduceBykey

數組的分組聚合操做

Val rdd12=rdd0.reduceByKey((x,y)=>x+y)

Array((1,6),(2,6))

15)  aggregeteByKey

更加靈活的一個函數。三個參數,第一個是初始值,第二個是給每一個元素值進行的函數操做,第三個是根據Key作相應的合併操做

Val z=sc.parallelize(list((1,3),(1,2),(1,4),(2,3)))

z.aggregateByKey(0)(math.max(_,_),_+_)  #先將每一個value值與初始值0比較大小,而後根據Key求和。

Array((2,3),(1,9))

16)  conbineByKey

    更加靈活的一個函數,與reduceByKey不一樣,它能夠同時計算求和和求次數。根據Key進行聚合操做。

17)  sortByKey

排序操做

Val rdd14=rdd0.sortBykey()

Rdd14.collect

18)  join

   鏈接兩個RDD,造成新的rdd,(跟groupwith區別是join是一對一,groupwith是分組,相同的放一塊兒)

 (1 tom) join (1 100) --> 1, (tom, 100)

(2 tomas) join (2 80) --> 2, (tomas, 80)

Val rdd1 = sc.makeRDD(Array((1, 「tom」), (2, 「tomas」)))

Val rdd1 = sc.makeRDD(Array((1, 100), (2, 80)))

rdd1.join(rdd2).collect()

array((1,(tom,800)),(2,(tomas,700)))

19)  intersection

提取RDD之間的交集

 val rdd1 = sc.makeRDD(Array(「tom」,」tomas」,」tomaslee」))

val rdd2 = sc.makeRDD(Array(「tomas」,」tomaslee」,」tomason」))

rdd1.intersection().collect()

Array(tomaslee, tomas)

20)  cogroup

輸入數據集(k, v)和另一個數據集(k, w)進行cogroup,獲得一個格式(k, Seq[v], Seq[W])的數據集。

rdd0 = sc.makeRDD(Array((1, 「tom」),(2,」tomas」),(3,」tomasLee」) ))

rdd0.cogroup(sc.makeRDD(Array((1,」hebei」),(2,」henan」),(3,」hexi」))))

Array((1,(CompactBuffer(tom),CompactBuffer(hebei))), (2,(CompactBuffer(tomas),CompactBuffer(henan))), (3,(CompactBuffer(tomasLee),CompactBuffer(hex))))

21)  cache / persist

cache是特殊的persist,只在內存中對RDD的結果進行保存(一旦關掉就沒有了)。

val rdd = sc.makeRDD(1 to 10).map(e=>(println(e);e))

rdd.collect

rdd.cache

rdd.collect

rdd.presist() == rdd.persist(StorageLevel.MEMORY_ONLY)

rdd.presist(org.apache.spark.storage.StorageLevel.DISK_ONLY)

22)  pipe

對每一個分區執行shell命令

val r = sc.makeRDD(1 to 5).pipe(「echo hahah」).collect #hahah個數同分區數

val rdd18=sc.parallelize(1 to 9,3)

rdd18.pipe(「head -n 1」).collect #取每一個分區的第一個數

23)  randomSplit

 Val rdd19=rdd1.randomSplit(Array(0.3,0.7), 1)

rdd19(0).collect

rdd19(1).collect

24)  Zip

 Val rdd21_1=sc.parallelize(Array(1,2,3,4), 3)

 Val rdd21_2=sc.parallelize(Array(「a」,」b」,」c」,」d」), 3)

 Val rdd21_3=rdd21_1.zip(rdd21_2)

將每一個分區的全部元素放到一個數組中,造成RDD,RDD的每一個元素是數組,數組長度等於分區個數

 Val rdd = sc. parallelize(1 to 10, 4).glom().collect()

Array<array<1,2>, array<3,4,5>,array<6,7>,array<8,9,10>>

25)  keyBy

將rdd的元素和一個變換以後的值組合造成元組

val rdd = sc.makeRDD(1 to 10)

rdd.keyBy(_ * 2).collect

Array[(Int, Int)] = Array((2,1), (4,2), (6,3), (8,4), (10,5), (12,6), (14,7), (16,8), (18,9), (20,10))

26)  max | min | mean

Rdd.max 

27)  repartitionAndSortWithinPartitions

經過指定分區函數實現再分區並在分區內排序

 

2、RDD行動操做(會當即執行,返回數組)

1)     reduce

val rdd1=sc.parallelize(1 to 9, 3)

val rdd2=rdd1.reduce(_+_)

2)     collect

3)     count

4)     first

5)     take

6)     takesample

相似於sample,但takeSample是行動操做,因此返回的是數組

Rdd1.takeSample(true, 4)

7)     takeOrdered

takeOrdered(n, [ordering])是返回包含隨機的n個元素的數組,按照順序輸出

8)SaveAsTextFile

   把數據集中的元素寫到一個文本文件,Spark會對每一個元素調用toString方法來把每一個元素存成文本文件的一行。

   r.saveAsTextFile(「/home/centos/aa」)

   cd aa/

   find .

   ll

  nano part-0000

9)     saveAsSequenceFile

r.map(w=>(w,1)).saveAsSequenceFile(「home/centos/bb」)

cd bb/

ls

hdfs dfs -text file:///home/centos/bb/part-00000

10)countByKey

對於(k,v)類型的RDD,返回一個(k,int)的map, int爲k的個數。

 Val rdd =

sc.makeRDD(Array(1,」tom」),(2,」tomas」),(1,」tomasLee」))).countByKey.foreach(e=>println(e))

結果 (1,2)

        (2,1)

11)foreach

   Foreach(func)是對數據集中的每一個元素都執行func函數

相關文章
相關標籤/搜索