Spark筆記:RDD基本操做(上)

  本文主要是講解spark裏RDD的基礎操做。RDD是spark特有的數據模型,談到RDD就會提到什麼彈性分佈式數據集,什麼有向無環圖,本文暫時不去展開這些高深概念,在閱讀本文時候,你們能夠就把RDD看成一個數組,這樣的理解對咱們學習RDD的API是很是有幫助的。本文全部示例代碼都是使用scala語言編寫的。java

  Spark裏的計算都是操做RDD進行,那麼學習RDD的第一個問題就是如何構建RDD,構建RDD從數據來源角度分爲兩類:第一類是從內存裏直接讀取數據,第二類就是從文件系統裏讀取,固然這裏的文件系統種類不少常見的就是HDFS以及本地文件系統了。shell

  第一類方式從內存裏構造RDD,使用的方法:makeRDD和parallelize方法,以下代碼所示:apache

 

    /* 使用makeRDD建立RDD */
    /* List */
    val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
    val r01 = rdd01.map { x => x * x }
    println(r01.collect().mkString(","))
    /* Array */
    val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
    val r02 = rdd02.filter { x => x < 5}
    println(r02.collect().mkString(","))

    val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r03 = rdd03.map { x => x + 1 }
    println(r03.collect().mkString(","))
    /* Array */
    val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r04 = rdd04.filter { x => x > 3 }
    println(r04.collect().mkString(","))

 

  你們看到了RDD本質就是一個數組,所以構造數據時候使用的是List(鏈表)和Array(數組)類型。windows

  第二類方式是經過文件系統構造RDD,代碼以下所示:數組

    val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1)
    val r:RDD[String] = rdd.flatMap { x => x.split(",") }
    println(r.collect().mkString(","))

  這裏例子使用的是本地文件系統,因此文件路徑協議前綴是file://。服務器

  構造了RDD對象了,接下來就是如何操做RDD對象了,RDD的操做分爲轉化操做(transformation)和行動操做(action),RDD之因此將操做分紅這兩類這是和RDD惰性運算有關,當RDD執行轉化操做時候,實際計算並無被執行,只有當RDD執行行動操做時候纔會促發計算任務提交,執行相應的計算操做。區別轉化操做和行動操做也很是簡單,轉化操做就是從一個RDD產生一個新的RDD操做,而行動操做就是進行實際的計算。app

  下面是RDD的基礎操做API介紹:框架

操做類型eclipse

函數名分佈式

做用

轉化操做

map()

參數是函數,函數應用於RDD每個元素,返回值是新的RDD

flatMap()

參數是函數,函數應用於RDD每個元素,將元素數據進行拆分,變成迭代器,返回值是新的RDD

filter()

參數是函數,函數會過濾掉不符合條件的元素,返回值是新的RDD

distinct()

沒有參數,將RDD裏的元素進行去重操做

union()

參數是RDD,生成包含兩個RDD全部元素的新RDD

intersection()

參數是RDD,求出兩個RDD的共同元素

subtract()

參數是RDD,將原RDD裏和參數RDD裏相同的元素去掉

cartesian()

參數是RDD,求兩個RDD的笛卡兒積

行動操做

collect()

返回RDD全部元素

count()

RDD裏元素個數

countByValue()

各元素在RDD中出現次數

reduce()

並行整合全部RDD數據,例如求和操做

fold(0)(func)

和reduce功能同樣,不過fold帶有初始值

aggregate(0)(seqOp,combop)

和reduce功能同樣,可是返回的RDD數據類型和原RDD不同

foreach(func)

對RDD每一個元素都是使用特定函數

  下面是以上API操做的示例代碼,以下:

  轉化操做:

    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    val rddFile:RDD[String] = sc.textFile(path, 1)
    
    val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
    val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))

    /* map操做 */
    println("======map操做======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操做======")
    /* filter操做 */
    println("======filter操做======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操做======")
    /* flatMap操做 */
    println("======flatMap操做======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操做======")
    /* distinct去重操做 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")
    /* union操做 */
    println("======union操做======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操做======")
    /* intersection操做 */
    println("======intersection操做======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操做======")
    /* subtract操做 */
    println("======subtract操做======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操做======")
    /* cartesian操做 */
    println("======cartesian操做======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操做======")

  行動操做代碼以下:

    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    
    /* count操做 */
    println("======count操做======")
    println(rddInt.count())
    println("======count操做======")   
    /* countByValue操做 */
    println("======countByValue操做======")
    println(rddInt.countByValue())
    println("======countByValue操做======")
    /* reduce操做 */
    println("======countByValue操做======")
    println(rddInt.reduce((x ,y) => x + y))
    println("======countByValue操做======")
    /* fold操做 */
    println("======fold操做======")
    println(rddInt.fold(0)((x ,y) => x + y))
    println("======fold操做======")
    /* aggregate操做 */
    println("======aggregate操做======")
    val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操做======")
    /* foeach操做 */
    println("======foeach操做======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操做======") 

  RDD操做暫時先學習到這裏,剩下的內容在下一篇裏再談了,下面我要說說如何開發spark,安裝spark的內容我後面會使用專門的文章進行講解,這裏咱們假設已經安裝好了spark,那麼咱們就能夠在已經裝好的spark服務器上使用spark-shell進行與spark交互的shell,這裏咱們直接能夠敲打代碼編寫spark程序。可是spark-shell畢竟使用太麻煩,並且spark-shell一次只能使用一個用戶,當另一個用戶要使用spark-shell就會把前一個用戶踢掉,並且shell也沒有IDE那種代碼補全,代碼校驗的功能,使用起來非常痛苦。

  不過spark的確是一個神奇的框架,這裏的神奇就是指spark本地開發調試很是簡單,本地開發調試不須要任何已經裝好的spark系統,咱們只須要創建一個項目,這個項目能夠是java的也能夠是scala,而後咱們將spark-assembly-1.6.1-hadoop2.6.0.jar這樣的jar放入項目的環境裏,這個時候咱們就能夠在本地開發調試spark程序了。

  你們請看咱們裝有scala插件的eclipse裏的完整代碼:

package cn.com.sparktest

import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object SparkTest {
  val conf:SparkConf = new SparkConf().setAppName("xtq").setMaster("local[2]")
  val sc:SparkContext = new SparkContext(conf)
  
  /**
   * 建立數據的方式--從內存裏構造數據(基礎)
   */
  def createDataMethod():Unit = {
    /* 使用makeRDD建立RDD */
    /* List */
    val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
    val r01 = rdd01.map { x => x * x }
    println("===================createDataMethod:makeRDD:List=====================")
    println(r01.collect().mkString(","))
    println("===================createDataMethod:makeRDD:List=====================")
    /* Array */
    val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
    val r02 = rdd02.filter { x => x < 5}
    println("===================createDataMethod:makeRDD:Array=====================")
    println(r02.collect().mkString(","))
    println("===================createDataMethod:makeRDD:Array=====================")
    
    /* 使用parallelize建立RDD */
    /* List */
    val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r03 = rdd03.map { x => x + 1 }
    println("===================createDataMethod:parallelize:List=====================")
    println(r03.collect().mkString(","))
    println("===================createDataMethod:parallelize:List=====================")
    /* Array */
    val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r04 = rdd04.filter { x => x > 3 }
    println("===================createDataMethod:parallelize:Array=====================")
    println(r04.collect().mkString(","))
    println("===================createDataMethod:parallelize:Array=====================")
  }
  
  /**
   * 建立Pair Map
   */
  def createPairRDD():Unit = {
    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("key01",1),("key02",2),("key03",3)))
    val r:RDD[String] = rdd.keys
    println("===========================createPairRDD=================================")
    println(r.collect().mkString(","))
    println("===========================createPairRDD=================================")
  }
  
  /**
   * 經過文件建立RDD
   * 文件數據:
   * 	key01,1,2.3
		  key02,5,3.7
      key03,23,4.8
      key04,12,3.9
      key05,7,1.3
   */
  def createDataFromFile(path:String):Unit = {
    val rdd:RDD[String] = sc.textFile(path, 1)
    val r:RDD[String] = rdd.flatMap { x => x.split(",") }
    println("=========================createDataFromFile==================================")
    println(r.collect().mkString(","))
    println("=========================createDataFromFile==================================")
  }
  
  /**
   * 基本的RDD操做
   */
  def basicTransformRDD(path:String):Unit = {
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    val rddFile:RDD[String] = sc.textFile(path, 1)
    
    val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
    val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))

    /* map操做 */
    println("======map操做======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操做======")
    /* filter操做 */
    println("======filter操做======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操做======")
    /* flatMap操做 */
    println("======flatMap操做======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操做======")
    /* distinct去重操做 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")
    /* union操做 */
    println("======union操做======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操做======")
    /* intersection操做 */
    println("======intersection操做======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操做======")
    /* subtract操做 */
    println("======subtract操做======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操做======")
    /* cartesian操做 */
    println("======cartesian操做======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操做======")    
  }
  
  /**
   * 基本的RDD行動操做
   */
  def basicActionRDD():Unit = {
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    
    /* count操做 */
    println("======count操做======")
    println(rddInt.count())
    println("======count操做======")   
    /* countByValue操做 */
    println("======countByValue操做======")
    println(rddInt.countByValue())
    println("======countByValue操做======")
    /* reduce操做 */
    println("======countByValue操做======")
    println(rddInt.reduce((x ,y) => x + y))
    println("======countByValue操做======")
    /* fold操做 */
    println("======fold操做======")
    println(rddInt.fold(0)((x ,y) => x + y))
    println("======fold操做======")
    /* aggregate操做 */
    println("======aggregate操做======")
    val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操做======")
    /* foeach操做 */
    println("======foeach操做======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操做======")    
  }
  
  def main(args: Array[String]): Unit = {
    println(System.getenv("HADOOP_HOME"))
    createDataMethod()
    createPairRDD()
    createDataFromFile("file:///D:/sparkdata.txt")
    basicTransformRDD("file:///D:/sparkdata.txt")
    basicActionRDD()
    /*打印結果*/
    /*D://hadoop
===================createDataMethod:makeRDD:List=====================
1,4,9,16,25,36
===================createDataMethod:makeRDD:List=====================
===================createDataMethod:makeRDD:Array=====================
1,2,3,4
===================createDataMethod:makeRDD:Array=====================
===================createDataMethod:parallelize:List=====================
2,3,4,5,6,7
===================createDataMethod:parallelize:List=====================
===================createDataMethod:parallelize:Array=====================
4,5,6
===================createDataMethod:parallelize:Array=====================
===========================createPairRDD=================================
key01,key02,key03
===========================createPairRDD=================================
key01,1,2.3,key02,5,3.7,key03,23,4.8,key04,12,3.9,key05,7,1.3
=========================createDataFromFile==================================
2,3,4,5,6,7,3,6,2
======map操做======
======filter操做======
5,6,5
======filter操做======
======flatMap操做======
key01
======flatMap操做======
======distinct去重======
4,6,2,1,3,5
======distinct去重======
======union操做======
1,3,5,3,2,4,5,1
======union操做======
======intersection操做======
1,5
======intersection操做======
======subtract操做======
3,3
======subtract操做======
======cartesian操做======
(1,2),(1,4),(3,2),(3,4),(1,5),(1,1),(3,5),(3,1),(5,2),(5,4),(3,2),(3,4),(5,5),(5,1),(3,5),(3,1)
======cartesian操做======
======count操做======
9
======count操做======
======countByValue操做======
Map(5 -> 2, 1 -> 2, 6 -> 1, 2 -> 2, 3 -> 1, 4 -> 1)
======countByValue操做======
======countByValue操做======
29
======countByValue操做======
======fold操做======
29
======fold操做======
======aggregate操做======
19,10
======aggregate操做======
======foeach操做======
a
b
c
d
b
a
======foeach操做======*/
  }
}

  Spark執行時候咱們須要構造一個SparkContenxt的環境變量,構造環境變量時候須要構造一個SparkConf對象,例如代碼:setAppName("xtq").setMaster("local[2]")

  appName就是spark任務名稱,master爲local[2]是指使用本地模式,啓動2個線程完成spark任務。

  在eclipse裏運行spark程序時候,會報出以下錯誤:

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
	at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355)
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:86)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:66)
	at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
	at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271)
	at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248)
	at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763)
	at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
	at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
	at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160)
	at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2160)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:322)
	at cn.com.sparktest.SparkTest$.<init>(SparkTest.scala:10)
	at cn.com.sparktest.SparkTest$.<clinit>(SparkTest.scala)
	at cn.com.sparktest.SparkTest.main(SparkTest.scala)

  該錯誤不會影響程序的運算,但老是讓人以爲不舒服,這個問題是由於spark運行依賴於hadoop,但是在window下實際上是沒法安裝hadoop,只能使用cygwin模擬安裝,而新版本的hadoop在windows下使用須要使用winutils.exe,解決這個問題很簡單,就是下載一個winutils.exe,注意下本身操做系統是32位仍是64位,找到對應版本,而後放置在這樣的目錄下:

  D:\hadoop\bin\winutils.exe

  而後再環境變量裏定義HADOOP_HOME= D:\hadoop

  環境變量的改變要重啓eclipse,這樣環境變量纔會生效,這個時候程序運行就不會報出錯誤了。

相關文章
相關標籤/搜索