spark RDD算子詳解1

Value型Transformation算子數組

處理數據類型爲Value型的Transformation算子能夠根據RDD變換算子的輸入分區與輸出分區關係分爲如下幾種類型。緩存

1)輸入分區與輸出分區一對一型。app

2)輸入分區與輸出分區多對一型。分佈式

3)輸入分區與輸出分區多對多型。函數

4)輸出分區爲輸入分區子集型。ui

5)還有一種特殊的輸入與輸出分區一對一的算子類型:Cache型。Cache算子對RDD分區進行緩存。this

這裏的對應指的是分區依賴的對應spa

1.輸入分區與輸出分區一對一型orm

(1)map(func)內存

map是對RDD中的每一個元素都執行一個指定的函數來產生一個新的RDD,新RDD叫做MappedRDD(this, sc.clean(f))。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。

圖3-4中的每一個方框表示一個RDD分區,左側的分區通過用戶自定義函數f:T->U映射爲右側的新的RDD分區。可是實際只有等到Action算子觸發後,這個f函數纔會和其餘函數在一個Stage中對數據進行運算。V1輸入f轉換輸出V’1。

(2)flatMap(func)

相似於map,可是每個輸入元素,會被映射爲0到多個輸出元素(所以,func函數的返回值是一個Seq,而不是單一元素)。內部建立 FlatMappedRDD(this, sc.clean(f))。

圖3-5中小方框表示RDD的一個分區,對分區進行flatMap函數操做,flatMap中傳入的函數爲f:T->U,T和U能夠是任意的數據類型。將分區中的數據經過用戶自定義函數f轉換爲新的數據。外部大方框能夠認爲是一個RDD分區,小方框表明一個集合。V一、V二、V3在一個集合做爲RDD的一個數據項,轉換爲V’一、V’二、V’3後,將結合拆散,造成爲RDD中的數據項。

(3)mapPartitions(func)

mapPartitions是map的一個變種。map的輸入函數是應用於RDD中每一個元素,而mapPartitions的輸入函數是應用於每一個分區,也就是把每一個分區中的內容做爲總體來處理的。

mapPartitions函數獲取到每一個分區的迭代器,在函數中經過這個分區總體的迭代器對整個分區的元素進行操做。內部實現是生成MapPartitionsRDD。圖3-6中的方框表明一個RDD分區。

圖3-6中,用戶經過函數f (iter )=>iter.filter(_>=3)對分區中的全部數據進行過濾,>=3的數據保留。一個方塊表明一個RDD分區,含有一、二、3的分區過濾只剩下元素3。

(4)glom()

glom函數將每一個分區造成一個數組,內部實現是返回的GlommedRDD。圖3-7中的每一個方框表明一個RDD分區。

圖3-7中的方框表明一個分區。該圖表示含有V一、V二、V3的分區經過函數glom造成一個數組Array[(V1),(V2),(V3)]。

2.輸入分區與輸出分區多對一型

(1)union(otherDataset)

使用union函數時須要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合併的RDD元素數據類型相同,並不進行去重操做,保存全部元素。若是想去重,可使用distinct()。++符號至關於uion函數操做。

圖3-8中左側的大方框表明兩個RDD,大方框內的小方框表明RDD的分區。右側大方框表明合併後的RDD,大方框內的小方框表明分區。含有V1,V2…U4的RDD和含有V1,V8…U8的RDD合併全部元素造成一個RDD。V一、V一、V二、V8造成一個分區,其餘元素同理進行合併。

(2)cartesian(otherDataset)

對兩個RDD內的全部元素進行笛卡爾積操做。操做後,內部實現返回CartesianRDD。

左側的大方框表明兩個RDD,大方框內的小方框表明RDD的分區。右側大方框表明合併後的RDD,大方框內的小方框表明分區。大方框表明RDD,大方框中的小方框表明RDD分區。 例如,V1和另外一個RDD中的W一、 W二、 Q5進行笛卡爾積運算造成(V1,W1)、(V1,W2)、(V1,Q5)。

3.輸入分區與輸出分區多對多型

groupBy (func)

將元素經過函數生成相應的Key,數據就轉化爲Key-Value格式,以後將Key相同的元素分爲一組。
圖中,方框表明一個RDD分區,相同key的元素合併到一個組。 例如,V1,V2合併爲一個Key-Value對,其中key爲「 V」 ,Value爲「 V1,V2」 ,造成V,Seq(V1,V2)。

4.輸出分區爲輸入分區子集型

(1)filter(func)

filter的功能是對元素進行過濾,對每一個元素應用f函數,返回值爲true的元素在RDD中保留,返回爲false的將過濾掉。內部實現至關於生成FilteredRDD(this,sc.clean(f))。
圖3-11中的每一個方框表明一個RDD分區。T能夠是任意的類型。經過用戶自定義的過濾函數f,對每一個數據項進行操做,將知足條件,返回結果爲true的數據項保留。例如,過濾掉V二、V3保留了V1,將區分命名爲V1'。

(2)distinct([numTasks]))

distinct將RDD中的元素進行去重操做。圖3-12中的方框表明RDD分區。

圖3-12中的每一個方框表明一個分區,經過distinct函數,將數據去重。例如,重複數據V一、V1去重後只保留一份V1。

 

(3)subtract(other, numPartitions=None)

subtract至關於進行集合的差操做,RDD 1去除RDD 1和RDD 2交集中的全部元素。

圖3-13中左側的大方框表明兩個RDD,大方框內的小方框表明RDD的分區。右側大方框表明合併後的RDD,大方框內的小方框表明分區。V1在兩個RDD中均有,根據差集運算規則,新RDD不保留,V2在第一個RDD有,第二個RDD沒有,則在新RDD元素中包含V2。

(4)sample(withReplacement, fraction, seed=None)

sample將RDD這個集合內的元素進行採樣,獲取全部元素的子集。用戶能夠設定是否有放回的抽樣、百分比、隨機種子,進而決定採樣方式。

內部實現是生成SampledRDD(withReplacement, fraction, seed)。

函數參數設置以下。

withReplacement=true,表示有放回的抽樣;

withReplacement=false,表示無放回的抽樣。

圖3-14中的每一個方框是一個RDD分區。經過sample函數,採樣50%的數據。V一、V二、U一、U二、U三、U4採樣出數據V1和U一、U2,造成新的RDD。

(5)takeSample(withReplacement, num, seed=None)

takeSample()函數和上面的sample函數是一個原理,可是不使用相對比例採樣,而是按設定的採樣個數進行採樣,同時返回結果再也不是RDD,而是至關於對採樣後的數據進行Collect(),返回結果的集合爲單機的數組。

圖3-15中左側的方框表明分佈式的各個節點上的分區,右側方框表明單機上返回的結果數組。經過takeSample對數據採樣,設置爲採樣一份數據,返回結果爲V1。

5.Cache型

(1)cache

cache將RDD元素從磁盤緩存到內存,至關於persist(MEMORY_ONLY)函數的功能。圖3-14中的方框表明RDD分區。

圖3-16中的每一個方框表明一個RDD分區,左側至關於數據分區都存儲在磁盤,經過cache算子將數據緩存在內存。

(2)persist(storageLevel=StorageLevel(False, True, False, False, 1))

persist函數對RDD進行緩存操做。數據緩存在哪裏由StorageLevel枚舉類型肯定。有如下幾種類型的組合(見圖3-15),DISK表明磁盤,MEMORY表明內存,SER表明數據是否進行序列化存儲。

下面爲函數定義,StorageLevel是枚舉類型,表明存儲模式,用戶能夠經過圖3-17按需選擇。

圖3-17中列出persist函數能夠緩存的模式。例如,MEMORY_AND_DISK_SER表明數據能夠存儲在內存和磁盤,而且以序列化的方式存儲。其餘同理。圖中,方框表明RDD分區。 disk表明存儲在磁盤,mem表明存儲在內存。 數據最初所有存儲在磁盤,經過persist(MEMORY_AND_DISK)將數據緩存到內存,可是有的分區沒法容納在內存,例如:圖3-18中將含有V1,V2,V3的RDD存儲到磁盤,將含有U1,U2的RDD仍舊存儲在內存。

相關文章
相關標籤/搜索