Value型Transformation算子數組
處理數據類型爲Value型的Transformation算子能夠根據RDD變換算子的輸入分區與輸出分區關係分爲如下幾種類型。緩存
1)輸入分區與輸出分區一對一型。app
2)輸入分區與輸出分區多對一型。分佈式
3)輸入分區與輸出分區多對多型。函數
4)輸出分區爲輸入分區子集型。ui
5)還有一種特殊的輸入與輸出分區一對一的算子類型:Cache型。Cache算子對RDD分區進行緩存。this
這裏的對應指的是分區依賴的對應spa
1.輸入分區與輸出分區一對一型orm
(1)map(func)內存
map是對RDD中的每一個元素都執行一個指定的函數來產生一個新的RDD,新RDD叫做MappedRDD(this, sc.clean(f))。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。
圖3-4中的每一個方框表示一個RDD分區,左側的分區通過用戶自定義函數f:T->U映射爲右側的新的RDD分區。可是實際只有等到Action算子觸發後,這個f函數纔會和其餘函數在一個Stage中對數據進行運算。V1輸入f轉換輸出V’1。
(2)flatMap(func)
相似於map,可是每個輸入元素,會被映射爲0到多個輸出元素(所以,func函數的返回值是一個Seq,而不是單一元素)。內部建立 FlatMappedRDD(this, sc.clean(f))。
圖3-5中小方框表示RDD的一個分區,對分區進行flatMap函數操做,flatMap中傳入的函數爲f:T->U,T和U能夠是任意的數據類型。將分區中的數據經過用戶自定義函數f轉換爲新的數據。外部大方框能夠認爲是一個RDD分區,小方框表明一個集合。V一、V二、V3在一個集合做爲RDD的一個數據項,轉換爲V’一、V’二、V’3後,將結合拆散,造成爲RDD中的數據項。
(3)mapPartitions(func)
mapPartitions是map的一個變種。map的輸入函數是應用於RDD中每一個元素,而mapPartitions的輸入函數是應用於每一個分區,也就是把每一個分區中的內容做爲總體來處理的。
mapPartitions函數獲取到每一個分區的迭代器,在函數中經過這個分區總體的迭代器對整個分區的元素進行操做。內部實現是生成MapPartitionsRDD。圖3-6中的方框表明一個RDD分區。
圖3-6中,用戶經過函數f (iter )=>iter.filter(_>=3)對分區中的全部數據進行過濾,>=3的數據保留。一個方塊表明一個RDD分區,含有一、二、3的分區過濾只剩下元素3。
(4)glom()
glom函數將每一個分區造成一個數組,內部實現是返回的GlommedRDD。圖3-7中的每一個方框表明一個RDD分區。
圖3-7中的方框表明一個分區。該圖表示含有V一、V二、V3的分區經過函數glom造成一個數組Array[(V1),(V2),(V3)]。
2.輸入分區與輸出分區多對一型
(1)union(otherDataset)
使用union函數時須要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合併的RDD元素數據類型相同,並不進行去重操做,保存全部元素。若是想去重,可使用distinct()。++符號至關於uion函數操做。
圖3-8中左側的大方框表明兩個RDD,大方框內的小方框表明RDD的分區。右側大方框表明合併後的RDD,大方框內的小方框表明分區。含有V1,V2…U4的RDD和含有V1,V8…U8的RDD合併全部元素造成一個RDD。V一、V一、V二、V8造成一個分區,其餘元素同理進行合併。
(2)cartesian(otherDataset)
對兩個RDD內的全部元素進行笛卡爾積操做。操做後,內部實現返回CartesianRDD。
左側的大方框表明兩個RDD,大方框內的小方框表明RDD的分區。右側大方框表明合併後的RDD,大方框內的小方框表明分區。大方框表明RDD,大方框中的小方框表明RDD分區。 例如,V1和另外一個RDD中的W一、 W二、 Q5進行笛卡爾積運算造成(V1,W1)、(V1,W2)、(V1,Q5)。
3.輸入分區與輸出分區多對多型
groupBy (func)
將元素經過函數生成相應的Key,數據就轉化爲Key-Value格式,以後將Key相同的元素分爲一組。
圖中,方框表明一個RDD分區,相同key的元素合併到一個組。 例如,V1,V2合併爲一個Key-Value對,其中key爲「 V」 ,Value爲「 V1,V2」 ,造成V,Seq(V1,V2)。
4.輸出分區爲輸入分區子集型
(1)filter(func)
filter的功能是對元素進行過濾,對每一個元素應用f函數,返回值爲true的元素在RDD中保留,返回爲false的將過濾掉。內部實現至關於生成FilteredRDD(this,sc.clean(f))。
圖3-11中的每一個方框表明一個RDD分區。T能夠是任意的類型。經過用戶自定義的過濾函數f,對每一個數據項進行操做,將知足條件,返回結果爲true的數據項保留。例如,過濾掉V二、V3保留了V1,將區分命名爲V1'。
(2)distinct([numTasks]))
distinct將RDD中的元素進行去重操做。圖3-12中的方框表明RDD分區。
圖3-12中的每一個方框表明一個分區,經過distinct函數,將數據去重。例如,重複數據V一、V1去重後只保留一份V1。
(3)subtract(other, numPartitions=None)
subtract至關於進行集合的差操做,RDD 1去除RDD 1和RDD 2交集中的全部元素。
圖3-13中左側的大方框表明兩個RDD,大方框內的小方框表明RDD的分區。右側大方框表明合併後的RDD,大方框內的小方框表明分區。V1在兩個RDD中均有,根據差集運算規則,新RDD不保留,V2在第一個RDD有,第二個RDD沒有,則在新RDD元素中包含V2。
(4)sample(withReplacement, fraction, seed=None)
sample將RDD這個集合內的元素進行採樣,獲取全部元素的子集。用戶能夠設定是否有放回的抽樣、百分比、隨機種子,進而決定採樣方式。
內部實現是生成SampledRDD(withReplacement, fraction, seed)。
函數參數設置以下。
withReplacement=true,表示有放回的抽樣;
withReplacement=false,表示無放回的抽樣。
圖3-14中的每一個方框是一個RDD分區。經過sample函數,採樣50%的數據。V一、V二、U一、U二、U三、U4採樣出數據V1和U一、U2,造成新的RDD。
(5)takeSample(withReplacement, num, seed=None)
takeSample()函數和上面的sample函數是一個原理,可是不使用相對比例採樣,而是按設定的採樣個數進行採樣,同時返回結果再也不是RDD,而是至關於對採樣後的數據進行Collect(),返回結果的集合爲單機的數組。
圖3-15中左側的方框表明分佈式的各個節點上的分區,右側方框表明單機上返回的結果數組。經過takeSample對數據採樣,設置爲採樣一份數據,返回結果爲V1。
5.Cache型
(1)cache
cache將RDD元素從磁盤緩存到內存,至關於persist(MEMORY_ONLY)函數的功能。圖3-14中的方框表明RDD分區。
圖3-16中的每一個方框表明一個RDD分區,左側至關於數據分區都存儲在磁盤,經過cache算子將數據緩存在內存。
(2)persist(storageLevel=StorageLevel(False, True, False, False, 1))
persist函數對RDD進行緩存操做。數據緩存在哪裏由StorageLevel枚舉類型肯定。有如下幾種類型的組合(見圖3-15),DISK表明磁盤,MEMORY表明內存,SER表明數據是否進行序列化存儲。
下面爲函數定義,StorageLevel是枚舉類型,表明存儲模式,用戶能夠經過圖3-17按需選擇。
圖3-17中列出persist函數能夠緩存的模式。例如,MEMORY_AND_DISK_SER表明數據能夠存儲在內存和磁盤,而且以序列化的方式存儲。其餘同理。圖中,方框表明RDD分區。 disk表明存儲在磁盤,mem表明存儲在內存。 數據最初所有存儲在磁盤,經過persist(MEMORY_AND_DISK)將數據緩存到內存,可是有的分區沒法容納在內存,例如:圖3-18中將含有V1,V2,V3的RDD存儲到磁盤,將含有U1,U2的RDD仍舊存儲在內存。