RDD
全稱爲 Resilient Distributed Datasets,是 Spark 最基本的數據抽象,它是隻讀的、分區記錄的集合,支持並行操做,能夠由外部數據集或其餘 RDD 轉換而來,它具備如下特性:html
RDD[T]
抽象類的部分相關代碼以下:java
// 由子類實現以計算給定分區 def compute(split: Partition, context: TaskContext): Iterator[T] // 獲取全部分區 protected def getPartitions: Array[Partition] // 獲取全部依賴關係 protected def getDependencies: Seq[Dependency[_]] = deps // 獲取優先位置列表 protected def getPreferredLocations(split: Partition): Seq[String] = Nil // 分區器 由子類重寫以指定它們的分區方式 @transient val partitioner: Option[Partitioner] = None
RDD 有兩種建立方式,分別介紹以下:git
這裏使用 spark-shell
進行測試,啓動命令以下:github
spark-shell --master local[4]
啓動 spark-shell
後,程序會自動建立應用上下文,至關於執行了下面的 Scala 語句:shell
val conf = new SparkConf().setAppName("Spark shell").setMaster("local[4]") val sc = new SparkContext(conf)
由現有集合建立 RDD,你能夠在建立時指定其分區個數,若是沒有指定,則採用程序所分配到的 CPU 的核心數:apache
val data = Array(1, 2, 3, 4, 5) // 由現有集合建立 RDD,默認分區數爲程序所分配到的 CPU 的核心數 val dataRDD = sc.parallelize(data) // 查看分區數 dataRDD.getNumPartitions // 明確指定分區數 val dataRDD = sc.parallelize(data,2)
執行結果以下:編程
引用外部存儲系統中的數據集,例如本地文件系統,HDFS,HBase 或支持 Hadoop InputFormat 的任何數據源。數組
val fileRDD = sc.textFile("/usr/file/emp.txt") // 獲取第一行文本 fileRDD.take(1)
使用外部存儲系統時須要注意如下兩點:緩存
二者均可以用來讀取外部文件,可是返回格式是不一樣的:網絡
RDD[String]
,返回的是就是文件內容,RDD 中每個元素對應一行數據;RDD[(String, String)]
,元組中第一個參數是文件路徑,第二個參數是文件內容;def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...} def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions): RDD[(String, String)]={..}
RDD 支持兩種類型的操做:transformations(轉換,從現有數據集建立新數據集)和 actions(在數據集上運行計算後將值返回到驅動程序)。RDD 中的全部轉換操做都是惰性的,它們只是記住這些轉換操做,但不會當即執行,只有遇到 action 操做後纔會真正的進行計算,這相似於函數式編程中的惰性求值。
val list = List(1, 2, 3) // map 是一個 transformations 操做,而 foreach 是一個 actions 操做 sc.parallelize(list).map(_ * 10).foreach(println) // 輸出: 10 20 30
Spark 速度很是快的一個緣由是 RDD 支持緩存。成功緩存後,若是以後的操做使用到了該數據集,則直接從緩存中獲取。雖然緩存也有丟失的風險,可是因爲 RDD 之間的依賴關係,若是某個分區的緩存數據丟失,只須要從新計算該分區便可。
Spark 支持多種緩存級別 :
Storage Level (存儲級別) |
Meaning(含義) |
---|---|
MEMORY_ONLY |
默認的緩存級別,將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中。若是內存空間不夠,則部分分區數據將再也不緩存。 |
MEMORY_AND_DISK |
將 RDD 以反序列化的 Java 對象的形式存儲 JVM 中。若是內存空間不夠,將未緩存的分區數據存儲到磁盤,在須要使用這些分區時從磁盤讀取。 |
MEMORY_ONLY_SER |
將 RDD 以序列化的 Java 對象的形式進行存儲(每一個分區爲一個 byte 數組)。這種方式比反序列化對象節省存儲空間,但在讀取時會增長 CPU 的計算負擔。僅支持 Java 和 Scala 。 |
MEMORY_AND_DISK_SER |
相似於 MEMORY_ONLY_SER ,可是溢出的分區數據會存儲到磁盤,而不是在用到它們時從新計算。僅支持 Java 和 Scala。 |
DISK_ONLY |
只在磁盤上緩存 RDD |
MEMORY_ONLY_2 , MEMORY_AND_DISK_2 , etc |
與上面的對應級別功能相同,可是會爲每一個分區在集羣中的兩個節點上創建副本。 |
OFF_HEAP |
與 MEMORY_ONLY_SER 相似,但將數據存儲在堆外內存中。這須要啓用堆外內存。 |
啓動堆外內存須要配置兩個參數:
- spark.memory.offHeap.enabled :是否開啓堆外內存,默認值爲 false,須要設置爲 true;
- spark.memory.offHeap.size : 堆外內存空間的大小,默認值爲 0,須要設置爲正值。
緩存數據的方法有兩個:persist
和 cache
。cache
內部調用的也是 persist
,它是 persist
的特殊化形式,等價於 persist(StorageLevel.MEMORY_ONLY)
。示例以下:
// 全部存儲級別均定義在 StorageLevel 對象中 fileRDD.persist(StorageLevel.MEMORY_AND_DISK) fileRDD.cache()
Spark 會自動監視每一個節點上的緩存使用狀況,並按照最近最少使用(LRU)的規則刪除舊數據分區。固然,你也可使用 RDD.unpersist()
方法進行手動刪除。
在 Spark 中,一個任務對應一個分區,一般不會跨分區操做數據。但若是遇到 reduceByKey
等操做,Spark 必須從全部分區讀取數據,並查找全部鍵的全部值,而後彙總在一塊兒以計算每一個鍵的最終結果 ,這稱爲 Shuffle
。
Shuffle 是一項昂貴的操做,由於它一般會跨節點操做數據,這會涉及磁盤 I/O,網絡 I/O,和數據序列化。某些 Shuffle 操做還會消耗大量的堆內存,由於它們使用堆內存來臨時存儲須要網絡傳輸的數據。Shuffle 還會在磁盤上生成大量中間文件,從 Spark 1.3 開始,這些文件將被保留,直到相應的 RDD 再也不使用並進行垃圾回收,這樣作是爲了不在計算時重複建立 Shuffle 文件。若是應用程序長期保留對這些 RDD 的引用,則垃圾回收可能在很長一段時間後纔會發生,這意味着長時間運行的 Spark 做業可能會佔用大量磁盤空間,一般可使用 spark.local.dir
參數來指定這些臨時文件的存儲目錄。
因爲 Shuffle 操做對性能的影響比較大,因此須要特別注意使用,如下操做都會致使 Shuffle:
repartition
和 coalesce
;groupByKey
和 reduceByKey
,但 countByKey
除外;cogroup
和 join
。RDD 和它的父 RDD(s) 之間的依賴關係分爲兩種不一樣的類型:
以下圖,每個方框表示一個 RDD,帶有顏色的矩形表示分區:
區分這兩種依賴是很是有用的:
RDD(s) 及其之間的依賴關係組成了 DAG(有向無環圖),DAG 定義了這些 RDD(s) 之間的 Lineage(血統) 關係,經過血統關係,若是一個 RDD 的部分或者所有計算結果丟失了,也能夠從新進行計算。那麼 Spark 是如何根據 DAG 來生成計算任務呢?主要是根據依賴關係的不一樣將 DAG 劃分爲不一樣的計算階段 (Stage):
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南