spark中RDD的轉化操做和行動操做

本文主要是講解spark裏RDD的基礎操做。RDD是spark特有的數據模型,談到RDD就會提到什麼彈性分佈式數據集,什麼有向無環圖,本文暫時不去展開這些高深概念,在閱讀本文時候,你們能夠就把RDD看成一個數組,這樣的理解對咱們學習RDD的API是很是有幫助的。本文全部示例代碼都是使用scala語言編寫的。html

  Spark裏的計算都是操做RDD進行,那麼學習RDD的第一個問題就是如何構建RDD,構建RDD從數據來源角度分爲兩類:第一類是從內存裏直接讀取數據,第二類就是從文件系統裏讀取,固然這裏的文件系統種類不少常見的就是HDFS以及本地文件系統了。java

  第一類方式從內存裏構造RDD,使用的方法:makeRDD和parallelize方法,以下代碼所示:shell

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* 使用makeRDD建立RDD */
/* List */
val  rdd 01  =  sc.makeRDD(List( 1 , 2 , 3 , 4 , 5 , 6 ))
val  r 01  =  rdd 01 .map { x  = > x * x }
println(r 01 .collect().mkString( "," ))
/* Array */
val  rdd 02  =  sc.makeRDD(Array( 1 , 2 , 3 , 4 , 5 , 6 ))
val  r 02  =  rdd 02 .filter { x  = > x <  5 }
println(r 02 .collect().mkString( "," ))
 
val  rdd 03  =  sc.parallelize(List( 1 , 2 , 3 , 4 , 5 , 6 ),  1 )
val  r 03  =  rdd 03 .map { x  = > x +  1  }
println(r 03 .collect().mkString( "," ))
/* Array */
val  rdd 04  =  sc.parallelize(List( 1 , 2 , 3 , 4 , 5 , 6 ),  1 )
val  r 04  =  rdd 04 .filter { x  = > x >  3  }
println(r 04 .collect().mkString( "," ))

 

  你們看到了RDD本質就是一個數組,所以構造數據時候使用的是List(鏈表)和Array(數組)類型。apache

  第二類方式是經過文件系統構造RDD,代碼以下所示:windows

1
2
3
val  rdd : RDD[String]  =  sc.textFile( "file:///D:/sparkdata.txt" 1 )
val  r : RDD[String]  =  rdd.flatMap { x  = > x.split( "," ) }
println(r.collect().mkString( "," ))

  這裏例子使用的是本地文件系統,因此文件路徑協議前綴是file://。數組

  構造了RDD對象了,接下來就是如何操做RDD對象了,RDD的操做分爲轉化操做(transformation)和行動操做(action),RDD之因此將操做分紅這兩類這是和RDD惰性運算有關,當RDD執行轉化操做時候,實際計算並無被執行,只有當RDD執行行動操做時候纔會促發計算任務提交,執行相應的計算操做。區別轉化操做和行動操做也很是簡單,轉化操做就是從一個RDD產生一個新的RDD操做,而行動操做就是進行實際的計算。服務器

  下面是RDD的基礎操做API介紹:app

操做類型框架

函數名eclipse

做用

轉化操做

map()

參數是函數,函數應用於RDD每個元素,返回值是新的RDD

flatMap()

參數是函數,函數應用於RDD每個元素,將元素數據進行拆分,變成迭代器,返回值是新的RDD

filter()

參數是函數,函數會過濾掉不符合條件的元素,返回值是新的RDD

distinct()

沒有參數,將RDD裏的元素進行去重操做

union()

參數是RDD,生成包含兩個RDD全部元素的新RDD

intersection()

參數是RDD,求出兩個RDD的共同元素

subtract()

參數是RDD,將原RDD裏和參數RDD裏相同的元素去掉

cartesian()

參數是RDD,求兩個RDD的笛卡兒積

行動操做

collect()

返回RDD全部元素

count()

RDD裏元素個數

countByValue()

各元素在RDD中出現次數

reduce()

並行整合全部RDD數據,例如求和操做

fold(0)(func)

和reduce功能同樣,不過fold帶有初始值

aggregate(0)(seqOp,combop)

和reduce功能同樣,可是返回的RDD數據類型和原RDD不同

foreach(func)

對RDD每一個元素都是使用特定函數

  下面是以上API操做的示例代碼,以下:

  轉化操做:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
val  rddInt : RDD[Int]  =  sc.makeRDD(List( 1 , 2 , 3 , 4 , 5 , 6 , 2 , 5 , 1 ))
val  rddStr : RDD[String]  =  sc.parallelize(Array( "a" , "b" , "c" , "d" , "b" , "a" ),  1 )
val  rddFile : RDD[String]  =  sc.textFile(path,  1 )
 
val  rdd 01 : RDD[Int]  =  sc.makeRDD(List( 1 , 3 , 5 , 3 ))
val  rdd 02 : RDD[Int]  =  sc.makeRDD(List( 2 , 4 , 5 , 1 ))
 
/* map操做 */
println( "======map操做======" )
println(rddInt.map(x  = > x +  1 ).collect().mkString( "," ))
println( "======map操做======" )
/* filter操做 */
println( "======filter操做======" )
println(rddInt.filter(x  = > x >  4 ).collect().mkString( "," ))
println( "======filter操做======" )
/* flatMap操做 */
println( "======flatMap操做======" )
println(rddFile.flatMap { x  = > x.split( "," ) }.first())
println( "======flatMap操做======" )
/* distinct去重操做 */
println( "======distinct去重======" )
println(rddInt.distinct().collect().mkString( "," ))
println(rddStr.distinct().collect().mkString( "," ))
println( "======distinct去重======" )
/* union操做 */
println( "======union操做======" )
println(rdd 01 .union(rdd 02 ).collect().mkString( "," ))
println( "======union操做======" )
/* intersection操做 */
println( "======intersection操做======" )
println(rdd 01 .intersection(rdd 02 ).collect().mkString( "," ))
println( "======intersection操做======" )
/* subtract操做 */
println( "======subtract操做======" )
println(rdd 01 .subtract(rdd 02 ).collect().mkString( "," ))
println( "======subtract操做======" )
/* cartesian操做 */
println( "======cartesian操做======" )
println(rdd 01 .cartesian(rdd 02 ).collect().mkString( "," ))
println( "======cartesian操做======" )

  行動操做代碼以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
val  rddInt : RDD[Int]  =  sc.makeRDD(List( 1 , 2 , 3 , 4 , 5 , 6 , 2 , 5 , 1 ))
val  rddStr : RDD[String]  =  sc.parallelize(Array( "a" , "b" , "c" , "d" , "b" , "a" ),  1 )
 
/* count操做 */
println( "======count操做======" )
println(rddInt.count())
println( "======count操做======" )  
/* countByValue操做 */
println( "======countByValue操做======" )
println(rddInt.countByValue())
println( "======countByValue操做======" )
/* reduce操做 */
println( "======countByValue操做======" )
println(rddInt.reduce((x ,y)  = > x + y))
println( "======countByValue操做======" )
/* fold操做 */
println( "======fold操做======" )
println(rddInt.fold( 0 )((x ,y)  = > x + y))
println( "======fold操做======" )
/* aggregate操做 */
println( "======aggregate操做======" )
val  res : (Int,Int)  =  rddInt.aggregate(( 0 , 0 ))((x,y)  = > (x. _ 1  + x. _ 2 ,y),(x,y)  = > (x. _ 1  + x. _ 2 ,y. _ 1  + y. _ 2 ))
println(res. _ 1  ","  + res. _ 2 )
println( "======aggregate操做======" )
/* foeach操做 */
println( "======foeach操做======" )
println(rddStr.foreach { x  = > println(x) })
println( "======foeach操做======" )

  RDD操做暫時先學習到這裏,剩下的內容在下一篇裏再談了,下面我要說說如何開發spark,安裝spark的內容我後面會使用專門的文章進行講解,這裏咱們假設已經安裝好了spark,那麼咱們就能夠在已經裝好的spark服務器上使用spark-shell進行與spark交互的shell,這裏咱們直接能夠敲打代碼編寫spark程序。可是spark-shell畢竟使用太麻煩,並且spark-shell一次只能使用一個用戶,當另一個用戶要使用spark-shell就會把前一個用戶踢掉,並且shell也沒有IDE那種代碼補全,代碼校驗的功能,使用起來非常痛苦。

  不過spark的確是一個神奇的框架,這裏的神奇就是指spark本地開發調試很是簡單,本地開發調試不須要任何已經裝好的spark系統,咱們只須要創建一個項目,這個項目能夠是java的也能夠是scala,而後咱們將spark-assembly-1.6.1-hadoop2.6.0.jar這樣的jar放入項目的環境裏,這個時候咱們就能夠在本地開發調試spark程序了。

  你們請看咱們裝有scala插件的eclipse裏的完整代碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package  cn.com.sparktest
 
import  org.apache.spark.SparkConf
import  org.apache.spark.SparkConf
import  org.apache.spark.SparkContext
import  org.apache.spark.rdd.RDD
 
object  SparkTest {
   val  conf : SparkConf  =  new  SparkConf().setAppName( "xtq" ).setMaster( "local[2]" )
   val  sc : SparkContext  =  new  SparkContext(conf)
   
   /**
    * 建立數據的方式--從內存裏構造數據(基礎)
    */
   def  createDataMethod() : Unit  =  {
     /* 使用makeRDD建立RDD */
     /* List */
     val  rdd 01  =  sc.makeRDD(List( 1 , 2 , 3 , 4 , 5 , 6 ))
     val  r 01  =  rdd 01 .map { x  = > x * x }
     println( "===================createDataMethod:makeRDD:List=====================" )
     println(r 01 .collect().mkString( "," ))
     println( "===================createDataMethod:makeRDD:List=====================" )
     /* Array */
     val  rdd 02  =  sc.makeRDD(Array( 1 , 2 , 3 , 4 , 5 , 6 ))
     val  r 02  =  rdd 02 .filter { x  = > x <  5 }
     println( "===================createDataMethod:makeRDD:Array=====================" )
     println(r 02 .collect().mkString( "," ))
     println( "===================createDataMethod:makeRDD:Array=====================" )
     
     /* 使用parallelize建立RDD */
     /* List */
     val  rdd 03  =  sc.parallelize(List( 1 , 2 , 3 , 4 , 5 , 6 ),  1 )
     val  r 03  =  rdd 03 .map { x  = > x +  1  }
     println( "===================createDataMethod:parallelize:List=====================" )
     println(r 03 .collect().mkString( "," ))
     println( "===================createDataMethod:parallelize:List=====================" )
     /* Array */
     val  rdd 04  =  sc.parallelize(List( 1 , 2 , 3 , 4 , 5 , 6 ),  1 )
     val  r 04  =  rdd 04 .filter { x  = > x >  3  }
     println( "===================createDataMethod:parallelize:Array=====================" )
     println(r 04 .collect().mkString( "," ))
     println( "===================createDataMethod:parallelize:Array=====================" )
   }
   
   /**
    * 建立Pair Map
    */
   def  createPairRDD() : Unit  =  {
     val  rdd : RDD[(String,Int)]  =  sc.makeRDD(List(( "key01" , 1 ),( "key02" , 2 ),( "key03" , 3 )))
     val  r : RDD[String]  =  rdd.keys
     println( "===========================createPairRDD=================================" )
     println(r.collect().mkString( "," ))
     println( "===========================createPairRDD=================================" )
   }
   
   /**
    * 經過文件建立RDD
    * 文件數據:
    *    key01,1,2.3
           key02,5,3.7
       key03,23,4.8
       key04,12,3.9
       key05,7,1.3
    */
   def  createDataFromFile(path : String) : Unit  =  {
     val  rdd : RDD[String]  =  sc.textFile(path,  1 )
     val  r : RDD[String]  =  rdd.flatMap { x  = > x.split( "," ) }
     println( "=========================createDataFromFile==================================" )
     println(r.collect().mkString( "," ))
     println( "=========================createDataFromFile==================================" )
   }
   
   /**
    * 基本的RDD操做
    */
   def  basicTransformRDD(path : String) : Unit  =  {
     val  rddInt : RDD[Int]  =  sc.makeRDD(List( 1 , 2 , 3 , 4 , 5 , 6 , 2 , 5 , 1 ))
     val  rddStr : RDD[String]  =  sc.parallelize(Array( "a" , "b" , "c" , "d" , "b" , "a" ),  1 )
     val  rddFile : RDD[String]  =  sc.textFile(path,  1 )
     
     val  rdd 01 : RDD[Int]  =  sc.makeRDD(List( 1 , 3 , 5 , 3 ))
     val  rdd 02 : RDD[Int]  =  sc.makeRDD(List( 2 , 4 , 5 , 1 ))
 
     /* map操做 */
     println( "======map操做======" )
     println(rddInt.map(x  = > x +  1 ).collect().mkString( "," ))
     println( "======map操做======" )
     /* filter操做 */
     println( "======filter操做======" )
     println(rddInt.filter(x  = > x >  4 ).collect().mkString( "," ))
     println( "======filter操做======" )
     /* flatMap操做 */
     println( "======flatMap操做======" )
     println(rddFile.flatMap { x  = > x.split( "," ) }.first())
     println( "======flatMap操做======" )
     /* distinct去重操做 */
     println( "======distinct去重======" )
     println(rddInt.distinct().collect().mkString( "," ))
     println(rddStr.distinct().collect().mkString( "," ))
     println( "======distinct去重======" )
     /* union操做 */
     println( "======union操做======" )
     println(rdd 01 .union(rdd 02 ).collect().mkString( "," ))
     println( "======union操做======" )
     /* intersection操做 */
     println( "======intersection操做======" )
     println(rdd 01 .intersection(rdd 02 ).collect().mkString( "," ))
     println( "======intersection操做======" )
     /* subtract操做 */
     println( "======subtract操做======" )
     println(rdd 01 .subtract(rdd 02 ).collect().mkString( "," ))
     println( "======subtract操做======" )
     /* cartesian操做 */
     println( "======cartesian操做======" )
     println(rdd 01 .cartesian(rdd 02 ).collect().mkString( "," ))
     println( "======cartesian操做======" )   
   }
   
   /**
    * 基本的RDD行動操做
    */
   def  basicActionRDD() : Unit  =  {
     val  rddInt : RDD[Int]  =  sc.makeRDD(List( 1 , 2 , 3 , 4 , 5 , 6 , 2 , 5 , 1 ))
     val  rddStr : RDD[String]  =  sc.parallelize(Array( "a" , "b" , "c" , "d" , "b" , "a" ),  1 )
     
     /* count操做 */
     println( "======count操做======" )
     println(rddInt.count())
     println( "======count操做======" )  
     /* countByValue操做 */
     println( "======countByValue操做======" )
     println(rddInt.countByValue())
相關文章
相關標籤/搜索