[Spark] Spark 對RDD編程

本篇博客中的操做都在 ./bin/pyspark 中執行。python

RDD,即彈性分佈式數據集(Resilient Distributed Dataset),是Spark對數據的核心抽象。RDD是分佈式元素的集合,對手的全部操做均可以歸納爲:shell

  • 建立RDD
  • 轉化已有RDD
  • 調用RDD操做進行求值

在這些操做中,Spark會自動將RDD中的數據分發的集羣上,並將操做自動化執行。緩存

每一個RDD都被分爲多個分區,這些分區運行在集羣中的不一樣節點上。分佈式

Get Started

用戶能夠:函數

  • 讀取一個外部數據集
  • 或者使用對象集合(好比 list 或者 set)

來建立 RDD。好比使用 SparkContext.textFile() 來建立一個字符串RDD:優化

lines = sc.textFile("README.md")

RDD建立以後,支持:url

  • 轉化操做(transformation):會由一個RDD生成一個新的RDD。
  • 行動操做(action):會計算出一個結果,並把結果返回到驅動器程序中,或存儲在外部存儲系統中。

Spark對RDD是惰性計算的,只有在行動操做(action)時,纔會真正計算。spa

回到shell 中,再執行:.net

pythonLines = lines.filter(lambda line: "Python" in line)

行動操做 first() 之中,Spark才進行真正的計算,而這時候只須要計算結果中真正須要的數據:在這裏,Spark只須要掃面文件知道找到第一個匹配的行(包含"Python"的行)就中止了。3d

默認狀況下,Spark的RDD會在每次進行行動(Action)操做的時候從新計算,若是想在多個行動操做中使用同一個RDD,可使用.persist()方法來讓Spark把這個RDD緩存下來,這個操做叫作:持久化。

持久化方便在之後的操做中重用數據。

總的來講,Spark會這樣工做:

  1. 建立出RDD
  2. 使用轉化操做(好比filter)對RDD進行轉化,建立出新RDD
  3. 告訴Spark咱們要重用哪些中間結果,對這些RDD進行持久化操做
  4. 使用行動(Action)操做,來觸發一次計算,Spark會對計算進行優化後再執行。

另:cache()persist() 使用的默認存儲級別是同樣的。

建立一個RDD

使用外部數據集的方式比較常見,這裏咱們就看一個文本文檔的例子:

lines = sc.textFile("README.md")

爲了下面的演示不麻煩,咱們這裏主要看經過將程序中的集合轉化爲RDD的方法,快速建立一個RDD:

lines = sc.parallelize(["Hello world", "News about Senate Hacking Hearing","US official says Russia undoubtedly meddled in US election"])

// 注意上面的RDD中出現了兩個"US",後面有用。

對進行RDD操做

RDD的轉化操做是返回一個新的RDD的操做,好比.filter()操做就是轉化操做。

RDD的行動操做是向驅動器程序返回結果,或者把結果寫入外部驅動器,行動操做會觸發實際的計算,好比.count()或者 .first()方法。

轉化操做

場景:找了個本身之前程序的log文件,咱們使用Spark找出其中的錯誤(ERROR)信息,文件link

下面是使用.filter()實現轉化操做:

>>> inputRDD   =  sc.textFile("url_Requests.log")
>>> errorsRDD  =  inputRDD.filter(lambda x : "ERROR"   in x)
>>> cautionRDD =  inputRDD.filter(lambda x : "CAUTION" in x)

注意 .filter() 方法不會改變已有的 inputRDD 中的數據,該操做會返回一個全新的RDD,inputRDD還在後面的程序中還能夠繼續使用。

而後再來一個.union() 操做:

>>> badlineRDD = errorsRDD.union(cautionRDD)

.union() 操做就是取並集,這個還比較好理解。

經過轉化操做,能夠從已有的RDD中派生出新的RDD。

行動(action)操做

好比說.count()操做,就是一個行動操做:

count

另一個常見的操做是.collect():

collect-operation

對於.collect()操做來講,能夠用來獲取整個RDD中的數據。只有當整個RDD的數據能在單臺機器的內存中放得下時,才能使用該方法。

當咱們每次調用一個新的行動操做時,整個RDD都會從頭開始計算,若是要避免這種行爲,用戶可讓中間結果持久化,這個在後面會提到。

關於惰性求值

RDD的轉化都是惰性求值的,就是說在被調用行動曹組偶以前,Spark不會開始計算。

惰性求值覺得這咱們對RDD調用轉化操做是,操做不會當即執行,Spark會在內部記錄下全部要執行的操做信息。咱們能夠把RDD當成咱們經過轉化操做構建出來的特定數據集。

上面操做過的把文本數據讀到RDD的操做一樣也是惰性的,當咱們調用 sc.textFile()時,數據並無讀取進來,而是在必要時纔會讀取。 和轉化操做相同的是,讀取數據的操做也有可能被屢次執行。

常見RDD的轉化操做和行動操做

對各個元素的轉化操做

其中的一個例子是 .map()方法,map能夠對RDD中的每一個數據進行操做:

>>> nums = sc.parallelize([1,2,3,4])
>>> squared = nums.map(lambda x : x ** 2)
>>> squared.collect()
[1, 4, 9, 16]

再好比咱們剛纔的日誌文件:

>>> numberOfLines = errorsRDD.map(lambda line: len(line))
>>> numberOfLines.collect()

這裏,咱們計算了每行錯誤日誌的字符數,結果爲:

另外一個是flatMap,看一個例子就懂了,還記得咱們剛纔建立的lines嗎:

lines.collect()
words = lines.flatMap(lambda line: line.split(" "))
words.collect()

其輸出結果爲:

.map() 有什麼區別呢,這是 map 的輸出結果,很容易懂:

difference-between-map-and-flatmap

使用 .distinct() 操做進行去重:

distinct

常見行動操做

.reduce() 是最經常使用的行動操做:

reduce 接受一個函數做爲參數,這個函數要操做2個相同元素類型的RDD數據,並返回一個一樣類型的新元素。

此外,還有toptake 等常見操做:

行動操做

相關文章
相關標籤/搜索