一:Scalaes6
Scala 是一門現代的多範式編程語言,志在以簡練、優雅及類型安全的方式來表達經常使用編程模式。它平滑地集成了面向對象和函數語言的特性。Scala 運行於 Java 平臺(JVM,Java 虛擬機),併兼容現有的 Java 程序。算法
執行如下命令,啓動spark-shell:shell
hadoop@master:/mysoftware/spark-1.6.1$ spark-shell
二:彈性分佈式數據集(RDD)apache
1.RDD(Resilient Distributed Dataset,彈性分佈式數據集)。 編程
Spark是一個分佈式計算框架, 而RDD是其對分佈式內存數據的抽象,能夠認爲RDD就是Spark分佈式算法的數據結構。而RDD之上的操做是Spark分佈式算法的核心原語,由數據結構和原語設計上層算法。Spark最終會將算法翻譯爲DAG形式的工做流進行調度,並進行分佈式任務的發佈。數組
RDD,它在集羣中的多臺機器上進行了數據分區,邏輯上能夠認爲是一個分佈式的數組,而數組中每一個記錄能夠是用戶自定義的任意數據結構。RDD是Spark的核心數據結構,經過RDD的依賴關係造成Spark的調度順序,經過對RDD的操做造成了整個Spark程序。緩存
2.RDD的建立方式安全
2.1 從Hadoop文件系統(或與Hadoop兼容的其餘持久化存儲系統,如Hive,HBase)輸出(HDFS)建立。ruby
2.2 從父RDD轉換獲得新的RDD數據結構
2.3 經過parallelize或makeRDD將單擊數據建立爲分佈式RDD。
scala> var textFile = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt"); textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt MapPartitionsRDD[1] at textFile at <console>:27 scala> val a = sc.parallelize(1 to 9, 3) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27 scala>
3.RDD的兩種操做算子: 轉換(Transformation),行動(Action)。
3.1 轉換(Transformation):延遲計算的,也就是說從一個RDD轉換生成另一個RDD的轉換操做不是立刻執行,須要等到有Action操做的時候纔會真正觸發運算。
3.2 行動(Action):Action算子會觸發Spark提交做業(Job),並將數據輸出Spark系統。
4.RDD的重要內部屬性。
4.1 分區列表:經過分區列表能夠找到一個RDD中包含的全部分區及其所在地址。
4.2 計算每一個分片的函數:經過函數能夠對每一個數據塊進行RDD須要進行的用戶自定義函數 運算。
4.3 對父RDD的依賴列表:爲了可以回溯帶父RDD,爲容錯等提供支持。
4.4 對 key-value pair數據類型RDD的分區器,控制分區策略和分區數。經過分區函數能夠肯定數據記錄在各個分區和節點上的分配,減小分佈不平衡。
4.5 每一個數據分區的地址列表(如 HDFS 上的數據塊的地址)。
若是數據有副本,則經過地址列表能夠獲知單個數據塊的全部副本地址,爲負載均衡和容錯提供支持。
4. Spark 計算工做流
途中描述了Spark的輸入,運行轉換,輸出。在運行轉換中經過算子對RDD進行轉換。算子RDD中定義的函數,能夠對RDD中的數據進行轉換和操做。
輸入:在Spark程序運行中,數據從外部數據空間(eg:HDFS)輸入到Spark,數據就進入了Spark運行時數據空間,會轉化爲Spark中的數據塊,經過BlockManager進行管理。
運行:在Spark數據輸入造成RDD後,即可以經過變換算子fliter等,對數據操做並將RDD轉換爲新的RDD,經過行動Action算子,觸發Spark提交做業。若是數據須要服用,能夠經過Cache算子,將數據緩存到內存。
輸出:程序運行結束數據會輸出Spark運行時空間,存儲到分佈式存儲中(如saveAsTextFile 輸出到 HDFS)或 Scala 數據或集合中( collect 輸出到 Scala 集合,count 返回 Scala Int 型數據)。
Spark的核心數據模型是RDD,但RDD是個抽象類,具體由各子類實現,如MappedRDD,ShuffledRDD等子類。Spark將經常使用的大數據操做都轉換成爲RDD的子類。
對其一些基本操做的使用:
scala> 3*7 res0: Int = 21 scala> var textFile = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt"); textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt MapPartitionsRDD[1] at textFile at <console>:27 scala> textFile.count() res1: Long = 3 scala> textFile.first() res2: String = 1 spark scala> textFile.filter(line => line.contains("berg")).count() res3: Long = 1 scala> textFile.filter(line => line.contains("bergs")).count() res4: Long = 0 scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res5: Int = 1 scala> textFile.map(line => line.split("\t").size).reduce((a, b) => if (a > b) a else b) res6: Int = 2
更多學習:
http://my.oschina.net/lgscofield/blog/497145
http://blog.csdn.net/tanggao1314/article/details/51557377
http://blog.csdn.net/yeruby/article/details/41043039
文獻: Spark大數據分析實戰。