參考:html
英文:https://spark.apache.org/docs/latest/programming-guide.htmljava
中文:http://www.cnblogs.com/lujinhong2/p/4651025.html 1.2.1版本的python
(一)快速入門shell
老規矩,先看一個簡單示例,有個認識。這個示例來自官方example的SparkPi:express
package org.lujinhong.demo.spark /* * 官方的sparkPi示例 */ import scala.math.random import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi").setMaster("local") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() } }
注意以上的setMaster(「local」)是本身加上去的,方便直接在本地運行。若是在集羣上運行,則經過spark-submit的—master參數指定。
寫好代碼後,就能夠直接在eclipse中右鍵—>運行了。
(二)理論介紹apache
一、spark中的全部操做都與RDD相關,包括建立RDD,transformation(將RDD轉換爲另外一個RDD)和action(觸發RDD的計算,以及輸出等)。緩存
In Spark all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.app
二、RDD是一個不可變的分佈式對象集合,每一個RDD會被分紅多個分區,它們分別在不一樣的機器上被計算。它能夠是任何的python/java/scala對象,包括你本身建立的對象。dom
注意RDD是不可變的,所以若須要改變現有RDD的內容,只能經過建立一個新的RDD來實現,這也是transformation的做用。eclipse
RDD是一個集合,所以能夠經過一些迭代方法對內容進行處理
三、RDD操做類型:對RDD的操做能夠分爲2種類型
(1)Transformation: 將一個RDD轉化爲另外一個RDD,如map, filter等操做
(2)Action:返回計算結果給driver,寫入存儲等操做。
最明顯的區別:transformation返回一個RDD, action返回其它數據類型
(三)Spark應用的主要4個工做流程以下:
一、create: 經過讀取外部數據源來建立RDD。(雖說也能夠將list/set等轉化爲RDD,但實際上這對於處理大數據沒什麼做用,通常只用做demo)
二、transformation: 將RDD將化爲另外一個RDD,如filter()等。
三、cache: 將RDD緩存下來,方便以後再使用,如persitst()等。
四、action: 執行真正的工做,計算結果並輸出,如count(),first()等。
幾個注意點
一、建立RDD有2種方法:
(1)從外部數據集中建立,如從文件,socket,kafka, flume等數據源
(2)將list/set等集合轉化爲RDD。scala> val lines = sc.parallelize(List("apple","pear"));
二、執行transformation只定義了操做,spark執行的是懶計算原則,即transformation不會觸發真正的計算,而是等到第一個action出現時纔開始真正的計算。這對於大數據量時成爲重要。如讀取一份大文件時,若立刻將其讀入內存,會佔用大量的內存空間,而有可能過很長時間也會開始計算。另外一方面,若是隻是執行first()相似的計算,這個文件徹底不必所有讀入內存,而是隻讀取到第一行就能夠了。
三、默認狀況下,對於每個action,spark會從新計算它用到的RDD,若一個RDD會被以後的多個action用到,能夠將其緩存到內存(看成也能夠到磁盤等),如讀取一個文件後,先通過filter,過濾出只包括」spark」的行,此時能夠將這個RDD保存到內存中,再分別計算它的count(),first()等操做。
cache() is the same as calling persist() with the default storage level.
四、action會觸發真正的計算。
看一個示例:
$ bin/spark-shell