大部分東西都從 RDD Programming Guide 裏整理摘抄. 對 Scala 還不熟悉, 示例以 Java 爲主.java
RDD 的東西其實還有不少, 遠超這篇筆記的內容. 剛接觸 Spark, 理解得不深也講很差, 請以文檔和書籍爲主.web
Spark 圍繞着 RDD (Resilient Distributed Dataset) 展開, RDD 是一個容錯的可並行的集合.算法
RDD 建立以後, 能夠進行轉換和行動操做. 轉換操做生成一個新的 RDD, 新的 RDD 依賴於原先的 RDDs(一個或多個). RDD 的轉換和計算是 lazy 的, 行動操做會觸發計算, 並將結果返回給驅動程序.shell
Spark 會將 RDD 中的元素作分區. 不一樣的分區可能會分佈在不一樣的機器上. 分區的數量決定了task的數量.apache
RDD 能夠經過並行化驅動程序中的已有集合或者讀取外部存儲系統的數據集來建立.api
在建立 RDD 以前, 須要先初始化 SparkContext:緩存
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf);
在 shell 中能夠省略這一步.網絡
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> distData = sc.parallelize(data);
Spark 可以用任何 Hadoop 支持的存儲元來建立 RDD, 包括本地文件系統、HDFS、Cassandra、HBase、Amazon S3 等等. Spark 支持文本文件、SequenceFiles、和其餘的任何 Hadoop 輸入格式. External Datasets閉包
// 文件系統 JavaRDD<String> distFile = sc.textFile("README.md"); // 從HDFS中讀取 JavaRDD<String> hdfsFile = sc.textFile("hdfs://host:port/path/README.md");
下面這段代碼在本地模式運行和在集羣中運行的表現將會不同 (實際上與是否在同一個JVM中運行相關).
int counter = 0; JavaRDD<Integer> rdd = sc.parallelize(data); // Wrong: Don't do this!! rdd.foreach(x -> counter += x); println("Counter value: " + counter);
在執行 job 以前, Spark 會將 RDD 的操做切分紅 task, 每一個 task 都將在一個 executor 中執行. 在執行以前, Spark 會計算 task 的閉包(executor 執行計算所須要的變量、方法). 這個閉包會被序列化, 並被髮送給每一個 executor. 被髮送給 executor 的閉包都是一個 copy, 所以閉包的中變量都再也不是驅動節點中的變量了. 原來驅動節點中的變量對 executor 再也不可見. 在上面的例子中, counter 的最終值將會是0.
在本地模式中, 一些狀況下, foreach 方法會與驅動程序在同一個 JVM 中執行, 這時將會引用同一個 counter.
爲了確保在這些場景中的良好行爲, 應該使用 Accumulator.
閉包中不該改變某些全局狀態.
RDD 是不可變的, 轉換操做將會在原 RDD 的基礎上生成一個新的 RDD. 全部的轉換操做都是 lazy 的, 在行動操做以前不會進行計算. 默認狀況下, 每次行動操做, 轉換 RDD 會被從新計算, 可使用 persist
方法來把一個 RDD 放到內存、磁盤裏, 這樣就只須要計算一次.
Spark 支持的轉換操做請看 Transformations
行動操做會觸發計算, 並將結果返回給驅動程序. Spark 支持的行動操做請看 Actions
分區是 RDD 並行計算的單元. 一個 RDD 會被分爲若干個分區, 這些分區甚至能夠不在同一臺機器上. 每一個分區的技術計算都在一個 task 中進行, task 的個數也由分區數決定. 分區的最小數量能夠在建立 RDD 時指定:
JavaRDD<String> distFile = sc.textFile("README.md", 4);
默認的分區數量能夠經過修改 spark.default.parallelism
來配置. 若是沒有這個配置, 則與不一樣的集羣環境相關.
RDD 經過操做算子進行轉換,轉換獲得的新RDD包含了從其餘RDDs衍生所必需的信息,RDDs之間維護着這種血緣關係,也稱之爲依賴。以下圖所示,依賴包括兩種,一種是窄依賴,RDDs之間分區是一一對應的,另外一種是寬依賴,下游RDD的每一個分區與上游RDD(也稱之爲父RDD)的每一個分區都有關,是多對多的關係。在執行過程當中, 寬依賴和 Shuffle 對應. 經過RDDs之間的這種依賴關係,一個任務流能夠描述爲DAG(有向無環圖).
shuffle 是 Spark 從新分配數據的一個機制. 一些跨分區的操做會觸發 shuffle. shuffle 一般涉及跨 executor 和機器的數據拷貝(會涉及磁盤I/O、數據序列化和網絡I/O), 所以很是昂貴和複雜. Shuffle operations
Spark 的一項重要功能是跨操做持久化或緩存 RDD, 而且可使用不一樣的存儲級別.RDD Persistence.
持久化能夠經過 RDD 的 cache()
或 persist()
方法, 移除持久化可使用 unpersist()
方法.
以前提到過, 在集羣中運行時, 變量會被拷貝到每臺機器上, 而且對變量的改動不會被傳播回驅動程序. Spark 提供了兩種類型的共享變量 broadcast variable 和 accumulator.
Spark 支持數字型的累加器, 用戶也能自定義累加器.
LongAccumulator accum = jsc.sc().longAccumulator(); sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); accum.value(); // returns 10
用戶自定義累加器, 須要實現 AccumulatorV2 接口. 自定義累加器時, 結果類型能夠和元素類型不一樣.
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> { private MyVector myVector = MyVector.createZeroVector(); public void reset() { myVector.reset(); } public void add(MyVector v) { myVector.add(v); } ... } // Then, create an Accumulator of this type: VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2(); // Then, register it into spark context: jsc.sc().register(myVectorAcc, "MyVectorAcc1");
注意, 累加器只有在 RDD 做爲操做的部分計算時纔會進行更新:
LongAccumulator accum = jsc.sc().longAccumulator(); data.map(x -> { accum.add(x); return f(x); }); // Here, accum is still 0 because no actions have caused the `map` to be computed.
Broadcast variable 容許你在每臺機器上建立一個只讀的變量緩存, 而不是和任務一塊兒傳輸. Spark 同時嘗試使用更高效的廣播算法來減小網絡開銷. 使用 SparkContext.broadcast(v)
來建立一個廣播變量.
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3}); broadcastVar.value(); // returns [1, 2, 3]
若是 v 是一個對象, 在建立完廣播變量後不要修改 v, 防止將不一樣的值傳給不一樣的機器. 咱們能夠建立命名的和匿名(named or unnamed)的累加器, 命名的累加器將會被展現在 webUI中.
LongAccumulator accum = jsc.sc().longAccumulator(); sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); accum.value(); // returns 10
廣播變量會在每臺機器上都有一個拷貝, 數據量較大時建議直接使用 RDD.