筆記摘抄自 [美] Holden Karau 等著的《Spark快速大數據分析》python
彈性分佈式數據集(Resilient Distributed Dataset,簡稱 RDD)shell
在 Spark 中,對數據的全部操做不外乎 建立 RDD、轉化已有 RDD 以及 調用 RDD 操做 進行求值。而在這一切背後,Spark 會自動將 RDD 中的數據分發到集羣上,並將操做並行化執行。apache
建立 RDD 的兩種方式:api
- 讀取一個外部數據集
- 驅動器程序裏分發驅動器程序中的對象集合(好比 list 和 set)
這裏經過讀取文本文件做爲一個字符串 RDD:分佈式
>>> lines = sc.textFile("README.md")
RDD 的兩種操做:oop
- 轉化操做(transformation):由一個RDD 生成一個新的RDD,例如篩選數據
- 行動操做(action):對RDD 計算出一個結果,並把結果返回到驅動器程序中,或把結果存儲到外部存儲系統(如HDFS)中
調用轉化操做 filter() :學習
>>> pythonLines = lines.filter(lambda line: "Python" in line)
調用 first() 行動操做 :大數據
>>> pythonLines.first() u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'
@Noticespa
把RDD 持久化3到內存中
>>> pythonLines.persist <bound method PipelinedRDD.persist of PythonRDD[3] at RDD at PythonRDD.scala:53> >>> pythonLines.count() 3 >>> pythonLines.first() u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'
把程序中一個已有的集合傳給 SparkContext 的 parallelize() 方法,這種方式須要把整個數據集先放到一臺機器的內存中,故不經常使用
JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));
JavaRDD<String> lines = sc.textFile("/path/to/README.md");
RDD 的轉化操做是返回一個新的RDD 的操做,好比 map() 和 filter()
展現日誌文件中全部錯誤記錄
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import java.util.List; public class CountError { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("CountError"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); JavaRDD<String> log = javaSparkContext.textFile(args[0]); JavaRDD<String> errorsRDD = log.filter( new Function<String, Boolean>() { public Boolean call(String x) { return x.contains("ERROR"); } }); List<String> errors = errorsRDD.collect(); for (String output : errors) { System.out.println(output); } javaSparkContext.stop(); } }
日誌文件內容
INFO:everything gonna be ok... ERROR:something is wrong! INFO:everything gonna be ok... ERROR:something is wrong! INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... ERROR:something is wrong! INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... ERROR:something is wrong! INFO:everything gonna be ok... INFO:everything gonna be ok...
運行效果
[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class CountError ~/SparkTest2.jar ~/SparkTest2.log ... 19/09/10 16:33:10 INFO DAGScheduler: Job 0 finished: collect at CountError.java:20, took 0.423698 s ERROR:something is wrong! ERROR:something is wrong! ERROR:something is wrong! ERROR:something is wrong! ...
>>> lines = sc.textFile("/root/SparkTest2.log") >>> errorsRDD = lines.filter(lambda lines: "ERROR" in lines) >>> infoRDD = lines.filter(lambda lines: "INFO" in lines) >>> totalRDD = errorsRDD.union(infoRDD) >>> lines.count() 21 >>> errorsRDD.count() 4 >>> infoRDD.count() 17 >>> totalRDD.count() 21
@Notice
轉化操做能夠操做任意數量的輸入 RDD
Spark 會使用譜系圖(lineage graph)來記錄這些不一樣 RDD 之間的依賴關係,以此按需計算每一個 RDD
@P.s.
也能夠依靠譜系圖在持久化的RDD 丟失部分數據時恢復所丟失的數據
把最終求得的結果返回到驅動器程序,或者寫入外部存儲系統中的 RDD 操做
上文例程中的 count() 即是一個行動操做,另外還有 take() 、collect() 等操做
下面以 take() 爲例,獲取 union 後的 totalRDD 的前 10 條
>>> for line in totalRDD.take(10):print line ... ERROR:something is wrong! ERROR:something is wrong! ERROR:something is wrong! ERROR:something is wrong! INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... >>>
@P.s.
程序把RDD 篩選到一個很小的規模單臺機器內存足以放下時纔可使用 collect()
RDD 的轉化操做都是惰性求值的,在被調用行動操做以前 Spark 不會開始計算