spark——spark中常說RDD,究竟RDD是什麼?

本文始發於我的公衆號:TechFlow,原創不易,求個關注web


今天是spark專題第二篇文章,咱們來看spark很是重要的一個概念——RDD。編輯器

在上一講當中咱們在本地安裝好了spark,雖然咱們只有local一個集羣,可是仍然不妨礙咱們進行實驗。spark最大的特色就是不管集羣的資源如何,進行計算的代碼都是同樣的,spark會自動爲咱們作分佈式調度工做分佈式

RDD概念

介紹spark離不開RDD,RDD是其中很重要的一個部分。可是不少初學者每每都不清楚RDD到底是什麼,我本身也是同樣,我在系統學習spark以前代碼寫了一堆,可是對於RDD等概念仍然雲裏霧裏。函數

RDD的英文全名是Resilient Distributed Dataset,我把英文寫出來就清楚了不少。即便第一個單詞不認識,至少也能夠知道它是一個分佈式的數據集。第一個單詞是彈性的意思,因此直譯就是彈性分佈式數據集。雖然咱們仍是不夠清楚,可是已經比只知道RDD這個概念清楚多了,學習

RDD是一個不可變的分佈式對象集合,每一個RDD都被分爲多個分區,這些分區運行在集羣的不一樣節點上。spa

不少資料裏只有這麼一句粗淺的解釋,看起來講了不少,可是咱們都get不到。細想有不少疑問,最後我在大神的博客裏找到了詳細的解釋,這位大神翻了spark的源碼,找到了其中RDD的定義,一個RDD當中包含如下內容:3d

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

咱們一條一條來看:調試

  1. 它是一組分區, 分區是spark中數據集的最小單位。也就是說spark當中數據是以分區爲單位存儲的,不一樣的分區被存儲在不一樣的節點上。這也是分佈式計算的基礎。
  2. 一個應用在各個分區上的計算任務。在spark當中 數據和執行的操做是分開的,而且spark基於懶計算的機制,也就是在真正觸發計算的行動操做出現以前,spark會存儲起來對哪些數據執行哪些計算。數據和計算之間的映射關係就存儲在RDD中。
  3. RDD之間的依賴關係, RDD之間存在轉化關係,一個RDD能夠經過轉化操做轉化成其餘RDD,這些轉化操做都會被記錄下來。當部分數據丟失的時候,spark能夠經過記錄的依賴關係從新計算丟失部分的數據,而不是從新計算全部數據。
  4. 一個分區的方法,也就是 計算分區的函數。spark當中支持基於hash的hash分區方法和基於範圍的range分區方法。
  5. 一個列表,存儲的是存儲每一個分區的優先存儲的位置。

經過以上五點,咱們能夠看出spark一個重要的理念。即移動數據不如移動計算,也就是說在spark運行調度的時候,會傾向於將計算分發到節點,而不是將節點的數據蒐集起來計算。RDD正是基於這一理念而生的,它作的也正是這樣的事情。code

建立RDD

spark中提供了兩種方式來建立RDD,一種是讀取外部的數據集,另外一種是將一個已經存儲在內存當中的集合進行並行化orm

咱們一個一個來看,最簡單的方式固然是並行化,由於這不須要外部的數據集,能夠很輕易地作到。

在此以前,咱們先來看一下SparkContext的概念,SparkContext是整個spark的入口,至關於程序的main函數。在咱們啓動spark的時候,spark已經爲咱們建立好了一個SparkContext的實例,命名爲sc,咱們能夠直接訪問到。

咱們要建立RDD也須要基於sc進行,好比下面我要建立一個有字符串構成的RDD:

texts = sc.parallelize(['now test''spark rdd'])

返回的texts就是一個RDD:

除了parallelize以外呢,咱們還能夠從外部數據生成RDD,好比我想從一個文件讀入,可使用sc當中的textFile方法獲取:

text = sc.textFile('/path/path/data.txt')

通常來講,除了本地調試咱們不多會用parallelize進行建立RDD,由於這須要咱們先把數據讀取在內存。因爲內存的限制,使得咱們很難將spark的能力發揮出來。

轉化操做和行動操做

剛纔咱們在介紹RDD的時候其實提到過,RDD支持兩種操做,一種叫作轉化操做(transformation)一種叫作行動操做(action)。

顧名思義,執行轉化操做的時候,spark會將一個RDD轉化成另外一個RDD。RDD中會將咱們此次轉化的內容記錄下來,可是不會進行運算。因此咱們獲得的仍然是一個RDD而不是執行的結果。

好比咱們建立了texts的RDD以後,咱們想要對其中的內容進行過濾,只保留長度超過8的,咱們能夠用filter進行轉化:

textAfterFilter = texts.filter(lambda x: len(x) > 8)

咱們調用以後獲得的也是一個RDD,就像咱們剛纔說的同樣,因爲filter是一個轉化操做,因此spark只會記錄下它的內容,並不會真正執行。

轉化操做能夠操做任意數量的RDD,好比若是我執行以下操做,會一共獲得4個RDD:

inputRDD = sc.textFile('path/path/log.txt')
lengthRDD = inputRDD.filter(lambda x: len(x) > 10)
errorRDD = inputRDD.filter(lambda x: 'error' in x)
unionRDD = errorRDD.union(lengthRDD)

最後的union會將兩個RDD的結果組合在一塊兒,若是咱們執行完上述代碼以後,spark會記錄下這些RDD的依賴信息,咱們把這個依賴信息畫出來,就成了一張依賴圖:

不管咱們執行多少次轉化操做,spark都不會真正執行其中的操做,只有當咱們執行行動操做時,記錄下來的轉化操做纔會真正投入運算。像是first(),take(),count()等都是行動操做,這時候spark就會給咱們返回計算結果了。

其中first的用處是返回第一個結果,take須要傳入一個參數,指定返回的結果條數,count則是計算結果的數量。和咱們逾期的同樣,當咱們執行了這些操做以後,spark爲咱們返回告終果。

本文着重講的是RDD的概念,咱們下篇文章還會着重對轉化操做和行動操做進行深刻解讀。感興趣的同窗不妨期待一下吧~

今天的文章就是這些,若是以爲有所收穫,請順手點個關注或者轉發吧,大家的舉手之勞對我來講很重要。

相關文章
相關標籤/搜索