參考來源:http://www.yiibai.com/spark/node
概述
Apache Spark是一個集羣計算設計的快速計算。它是創建在Hadoop MapReduce之上,它擴展了 MapReduce 模式,有效地使用更多類型的計算,其中包括交互式查詢和流處理。Spark的主要特徵是其內存集羣計算,增長的應用程序的處理速度。shell
三種部署方法:apache
Spark RDD
彈性分佈式數據集(RDD)是Spark的基本數據結構。它是對象的不可變的分佈式集合。在RDD中每一個數據集被劃分紅邏輯分區,這多是在羣集中的不一樣節點上計算的。RDDS能夠包含任何類型,如:Python,Java,或者Scala的對象,包括用戶定義的類。編程
安裝
按順序安裝Java、Scala、Spark數組
Spark核心編程
建立簡單RDD
Spark容器會自動建立Spark 上下文對象名爲sc緩存
$ spark-shell scala> val inputfile = sc.textFile(「input.txt」)
RDD轉換
S.No | 轉換&含義
--------|----------------
1 | map(func) 返回一個新的分佈式數據集,傳遞源的每一個元素造成經過一個函數 func
2 | filter(func) 返回由選擇在func返回true,源元素組成了一個新的數據集
3 | flatMap(func) 相似映射,但每一個輸入項目能夠被映射到0以上輸出項目(因此func應返回seq而不是單一的項目)
4 | mapPartitions(func) 相似映射,只不過是單獨的每一個分區(塊)上運行RDD,所以 func 的類型必須是Iterator
5 | mapPartitionsWithIndex(func) 相似映射分區,並且還提供func 來表示分區的索引的整數值,所以 func 必須是類型 (Int, Iterator
6 | sample(withReplacement, fraction, seed) 採樣數據的一小部分,有或沒有更換,利用給定的隨機數發生器的種子
7 | union(otherDataset) 返回一個新的數據集,其中包含源數據和參數元素的結合
8 | intersection(otherDataset) 返回包含在源數據和參數元素的新RDD交集
9 | distinct([numTasks]) 返回一個新的數據集包含源數據集的不一樣元素
10 | groupByKey([numTasks]) 當調用(K,V)數據集,返回(K, Iterable
11 | reduceByKey(func, [numTasks])
12 | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
13 | sortByKey([ascending], [numTasks])
14 | join(otherDataset, [numTasks])
15 | cogroup(otherDataset, [numTasks])
16 | cartesian(otherDataset) 當上調用類型T和U的數據集,返回(T,U)對數據集(全部元素對)
17 | pipe(command, [envVars]) RDD經過shell命令每一個分區,例如:一個Perl或bash腳本。RDD元素被寫入到進程的標準輸入和線路輸出,標準輸出形式返回一個字符串RDD
18 | coalesce(numPartitions) 減小RDD到numPartitions分區的數量。過濾大型數據集後,更高效地運行的操做
19 | repartition(numPartitions) 打亂RDD數據隨機創造更多或更少的分區,並在它們之間平衡。這老是打亂的全部數據在網絡上
20 | repartitionAndSortWithinPartitions(partitioner) 根據給定的分區從新分區RDD及在每一個結果分區,排序鍵記錄。這是調用從新分配排序在每一個分區內,由於它能夠推進分揀向下進入混洗機制效率更高。
動做
S.No | 操做 & 含義
--------|---------------------
1 | reduce(func) 合計數據集的元素,使用函數 func (其中有兩個參數和返回一行). 該函數應該是可交換和可結合,以便它能夠正確地在並行計算。
2 | collect() 返回數據集的全部做爲數組在驅動程序的元素。這是一個過濾器或其它操做以後返回數據的一個足夠小的子集,一般是有用的
3 | count() 返回該數據集的元素數
4 | first() 返回的數據集的第一個元素(相似於使用(1))
5 | take(n) 返回與該數據集的前n個元素的陣列。
6 | takeSample (withReplacement,num, [seed]) 返回數組的數據集num個元素,有或沒有更換隨機抽樣,預指定的隨機數發生器的種子可選
7 | takeOrdered(n, [ordering]) 返回RDD使用或者按其天然順序或自定義比較的前第n個元素
8 | saveAsTextFile(path) 寫入數據集是一個文本文件中的元素(或一組文本文件),在給定的目錄的本地文件系統,HDFS或任何其餘的Hadoop支持的文件系統。Spark調用每一個元素的 toString,將其轉換爲文件中的文本行
9 | saveAsSequenceFile(path) (Java and Scala) 寫入數據集,爲Hadoop SequenceFile元素在給定的路徑寫入在本地文件系統,HDFS或任何其餘Hadoop支持的文件系統。 這是適用於實現Hadoop可寫接口RDDS的鍵 - 值對。在Scala中,它也能夠在屬於隱式轉換爲可寫(Spark包括轉換爲基本類型,如 Int, Double, String 等等)類型。
10 | saveAsObjectFile(path) (Java and Scala) 寫入數據集的內容使用Java序列化爲一個簡單的格式,而後可使用SparkContext.objectFile()加載。
11 | countByKey() 僅適用於RDDS的類型 (K, V). 返回(K, Int)對與每一個鍵的次數的一個HashMap。
12 | foreach(func) 數據集的每一個元素上運行函數func。這一般對於不良反應,例如更新累加器或與外部存儲系統進行交互進行。網絡
示例程序數據結構
//打開Spark-Shell $ spark-shell //建立一個RDD scala> val inputfile = sc.textFile("input.txt") //執行字數轉換 scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_); //當前RDD scala> counts.toDebugString //緩存轉換 scala> counts.cache() //應用動做 scala> counts.saveAsTextFile("output")
Spark部署
Spark應用程序使用spark-submit(shell命令)來部署在集羣中的Spark應用程序
示例:
SparkWordCount.scalaapp
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) /* local = master URL; Word Count = application name; */ /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ /* Map = variables to work nodes */ /*creating an inputRDD to read text file (in.txt) through Spark context*/ val input = sc.textFile("in.txt") /* Transform the inputRDD into countRDD */ valcount = input.flatMap(line ⇒ line.split(" ")) .map(word ⇒ (word, 1)) .reduceByKey(_ + _) /* saveAsTextFile method is an action that effects on the RDD */ count.saveAsTextFile("outfile") System.out.println("OK"); } }
步驟:
一、下載Spark Ja
下載spark-core_2.10-1.3.0.jar
二、編譯程序
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
三、建立JAR
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
四、提交spark應用
spark-submit --class SparkWordCount --master local wordcount.jar