Spark(三):彈性分佈式數據集(RDD)

一:Scalaes6

    Scala 是一門現代的多範式編程語言,志在以簡練、優雅及類型安全的方式來表達經常使用編程模式。它平滑地集成了面向對象和函數語言的特性。Scala 運行於 Java 平臺(JVM,Java 虛擬機),併兼容現有的 Java 程序。算法

執行如下命令,啓動spark-shell:shell

hadoop@master:/mysoftware/spark-1.6.1$ spark-shell

二:彈性分佈式數據集(RDD)apache

1.RDD(Resilient Distributed Dataset,彈性分佈式數據集)。   編程

    Spark是一個分佈式計算框架, 而RDD是其對分佈式內存數據的抽象,能夠認爲RDD就是Spark分佈式算法的數據結構。而RDD之上的操做是Spark分佈式算法的核心原語,由數據結構和原語設計上層算法。Spark最終會將算法翻譯爲DAG形式的工做流進行調度,並進行分佈式任務的發佈。數組

    RDD,它在集羣中的多臺機器上進行了數據分區,邏輯上能夠認爲是一個分佈式的數組,而數組中每一個記錄能夠是用戶自定義的任意數據結構。RDD是Spark的核心數據結構,經過RDD的依賴關係造成Spark的調度順序,經過對RDD的操做造成了整個Spark程序。緩存

2.RDD的建立方式安全

    2.1 從Hadoop文件系統(或與Hadoop兼容的其餘持久化存儲系統,如Hive,HBase)輸出(HDFS)建立。ruby

    2.2 從父RDD轉換獲得新的RDD數據結構

    2.3 經過parallelize或makeRDD將單擊數據建立爲分佈式RDD。

scala> var textFile = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt");  

textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt MapPartitionsRDD[1] at textFile at <console>:27

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27

scala>

3.RDD的兩種操做算子: 轉換(Transformation),行動(Action)。

    3.1  轉換(Transformation):延遲計算的,也就是說從一個RDD轉換生成另一個RDD的轉換操做不是立刻執行,須要等到有Action操做的時候纔會真正觸發運算。

    3.2  行動(Action):Action算子會觸發Spark提交做業(Job),並將數據輸出Spark系統。

4.RDD的重要內部屬性。

    4.1  分區列表:經過分區列表能夠找到一個RDD中包含的全部分區及其所在地址。

    4.2  計算每一個分片的函數:經過函數能夠對每一個數據塊進行RDD須要進行的用戶自定義函數                                                運算。

    4.3  對父RDD的依賴列表:爲了可以回溯帶父RDD,爲容錯等提供支持。

    4.4  對 key-value pair數據類型RDD的分區器,控制分區策略和分區數。經過分區函數能夠肯定數據記錄在各個分區和節點上的分配,減小分佈不平衡。

    4.5  每一個數據分區的地址列表(如 HDFS 上的數據塊的地址)。
            若是數據有副本,則經過地址列表能夠獲知單個數據塊的全部副本地址,爲負載均衡和容錯提供支持。

 

4.  Spark 計算工做流

    途中描述了Spark的輸入,運行轉換,輸出。在運行轉換中經過算子對RDD進行轉換。算子RDD中定義的函數,能夠對RDD中的數據進行轉換和操做。

    輸入:在Spark程序運行中,數據從外部數據空間(eg:HDFS)輸入到Spark,數據就進入了Spark運行時數據空間,會轉化爲Spark中的數據塊,經過BlockManager進行管理。

    運行:在Spark數據輸入造成RDD後,即可以經過變換算子fliter等,對數據操做並將RDD轉換爲新的RDD,經過行動Action算子,觸發Spark提交做業。若是數據須要服用,能夠經過Cache算子,將數據緩存到內存。

    輸出:程序運行結束數據會輸出Spark運行時空間,存儲到分佈式存儲中(如saveAsTextFile 輸出到 HDFS)或 Scala 數據或集合中( collect 輸出到 Scala 集合,count 返回 Scala Int 型數據)。
 

    

 

 

    Spark的核心數據模型是RDD,但RDD是個抽象類,具體由各子類實現,如MappedRDD,ShuffledRDD等子類。Spark將經常使用的大數據操做都轉換成爲RDD的子類。

對其一些基本操做的使用:

scala> 3*7
res0: Int = 21

scala> var textFile = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt");  
textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt MapPartitionsRDD[1] at textFile at <console>:27

scala> textFile.count()
res1: Long = 3                                                                  

scala> textFile.first()
res2: String = 1	spark

scala> textFile.filter(line => line.contains("berg")).count() 
res3: Long = 1

scala> textFile.filter(line => line.contains("bergs")).count() 
res4: Long = 0

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res5: Int = 1

scala> textFile.map(line => line.split("\t").size).reduce((a, b) => if (a > b) a else b)
res6: Int = 2

 

更多學習:

http://my.oschina.net/lgscofield/blog/497145

http://blog.csdn.net/tanggao1314/article/details/51557377

http://blog.csdn.net/yeruby/article/details/41043039

 

文獻: Spark大數據分析實戰。  

相關文章
相關標籤/搜索