Spark學習筆記2——RDD(上)

Spark學習筆記2——RDD(上)

筆記摘抄自 [美] Holden Karau 等著的《Spark快速大數據分析》python

RDD是什麼?

彈性分佈式數據集(Resilient Distributed Dataset,簡稱 RDD)shell

  • Spark 的核心概念
  • 一個不可變的分佈式對象集合
  • 每一個 RDD 都被分爲多個分區運行在集羣的不一樣節點上
  • RDD 能夠包含 Python、Java、Scala 中任意類型的對象(能夠自定義)

在 Spark 中,對數據的全部操做不外乎 建立 RDD轉化已有 RDD 以及 調用 RDD 操做 進行求值。而在這一切背後,Spark 會自動將 RDD 中的數據分發到集羣上,並將操做並行化執行。apache

例子

建立 RDD 的兩種方式:api

  • 讀取一個外部數據集
  • 驅動器程序裏分發驅動器程序中的對象集合(好比 list 和 set)

這裏經過讀取文本文件做爲一個字符串 RDD:分佈式

>>> lines = sc.textFile("README.md")

RDD 的兩種操做:oop

  • 轉化操做(transformation):由一個RDD 生成一個新的RDD,例如篩選數據
  • 行動操做(action):對RDD 計算出一個結果,並把結果返回到驅動器程序中,或把結果存儲到外部存儲系統(如HDFS)中

調用轉化操做 filter() :學習

>>> pythonLines = lines.filter(lambda line: "Python" in line)

調用 first() 行動操做 :大數據

>>> pythonLines.first()
u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

@Noticespa

  • 惰性計算」:RDD 只有在進行第一個 行動操做 時纔會被計算1
  • 持久化」:RDD默認會在每次行動操做時從新計算2,若是想要在多個行動操做中重複使用同一個 RDD ,須要對該 RDD 進行 「持久化」

把RDD 持久化3到內存中

>>> pythonLines.persist
<bound method PipelinedRDD.persist of PythonRDD[3] at RDD at PythonRDD.scala:53>
>>> pythonLines.count()
3
>>> pythonLines.first()
u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

建立 RDD

並行化方式

把程序中一個已有的集合傳給 SparkContext 的 parallelize() 方法,這種方式須要把整個數據集先放到一臺機器的內存中,故不經常使用

JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));

讀取外部數據集方式

JavaRDD<String> lines = sc.textFile("/path/to/README.md");

RDD 操做

轉化操做

RDD 的轉化操做是返回一個新的RDD 的操做,好比 map() 和 filter()

例程(Java)

展現日誌文件中全部錯誤記錄

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.List;

public class CountError {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("CountError");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        JavaRDD<String> log = javaSparkContext.textFile(args[0]);
        JavaRDD<String> errorsRDD = log.filter(
                new Function<String, Boolean>() {
                    public Boolean call(String x) {
                        return x.contains("ERROR");
                    }
                });
        List<String> errors = errorsRDD.collect();
        for (String output : errors) {
            System.out.println(output);
        }
        javaSparkContext.stop();
    }
}

日誌文件內容

INFO:everything gonna be ok...
ERROR:something is wrong!
INFO:everything gonna be ok...
ERROR:something is wrong!
INFO:everything gonna be ok...
INFO:everything gonna be ok...
INFO:everything gonna be ok...
INFO:everything gonna be ok...
ERROR:something is wrong!
INFO:everything gonna be ok...
INFO:everything gonna be ok...
INFO:everything gonna be ok...
ERROR:something is wrong!
INFO:everything gonna be ok...
INFO:everything gonna be ok...

運行效果

[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class CountError ~/SparkTest2.jar ~/SparkTest2.log
...
19/09/10 16:33:10 INFO DAGScheduler: Job 0 finished: collect at CountError.java:20, took 0.423698 s
ERROR:something is wrong!
ERROR:something is wrong!
ERROR:something is wrong!
ERROR:something is wrong!
...

例程(Python)

>>> lines = sc.textFile("/root/SparkTest2.log")
>>> errorsRDD = lines.filter(lambda lines: "ERROR" in lines)
>>> infoRDD = lines.filter(lambda lines: "INFO" in lines)
>>> totalRDD = errorsRDD.union(infoRDD)
>>> lines.count()
21
>>> errorsRDD.count()
4
>>> infoRDD.count()
17
>>> totalRDD.count()
21

@Notice

  • 轉化操做能夠操做任意數量的輸入 RDD

  • Spark 會使用譜系圖(lineage graph)來記錄這些不一樣 RDD 之間的依賴關係,以此按需計算每一個 RDD

Spark譜系圖.png

@P.s.

​ 也能夠依靠譜系圖在持久化的RDD 丟失部分數據時恢復所丟失的數據

行動操做

把最終求得的結果返回到驅動器程序,或者寫入外部存儲系統中的 RDD 操做

上文例程中的 count() 即是一個行動操做,另外還有 take() 、collect() 等操做

下面以 take() 爲例,獲取 union 後的 totalRDD 的前 10 條

>>> for line in totalRDD.take(10):print line
... 
ERROR:something is wrong!
ERROR:something is wrong!
ERROR:something is wrong!
ERROR:something is wrong!
INFO:everything gonna be ok...
INFO:everything gonna be ok...
INFO:everything gonna be ok...
INFO:everything gonna be ok...
INFO:everything gonna be ok...
INFO:everything gonna be ok...
>>>

@P.s.

程序把RDD 篩選到一個很小的規模單臺機器內存足以放下時纔可使用 collect()

惰性求值

RDD 的轉化操做都是惰性求值的,在被調用行動操做以前 Spark 不會開始計算

  • 不該該把 RDD 看做存放着特定數據的數據集,而最好把每一個 RDD 看成咱們經過轉化操做構建出來的、記錄如何計算數據的 指令列表
  • 把數據讀取到 RDD 的操做也一樣是惰性的
  • 讀取數據的操做也有可能會屢次執行


  1. 若是建立 RDD 或轉化 RDD 時就把文件中全部的行數都存儲起來,會消耗大量存儲空間,Spark 瞭解完整的操做鏈後,能夠只計算結果然正須要的數據,例如行動操做爲 first() 則只存儲 「README.md」 中第一行 「Python」

  2. 若是不這樣作也會致使重複建立 RDD 浪費存儲空間

  3. 默認存儲級別調用 persist() 和 cache() 是同樣的

相關文章
相關標籤/搜索