Spark編程指南

一、在maven裏面添加引用,spark和hdfs的客戶端的。html

groupId = org.apache.spark
artifactId = spark-core_2.9.3
version = 0.8.1-incubating 
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

二、把assembly/target/spark-assembly_2.9.3-0.8.1-incubating.jar添加到classpath裏面,而後咱們在程序裏面要添加如下引用。程序員

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

三、下面是官方的WorkCount的例子,能夠參考一下。算法

/*** SimpleApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME", List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar"))
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
 } 
}
val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME", List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar"))
SparkContext是SparkContext的上下文對象,是很是核心的一個類,它的實例化方法是Hadoop datasets。
(1)Parallelized collectionsnew SparkContext(master, appName, [sparkHome], [jars])。
master:master的地址。
appName:應用的名稱。
sparkHome:spark的安裝地址。
jars:jar包的位置。

四、Spark老是圍繞這個一個概念來進行 resilient distributed dataset (RDD),是能夠並行操做的支持容錯的元素集合。目前支持兩種類型的RDDs,parallelized collections和是scala中存在的集合類,而且支持並行操做。
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data)
distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e

 正常狀況之下,spark會自動設置並行任務所須要的cpu的分片,通常是每一個cpu 2-4個分片,也能夠本身手動設置,sc.parallelize(data, 10)。apache

 (2)Spark支持hadoop上的任何數據集,好比text files, SequenceFiles,還有其它的InputFormat。api

   下面是text files的例子:數組

scala> val distFile = sc.textFile("data.txt")
distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08

   SequenceFiles則使用SparkContext’s sequenceFile[K, V] ,好比sequenceFile[Int, String],Int對應的是IntWritable,String對應的是Text。緩存

   別的數據格式使用SparkContext.hadoopRDD,以後再介紹,這個文檔沒有介紹。app

   正常狀況之下,spark是一個block一個任務。maven

   (3)RDDs只支持兩種操做: transformations,  從一個數據集轉換成另一種; actions, 經過對一個數據集進行運算以後返回一個值。oop

     Spark當中全部的transformations都是延遲執行的,等到真正使用的時候纔會進行運算。

     默認的,每個通過transformed的RDD當有action做用於它的時候,它會從新計算一遍,除非咱們進行persist (or cache) 操做。

     最後附錄一下RDD的API地址:http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.RDD

 (4)RDD Persistence 

    Spark最重要的一個功能就是能夠把RDD持久化或者緩存,當你進行一個持久化操做的時候,Spark會在全部節點的內存當中保存這個RDD,第一個的時候計算,以後一直使用不須要再從新計算了。緩存是實現迭代式算法的關鍵。咱們可使用persist() or cache()方法來持久化一個RDD,它是容錯的,當這個RDD的任何分片丟失以後,它會在以前計算它的機器上從新計算。另外每個RDD,有它本身的存儲Level,存儲在硬盤或者存儲在內存,可是序列化成Java對象(節省空間),或者在集羣間複製。要設置它,咱們須要傳遞一個StorageLevel給persist(),cache()是默認的了是StorageLevel.MEMORY_ONLY (存儲爲反序列化對象在內存當中)

  當內存足夠的時候,咱們可使用MEMORY_ONLY;當內存不太好的時候,咱們能夠採用MEMORY_ONLY_SER,在內存中存儲爲一個字節數組,速度還能夠;當操做的數據集合足夠大的時候,咱們就把中間結果寫到硬盤上;若是要支持容錯,就使用備份到2個節點上的方式。若是要本身定義一個的話,要使用StorageLevel的apply()方法。      

 五、共享變量

   Spark提供了兩種限制的共享變量,BroadcastAccumulators。

   (1)Broadcast容許程序員持有一個只讀的變量在各個節點之間,它一個經常使用的場景就是用它來存儲一個很大的輸入的數據集給每一個節點使用,Spark會只用它獨有的廣播算法來減小通訊損失。下面是例子:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

 

    (2)Accumulators是用來計數或者求總數的,使用SparkContext.accumulator(v)來給它一個初始化的值,而後用「+=」來進行操做,可是任務之間不能獲得它的結果,只有驅動任務的程序能夠獲得它的結果。下面是例子:

scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10
相關文章
相關標籤/搜索