http://spark.incubator.apache.org/html
http://spark.incubator.apache.org/documentation.htmljava
http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html, 很是好的hand-on exercisesnode
源碼分析es6
http://jerryshao.me/archive.htmlsql
http://www.cnblogs.com/jerrylead/shell
下載 數據庫
http://spark.apache.org/downloads.htmlapache
下載須要的版本,解壓就能夠
固然想本身編譯也能夠api
編譯session
當前spark支持Maven編譯,
http://spark.apache.org/docs/latest/building-with-maven.html
能夠簡單的這樣編譯,
mvn -DskipTests clean package
Spark若是須要經過Hadoop-Client來訪問HDFS, 因爲不一樣版本的Hadoop的client協議不一樣, 因此編譯時須要匹配特定的Hadoop版本進行編譯,參考上面的連接
一樣若是須要用Yarn作資源管理,也須要在編譯的時候指明
http://spark.apache.org/docs/latest/quick-start.html
Spark interactive
Spark很方便的一點是,支持命令行的方式
這樣能夠簡單的學習和調試,或者interactively的進行數據分析,很贊
Scala shell (./spark-shell
)
Python interpreter (./pyspark
)
用--help來查看若是使用 (./spark-shell --help
)
而且shell不但能夠local使用,能夠鏈接實際的集羣,進行大規模的數據分析, ./bin/spark-shell --master spark://IP:PORT
下面看個簡單的例子,統計下README的行數,能夠看到在shell中會自動建立SparkContext, sc
scala> val textFile = sc.textFile("README.md") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 scala> textFile.count() // Number of items in this RDD res0: Long = 126
看個稍微複雜點的,裏面用到java的math包,
計算文件裏面最長的行包含多少單詞
scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15
Standalone Applications
可使用Scala,Java,或Python來寫Spark的app
這裏描述下用scala是如何作的,Java和Python參考上面的連接,
先寫出應用,很簡單,統計出文檔裏面,包含a和b的行數各是多少
/* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
用sbt編譯,建立simple.sbt,注意sbt中每行中間要空行來分割
name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"
建立以下的目錄結構,
# Your directory layout should look like this $ find . . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala
編譯,
# Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
提交任務,並獲得結果
# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.10/simple-project_2.10-1.0.jar ... Lines with a: 46, Lines with b: 23
maven編譯Spark應用
參考,
http://www.scala-lang.org/old/node/345
https://blogs.oracle.com/arungupta/entry/scala_and_maven_getting_started
sbt用不慣,也懶的學,連spark都用maven編譯,因此介紹如何用maven來編譯scala寫的spark應用
比較簡單Maven自己提供scala的archetype,scala-archetype-simple
mvn archetype:generate \
-DarchetypeGroupId=org.scala-tools.archetypes \
-DarchetypeArtifactId=scala-archetype-simple \
-DremoteRepositories=http://scala-tools.org/repo-releases \
-DgroupId=com.xxx \
-DartifactId=sparkapp \
-Dversion=1.0-SNAPSHOT
而後再pom裏面加上依賴,
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.1.0</version>
</dependency>
默認沒有maven-assembly-plugin,加上
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
別忘了修改pom裏面scala的版本,而後就mvn package,就ok了
Running the Examples
同時Spark也提供了些例子你們能夠參考,也能夠直接運行,以下
./bin/run-example SparkPi
獲得結果,Pi is roughly 3.14302
也能夠看下,SparkPi的實現,很簡單,
package org.apache.spark.examples import scala.math.random import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = 100000 * slices val count = spark.parallelize(1 to n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() } }
http://spark.apache.org/docs/latest/quick-start.html
Initializing Spark
開始用Spark, 首先須要建立SparkContext
This is done through the following constructor:
new SparkContext(master, appName, [sparkHome], [jars])
1. master參數, 就是指明spark集羣的位置url, 支持以下一些格式
local, Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K], Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
spark://HOST:PORT, Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
mesos://HOST:PORT, Connect to the given Mesos cluster. The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use, which is 5050 by default.
If no master URL is specified, the spark shell defaults to 「local」.
2. appName, 這個很好理解,給個名字
3. 若是你須要部署到分佈式集羣上, 那麼就須要指定後面兩個參數,
sparkHome, spark在worker上的的安裝目錄,必須保持一致
jars, 全部的jar, 包含你的應用和依賴, spark會部署到全部worker上, 並自動加入到classpath
Resilient Distributed Datasets (RDDs)
首先數據若是要在Spark中被處理,首先須要導入成RDD
如何生成RDD有兩種方式,
Parallelized collections
能夠將進程中的數據結構轉換爲RDD,這樣挺方便,你能夠把數據從任意源中先讀出來,而後轉成RDD
但前提是數據不能太大,不然效率會是問題,由於這種case是會把數據copy到各個節點
對於Scala,在Scala collection上面調用SparkContext的parallelize函數, 做用就是把collection分紅多個slices, 並分發到各個分佈式節點上, 這樣便於並行處理
scala> val data = Array(1, 2, 3, 4, 5) scala> val distData = sc.parallelize(data) //會將data, 分紅slices, 並分發到各個節點 distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e
這裏slices, 其實spark會根據集羣的CPU狀況自動分配, 可是你也能夠本身設置
sc.parallelize(data, 10)
External Datasets
固然更爲常見的是,從外部數據集中讀取數據,轉換爲RDD
Spark最經常使用的是從文件讀取數據,local,hdfs或s3等
Spark也支持從數據庫,如Cassandra, HBase, 甚至是Mysql讀取數據(經過jdbcRDD)
先看看使用SparkContext的textFile方法來加載文件,
Text file RDDs can be created using SparkContext
’s textFile
method. This method takes an URI for the file (either a local path on the machine, or a hdfs://
, s3n://
, kfs://
, etc URI)and reads it as a collection of lines. Here is an example invocation:
scala> val distFile = sc.textFile("data.txt") distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08
textFile
同樣能夠指定slices參數, 默認Spark會爲每一個block file建立一個slice, 你能夠選擇更多的slices, 但不能比block數少
須要注意的是,對於外部存儲,是各個worker獨自去讀本身的數據的,因此若是用分佈式存儲沒有問題,但若是用local存儲就須要保證在每一個節點的改目錄上都要有這個文件
因此若是不用分佈式存儲,就使用共享存儲,否則比較麻煩,須要本身copy到每一個節點
textFile會將文件讀出成collection of lines,因此能夠for直接遍歷每行
而且還支持目錄,壓縮文件,或通配符的讀取, textFile("/my/directory")
, textFile("/my/directory/*.txt")
, and textFile("/my/directory/*.gz")
.
除了textFile,還支持其餘的讀取接口,
SparkContext.wholeTextFiles,用於讀一個目錄,返回(filename, content) pairs
SequeceFiles, 使用sequenceFile[K, V]
接口, K, V表示類型, 應該是Writable的子類 SparkContext.hadoopRDD
method, which takes an arbitrary JobConf
and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source.
RDD Operations
如今已經把原始數據導入成RDD,後面就是若是繼續處理
Spark對於RDD有兩種操做,transformations和actions
transformation具備lazy特性,其實就是調用transform的時候,不會真的執行,只會記錄下這個操做而已
action,會真正的runjob去執行
下面會列出最經常使用的的transform和action
這個連接會給出全部操做的例子,不錯
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
Transformations
map(func)
Return a new distributed dataset formed by passing each element of the source through a function func.filter(func)
Return a new dataset formed by selecting those elements of the source on which func returns true.flatMap(func), map是1->1, flatMap是1->0或多個,因此這個函數返回的是Seq,最終會把全部的Seq合成一個
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
Example,
sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect
res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)爲什麼須要下面兩個,當map裏面有比較耗時的初始化操做,好比連接和斷開數據庫,確定不想每一個item都作一遍
因此用mapPartitons就能夠一個partition只作一次,輸入輸出都是iterator
而mapPartitionsWithSplit只是多傳入split index而已,也許對於某些場景須要知道
mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator[T] => Iterator[U] when running on an RDD of type T.
mapPartitionsWithSplit(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the split, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.pipe(command, [envVars]), 以shell命令處理RDD數據
Takes the RDD data of each partition and sends it via stdin to a shell-command. The resulting output of the command is captured and returned as a RDD of string values.
Example,
val a = sc.parallelize(1 to 9, 3)
a.pipe("head -n 1").collect
res2: Array[String] = Array(1, 4, 7)sample(withReplacement, fraction, seed), 隨機抽樣
Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
withReplacement,是否放回抽樣
fraction,比例,0.1表示10%
seed,僞隨機,相同的seed獲得的隨機序列是同樣的,因此若是不設seed,同一段代碼執行兩遍獲得的隨機序列是同樣的
example,
val a = sc.parallelize(1 to 10000, 3)
a.sample(false, 0.1, 0).count
res24: Long = 960集合操做
union(otherDataset)
Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)
Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))
Return a new dataset that contains the distinct elements of the source dataset.
Example,
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect
res6: Array[String] = Array(Dog, Gnu, Cat, Rat)下面的操做都是基於key-value pairs的,而其餘的操做大都是不限制數據類型
先看幾種生成key-value pairs的函數,
KeyBy(func)
用於構建key-value pairs,func是用於基於item來構建key,item做爲value
Example,
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
b.collect
res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant)Zip
cartesian( otherDataset), 笛卡爾積
Joins two RDDs by combining the i-th of either partition with each other.
Examples,
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)
c.sortByKey(true).collect
res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3)))
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
Example,
val x = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect
res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))
再看看基於key-value pairs的操做,
這類操做要求是key-value pairs,基於PairRDDFunctions類
這類操做每每須要shuffle,如group或aggregate
而且都有個可選參數numTasks,默認就是parent的partition數目,固然也能夠指定groupByKey([numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs.
若是group的目的是作sum或average,那用reduceByKey
orcombineByKey
會有更好的效率
Example,val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length)
b.groupByKey.collect
res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))reduceByKey(func, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function.
經典的例子,統計文本詞數,val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations.
這個function看着比較難懂,
aggreateByKey(zeroValue: U)(seqOp: (U, T)=> U, combOp: (U, U) =>U)
寫成這樣好懂點,
首先和reduceByKey的不一樣在於,reduceByKey輸入輸出都是(K, V)
而aggreateByKey輸出是(K, U),能夠不一樣於輸入(K, V)
這裏須要定義3個東西,相似combineByKey
zeroValue: U,初始值,好比空列表{}
seqOp: (U, T)=> U, seq操做符,描述如何將T合併入U,好比如何將item合併到列表
combOp: (U, U) =>U,comb操做符,描述若是合併兩個U,好比合並兩個列表
因此aggreateByKey能夠當作更高抽象的,更靈活的reduce或group
Example,
val z = sc.parallelize(List(1,2,3,4,5,6), 2)
z.aggreateByKey(0)(math.max(_, _), _ + _)
res40: Int = 9
val z = sc.parallelize(List("a","b","c","d","e","f"),2)
z.aggreateByKey("")(_ + _, _+_)
res115: String = abcdefsortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the booleanascending
argument.join(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Outer joins are also supported throughleftOuterJoin
andrightOuterJoin
.
Example,
val a = sc.parallelize(List("dog", "salmon","elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","rabbit","turkey","wolf"), 3)
val d = c.keyBy(_.length)
b.join(d).collect
res0: Array[(Int, (String, String))] = Array((6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)))cogroup(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable<V>, Iterable<W>)tuples. This operation is also calledgroupWith
.
groupByKey, 能夠用於group一個rdd裏面的數據,cogroup能夠同時group多個rdd的數據
Example,
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
b.cogroup(c, d).collect
res2: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(d, d))), (3,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (2,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))))下面的操做用於改變partition的個數,
爲什麼要改變partition個數?由於有些操做會大幅的增長或減小RDD中的數據
好比filter,能夠會過濾掉極大部分的數據,因此此時能夠用coalesce來減小partition的數目
repartition,不光能夠減小,也能夠增長partition的數目,並且必定會用reshuffle來隨機的balance各個partition
coalesce(numPartitions)
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
Actions
reduce(func)
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
Example,
val a = sc.parallelize(1 to 100, 3)
a.reduce(_ + _)
res41: Int = 5050collect()
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
Example,
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect
res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)count()
Return the number of elements in the dataset.
Example,
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.count
res2: Long = 4first()
Return the first element of the dataset (similar to take(1)).
Example,
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.first
res1: String = Gnutake(n)
Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
Example,
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.take(2)
res18: Array[String] = Array(dog, cat)takeSample(withReplacement, num, seed)
Return an array with a random sample of num elements of the dataset, with or without replacement, using the given random number generator seed.
和Sample不一樣,
是action因此返回的是array而不是RDD
第二個參數num會精確指定抽樣數,而不是比例
返回array時,會進行隨機排序saveAsTextFile(path)
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
Example,
val a = sc.parallelize(1 to 10000, 3)
a.saveAsTextFile("mydata_a")
這個例子會在mydata_a目錄下存3個文件,part-00000,part-00001,part-00002saveAsSequenceFile(path)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is only available on RDDs of key-value pairs that either implement Hadoop's Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).countByKey()
Only available on RDDs of type (K, V). Returns a `Map` of (K, Int) pairs with the count of each key.
Example,
val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)
c.countByKey
res3: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)foreach(func)
Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
Example,
val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)
c.foreach(x => println(x + "s are yummy"))
lions are yummy
gnus are yummy
crocodiles are yummy
ants are yummy
whales are yummy
dolphins are yummy
spiders are yummy
Passing Functions to Spark
使用spark有不少的transform和action做爲元操做,很方便
但總有一些特殊的邏輯,須要用好比map引用到數據上面去,那麼若是來定義這些function
1. 對於很短的邏輯,直接用Anonymous function syntax,這是最方便的 lineLengths.reduce((a, b) => a + b)
2. 固然不可能全部的邏輯都那麼短,因此可使用Static methods in a global singleton object
由於Scala裏面沒有靜態類,因此global singleton object其實就是靜態類的概念 object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
上面兩種是推薦的形式,若是你非要用普通類的成員函數,或是函數中用到類成員 class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
} class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
那麼須要瞭解,這樣整個類對象都須要被send到集羣上去執行,比較低效
對於用到成員變量的case,能夠用局部變量替換繞過 def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
RDD Persistence
在一個stage中,各個transform就是function的調用,中間結果是不會保留的,固然出於方便或避免反覆計算,也能夠cache某個中間結果
這就是RDD persisitence的目的
能夠選擇不一樣的storage level, 若是使用cache(), 就是使用默認的MEMORY_ONLY
固然也可使用persist()接口, 能夠選擇不一樣的level
MEMORY_ONLY
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY
Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.
Same as the levels above, but replicate each partition on two cluster nodes.
如何選擇不一樣的storage level?
儘可能選擇memory_only
實在memory不夠, 能夠考慮使用memory_only_ser, 會節省些空間, 但這樣的問題是每次使用都須要作反序列化, 因此要用快的序列化庫
儘可能不要使用disk, 由於通常從新計算也比從disk讀快, 除非計算複雜度很是高
通常不須要使用replica, 由於RDD都會經過用從新計算來快速fault recovery, 除非有實時需求, 不忍容忍從新計算的時間
而且,Spark會以least-recently-used (LRU)的方式清理舊的cache,也能夠經過RDD.unpersist(),手工清空某個RDD的cache
Shared Variables
通常來講, 對於這種分佈式架構是很難提供shared variables的, 可是出於方便的要求, Spark提供兩種特殊的shared variables, broadcast variables and accumulators
Broadcast Variables
經過接口廣播read-only variable到每一個節點, 更靈活些(和shipping a copy of it with tasks比較)
注意這個值是不能夠改變的, 不然就會致使各個節點不一致
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) //廣播數據 broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) scala> broadcastVar.value //經過value取到廣播的值 res0: Array[Int] = Array(1, 2, 3)
Accumulators
用於來實現counter, Spark支持int和double類型的accumulator, 用戶能夠實現其餘類型的
注意的是在各個節點均可以經過+=來增長accumulator, 但只有在driver上能夠read accumulator的值
The interpreter session below shows an accumulator being used to add up the elements of an array:
scala> val accum = sc.accumulator(0) accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10