spark api之一:Spark官方文檔 - 中文翻譯

轉載請註明出處:http://www.cnblogs.com/BYRans/

html

 

1 概述(Overview)

整體來說,每個Spark驅動程序應用都由一個驅動程序組成,該驅動程序包含一個由用戶編寫的main方法,該方法會在集羣上並行執行一些列並行計算操做。Spark最重要的一個概念是彈性分佈式數據集,簡稱RDD(resilient distributed dataset )。RDD是一個數據容器,它將分佈在集羣上各個節點上的數據抽象爲一個數據集,而且RDD可以進行一系列的並行計算操做。能夠將RDD理解爲一個分佈式的List,該List的數據爲分佈在各個節點上的數據。RDD經過讀取Hadoop文件系統中的一個文件進行建立,也能夠由一個RDD通過轉換獲得。用戶也能夠將RDD緩存至內存,從而高效的處理RDD,提升計算效率。另外,RDD有良好的容錯機制。java

Spark另一個重要的概念是共享變量(shared variables)。在並行計算時,能夠方便的使用共享變量。在默認狀況下,執行Spark任務時會在多個節點上並行執行多個task,Spark將每一個變量的副本分發給各個task。在一些場景下,須要一個可以在各個task間共享的變量。Spark支持兩種類型的共享變量:python

  • 廣播變量(broadcast variables):將一個只讀變量緩存到集羣的每一個節點上。例如,將一份數據的只讀緩存分發到每一個節點。git

  • 累加變量(accumulators):只容許add操做,用於計數、求和。github

 

2 引入Spark(Linking with Spark)

在Spark 1.6.0上編寫應用程序,支持使用Scala 2.10.X、Java 7+、Python 2.6+、R 3.1+。若是使用Java 8,支持lambda表達式(lambda expressions)。
在編寫Spark應用時,須要在Maven依賴中添加Spark,Spark的Maven Central爲:web

groupId = org.apache.spark artifactId = spark-core_2.10 version = 1.6.0

另外,若是Spark應用中須要訪問HDFS集羣,則須要在hadoop-client中添加對應版本的HDFS依賴:算法

groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>

最後,須要在程序中添加Spark類。代碼以下:shell

import org.apache.spark.SparkContext import org.apache.spark.SparkConf

(在Spark 1.3.0以前的版本,使用Scala語言編寫Spark應用程序時,須要添加import org.apache.spark.SparkContext._來啓用必要的隱式轉換)數據庫

 

3 初始化Spark(Initializing Spark)

使用Scala編寫Spark程序的須要作的第一件事就是建立一個SparkContext對象(使用Java語言時建立JavaSparkContext)。SparkContext對象指定了Spark應用訪問集羣的方式。建立SparkContext須要先建立一個SparkConf對象,SparkConf對象包含了Spark應用的一些列信息。代碼以下:express

  • Scala
val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf)
  • java
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf);

appName參數爲應用程序在集羣的UI上顯示的名字。master爲Spark、Mesos、YARN URL或local。使用local值時,表示在本地模式下運行程序。應用程序的執行模型也能夠在使用spark-submit命令提交任務時進行指定。

 

3.1 使用Spark Shell(Using the Shell)

在Spark Shell下,一個特殊的SparkContext對象已經幫用戶建立好,變量爲sc。使用參數--master設置master參數值,使用參數--jars設置依賴包,多個jar包使用逗號分隔。可使用--packages參數指定Maven座標來添加依賴包,多個座標使用逗號分隔。可使用參數--repositories添加外部的repository。示例以下:

  • 本地模式下,使用4個核運行Spark程序:
$ ./bin/spark-shell --master local[4]
  • 將code.jar包添加到classpath:
$ ./bin/spark-shell --master local[4] --jars code.jar
  • 使用Maven座標添加一個依賴:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

詳細的Spark Shell參數描述請執行命令spark-shell --help。更多的spark-submit腳本請見spark-submit script

 

4 彈性分佈式數據集(RDDs)

Spark最重要的一個概念就是RDD,RDD是一個有容錯機制的元素容器,它能夠進行並行運算操做。獲得RDD的方式有兩個:

  • 經過並行化驅動程序中已有的一個集合而得到
  • 經過外部存儲系統(例如共享的文件系統、HDFS、HBase等)的數據集進行建立

 

4.1 並行集合(Parallelized Collections)

在驅動程序中,在一個已經存在的集合上(例如一個Scala的Seq)調用SparkContext的parallelize方法能夠建立一個並行集合。集合裏的元素將被複制到一個可被並行操做的分佈式數據集中。下面爲並行化一個保存數字1到5的集合示例:

  • Scala
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
  • Java
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> distData = sc.parallelize(data);

當分佈式數據集建立以後,就能夠進行並行操做。例如,能夠調用方法distData.reduce((a,b) => a + b)求數組內元素的和。Spark支持的分佈式數據集上的操做將在後面章節中詳細描述。

並行集合的一個重要的參數是表示將數據劃分爲幾個分區(partition)的分區數。Spark將在集羣上每一個數據分區上啓動一個task。一般狀況下,你能夠在集羣上爲每一個CPU設置2-4個分區。通常狀況下,Spark基於集羣自動設置分區數目。也能夠手動進行設置,設置該參數須要將參數值做爲第二參數傳給parallelize方法,例如:sc.parallelize(data, 10)。注意:在代碼中,部分位置使用術語slices(而不是partition),這麼作的緣由是爲了保持版本的向後兼容性。

 

4.2 外部數據庫(External Datasets)

Spark能夠經過Hadoop支持的外部數據源建立分佈式數據集,Hadoop支持的數據源有本地文件系統、HDFS、Cassandra、HBase、Amazon S3、Spark支持的文本文件、SequenceFiles、Hadoop InputFormat

SparkContext的testFile方法能夠建立文本文件RDD。使用這個方法須要傳遞文本文件的URI,URI能夠爲本機文件路徑、hdfs://、s3n://等。該方法讀取文本文件的每一行至容器中。示例以下:

  • Scala
scala> val distFile = sc.textFile("data.txt") distFile: RDD[String] = MappedRDD@1d4cee08
  • Java
JavaRDD<String> distFile = sc.textFile("data.txt");

建立以後,distFile就能夠進行數據集的通用操做。例如,使用map和reduce操做計算全部行的長度的總和:distFile.map(s => s.length).reduce((a, b) => a + b)
使用Spark讀取文件須要注意一下幾點:

  • 程序中若是使用到本地文件路徑,在其它worker節點上該文件必須在同一目錄,並有訪問權限。在這種狀況下,能夠將文件複製到全部的worker節點,也可使用網絡內的共享文件系統。
  • Spark全部的基於文件輸入的方法(包括textFile),都支持文件夾、壓縮文件、通配符。例如:textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")
  • textFile方法提供了一個可選的第二參數,用於控制文件的分區數。默認狀況下,Spark爲文件的每一個塊建立一個分區(塊使用HDFS的默認值64MB),經過設置這個第二參數能夠修改這個默認值。須要注意的是,分區數不能小於塊數。

除了文本文件以外,Spark還支持其它的數據格式:

  • SparkContext.wholeTextFiles可以讀取指定目錄下的許多小文本文件,返回(filename,content)對。而textFile只能讀取一個文本文件,返回該文本文件的每一行。
  • 對於SequenceFiles可使用SparkContext的sequenceFile[K,V]方法,其中K是文件中key和value的類型。它們必須爲像IntWritable和Text那樣,是Hadoop的Writable接口的子類。另外,對於通用的Writable,Spark容許用戶指定原生類型。例如,sequenceFile[Int,String]將自動讀取IntWritable和Text。
  • 對於其餘Hadoop InputFormat,可使用SparkContext.hadoopRDD方法,該方法接收任意類型的JobConf和輸入格式類、鍵類型和值類型。能夠像設置Hadoop job那樣設置輸入源。對於InputFormat還可使用基於新版本MapReduce API(org.apache.hadoop.mapreduce)的SparkContext.newAPIHadoopRDD。(老版本接口爲:SparkContext.newHadoopRDD
  • RDD.saveAsObjectFileSparkContext.objectFile可以保存包含簡單的序列化Java對象的RDD。可是這個方法不如Avro高效,Avro可以方便的保存任何RDD。

 

4.3 RDD操做(RDD Operations)

RDD支持兩種類型的操做:

  • transformation:從一個RDD轉換爲一個新的RDD。
  • action:基於一個數據集進行運算,並返回RDD。

例如,map是一個transformation操做,map將數據集的每個元素按指定的函數轉換爲一個RDD返回。reduce是一個action操做,reduce將RDD的全部元素按指定的函數進行聚合並返回結果給驅動程序(還有一個並行的reduceByKey可以返回一個分佈式的數據集)。

Spark的全部transformation操做都是懶執行,它們並不立馬執行,而是先記錄對數據集的一系列transformation操做。在執行一個須要執行一個action操做時,會執行該數據集上全部的transformation操做,而後返回結果。這種設計讓Spark的運算更加高效,例如,對一個數據集map操做以後使用reduce只返回結果,而不返回龐大的map運算的結果集。

默認狀況下,每一個轉換的RDD在執行action操做時都會從新計算。即便兩個action操做會使用同一個轉換的RDD,該RDD也會從新計算。在這種狀況下,可使用persist方法或cache方法將RDD緩存到內存,這樣在下次使用這個RDD時將會提升計算效率。在這裏,也支持將RDD持久化到磁盤,或在多個節點上覆制。

 

4.3.1 基礎(Basics)

參考下面的程序,瞭解RDD的基本輪廓:

  • Scala
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

 

  • Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

 

第一行經過讀取一個文件建立了一個基本的RDD。這個數據集沒有加載到內存,也沒有進行其餘的操做,變量lines僅僅是一個指向文件的指針。第二行爲transformation操做map的結果。此時lineLengths也沒有進行運算,由於map操做爲懶執行。最後,執行action操做reduce。此時Spark將運算分隔成多個任務分發給多個機器,每一個機器執行各自部分的map並進行本地reduce,最後返回運行結果給驅動程序。

若是在後面的運算中仍會用到lineLengths,能夠將其緩存,在reduce操做以前添加以下代碼,該persist操做將在lineLengths第一次被計算獲得後將其緩存到內存:

  • Scala
lineLengths.persist()
  • Java
lineLengths.persist(StorageLevel.MEMORY_ONLY());

 

4.3.2 把函數傳遞到Spark(Passing Functions to Spark)

  • Scala
    Spark的API,在很大程度上依賴於把驅動程序中的函數傳遞到集羣上運行。這有兩種推薦的實現方式:
    • 使用匿名函數的語法,這可讓代碼更加簡潔。
    • 使用全局單例對象的靜態方法。好比,你能夠定義函數對象object MyFunctions,而後將該對象的MyFunction.func1方法傳遞給Spark,以下所示:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

 

注意:因爲可能傳遞的是一個類實例方法的引用(而不是一個單例對象),在傳遞方法的時候,應該同時傳遞包含該方法的類對象。舉個例子:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

 

上面示例中,若是咱們建立了一個類實例new MyClass,而且調用了實例的doStuff方法,該方法中的map操做調用了這個MyClass實例的func1方法,因此須要將整個對象傳遞到集羣中。相似於寫成:rdd.map(x=>this.func1(x))。

相似地,訪問外部對象的字段時將引用整個對象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

 

等同於寫成rdd.map(x=>this.field+x),引用了整個this。爲了不這種問題,最簡單的方式是把field拷貝到本地變量,而不是去外部訪問它:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

 

  • Java
    Spark的API,在很大程度上依賴於把驅動程序中的函數傳遞到集羣上運行。在Java中,函數由那些實現了org.apache.spark.api.java.function包中的接口的類表示。有兩種建立這樣的函數的方式:
    • 在你本身的類中實現Function接口,能夠是匿名內部類,或者命名類,而且傳遞類的一個實例到Spark。
    • 在Java8中,使用lambda表達式來簡明地定義函數的實現。

爲了保持簡潔性,本指南中大量使用了lambda語法,這在長格式中很容易使用全部相同的APIs。好比,咱們能夠把上面的代碼寫成:

JavaRDD<String>  lines = sc.textFile("data.txt");
JavaRDD lineLengths = lines.map(new Function Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2 Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

 

一樣的功能,使用內聯式的實現顯得更爲笨重繁瑣,代碼以下:

class GetLength implements Function Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2 Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD lines = sc.textFile("data.txt");
JavaRDD lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

 

注意,java中的內部匿名類,只要帶有final關鍵字,就能夠訪問類範圍內的變量。Spark也會把變量複製到每個worker節點。

 

4.3.3 理解閉包(Understanding closures)

使用Spark的一個難點爲:理解程序在集羣中執行時變量和方法的生命週期。RDD操做能夠在變量範圍以外修改變量,這是一個常常致使迷惑的地方。好比下面的例子,使用foreach()方法增長計數器(counter)的值(相似的狀況,在其餘的RDD操做中常常出現)。

4.3.3.1 示例(Example)

參考下面簡單的RDD元素求和示例,求和運算是否在同一個JVM中執行,其複雜度也不一樣。Spark能夠在local模式下(--master = local[n])執行應用,也能夠將該Spark應用提交到集羣上執行(例如經過spark-submit提交到YARN):

  • Scala
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

 

  • Java
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter); 

上面是錯誤的,應該使用累加器,下面有說明。

4.3.3.2 本地模式 VS 集羣模式(Local vs. cluster modes)

在本地模式下僅有一個JVM,上面的代碼將直接計算RDD中元素和,並存儲到counter中。此時RDD和變量counter都在driver節點的同一內存空間中。

然而,在集羣模式下,狀況會變得複雜,上面的代碼並不會按照預期的方式執行。爲了執行這個job,Spark把處理RDD的操做分割成多個任務,每一個任務將被一個executor處理。在執行以前,Spark首先計算閉包(closure)。閉包是必須對executor可見的變量和方法,在對RDD進行運算時將會用到這些變量和方法(在本例子中指foreach())。這個閉包會被序列化,併發送給每一個executor。在local模式下,只有一個executor,因此全部的變量和方法都使用同一個閉包。在其餘模式下狀況跟local模式不同,每一個executor在不一樣的worker節點上運行,每一個executor都有一個單獨的閉包。

在這裏,發送給每一個executor的閉包內的變量是當前變量的副本,所以當counter在foreach中被引用時,已經不是在driver節點上的counter了。在driver節點的內存中仍然有一個counter,但這個counter對executors不可見。executor只能操做序列化的閉包中的counter副本。所以,最終counter的值仍然是0,由於全部對counter的操做都是在序列化的閉包內的counter上進行的

在相似這種場景下,爲了保證良好的行爲確保,應該使用累加器。Spark中的累加器專門爲在集羣中多個節點間更新變量提供了一種安全機制。在本手冊的累加器部分將對累加器進行詳細介紹。

通常狀況下,像環或本地定義方法這樣的閉包結構,不該該用於更改全局狀態。Spark不定義也不保證來自閉包外引用致使的對象變化行爲。有些狀況下,在local模式下能夠正常運行的代碼,在分佈式模式下也許並不會像預期那樣執行。在分佈式下運行時,建議使用累加器定義一些全局集合。

4.3.3.3 打印RDD的元素(Printing elements of an RDD)

打印一個RDD的元素也是一個經常使用的語法,帶引RDD元素可使用方法rdd.foreach(println)rdd.map(println)。在本地模式下,該方法將生成預期的輸出並打印RDD全部的元素。然而,在集羣模式下各個executor調用stdout,將結果打印到executor的stdout中。由於不是打印到driver節點上,因此在driver節點的stdout上不會看到這些輸出。若是想將RDD的元素打印到driver節點上,可使用collect()方法將RDD發送到driver節點上而後再打印該RDD:rdd.collect().foreach(println)。這個操做可能會致使driver節點內存不足,由於collect()方法將RDD所有的數據都發送到一臺節點上。若是僅僅打印RDD的部分元素,一個安全的方法是使用take()方法:rdd.take(100).foreach(println)

 

4.3.4 操做鍵值對(Working with Key­Value Pairs)

Spark大部分的RDD操做都是對任意類型的對象的,可是,有部分特殊的操做僅支持對鍵值對的RDD進行操做。最經常使用的是分佈式「shuffle」操做,好比按照key將RDD的元素進行分組或彙集操做。

  • Scala
    在Scala中,包含Tuple2對象在內的RDD鍵值對操做,都是能夠自動可用的(Tuple2對象是Scala語言內置的元組類型,能夠經過簡單的編寫進行(a,b)建立)。鍵值對操做接口在PairRDDFunctions類中,該類中的接口自動使用RDD的元組。
    例如,在下面的代碼中使用reduceByKey操做對鍵值對進行計數,計算每行的文本出現的次數:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

 

  • Java
    在Java中,鍵值對使用的是scala.Tuple2類。用戶可使用特定的map操做將JavaRDDs轉換爲JavaPairRDDs,例如mapToPairflatMapToPair。JavaPairRDD擁有標準RDD和特殊鍵值對的方法。
    例如,在下面的代碼中使用reduceByKey操做對鍵值對進行計數,計算每行的文本出現的次數:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

 

咱們還可使用counts.sortByKey()按照字母順序將鍵值對排序,使用counts.collect()將結果以一個數組的形式發送給driver節點。

注意,當在鍵值對操做中使用自定義對象做爲key時,你必須保證自定義的equals()方法有一個對應的hashCode()方法。詳細的細節,請閱讀Object.hashCode() documentation

 

4.3.5 Transformations

下面列出了Spark經常使用的transformation操做。詳細的細節請參考RDD API文檔(ScalaJavaPythonR)和鍵值對RDD方法文檔(ScalaJava)。

  • map(func)
    將原來RDD的每一個數據項,使用map中用戶自定義的函數func進行映射,轉變爲一個新的元素,並返回一個新的RDD。

  • filter(func)
    使用函數func對原RDD中數據項進行過濾,將符合func中條件的數據項組成新的RDD返回。

  • flatMap(func)
    相似於map,可是輸入數據項能夠被映射到0個或多個輸出數據集合中,因此函數func的返回值是一個數據項集合而不是一個單一的數據項。

  • mapPartitions(func)
    相似於map,可是該操做是在每一個分區上分別執行,因此當操做一個類型爲T的RDD時func的格式必須是Iterator<T> => Iterator<U>。即mapPartitions須要獲取到每一個分區的迭代器,在函數中經過這個分區的迭代器對整個分區的元素進行操做。

  • mapPartitionsWithIndex(func)
    相似於mapPartitions,可是須要提供給func一個整型值,這個整型值是分區的索引,因此當處理T類型的RDD時,func的格式必須爲(Int, Iterator<T>) => Iterator<U>

  • sample(withReplacement, fraction, seed)
    對數據採樣。用戶能夠設定是否有放回(withReplacement)、採樣的百分比(fraction)、隨機種子(seed)。

  • union(otherDataset)
    返回原數據集和參數指定的數據集合並後的數據集。使用union函數時須要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合併的RDD元素數據類型相同。該操做不進行去重操做,返回的結果會保存全部元素。若是想去重,可使用distinct()。

  • intersection(otherDataset)
    返回兩個數據集的交集。

  • distinct([numTasks]))
    將RDD中的元素進行去重操做。

  • groupByKey([numTasks])
    操做(K,V)格式的數據集,返回 (K, Iterable)格式的數據集。
    注意,若是分組是爲了按key進行聚合操做(例如,計算sum、average),此時使用reduceByKeyaggregateByKey計算效率會更高。
    注意,默認狀況下,並行狀況取決於父RDD的分區數,但能夠經過參數numTasks來設置任務數。

  • reduceByKey(func, [numTasks])
    使用給定的func,將(K,V)對格式的數據集中key相同的值進行彙集,其中func的格式必須爲(V,V) => V。可選參數numTasks能夠指定reduce任務的數目。

  • aggregateByKey(zeroValue)(seqOp, combOp,[numTasks])
    對(K,V)格式的數據按key進行聚合操做,聚合時使用給定的合併函數和一個初試值,返回一個(K,U)對格式數據。須要指定的三個參數:zeroValue爲在每一個分區中,對key值第一次讀取V類型的值時,使用的U類型的初始變量;seqOp用於在每一個分區中,相同的key中V類型的值合併到zeroValue建立的U類型的變量中。combOp是對從新分區後兩個分區中傳入的U類型數據的合併函數。

  • sortByKey([ascending], [numTasks])
    (K,V)格式的數據集,其中K已實現了Ordered,通過sortByKey操做返回排序後的數據集。指定布爾值參數ascending來指定升序或降序排列。

  • join(otherDataset, [numTasks])
    用於操做兩個鍵值對格式的數據集,操做兩個數據集(K,V)和(K,W)返回(K, (V, W))格式的數據集。經過leftOuterJoinrightOuterJoinfullOuterJoin完成外鏈接操做。

  • cogroup(otherDataset, [numTasks])
    用於操做兩個鍵值對格式數據集(K,V)和(K,W),返回數據集格式爲 (K,(Iterable, Iterable)) 。這個操做也稱爲groupWith。對在兩個RDD中的Key-Value類型的元素,每一個RDD相同Key的元素分別聚合爲一個集合,而且返回兩個RDD中對應Key的元素集合的迭代器。

  • cartesian(otherDataset)
    對類型爲T和U的兩個數據集進行操做,返回包含兩個數據集全部元素對的(T,U)格式的數據集。即對兩個RDD內的全部元素進行笛卡爾積操做。

  • pipe(command, [envVars])
    以管道(pipe)方式將 RDD的各個分區(partition)使用 shell命令處理(好比一個 Perl或 bash腳本)。 RDD的元素會被寫入進程的標準輸入(stdin),將進程返回的一個字符串型 RDD(RDD of strings),以一行文本的形式寫入進程的標準輸出(stdout)中。

  • coalesce(numPartitions)
    把RDD的分區數下降到經過參數numPartitions指定的值。在獲得的更大一些數據集上執行操做,會更加高效。

  • repartition(numPartitions)
    隨機地對RDD的數據從新洗牌(Reshuffle),從而建立更多或更少的分區,以平衡數據。老是對網絡上的全部數據進行洗牌(shuffles)。

  • repartitionAndSortWithinPartitions(partitioner)
    根據給定的分區器對RDD進行從新分區,在每一個結果分區中,按照key值對記錄排序。這在每一個分區中比先調用repartition再排序效率更高,由於它能夠將排序過程在shuffle操做的機器上進行。

 

4.3.6 Actions

下面列出了Spark支持的經常使用的action操做。詳細請參考RDD API文檔(ScalaJavaPythonR)和鍵值對RDD方法文檔(ScalaJava)。

  • reduce(func)
    使用函數func彙集數據集中的元素,這個函數func輸入爲兩個元素,返回爲一個元素。這個函數應該符合結合律和交換了,這樣才能保證數據集中各個元素計算的正確性。

  • collect()
    在驅動程序中,以數組的形式返回數據集的全部元素。一般用於filter或其它產生了大量小數據集的狀況。

  • count()
    返回數據集中元素的個數。

  • first()
    返回數據集中的第一個元素(相似於take(1))。

  • take(n)
    返回數據集中的前n個元素。

  • takeSample(withReplacement,num, [seed])
    對一個數據集隨機抽樣,返回一個包含num個隨機抽樣元素的數組,參數withReplacement指定是否有放回抽樣,參數seed指定生成隨機數的種子。

  • takeOrdered(n, [ordering])
    返回RDD按天然順序或自定義順序排序後的前n個元素。

  • saveAsTextFile(path)
    將數據集中的元素以文本文件(或文本文件集合)的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。Spark將在每一個元素上調用toString方法,將數據元素轉換爲文本文件中的一行記錄。

  • saveAsSequenceFile(path) (Java and Scala)
    將數據集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。該操做只支持對實現了Hadoop的Writable接口的鍵值對RDD進行操做。在Scala中,還支持隱式轉換爲Writable的類型(Spark包括了基本類型的轉換,例如Int、Double、String等等)。

  • saveAsObjectFile(path) (Java and Scala)
    將數據集中的元素以簡單的Java序列化的格式寫入指定的路徑。這些保存該數據的文件,可使用SparkContext.objectFile()進行加載。

  • countByKey()
    僅支持對(K,V)格式的鍵值對類型的RDD進行操做。返回(K,Int)格式的Hashmap,(K,Int)爲每一個key值對應的記錄數目。

  • foreach(func)
    對數據集中每一個元素使用函數func進行處理。該操做一般用於更新一個累加器(Accumulator)或與外部數據源進行交互。注意:在foreach()以外修改累加器變量可能引發不肯定的後果。詳細介紹請閱讀Understanding closures部分。

 

4.3.7 Shuffle操做(Shuffle operations)

Spark內的一個操做將會觸發shuffle事件。shuffle是Spark將多個分區的數據從新分組從新分佈數據的機制。shuffle是一個複雜且代價較高的操做,它須要完成將數據在executor和機器節點之間進行復制的工做

 

4.3.7.1 背景(Background)

經過reduceByKey操做的例子,來理解shuffle過程。reduceByKey操做生成了一個新的RDD,原始數據中相同key的全部記錄的聚合值合併爲一個元組,這個元組中的key對應的值爲執行reduce函數以後的結果。這個操做的挑戰是,key相同的全部記錄不在同一個分區中,甚至不在同一臺機器上,可是該操做必須將這些記錄聯合運算。

在Spark中,一般一條數據不會跨分區分佈,除非爲了一個特殊的操做在必要的地方纔會跨分區分佈。在計算過程當中,一個分區由一個task進行處理。所以,爲了組織全部的數據讓一個reduceByKey任務執行,Spark須要進行一個all-to-all操做。all-to-all操做須要讀取全部分區上的數據的全部的key,以及key對應的全部的值,而後將多個分區上的數據進行彙總,並將每一個key對應的多個分區的數據進行計算得出最終的結果,這個過程稱爲shuffle

雖然每一個分區中新shuffle後的數據元素是肯定的,分區間的順序也是肯定的,可是全部的元素是無序的。若是想在shuffle操做後將數據按指定規則進行排序,可使用下面的方法:

  • 使用mapPartitions操做在每一個分區上進行排序,排序可使用.sorted等方法。
  • 使用repartitionAndSortWithinPartitions操做在從新分區的同時高效的對分區進行排序。
  • 使用sortBy將RDD進行排序。

會引發shuffle過程的操做有:

  • repartition操做,例如:repartitioncoalesce
  • ByKey操做(除了counting相關操做),例如:groupByKeyreduceByKey
  • join操做,例如:cogroupjoin

 

4.3.7.2 性能影響(Performance Impact)

shuffle是一個代價比較高的操做,它涉及磁盤IO、數據序列化、網絡IO。爲了準備shuffle操做的數據,Spark啓動了一系列的map任務和reduce任務,map任務完成數據的處理工做,reduce完成map任務處理後的數據的收集工做。這裏的map、reduce來自MapReduce,跟Spark的map操做和reduce操做沒有關係。

在內部,一個map任務的全部結果數據會保存在內存,直到內存不能所有存儲爲止。而後,這些數據將基於目標分區進行排序並寫入一個單獨的文件中。在reduce時,任務將讀取相關的已排序的數據塊。

某些shuffle操做會大量消耗堆內存空間,由於shuffle操做在數據轉換先後,須要在使用內存中的數據結構對數據進行組織。須要特別說明的是,reduceByKeyaggregateByKey在map時會建立這些數據結構,ByKey操做在reduce時建立這些數據結構。當內存滿的時候,Spark會把溢出的數據存到磁盤上,這將致使額外的磁盤IO開銷和垃圾回收開銷的增長。

shuffle操做還會在磁盤上生成大量的中間文件。在Spark 1.3中,這些文件將會保留至對應的RDD不在使用並被垃圾回收爲止。這麼作的好處是,若是在Spark從新計算RDD的血統關係(lineage)時,shuffle操做產生的這些中間文件不須要從新建立。若是Spark應用長期保持對RDD的引用,或者垃圾回收不頻繁,這將致使垃圾回收的週期比較長。這意味着,長期運行Spark任務可能會消耗大量的磁盤空間。臨時數據存儲路徑能夠經過SparkContext中設置參數spark.local.dir進行配置。

shuffle操做的行爲能夠經過調節多個參數進行設置。詳細的說明請看Configuration Guide中的「Shuffle Behavior」部分。

 

4.4 RDD持久化(RDD Persistence)

Spark中一個很重要的能力是將數據持久化(或稱爲緩存),在多個操做間均可以訪問這些持久化的數據。當持久化一個RDD時,每一個節點會將本節點計算的數據塊存儲到內存,在該數據上的其餘action操做將直接使用內存中的數據。這樣會讓之後的action操做計算速度加快(一般運行速度會加速10倍)。緩存是迭代算法和快速的交互式使用的重要工具。

RDD可使用persist()方法或cache()方法進行持久化。數據將會在第一次action操做時進行計算,並在各個節點的內存中緩存。Spark的緩存具備容錯機制,若是一個緩存的RDD的某個分區丟失了,Spark將按照原來的計算過程,自動從新計算並進行緩存。

另外,每一個持久化的RDD可使用不一樣的存儲級別進行緩存,例如,持久化到磁盤、已序列化的Java對象形式持久化到內存(能夠節省空間)、跨節點間複製、以off-heap的方式存儲在 Tachyon。這些存儲級別經過傳遞一個StorageLevel對象(ScalaJavaPython)給persist()方法進行設置。cache()方法是使用默認存儲級別的快捷設置方法,默認的存儲級別是StorageLevel.MEMORY_ONLY(將反序列化的對象存儲到內存中)。詳細的存儲級別介紹以下:

  • MEMORY_ONLY:將RDD以反序列化Java對象的形式存儲在JVM中。若是內存空間不夠,部分數據分區將再也不緩存,在每次須要用到這些數據時從新進行計算。這是默認的級別。
  • MEMORY_AND_DISK:將RDD以反序列化Java對象的形式存儲在JVM中。若是內存空間不夠,將未緩存的數據分區存儲到磁盤,在須要使用這些分區時從磁盤讀取。
  • MEMORY_ONLY_SER:將RDD以序列化的Java對象的形式進行存儲(每一個分區爲一個byte數組)。這種方式會比反序列化對象的方式節省不少空間,尤爲是在使用fast serializer時會節省更多的空間,可是在讀取時會增長CPU的計算負擔。
  • MEMORY_AND_DISK_SER:相似於MEMORY_ONLY_SER,可是溢出的分區會存儲到磁盤,而不是在用到它們時從新計算。
  • DISK_ONLY:只在磁盤上緩存RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等:與上面的級別功能相同,只不過每一個分區在集羣中兩個節點上創建副本。
  • OFF_HEAP (實驗中):以序列化的格式 (serialized format) 將 RDD存儲到 Tachyon。相比於MEMORY_ONLY_SER, OFF_HEAP 下降了垃圾收集(garbage collection)的開銷,使得 executors變得更小,並且共享了內存池,在使用大堆(heaps)和多應用並行的環境下有更好的表現。此外,因爲 RDD存儲在Tachyon中, executor的崩潰不會致使內存中緩存數據的丟失。在這種模式下, Tachyon中的內存是可丟棄的。所以,Tachyon不會嘗試重建一個在內存中被清除的分塊。若是你打算使用Tachyon進行off heap級別的緩存,Spark與Tachyon當前可用的版本相兼容。詳細的版本配對使用建議請參考Tachyon的說明

注意,在Python中,緩存的對象老是使用Pickle進行序列化,因此在Python中不關心你選擇的是哪種序列化級別。

在shuffle操做中(例如reduceByKey),即使是用戶沒有調用persist方法,Spark也會自動緩存部分中間數據。這麼作的目的是,在shuffle的過程當中某個節點運行失敗時,不須要從新計算全部的輸入數據。若是用戶想屢次使用某個RDD,強烈推薦在該RDD上調用persist方法。

 

4.4.1 如何選擇存儲級別(Which Storage Level to Choose?)

Spark的存儲級別的選擇,核心問題是在內存使用率和CPU效率之間進行權衡。建議按下面的過程進行存儲級別的選擇:

  • 若是使用默認的存儲級別(MEMORY_ONLY),存儲在內存中的RDD沒有發生溢出,那麼就選擇默認的存儲級別。默認存儲級別能夠最大程度的提升CPU的效率,可使在RDD上的操做以最快的速度運行。
  • 若是內存不能所有存儲RDD,那麼使用MEMORY_ONLY_SER,並挑選一個快速序列化庫將對象序列化,以節省內存空間。使用這種存儲級別,計算速度仍然很快。
  • 除了在計算該數據集的代價特別高,或者在須要過濾大量數據的狀況下,儘可能不要將溢出的數據存儲到磁盤。由於,從新計算這個數據分區的耗時與從磁盤讀取這些數據的耗時差很少。
  • 若是想快速還原故障,建議使用多副本存儲界別(例如,使用Spark做爲web應用的後臺服務,在服務出故障時須要快速恢復的場景下)。全部的存儲級別都經過從新計算丟失的數據的方式,提供了徹底容錯機制。可是多副本級別在發生數據丟失時,不須要從新計算對應的數據庫,可讓任務繼續運行。
  • 在高內存消耗或者多任務的環境下,還處於實驗性的OFF_HEAP模式有下列幾個優點:
    • 它支持多個executor使用Tachyon中的同一個內存池。
    • 它顯著減小了內存回收的代價。
    • 若是個別executor崩潰掉,緩存的數據不會丟失。

 

4.4.2 移除數據(Removing Data)

Spark自動監控各個節點上的緩存使用率,並以最近最少使用的方式(LRU)將舊數據塊移除內存。若是想手動移除一個RDD,而不是等待該RDD被Spark自動移除,可使用RDD.unpersist()方法。

 

5 共享變量(Shared Variables)

一般狀況下,一個傳遞給Spark操做(例如mapreduce)的方法是在遠程集羣上的節點執行的。方法在多個節點執行過程當中使用的變量,是同一份變量的多個副本。這些變量的以副本的方式拷貝到每一個機器上,各個遠程機器上變量的更新並不會傳回driver程序。然而,爲了知足兩種常見的使用場景,Spark提供了兩種特定類型的共享變量廣播變量(broadcast variables)和累加器(accumulators)

5.1 廣播變量(broadcast variables)

廣播變量容許編程者將一個只讀變量緩存到每臺機器上,而不是給每一個任務傳遞一個副本。例如,廣播變量能夠用一種高效的方式給每一個節點傳遞一份比較大的數據集副本。在使用廣播變量時,Spark也嘗試使用高效廣播算法分發變量,以下降通訊成本。

Spark的action操做是經過一些列的階段(stage)進行執行的,這些階段(stage)是經過分佈式的shuffle操做進行切分的。Spark自動廣播在每一個階段內任務須要的公共數據。這種狀況下廣播的數據使用序列化的形式進行緩存,並在每一個任務在運行前進行反序列化。這明確說明了,只有在跨越多個階段的多個任務任務會使用相同的數據,或者在使用反序列化形式的數據特別重要的狀況下,使用廣播變量會有比較好的效果。

廣播變量經過在一個變量v上調用SparkContext.broadcast(v)方法進行建立。廣播變量是v的一個封裝器,能夠經過value方法訪問v的值。代碼示例以下:

  • Scala
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
  • Java
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3}); broadcastVar.value(); // returns [1, 2, 3]

廣播變量建立以後,在集羣上執行的全部的函數中,應該使用該廣播變量代替原來的v值。因此,每一個節點上的v最多分發一次。另外,對象v在廣播後不該該再被修改,以保證分發到全部的節點上的廣播變量有一樣的值(例如,在分發廣播變量以後,又對廣播變量進行了修改,而後又須要將廣播變量分發到新的節點)。

 

5.2 累加器(Accumulators)

累加器只容許關聯操做進行"added"操做,所以在並行計算中能夠支持特定的計算。累加器能夠用於實現計數(相似在MapReduce中那樣)或者求和。原生Spark支持數值型的累加器,編程者能夠添加新的支持類型。建立累加器並命名以後,在Spark的UI界面上將會顯示該累加器。這樣能夠幫助理解正在運行的階段的運行狀況(注意,在Python中還不支持)。

一個累加器能夠經過在原始值v上調用SparkContext.accumulator(v)。而後,集羣上正在運行的任務就可使用add方法或+=操做對該累加器進行累加操做。只有driver程序能夠讀取累加器的值,讀取累加器的值使用value方法。
下面代碼將數組中的元素進行求和:

  • Scala
scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10
  • Java
Accumulator<Integer> accum = sc.accumulator(0); sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); // ... // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s accum.value(); // returns 10

上面的代碼示例使用的是Spark內置的Int類型的累加器,開發者能夠經過集成AccumulatorParam類建立新的累加器類型。AccumulatorParam接口有兩個方法:zero方法和addInPlace方法。zero方法給數據類型提供了一個0值,addInPlace方法可以將兩個值進行累加。例如,假設咱們有一個表示數學上向量的Vector類,咱們能夠寫成:

  • Scala
object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
  • Java
class VectorAccumulatorParam implements AccumulatorParam<Vector> { public Vector zero(Vector initialValue) { return Vector.zeros(initialValue.size()); } public Vector addInPlace(Vector v1, Vector v2) { v1.addInPlace(v2); return v1; } } // Then, create an Accumulator of this type: Accumulator<Vector> vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());

Spark也支持使用更通用的 Accumulable接口去累加數據,其結果數據的類型和累加的元素類型不一樣(例如,經過收集數據元素建立一個list)。在Scala中,SparkContext.accumulableCollection方法可用於累加經常使用的Scala集合類型。

累加器的更新只發生在action操做中,Spark保證每一個任務只能更新累加器一次,例如從新啓動一個任務,該重啓的任務不容許更新累加器的值。在transformation用戶須要注意的是,若是任務過job的階段從新執行,每一個任務的更新操做將會執行屢次。

累加器沒有改變Spark懶執行的模式。若是累加器在RDD中的一個操做中進行更新,該累加器的值只在該RDD進行action操做時進行更新。所以,在一個像map()這樣的轉換操做中,累加器的更新並無執行。下面的代碼片斷證實了這個特性:

  • Scala
val accum = sc.accumulator(0) data.map { x => accum += x; f(x) } // Here, accum is still 0 because no actions have caused the <code>map</code> to be computed.
  • Java
Accumulator<Integer> accum = sc.accumulator(0); data.map(x -> { accum.add(x); return f(x); }); // Here, accum is still 0 because no actions have caused the `map` to be computed.

 

6 將應用提交到集羣(Deploying to a Cluster)

應用提交手冊描述瞭如何將應用提交到集羣。簡單的說,當你將你的應用打包成一個JAR(Java/Scala)或者一組.py.zip文件(Python)後,就能夠經過bin/spark-submit腳本將腳本提交到集羣支持的管理器中。

 更多關於spark-submit見《spark提交模式

7 Java/Scala中啓動Spark做業(Launching Spark jobs from Java / Scala)

使用org.apache.spark.launcher包提供的簡單的Java API,能夠將Spark做業以該包中提供的類的子類的形式啓動。

 

8 單元測試(Unit Testing)

Spark能夠友好的使用流行的單元測試框架進行單元測試。在test中簡單的建立一個SparkContext,master的URL設置爲local,運行幾個操做,而後調用SparkContext.stop()將該做業中止。由於Spark不支持在同一個程序中運行兩個context,因此須要請確保使用finally塊或者測試框架的tearDown方法將context中止。

 

9 從Spark1.0以前的版本遷移(Migrating from pre­1.0 Versions of Spark)

Spark 1.0凍結了1.X系列的Spark核的API,所以,當前沒有標記爲"experimental"或者「developer API」的API都將在將來的版本中進行支持。

  • Scala的變化

對於Scala的變化是,分組操做(例如groupByKeycogroupjoin)的返回類型由(Key,Seq[Value])變爲(Key,Iterable[Value])

  • Java API的變化
    • 1.0中org.apache.spark.api.java.function類中的Function類變成了接口,這意味着舊的代碼中extends Function應該改成implement Function
    • 增長了新的map型操做,例如mapToPairmapToDouble,增長的這些操做可用於建立特殊類型的RDD。
    • 分組操做(例如groupByKeycogroupjoin)的返回類型由(Key,Seq[Value])變爲(Key,Iterable[Value])

這些遷移指導對Spark Streaming、MLlib和GraphX一樣有效。

 

10 下一步(Where to Go from Here)

你能夠在Spark網站看一些Spark編程示例。另外,Spark在examples目錄下包含了許多例子(ScalaJavaPythonR)。運行Java和Scala例子,能夠經過將例子的類名傳給Spark的bin/run-example腳本進行啓動。例如:

./bin/run-example SparkPi

Python示例,使用spark-submit命令提交:

./bin/spark-submit examples/src/main/python/pi.py

R示例,使用spark-submit命令提交:

./bin/spark-submit examples/src/main/r/dataframe.R

configurationtuning手冊中,有許多優化程序的實踐。這些優化建議,可以確保你的數據以高效的格式存儲在內存中。對於部署的幫助信息,請閱讀cluster mode overview,該文檔描述了分佈式操做和支持集羣管理器的組件。

最後,完整的API文檔請查閱ScalaJavaPythonR

相關文章
相關標籤/搜索