1、概述html
Apache Spark 是一個快速的, 多用途的集羣計算系統。 它提供了 Java, Scala, Python 和 R 的高級 API,以及一個支持通用的執行圖計算的優化過的引擎. 它還支持一組豐富的高級工具, 包括使用 SQL 處理結構化數據處理的 Spark SQL, 用於機器學習的 MLlib, 用於圖計算的 GraphX, 以及 Spark Streaming。java
請注意, 在 Spark 2.0 以前, Spark 的主要編程接口是彈性分佈式數據集(RDD)。 在 Spark 2.0 以後, RDD 被 Dataset 替換, 它是像RDD 同樣的 strongly-typed(強類型), 可是在引擎蓋下更加優化。 RDD 接口仍然受支持,可是, 咱們強烈建議您切換到使用 Dataset(數據集), 其性能要更優於 RDD。node
每個 Spark 應用程序由一個在集羣上運行着用戶的 main
函數和執行各類並行操做的 driver program(驅動程序)組成。Spark 提供的主要抽象是一個彈性分佈式數據集(RDD),它是能夠執行並行操做且跨集羣節點的元素的集合。RDD 能夠從一個 Hadoop 文件系統(或者任何其它 Hadoop 支持的文件系統),或者一個在 driver program(驅動程序)中已存在的 Scala 集合,以及經過 transforming(轉換)來建立一個 RDD。用戶爲了讓它在整個並行操做中更高效的重用,也許會讓 Spark persist(持久化)一個 RDD 到內存中。最後,RDD 會自動的從節點故障中恢復。程序員
在 Spark 中的第二個抽象是可以用於並行操做的 shared variables(共享變量),默認狀況下,當 Spark 的一個函數做爲一組不一樣節點上的任務運行時,它將每個變量的副本應用到每個任務的函數中去。有時候,一個變量須要在整個任務中,或者在任務和 driver program(驅動程序)之間來共享。Spark 支持兩種類型的共享變量 : broadcast variables(廣播變量),它能夠用於在全部節點上緩存一個值,和 accumulators(累加器),他是一個只能被 「added(增長)」 的變量,例如 counters 和 sums。web
2、Spark依賴算法
Spark 2.x 默認使用 Scala 2.11 來構建和發佈直到運行。(固然,Spark 也能夠與其它的 Scala 版本一塊兒運行)。爲了使用 Scala 編寫應用程序,您須要使用可兼容的 Scala 版本(例如,2.11.X)。shell
要編寫一個 Spark 的應用程序,您須要在 Spark 上添加一個 Maven 依賴。Spark 能夠經過 Maven 中央倉庫獲取:數據庫
groupId = org.apache.spark artifactId = spark-core_2.11 version = 2.2.0
此外,若是您想訪問一個 HDFS 集羣,則須要針對您的 HDFS 版本添加一個 hadoop-client
(hadoop 客戶端)依賴。apache
groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>
最後,您須要導入一些 Spark classes(類)到您的程序中去。添加下面幾行:編程
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf;
3、初始化Spark
Spark 程序必須作的第一件事情是建立一個 SparkContext 對象,它會告訴 Spark 如何訪問集羣。要建立一個 SparkContext
,首先須要構建一個包含應用程序的信息的 SparkConf 對象。
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf);
這個 appName
參數是一個在集羣 UI 上展現應用程序的名稱。 master
是一個 Spark, Mesos 或 YARN 的 cluster URL,或者指定爲在 local mode(本地模式)中運行的 「local」 字符串。在實際工做中,當在集羣上運行時,您不但願在程序中將 master 給硬編碼,而是用 使用 spark-submit
啓動應用而且接收它。然而,對於本地測試和單元測試,您能夠經過 「local」 來運行 Spark 進程。
4、彈性分佈式數據集 (RDDs)
Spark 主要以一個 彈性分佈式數據集(RDD)的概念爲中心,它是一個容錯且能夠執行並行操做的元素的集合。有兩種方法能夠建立 RDD : 在你的 driver program(驅動程序)中 parallelizing 一個已存在的集合,或者在外部存儲系統中引用一個數據集,例如,一個共享文件系統,HDFS,HBase,或者提供 Hadoop InputFormat 的任何數據源。
1、A list of partiotions 一組分區(partition),partiotion是一個具體概念,指在一個節點中的連續的空間。一個partiotione確定使在一個節點上,可是一個節點上能夠有多個partiotione。用戶能夠在建立RDD時指定RDD的分區個數。 二、A function for computing each split 對RDD作計算,至關於對RDD的每一個split或partition作計算 3、A list of dependencies on other RDDs RDD之間有依賴關係,可溯源。 依賴還具體分爲寬依賴和窄依賴,但並非全部的RDD都有依賴。 RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。 四、Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 能夠按key的hash值分區 五、Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 數據本地性。計算每一個split時,在split所在機器的本地上運行task是最好的,避免了數據的移動;split有多個副本,因此preferred location不止一個
4.1建立RDD
能夠在您的 driver program (a Scala Seq
) 中已存在的集合上經過調用 SparkContext
的 parallelize
方法來建立並行集合。該集合的元素從一個能夠並行操做的 distributed dataset(分佈式數據集)中複製到另外一個 dataset(數據集)中去。例如,這裏是一個如何去建立一個保存數字 1 ~ 5 的並行集合。
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
在建立後,該 distributed dataset(分佈式數據集)(distData
)能夠並行的執行操做。例如,咱們能夠調用 distData.reduce((a, b) => a + b
) 來合計數組中的元素。後面咱們將介紹 distributed dataset(分佈式數據集)上的操做。
並行集合中一個很重要參數是 partitions(分區)的數量,它可用來切割 dataset(數據集)。Spark 將在集羣中的每個分區上運行一個任務。一般您但願羣集中的每個 CPU 計算 2-4 個分區。通常狀況下,Spark 會嘗試根據您的羣集狀況來自動的設置的分區的數量。固然,您也能夠將分區數做爲第二個參數傳遞到 parallelize
(例如sc.parallelize(data, 10)
) 方法中來手動的設置它。
Spark 能夠從 Hadoop 所支持的任何存儲源中建立 distributed dataset(分佈式數據集),包括本地文件系統,HDFS,Cassandra,HBase,Amazon S3 等等。 Spark 支持文本文件,SequenceFiles,以及任何其它的 Hadoop InputFormat。
可使用 SparkContext
的 textFile
方法來建立文本文件的 RDD。此方法須要一個文件的 URI(計算機上的本地路徑 ,hdfs://
,s3n://
等等的 URI),而且讀取它們做爲一個 lines(行)的集合。下面是一個調用示例:
JavaRDD<String> distFile = sc.textFile("data.txt");
使用 Spark 讀取文件時須要注意:
全部 Spark 基於文件的 input 方法, 包括 textFile
, 支持在目錄上運行, 壓縮文件, 和通配符. 例如, 您可使用 textFile("/my/directory")
, textFile("/my/directory/*.txt")
, and textFile("/my/directory/*.gz")
.
textFile
方法也能夠經過第二個可選的參數來控制該文件的分區數量. 默認狀況下, Spark 爲文件的每個 block(塊)建立的一 個 partition 分區(HDFS 中塊大小默認是 128MB),固然你也能夠經過傳遞一個較大的值來要求一個較高的分區數量。請注意,分區的數量不可以小於塊的數量。
除了文本文件以外,Spark 也支持一些其它的數據格式:
textFile
相比, 它的每個文件中的每一行將返回一個記錄. 分區由數據量來肯定, 某些狀況下, 可能致使分區太少. 針對這些狀況, wholeTextFiles
在第二個位置提供了一個可選的參數用戶控制分區的最小數量.sequenceFile[K, V]
方法,其中 K
和 V
指的是文件中 key 和 values 的類型. 這些應該是 Hadoop 的 Writable 接口的子類, 像 IntWritable and Text. 此外, Spark 可讓您爲一些常見的 Writables 指定原生類型; 例如, sequenceFile[Int, String]
會自動讀取 IntWritables 和 Texts.SparkContext.hadoopRDD
方法, 它接受一個任意的 JobConf
和 input format class, key class 和 value class. 經過相同的方法你能夠設置你的 input source(輸入源). 你還能夠針對 InputFormats 使用基於 「new」 MapReduce API (org.apache.hadoop.mapreduce
) 的 SparkContext.newAPIHadoopRDD
.RDD.saveAsObjectFile
和 SparkContext.objectFile
支持使用簡單的序列化的 Java objects 來保存 RDD. 雖然這不像 Avro 這種專用的格式同樣高效,但其提供了一種更簡單的方式來保存任何的 RDD。4.2 RDD操做
RDDs support 兩種類型的操做: transformations(轉換), 它會在一個已存在的 dataset 上建立一個新的 dataset, 和 actions(動做), 將在 dataset 上運行的計算後返回到 driver 程序. 例如, map
是一個經過讓每一個數據集元素都執行一個函數,並返回的新 RDD 結果的 transformation。reduce是 經過執行一些函數,聚合 RDD 中全部元素,並將最終結果給返回驅動程序的action.
Spark 中全部的 transformations 都是 lazy(懶加載的), 所以它不會馬上計算出結果. 他們只應用於一些基本數據集的轉換 (例如. 文件). 只有當須要返回結果給驅動程序時(action操做時),transformations 纔開始計算. 這種設計使 Spark 的運行更高效.
默認狀況下,每次你在 RDD 運行一個 action 的時, 每一個 transformed RDD 都會被從新計算。可是,您也可用 persist
(或 cache
) 方法將 RDD persist(持久化)到內存中;在這種狀況下,Spark 爲了下次查詢時能夠更快地訪問,會把數據保存在集羣上。此外,還支持持續持久化 RDDs 到磁盤,或複製到多個結點。
JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() { public Integer call(String s) { return s.length(); } }); int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() { public Integer call(Integer a, Integer b) { return a + b; } });
第一行從外部文件中定義了一個基本的 RDD,但這個數據集並未加載到內存中或即將被行動: line
僅僅是一個相似指針的東西,指向該文件. 第二行定義了 lineLengths
做爲 map
的結果。請注意,因爲 laziness
(延遲加載)lineLengths
不會被當即計算. 最後,咱們運行 reduce
,這是一個 action。此時,Spark 分發計算任務到不一樣的機器上運行,每臺機器都運行 map 的一部分並本地運行 reduce,僅僅返回它聚合後的結果給驅動程序.
若是咱們也但願之後再次使用 lineLengths
,咱們能夠在 reduce
以前添加如下代碼,這樣它就會被保存在 memory 中。
lineLengths.persist(StorageLevel.MEMORY_ONLY());
4.2.1 理解閉包
在集羣中執行代碼時,一個關於 Spark 更難的事情是理解變量和方法的範圍和生命週期。
int counter = 0; JavaRDD<Integer> rdd = sc.parallelize(data); // Wrong: Don't do this!! rdd.foreach(x -> counter += x); println("Counter value: " + counter);
Local(本地)vs. cluster(集羣)模式
foreach
函數引用的時候,它已經再也不是 driver node 的 counter 了。雖然在 driver node 仍然有一個 counter 在內存中,可是對 executors 已經不可見。executor 看到的只是序列化的閉包一個副本。因此 counter 最終的值仍是 0,由於對 counter
全部的操做均引用序列化的 closure 內的值。local
本地模式,在某些狀況下的 foreach
功能其實是同一 JVM 上的驅動程序中執行,並會引用同一個原始的 counter 計數器,實際上可能更新值。rdd.foreach(println)
或 rdd.map(println)
。在一臺機器上,這將產生預期的輸出和打印 RDD 的全部元素。cluster
模式下,stdout
輸出正在被執行寫操做 executors 的 stdout
代替,而不是在一個驅動程序上,所以 stdout
的 driver
程序不會顯示這些!driver
程序的全部元素,可使用的 collect()
方法首先把 RDD 放到 driver 程序節點上: rdd.collect().foreach(println)
。這可能會致使 driver 程序耗盡內存,雖然說,由於 collect()
獲取整個 RDD 到一臺機器; 若是你只須要打印 RDD 的幾個元素,一個更安全的方法是使用 take()
: rdd.take(100).foreach(println)
。4.2.2 Key-Value Pairs
雖然大多數 Spark 操做工做在包含任何類型對象的 RDDs 上,只有少數特殊的操做可用於 Key-Value 對的 RDDs. 最多見的是分佈式 「shuffle」 操做,如經過元素的 key 來進行 grouping 或 aggregating(聚合) 操做。
在java中, key-value pairs 是使用Scala標準庫中的 scala.Tuple2 類來表明,你可使用new Tuple2(a, b)
來建立一個Tuple,訪問它的元素使用tuple._1()
和 tuple._2()
可使用 mapToPair
和 flatMapToPair將JavaRDDs轉換爲
JavaPairRDDs.
JavaRDD<String> lines = sc.textFile("data.txt"); JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
咱們也可使用 counts.sortByKey()
,例如,在對按字母順序排序,最後 counts.collect()
把他們做爲一個數據對象返回給 driver 程序。
當在 key-value pair 操做中使用自定義的 objects 做爲 key 時, 您必須確保有一個自定義的 equals()
方法和一個 hashCode()
方法。
4.3 算子
下表列出了一些 Spark 經常使用的 transformations(轉換). 詳情請參考 RDD API 文檔 (Scala, Java, Python, R) 和 pair RDD 函數文檔 (Scala, Java).
下表列出了一些 Spark 經常使用的 actions 操做。詳細請參考 RDD API 文檔 (Scala, Java, Python, R)和 pair RDD 函數文檔 (Scala, Java).
4.2.4 shuffle操做
Spark 裏的某些操做會觸發 shuffle。shuffle 是spark 從新分配數據的一種機制,使得這些數據能夠跨不一樣的區域進行分組。這一般涉及在 executors和機器之間拷貝數據,這使得 shuffle 成爲一個複雜的、代價高的操做。
爲了明白 reduceByKey
操做的過程,咱們以 reduceByKey
爲例。reduceBykey 操做產生一個新的 RDD,其中 key 全部相同的的值組合成爲一個 tuple - key 以及與 key 相關聯的全部值在 reduce 函數上的執行結果。面臨的挑戰是,一個 key 的全部值不必定都在一個同一個 paritition 分區裏,甚至是不必定在同一臺機器裏,可是它們必須共同被計算。
在 spark 裏,特定的操做須要數據不跨分區分佈。在計算期間,一個任務在一個分區上執行,爲了全部數據都在單個 reduceByKey
的 reduce 任務上運行,咱們須要執行一個 all-to-all 操做。它必須從全部分區讀取全部的 key 和 key對應的全部的值,而且跨分區彙集去計算每一個 key 的結果 - 這個過程就叫作 shuffle.。
儘管每一個分區新 shuffle 的數據集將是肯定的,分區自己的順序也是這樣,可是這些數據的順序是不肯定的。若是但願 shuffle 後的數據是有序的,可使用:
mapPartitions
對每一個 partition 分區進行排序,例如, .sorted
repartitionAndSortWithinPartitions
在分區的同時對分區進行高效的排序.sortBy
對 RDD 進行全局的排序觸發的 shuffle 操做包括 repartition 操做,如 repartition
和 coalesce
, ‘ByKey 操做像 groupByKey
和 reduceByKey
, 和 join操做, 像 cogroup
和 join
.
4.2.5 RDD Persistence(持久化)
Spark 中一個很重要的能力是將數據 persisting 持久化(或稱爲 caching 緩存),在多個操做間均可以訪問這些持久化的數據。當持久化一個 RDD 時,每一個節點的其它分區均可以使用 RDD 在內存中進行計算,在該數據上的其餘 action 操做將直接使用內存中的數據。這樣會讓之後的 action 操做計算速度加快(一般運行速度會加速 10 倍)。緩存是迭代算法和快速的交互式使用的重要工具。
RDD 可使用 persist()
方法或 cache()
方法進行持久化。數據將會在第一次 action 操做時進行計算,並緩存在節點的內存中。Spark 的緩存具備容錯機制,若是一個緩存的 RDD 的某個分區丟失了,Spark 將按照原來的計算過程,自動從新計算並進行緩存。
另外,每一個持久化的 RDD 可使用不一樣的 storage level 存儲級別進行緩存,例如,持久化到磁盤、已序列化的 Java 對象形式持久化到內存(能夠節省空間)、跨節點間複製、以 off-heap 的方式存儲在 Tachyon。這些存儲級別經過傳遞一個 StorageLevel
對象給 persist()
方法進行設置。cache()
方法是使用默認存儲級別的快捷設置方法,默認的存儲級別是 StorageLevel.MEMORY_ONLY
(將反序列化的對象存儲到內存中)。詳細的存儲級別介紹以下:
Storage Level(存儲級別) | Meaning(含義) |
---|---|
MEMORY_ONLY | 將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中. 若是內存空間不夠,部分數據分區將再也不緩存,在每次須要用到這些數據時從新進行計算. 這是默認的級別. |
MEMORY_AND_DISK | 將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中。若是內存空間不夠,將未緩存的數據分區存儲到磁盤,在須要使用這些分區時從磁盤讀取. |
MEMORY_ONLY_SER (Java and Scala) |
將 RDD 以序列化的 Java 對象的形式進行存儲(每一個分區爲一個 byte 數組)。這種方式會比反序列化對象的方式節省不少空間,尤爲是在使用 fast serializer 時會節省更多的空間,可是在讀取時會增長 CPU 的計算負擔. |
MEMORY_AND_DISK_SER (Java and Scala) |
相似於 MEMORY_ONLY_SER ,可是溢出的分區會存儲到磁盤,而不是在用到它們時從新計算. |
DISK_ONLY | 只在磁盤上緩存 RDD. |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 與上面的級別功能相同,只不過每一個分區在集羣中兩個節點上創建副本. |
OFF_HEAP (experimental 實驗性) | 相似於 MEMORY_ONLY_SER, 可是將數據存儲在 off-heap memory 中. 這須要啓用 off-heap 內存. |
Spark 會自動監視每一個節點上的緩存使用狀況,並使用 least-recently-used(LRU)的方式來丟棄舊數據分區。 若是您想手動刪除 RDD 而不是等待它掉出緩存,使用 RDD.unpersist()
方法。
4.2.6 共享變量
v
上調用 SparkContext.broadcast(v)
方法來進行建立。廣播變量是 v
的一個 wrapper(包裝器),能夠經過調用 value
方法來訪問它的值。代碼示例以下:
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3}); broadcastVar.value(); // returns [1, 2, 3]
在建立廣播變量以後,在集羣上執行的全部的函數中,應該使用該廣播變量代替原來的 v
值,因此節點上的 v
最多分發一次。另外,對象 v
在廣播後不該該再被修改,以保證分發到全部的節點上的廣播變量具備一樣的值。
Accumulators(累加器)
Accumulators(累加器)是一個僅能夠執行 「added」(添加)的變量來經過一個關聯和交換操做,所以能夠高效地執行支持並行。累加器能夠用於實現 counter( 計數,相似在 MapReduce 中那樣)或者 sums(求和)。原生 Spark 支持數值型的累加器,而且程序員能夠添加新的支持類型。
做爲一個用戶,您能夠建立 accumulators(累加器)而且重命名. 以下圖所示, 一個命名的 accumulator 累加器(在這個例子中是 counter
)將顯示在 web UI 中,用於修改該累加器的階段。 Spark 在 「Tasks」 任務表中顯示由任務修改的每一個累加器的值。
SparkContext.longAccumulator()
或 SparkContext.doubleAccumulator()
方法建立數值類型的 accumulator
(累加器)以分別累加 Long 或 Double 類型的值。集羣上正在運行的任務就可使用 add
方法來累計數值。然而,它們不可以讀取它的值。只有 driver program(驅動程序)纔可使用 value
方法讀取累加器的值。
LongAccumulator accum = jsc.sc().longAccumulator(); sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); // ... // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s accum.value(); // returns 10
雖然此代碼使用 Long 類型的累加器的內置支持, 可是開發者經過 AccumulatorV2 它的子類來建立本身的類型. AccumulatorV2 抽象類有幾個須要 override(重寫)的方法: reset
方法可將累加器重置爲 0, add
方法可將其它值添加到累加器中, merge
方法可將其餘一樣類型的累加器合併爲一個. 其餘須要重寫的方法可參考 API documentation. 例如, 假設咱們有一個表示數學上 vectors(向量)的 MyVector
類,咱們能夠寫成:
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> { private MyVector myVector = MyVector.createZeroVector(); public void reset() { myVector.reset(); } public void add(MyVector v) { myVector.add(v); } ... } // Then, create an Accumulator of this type: VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2(); // Then, register it into spark context: jsc.sc().register(myVectorAcc, "MyVectorAcc1");
注意,在開發者定義本身的 AccumulatorV2 類型時, resulting type(返回值類型)可能與添加的元素的類型不一致。
5、調度流程
5.1 寬窄依賴
窄依賴:
父RDD和子RDD partition之間的關係是一對一的。或者父RDD一個partition只對應一個子RDD的partition狀況下的父RDD和子RDD partition關係是多對一的。不會有shuffle的產生。父RDD的一個分區去到子RDD的一個分區。
寬依賴:
父RDD與子RDD partition之間的關係是一對多。會有shuffle的產生。父RDD的一個分區的數據去到子RDD的不一樣分區裏面。
(其實區分寬窄依賴主要就是看父RDD的一個Partition的流向,要是流向一個的話就是窄依賴,流向多個的話就是寬依賴。)
5.2 stage切分
Spark任務會根據RDD之間的依賴關係,造成一個DAG有向無環圖,DAG會提交給DAGScheduler,DAGScheduler會把DAG劃分相互依賴的多個stage,劃分stage的依據就是RDD之間的寬窄依賴。遇到寬依賴就劃分stage,每一個stage包含一個或多個task任務。而後將這些task以taskSet的形式提交給TaskScheduler運行。stage是由一組並行的task組成。
切割規則:從後往前,遇到寬依賴就切割stage。
一個stage內的窄依賴是pipeline管道計算模式,pipeline只是一種計算思想,模式。
5.3 執行流程
Driver運行在客戶端:
(若是Driver運行在Worker上,客戶端提交做業給Master,Master讓一個Worker啓動Driver,即SchedulerBackend。)
Spark on Yarn:
5.4 任務調度