本文主要是講解spark裏RDD的基礎操做。RDD是spark特有的數據模型,談到RDD就會提到什麼彈性分佈式數據集,什麼有向無環圖,本文暫時不去展開這些高深概念,在閱讀本文時候,你們能夠就把RDD看成一個數組,這樣的理解對咱們學習RDD的API是很是有幫助的。本文全部示例代碼都是使用scala語言編寫的。java
Spark裏的計算都是操做RDD進行,那麼學習RDD的第一個問題就是如何構建RDD,構建RDD從數據來源角度分爲兩類:第一類是從內存裏直接讀取數據,第二類就是從文件系統裏讀取,固然這裏的文件系統種類不少常見的就是HDFS以及本地文件系統了。shell
第一類方式從內存裏構造RDD,使用的方法:makeRDD和parallelize方法,以下代碼所示:apache
/* 使用makeRDD建立RDD */ /* List */ val rdd01 = sc.makeRDD(List(1,2,3,4,5,6)) val r01 = rdd01.map { x => x * x } println(r01.collect().mkString(",")) /* Array */ val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6)) val r02 = rdd02.filter { x => x < 5} println(r02.collect().mkString(",")) val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1) val r03 = rdd03.map { x => x + 1 } println(r03.collect().mkString(",")) /* Array */ val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1) val r04 = rdd04.filter { x => x > 3 } println(r04.collect().mkString(","))
你們看到了RDD本質就是一個數組,所以構造數據時候使用的是List(鏈表)和Array(數組)類型。windows
第二類方式是經過文件系統構造RDD,代碼以下所示:數組
val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1) val r:RDD[String] = rdd.flatMap { x => x.split(",") } println(r.collect().mkString(","))
這裏例子使用的是本地文件系統,因此文件路徑協議前綴是file://。服務器
構造了RDD對象了,接下來就是如何操做RDD對象了,RDD的操做分爲轉化操做(transformation)和行動操做(action),RDD之因此將操做分紅這兩類這是和RDD惰性運算有關,當RDD執行轉化操做時候,實際計算並無被執行,只有當RDD執行行動操做時候纔會促發計算任務提交,執行相應的計算操做。區別轉化操做和行動操做也很是簡單,轉化操做就是從一個RDD產生一個新的RDD操做,而行動操做就是進行實際的計算。app
下面是RDD的基礎操做API介紹:框架
操做類型eclipse |
函數名分佈式 |
做用 |
轉化操做 |
map() |
參數是函數,函數應用於RDD每個元素,返回值是新的RDD |
flatMap() |
參數是函數,函數應用於RDD每個元素,將元素數據進行拆分,變成迭代器,返回值是新的RDD |
|
filter() |
參數是函數,函數會過濾掉不符合條件的元素,返回值是新的RDD |
|
distinct() |
沒有參數,將RDD裏的元素進行去重操做 |
|
union() |
參數是RDD,生成包含兩個RDD全部元素的新RDD |
|
intersection() |
參數是RDD,求出兩個RDD的共同元素 |
|
subtract() |
參數是RDD,將原RDD裏和參數RDD裏相同的元素去掉 |
|
cartesian() |
參數是RDD,求兩個RDD的笛卡兒積 |
|
行動操做 |
collect() |
返回RDD全部元素 |
count() |
RDD裏元素個數 |
|
countByValue() |
各元素在RDD中出現次數 |
|
reduce() |
並行整合全部RDD數據,例如求和操做 |
|
fold(0)(func) |
和reduce功能同樣,不過fold帶有初始值 |
|
aggregate(0)(seqOp,combop) |
和reduce功能同樣,可是返回的RDD數據類型和原RDD不同 |
|
foreach(func) |
對RDD每一個元素都是使用特定函數 |
下面是以上API操做的示例代碼,以下:
轉化操做:
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1)) val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1) val rddFile:RDD[String] = sc.textFile(path, 1) val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3)) val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1)) /* map操做 */ println("======map操做======") println(rddInt.map(x => x + 1).collect().mkString(",")) println("======map操做======") /* filter操做 */ println("======filter操做======") println(rddInt.filter(x => x > 4).collect().mkString(",")) println("======filter操做======") /* flatMap操做 */ println("======flatMap操做======") println(rddFile.flatMap { x => x.split(",") }.first()) println("======flatMap操做======") /* distinct去重操做 */ println("======distinct去重======") println(rddInt.distinct().collect().mkString(",")) println(rddStr.distinct().collect().mkString(",")) println("======distinct去重======") /* union操做 */ println("======union操做======") println(rdd01.union(rdd02).collect().mkString(",")) println("======union操做======") /* intersection操做 */ println("======intersection操做======") println(rdd01.intersection(rdd02).collect().mkString(",")) println("======intersection操做======") /* subtract操做 */ println("======subtract操做======") println(rdd01.subtract(rdd02).collect().mkString(",")) println("======subtract操做======") /* cartesian操做 */ println("======cartesian操做======") println(rdd01.cartesian(rdd02).collect().mkString(",")) println("======cartesian操做======")
行動操做代碼以下:
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1)) val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1) /* count操做 */ println("======count操做======") println(rddInt.count()) println("======count操做======") /* countByValue操做 */ println("======countByValue操做======") println(rddInt.countByValue()) println("======countByValue操做======") /* reduce操做 */ println("======countByValue操做======") println(rddInt.reduce((x ,y) => x + y)) println("======countByValue操做======") /* fold操做 */ println("======fold操做======") println(rddInt.fold(0)((x ,y) => x + y)) println("======fold操做======") /* aggregate操做 */ println("======aggregate操做======") val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2)) println(res._1 + "," + res._2) println("======aggregate操做======") /* foeach操做 */ println("======foeach操做======") println(rddStr.foreach { x => println(x) }) println("======foeach操做======")
RDD操做暫時先學習到這裏,剩下的內容在下一篇裏再談了,下面我要說說如何開發spark,安裝spark的內容我後面會使用專門的文章進行講解,這裏咱們假設已經安裝好了spark,那麼咱們就能夠在已經裝好的spark服務器上使用spark-shell進行與spark交互的shell,這裏咱們直接能夠敲打代碼編寫spark程序。可是spark-shell畢竟使用太麻煩,並且spark-shell一次只能使用一個用戶,當另一個用戶要使用spark-shell就會把前一個用戶踢掉,並且shell也沒有IDE那種代碼補全,代碼校驗的功能,使用起來非常痛苦。
不過spark的確是一個神奇的框架,這裏的神奇就是指spark本地開發調試很是簡單,本地開發調試不須要任何已經裝好的spark系統,咱們只須要創建一個項目,這個項目能夠是java的也能夠是scala,而後咱們將spark-assembly-1.6.1-hadoop2.6.0.jar這樣的jar放入項目的環境裏,這個時候咱們就能夠在本地開發調試spark程序了。
你們請看咱們裝有scala插件的eclipse裏的完整代碼:
package cn.com.sparktest import org.apache.spark.SparkConf import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD object SparkTest { val conf:SparkConf = new SparkConf().setAppName("xtq").setMaster("local[2]") val sc:SparkContext = new SparkContext(conf) /** * 建立數據的方式--從內存裏構造數據(基礎) */ def createDataMethod():Unit = { /* 使用makeRDD建立RDD */ /* List */ val rdd01 = sc.makeRDD(List(1,2,3,4,5,6)) val r01 = rdd01.map { x => x * x } println("===================createDataMethod:makeRDD:List=====================") println(r01.collect().mkString(",")) println("===================createDataMethod:makeRDD:List=====================") /* Array */ val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6)) val r02 = rdd02.filter { x => x < 5} println("===================createDataMethod:makeRDD:Array=====================") println(r02.collect().mkString(",")) println("===================createDataMethod:makeRDD:Array=====================") /* 使用parallelize建立RDD */ /* List */ val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1) val r03 = rdd03.map { x => x + 1 } println("===================createDataMethod:parallelize:List=====================") println(r03.collect().mkString(",")) println("===================createDataMethod:parallelize:List=====================") /* Array */ val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1) val r04 = rdd04.filter { x => x > 3 } println("===================createDataMethod:parallelize:Array=====================") println(r04.collect().mkString(",")) println("===================createDataMethod:parallelize:Array=====================") } /** * 建立Pair Map */ def createPairRDD():Unit = { val rdd:RDD[(String,Int)] = sc.makeRDD(List(("key01",1),("key02",2),("key03",3))) val r:RDD[String] = rdd.keys println("===========================createPairRDD=================================") println(r.collect().mkString(",")) println("===========================createPairRDD=================================") } /** * 經過文件建立RDD * 文件數據: * key01,1,2.3 key02,5,3.7 key03,23,4.8 key04,12,3.9 key05,7,1.3 */ def createDataFromFile(path:String):Unit = { val rdd:RDD[String] = sc.textFile(path, 1) val r:RDD[String] = rdd.flatMap { x => x.split(",") } println("=========================createDataFromFile==================================") println(r.collect().mkString(",")) println("=========================createDataFromFile==================================") } /** * 基本的RDD操做 */ def basicTransformRDD(path:String):Unit = { val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1)) val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1) val rddFile:RDD[String] = sc.textFile(path, 1) val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3)) val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1)) /* map操做 */ println("======map操做======") println(rddInt.map(x => x + 1).collect().mkString(",")) println("======map操做======") /* filter操做 */ println("======filter操做======") println(rddInt.filter(x => x > 4).collect().mkString(",")) println("======filter操做======") /* flatMap操做 */ println("======flatMap操做======") println(rddFile.flatMap { x => x.split(",") }.first()) println("======flatMap操做======") /* distinct去重操做 */ println("======distinct去重======") println(rddInt.distinct().collect().mkString(",")) println(rddStr.distinct().collect().mkString(",")) println("======distinct去重======") /* union操做 */ println("======union操做======") println(rdd01.union(rdd02).collect().mkString(",")) println("======union操做======") /* intersection操做 */ println("======intersection操做======") println(rdd01.intersection(rdd02).collect().mkString(",")) println("======intersection操做======") /* subtract操做 */ println("======subtract操做======") println(rdd01.subtract(rdd02).collect().mkString(",")) println("======subtract操做======") /* cartesian操做 */ println("======cartesian操做======") println(rdd01.cartesian(rdd02).collect().mkString(",")) println("======cartesian操做======") } /** * 基本的RDD行動操做 */ def basicActionRDD():Unit = { val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1)) val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1) /* count操做 */ println("======count操做======") println(rddInt.count()) println("======count操做======") /* countByValue操做 */ println("======countByValue操做======") println(rddInt.countByValue()) println("======countByValue操做======") /* reduce操做 */ println("======countByValue操做======") println(rddInt.reduce((x ,y) => x + y)) println("======countByValue操做======") /* fold操做 */ println("======fold操做======") println(rddInt.fold(0)((x ,y) => x + y)) println("======fold操做======") /* aggregate操做 */ println("======aggregate操做======") val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2)) println(res._1 + "," + res._2) println("======aggregate操做======") /* foeach操做 */ println("======foeach操做======") println(rddStr.foreach { x => println(x) }) println("======foeach操做======") } def main(args: Array[String]): Unit = { println(System.getenv("HADOOP_HOME")) createDataMethod() createPairRDD() createDataFromFile("file:///D:/sparkdata.txt") basicTransformRDD("file:///D:/sparkdata.txt") basicActionRDD() /*打印結果*/ /*D://hadoop ===================createDataMethod:makeRDD:List===================== 1,4,9,16,25,36 ===================createDataMethod:makeRDD:List===================== ===================createDataMethod:makeRDD:Array===================== 1,2,3,4 ===================createDataMethod:makeRDD:Array===================== ===================createDataMethod:parallelize:List===================== 2,3,4,5,6,7 ===================createDataMethod:parallelize:List===================== ===================createDataMethod:parallelize:Array===================== 4,5,6 ===================createDataMethod:parallelize:Array===================== ===========================createPairRDD================================= key01,key02,key03 ===========================createPairRDD================================= key01,1,2.3,key02,5,3.7,key03,23,4.8,key04,12,3.9,key05,7,1.3 =========================createDataFromFile================================== 2,3,4,5,6,7,3,6,2 ======map操做====== ======filter操做====== 5,6,5 ======filter操做====== ======flatMap操做====== key01 ======flatMap操做====== ======distinct去重====== 4,6,2,1,3,5 ======distinct去重====== ======union操做====== 1,3,5,3,2,4,5,1 ======union操做====== ======intersection操做====== 1,5 ======intersection操做====== ======subtract操做====== 3,3 ======subtract操做====== ======cartesian操做====== (1,2),(1,4),(3,2),(3,4),(1,5),(1,1),(3,5),(3,1),(5,2),(5,4),(3,2),(3,4),(5,5),(5,1),(3,5),(3,1) ======cartesian操做====== ======count操做====== 9 ======count操做====== ======countByValue操做====== Map(5 -> 2, 1 -> 2, 6 -> 1, 2 -> 2, 3 -> 1, 4 -> 1) ======countByValue操做====== ======countByValue操做====== 29 ======countByValue操做====== ======fold操做====== 29 ======fold操做====== ======aggregate操做====== 19,10 ======aggregate操做====== ======foeach操做====== a b c d b a ======foeach操做======*/ } }
Spark執行時候咱們須要構造一個SparkContenxt的環境變量,構造環境變量時候須要構造一個SparkConf對象,例如代碼:setAppName("xtq").setMaster("local[2]")
appName就是spark任務名稱,master爲local[2]是指使用本地模式,啓動2個線程完成spark任務。
在eclipse裏運行spark程序時候,會報出以下錯誤:
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370) at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363) at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104) at org.apache.hadoop.security.Groups.<init>(Groups.java:86) at org.apache.hadoop.security.Groups.<init>(Groups.java:66) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248) at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2160) at org.apache.spark.SparkContext.<init>(SparkContext.scala:322) at cn.com.sparktest.SparkTest$.<init>(SparkTest.scala:10) at cn.com.sparktest.SparkTest$.<clinit>(SparkTest.scala) at cn.com.sparktest.SparkTest.main(SparkTest.scala)
該錯誤不會影響程序的運算,但老是讓人以爲不舒服,這個問題是由於spark運行依賴於hadoop,但是在window下實際上是沒法安裝hadoop,只能使用cygwin模擬安裝,而新版本的hadoop在windows下使用須要使用winutils.exe,解決這個問題很簡單,就是下載一個winutils.exe,注意下本身操做系統是32位仍是64位,找到對應版本,而後放置在這樣的目錄下:
D:\hadoop\bin\winutils.exe
而後再環境變量裏定義HADOOP_HOME= D:\hadoop
環境變量的改變要重啓eclipse,這樣環境變量纔會生效,這個時候程序運行就不會報出錯誤了。