本篇博客中的操做都在 ./bin/pyspark
中執行。python
RDD,即彈性分佈式數據集(Resilient Distributed Dataset),是Spark對數據的核心抽象。RDD是分佈式元素的集合,對手的全部操做均可以歸納爲:shell
在這些操做中,Spark會自動將RDD中的數據分發的集羣上,並將操做自動化執行。緩存
每一個RDD都被分爲多個分區,這些分區運行在集羣中的不一樣節點上。分佈式
用戶能夠:函數
來建立 RDD。好比使用 SparkContext.textFile()
來建立一個字符串RDD:優化
lines = sc.textFile("README.md")
RDD建立以後,支持:url
Spark對RDD是惰性計算的,只有在行動操做(action)時,纔會真正計算。spa
回到shell 中,再執行:.net
pythonLines = lines.filter(lambda line: "Python" in line)
在行動操做 first()
之中,Spark才進行真正的計算,而這時候只須要計算結果中真正須要的數據:在這裏,Spark只須要掃面文件知道找到第一個匹配的行(包含"Python"的行)就中止了。3d
默認狀況下,Spark的RDD會在每次進行行動(Action)操做的時候從新計算,若是想在多個行動操做中使用同一個RDD,可使用.persist()
方法來讓Spark把這個RDD緩存下來,這個操做叫作:持久化。
持久化方便在之後的操做中重用數據。
總的來講,Spark會這樣工做:
另:cache()
和 persist()
使用的默認存儲級別是同樣的。
使用外部數據集的方式比較常見,這裏咱們就看一個文本文檔的例子:
lines = sc.textFile("README.md")
爲了下面的演示不麻煩,咱們這裏主要看經過將程序中的集合轉化爲RDD的方法,快速建立一個RDD:
lines = sc.parallelize(["Hello world", "News about Senate Hacking Hearing","US official says Russia undoubtedly meddled in US election"])
// 注意上面的RDD中出現了兩個"US",後面有用。
RDD的轉化操做是返回一個新的RDD的操做,好比.filter()
操做就是轉化操做。
RDD的行動操做是向驅動器程序返回結果,或者把結果寫入外部驅動器,行動操做會觸發實際的計算,好比.count()
或者 .first()
方法。
場景:找了個本身之前程序的log文件,咱們使用Spark找出其中的錯誤(ERROR)信息,文件link。
下面是使用.filter()
實現轉化操做:
>>> inputRDD = sc.textFile("url_Requests.log") >>> errorsRDD = inputRDD.filter(lambda x : "ERROR" in x) >>> cautionRDD = inputRDD.filter(lambda x : "CAUTION" in x)
注意 .filter()
方法不會改變已有的 inputRDD
中的數據,該操做會返回一個全新的RDD,inputRDD還在後面的程序中還能夠繼續使用。
而後再來一個.union()
操做:
>>> badlineRDD = errorsRDD.union(cautionRDD)
.union()
操做就是取並集,這個還比較好理解。
經過轉化操做,能夠從已有的RDD中派生出新的RDD。
好比說.count()
操做,就是一個行動操做:
另一個常見的操做是.collect()
:
對於.collect()
操做來講,能夠用來獲取整個RDD中的數據。只有當整個RDD的數據能在單臺機器的內存中放得下時,才能使用該方法。
當咱們每次調用一個新的行動操做時,整個RDD都會從頭開始計算,若是要避免這種行爲,用戶可讓中間結果持久化,這個在後面會提到。
RDD的轉化都是惰性求值的,就是說在被調用行動曹組偶以前,Spark不會開始計算。
惰性求值覺得這咱們對RDD調用轉化操做是,操做不會當即執行,Spark會在內部記錄下全部要執行的操做信息。咱們能夠把RDD當成咱們經過轉化操做構建出來的特定數據集。
上面操做過的把文本數據讀到RDD的操做一樣也是惰性的,當咱們調用 sc.textFile()
時,數據並無讀取進來,而是在必要時纔會讀取。 和轉化操做相同的是,讀取數據的操做也有可能被屢次執行。
其中的一個例子是 .map()
方法,map能夠對RDD中的每一個數據進行操做:
>>> nums = sc.parallelize([1,2,3,4]) >>> squared = nums.map(lambda x : x ** 2) >>> squared.collect() [1, 4, 9, 16]
再好比咱們剛纔的日誌文件:
>>> numberOfLines = errorsRDD.map(lambda line: len(line)) >>> numberOfLines.collect()
這裏,咱們計算了每行錯誤日誌的字符數,結果爲:
另外一個是flatMap,看一個例子就懂了,還記得咱們剛纔建立的lines嗎:
lines.collect() words = lines.flatMap(lambda line: line.split(" ")) words.collect()
其輸出結果爲:
和.map()
有什麼區別呢,這是 map
的輸出結果,很容易懂:
使用 .distinct()
操做進行去重:
.reduce()
是最經常使用的行動操做:
reduce
接受一個函數做爲參數,這個函數要操做2個相同元素類型的RDD數據,並返回一個一樣類型的新元素。
此外,還有top
, take
等常見操做: