[spark]spark 編程教程

 

參考:html

英文:https://spark.apache.org/docs/latest/programming-guide.htmljava

中文:http://www.cnblogs.com/lujinhong2/p/4651025.html 1.2.1版本的python

 

(一)快速入門shell

老規矩,先看一個簡單示例,有個認識。這個示例來自官方example的SparkPi:express

package org.lujinhong.demo.spark

/*
 * 官方的sparkPi示例
 */

import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi").setMaster("local")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }

}
 
注意以上的setMaster(「local」)是本身加上去的,方便直接在本地運行。若是在集羣上運行,則經過spark-submit的—master參數指定。
寫好代碼後,就能夠直接在eclipse中右鍵—>運行了。

 

 (二)理論介紹apache

 

一、spark中的全部操做都與RDD相關,包括建立RDD,transformation(將RDD轉換爲另外一個RDD)和action(觸發RDD的計算,以及輸出等)。緩存

In Spark all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.app

 二、RDD是一個不可變的分佈式對象集合,每一個RDD會被分紅多個分區,它們分別在不一樣的機器上被計算。它能夠是任何的python/java/scala對象,包括你本身建立的對象。dom

注意RDD是不可變的,所以若須要改變現有RDD的內容,只能經過建立一個新的RDD來實現,這也是transformation的做用。eclipse

RDD是一個集合,所以能夠經過一些迭代方法對內容進行處理

 三、RDD操做類型:對RDD的操做能夠分爲2種類型

(1)Transformation: 將一個RDD轉化爲另外一個RDD,如map, filter等操做

(2)Action:返回計算結果給driver,寫入存儲等操做。

最明顯的區別:transformation返回一個RDD, action返回其它數據類型

 

(三)Spark應用的主要4個工做流程以下:

一、create:     經過讀取外部數據源來建立RDD。(雖說也能夠將list/set等轉化爲RDD,但實際上這對於處理大數據沒什麼做用,通常只用做demo)

二、transformation:  將RDD將化爲另外一個RDD,如filter()等。

三、cache:  將RDD緩存下來,方便以後再使用,如persitst()等。

四、action:   執行真正的工做,計算結果並輸出,如count(),first()等。

幾個注意點

一、建立RDD有2種方法:

(1)從外部數據集中建立,如從文件,socket,kafka, flume等數據源

(2)將list/set等集合轉化爲RDD。scala> val lines = sc.parallelize(List("apple","pear"));

二、執行transformation只定義了操做,spark執行的是懶計算原則,即transformation不會觸發真正的計算,而是等到第一個action出現時纔開始真正的計算。這對於大數據量時成爲重要。如讀取一份大文件時,若立刻將其讀入內存,會佔用大量的內存空間,而有可能過很長時間也會開始計算。另外一方面,若是隻是執行first()相似的計算,這個文件徹底不必所有讀入內存,而是隻讀取到第一行就能夠了。

三、默認狀況下,對於每個action,spark會從新計算它用到的RDD,若一個RDD會被以後的多個action用到,能夠將其緩存到內存(看成也能夠到磁盤等),如讀取一個文件後,先通過filter,過濾出只包括」spark」的行,此時能夠將這個RDD保存到內存中,再分別計算它的count(),first()等操做。

cache() is the same as calling persist() with the default storage level.

四、action會觸發真正的計算。

看一個示例:

$ bin/spark-shell

 
(1)建立RDD
scala> val fileContent = sc.textFile("file:///home/hadoop/spark/README.md」)
 
(2)過濾RDD
scala> val pythonLine = fileContent.filter(line => line.contains("spark」))
 
(3)計算行數
scala> pythonLine.count
最後的輸出以下:
15/07/21 11:20:43 INFO scheduler.DAGScheduler: Job 2 finished: count at <console>:26, took 1.495956 s
res5: Long = 11
 
能夠看出使用了1.5秒左右。
 
(4)咱們試一下緩存後再計算
scala> pythonLine.cache()
scala> pythonLine.count
15/07/21 11:22:18 INFO scheduler.DAGScheduler: Job 3 finished: count at <console>:26, took 0.123537 s
res7: Long = 11
只使用了0.12秒
 
(5)繼續執行其它action
scala> pythonLine.first()
。。。。。
相關文章
相關標籤/搜索