RDD,全稱Resilient Distributed Datasets(彈性分佈式數據集),是Spark最爲核心的概念,是Spark對數據的抽象。RDD是分佈式的元素集合,每一個RDD只支持讀操做,且每一個RDD都被分爲多個分區存儲到集羣的不一樣節點上。除此以外,RDD還容許用戶顯示的指定數據存儲到內存和磁盤中,掌握了RDD編程是SPARK開發的第一步。java
DEMO代碼地址:https://github.com/zhp8341/sparkdemo/blob/master/src/main/java/com/demo/spark/rdddemo/OneRDD.javagit
建立RDD有兩種方式:
1 讀取一個數據集(SparkContext.textFile()) :github
2 讀取一個集合(SparkContext.parallelize()) :redis
1:單個RDD轉換操做
map() : 對每一個元素進行操做,返回一個新的RDD
System.out.println("RDD每一個元素乘10:" + rdd.map(v -> v * 10)編程
filter() : 最每一個元素進行篩選,返回符合條件的元素組成的一個新RDD
System.out.println("RDD去掉1的元素:" + rdd.filter(v -> v != 1));緩存
flatMap() : 對每一個元素進行操做,將返回的迭代器的全部元素組成一個新的RDD返回
r.dd.flatMap(x -> x.to(3)).collect()socket
distinct():去重操做
System.out.println("RDD去重操做:" + rdd.distinct());分佈式
rdd最大和最小值
ui
Integer max= rdd.reduce((v1, v2) -> Math.max(v1, v2));spa
Integer min= rdd.reduce((v1, v2) -> Math.min(v1, v2))
2:兩個RDD的轉化操做:
[1, 2, 3] [3, 4, 5] 兩個個RDD簡單相關操做
union() :合併,不去重
System.out.println("兩個RDD集合:" + rdd1.union(rdd2).collect());
intersection() :交集
System.out.println("兩個RDD集合共同元素:" + rdd1.intersection(rdd2).collect());
cartesian() :笛卡兒積
System.out.println("和另一個RDD集合的笛卡爾積:" + rdd1.cartesian(rdd2).collect());
subtract() : 移除相同的內容
rdd1.subtract(rdd2).collect()
collect() :返回全部元素
System.out.println("原始數據:" + rdd.collect());
count() :返回元素個數
System.out.println("統計RDD的全部元素:" + rdd.count());
countByValue() : 各個元素出現的次數
System.out.println("每一個元素出現的次數:" + rdd.countByValue());
take(num) : 返回num個元素
System.out.println("取出rdd返回2個元素:" + rdd.take(2));
top(num) : 返回前num個元素
System.out.println("取出rdd返回最前2個元素:" + rdd.top(2));
reduce(func) :並行整合RDD中的全部數據(最經常使用的)
System.out.println("整合RDD中全部數據(sum):" + rdd.reduce((v1, v2) -> v1 + v2));
foreach(func):對每一個元素使用func
rdd.foreach(t -> System.out.print(t));
cache():
persist():保留着RDD的依賴關係
checkpoint(level:StorageLevel):RDD[T]切斷RDD依賴關係
所謂的控制操做就是持久化你能經過persist()或者cache()方法持久化一個rdd。首先,在action中計算獲得rdd;而後,將其保存在每一個節點的內存中。Spark的緩存是一個容錯的技術-若是RDD的任何一個分區丟失,它 能夠經過原有的轉換(transformations)操做自動的重複計算而且建立出這個分區。此外,咱們能夠利用不一樣的存儲級別存儲每個被持久化的RDD。Spark自動的監控每一個節點緩存的使用狀況,利用最近最少使用原則刪除老舊的數據。若是你想手動的刪除RDD,可使用RDD.unpersist()方法。在實際操做當中咱們能夠藉助第三方進行數據持久化 如:redis