該部分分爲兩篇,分別介紹RDD與Dataset/DataFrame:html
1、RDDjava
先來看下官網對RDD、DataSet、DataFrame的解釋:sql
1.RDD數據庫
Resilient distributed dataset(RDD),which is a fault-tolerant collection of elements that can be operated on in parallelapache
RDD——彈性分佈式數據集,分佈在集羣的各個結點上具備容錯性的元素集,能夠被並行處理。api
參考連接:http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds緩存
2. DataSet & DataFrameapp
A Dataset is a distributed collection of data.分佈式
A DataFrame is a Dataset organized into named columns.
type DataFrame = DataSet[Row]
Note that, before Spark 2.0, the main programming interface of Spark was the Resilient Distributed Dataset (RDD). After Spark 2.0, RDDs are replaced by Dataset, which is strongly-typed like an RDD, but with richer optimizations under the hood. The RDD interface is still supported, and you can get a more complete reference at the RDD programming guide. However, we highly recommend you to switch to use Dataset, which has better performance than RDD. See the SQL programming guide to get more information about Dataset.
能夠看到Spark2.0之後,DataSet取代了RDD,並具備更高的性能(其中一點即是DataSet支持sql(如select、join、union、groupBy等)操做,能夠像操做數據庫表/視圖似的來進行數據處理)。
參考連接:http://spark.apache.org/docs/latest/quick-start.html
固然,有的場景RDD比DataSet/DataFrame更方便數據處理,好比有個數據集,每行包含不少字段,可是咱們只須要獲取其中的某幾個字段,若是用DataSet/DataFrame,必須定義全部字段的結構,可是,若是使用RDD進行處理,直接獲取每行的指定字段便可,不須要關心其餘字段,後續對特定字段的操做再轉換爲DataSet/DataFrame處理便可,可見,RDD和DataSet結合使用有時候更方便數據數據。
下面分別對RDD、DataSet、DataFrame的使用方法進行介紹。
RDD操做主要分爲兩類:Transformations與Actions。官方將Transformations操做定義爲從一個數據集中生成另外一個數據集;將Actions操做定義爲對數據集進行一系列計算之後返回給驅動程序一個值。能夠看出數據轉換(map)、合併(union)、過濾(filter)等操做均爲Transformations類型,由於他們的結果仍然是一個數據集,而數據聚合(reduce)、統計(count)、保存(saveAsXXX)等操做均爲Actions類型,緣由是他們的最終都要將結果返回給驅動程序(即對結果進行彙總,而Transformations操做只須要在各個node/slave上執行)。
之因此要區分操做類型,是由於Transformations操做是滯後的,不會立刻執行,只有當程序要返回結果給驅動程序時纔會執行,因此定義了Transformations操做後立馬執行println來輸出某個值是得不到結果的,只有執行過Actions操做才能獲得結算結果,且Actions操做會被當即執行。
官方列出的經常使用Transformations操做包括:map、filter、flatMap、mapPartitions、mapPartitionsWithIndex、sample、union、intersection、distinct、groupByKey、reduceBykey、aggregateByKey、sortByKey、join、cogroup、cartesian、pipe、coalsce、repartition、repartitionAndSortWithinPartitions;Actions操做包括:reduce、collect、count、first、take、takeSample、takeOrdered、saveAsTextFile、saveAsSequenceFile、saveAsObjectFile、countByKey、foreach。具體用法能夠參考官方API。
咱們能夠經過SparkContext來生成RDD,下面是兩種獲取SparkContext實例的方法。
//1.SparkContext val sc = new SparkContext(new SparkConf().setAppName("Spark Context")) val rdd1 = sc.textFile("data.txt") //2.SparkSession val spark = SparkSession.builder().appName("Spark Session").getOrCreate() val rdd2 = spark.sparkContext.textFile("data.txt")
上例中經過textFile讀取本地文件來生成RDD,textFile參數能夠是HDFS路徑、本地文件(非單機模式下須要每一個node上都有)或者任何hadoop文件系統支持的URI;除了textFile還可使用hadoopFile、hadoopRDD、parallelize、makeRDD來生成RDD。這裏提一下,textFile支持通配符形式的path,好比hdfs://xx.xx.xx.xx:9000/path1/ds=*/*.gz,特別適用於按分區存儲的數據處理。
下面經過一個例子演示一下RDD經常使用操做的用法:
下面的代碼對保存在HDFS上的日誌文件進行解析、過濾、統計Title字段的字節數並計算Title的最大長度。
package com.personal.test import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.storage.StorageLevel object RDDTest { def main(args: Array[String]): Unit = { val MinFieldsLength = 53 val VTitleIndex = 11 val inputPath = "hdfs://192.168.1.44:9000/user/name/input/attempt_1530774346064" val outputPath = "hdfs://192.68.1.44:9000/user/name/output/" val sparkConf = new SparkConf().setAppName("RDD Test") val sc = new SparkContext(sparkConf) val rdd = sc.textFile(inputPath) val lineCounter = sc.longAccumulator("LineCounter") val resultRdd = rdd.map(_.split("\t")) .filter( fields =>{ lineCounter.add(1) if(fields.length < MinFieldsLength) false else true } ) .map(fields => fields(VTitleIndex).length) .persist(StorageLevel.MEMORY_ONLY) resultRdd.saveAsTextFile(outputPath) val maxTitleLength = resultRdd.reduce((a, b) => if (a>b) a else b) println(s"Line count: ${lineCounter.value}") println(s"Max title length: ${maxTitleLength}") sc.stop() } }
例中先初始化一個SparkContext對象,而後經過textFile讀取hdfs中的文件,生成一個RDD,接着調用map逐行分割字符串,再調用filter對字段數不合法的行進行過濾,接着再計算每行的Title字段長度並寫入hdfs,同時使用reduce計算Title的最大長度,最後輸出統計信息。根據RDD操做類型定義,文中調用map->filter->map的過程是不會立刻被執行的,直到調用saveAsTextFile和reduce時纔會被執行。
上例中用到了一個特殊的變量——累加器(Accumulator),經過SparkContext.longAccumulator(name: String)定義,顧名思義,只能進行加法操做,用於計數器或求總和。這類變量在Spark中稱爲共享變量(Shared Variables),即在集羣的各個node中的值是相同的),與共享變量相反,程序中定義的其餘變量在集羣的各個node之間是互相獨立的。
除了計數器,Spark還支持另外一種共享變量——廣播變量(Broadcast Variables),它是隻讀的,被cache到每臺機器中,經常使用於各個node之間的大規模數據分發。Spark任務是分階段執行的,各個階段須要的數據即是經過broadcast方式分發的,cache時進行序列化,任務執行時再反序列化。所以,只有在各個階段須要同一份數據或須要cache反序列化後的值時才須要顯式定義broadcast變量,經過調用SparkContext.broadcast(value: T)來定義。
org.apache.spark.rdd.RDD.map原型爲:
def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U]
Return a new RDD by applying a function to all elements of this RDD.
能夠看到,map是一個高階函數,即參數也是一個函數;第二個爲隱式參數,不須要顯示賦值(須要初始化spark後"import spark.implicits._"),程序會根據上下文自動賦值。map經常使用於對數據逐行處理,返回值是個新的RDD,處理後的結果數不變。如上例中:
val resultRdd = rdd.map(_.split("\t"))
org.apache.spark.rdd.RDD.filter原型爲:
def filter(f: (T) ⇒ Boolean): RDD[T]
Return a new RDD containing only the elements that satisfy a predicate.
同map同樣,filter也是一個高階函數,函數返回值爲true時保留該數據,爲false時過濾掉該數據。如上例中:
val resultRdd = rdd.map(_.split("\t")) .filter( fields =>{ lineCounter.add(1) if(fields.length < MinFieldsLength) false else true } )
org.apache.spark.rdd.RDD.saveAsTextFile原型爲:
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
Save this RDD as a compressed text file, using string representations of elements.
def saveAsTextFile(path: String): Unit
Save this RDD as a text file, using string representations of elements.
saveAsTextFile爲Actions類型的方法,用於將rdd結果以text格式持久化到指定path下,寫入的時候會檢查是否path已經存在,存在則拋出異常。第二個參數用於指定壓縮類型,如org.apache.hadoop.io.compress.GzipCodec、com.hadoop.compression.lzo.LzoCodec,默認不壓縮。如上例中:
resultRdd.saveAsTextFile(outputPath)
org.apache.spark.rdd.RDD.reduce原型爲:
def reduce(f: (T, T) ⇒ T): T
Reduces the elements of this RDD using the specified commutative and associative binary operator.
reduce的參數爲同一類型的二元操做函數,即「T <operator> T」,可用於求最值,求和等聚合需求。如上例中:
val maxTitleLength = resultRdd.reduce((a, b) => if (a>b) a else b)
上例中還用到了一個Spark中很重要的功能——持久化(Persistence),它將RDD持久化/緩存到各個node的內存中以加速後續的計算。能夠經過調用persist() 或 cache()來使RDD持久化,cache的存儲方式是反序列化後寫入內存,persist的存儲方式(StorageLevel)能夠經過參數指定,不指定參數等同於cache,可選的存儲方式包括:
類型 | 說明 |
MEMORY_ONLY | 將RDD反序列化爲java objects寫入JVM。若沒法徹底寫入內存,則部分partiton內的數據將在須要的時候從新計算。 |
MEMORY_AND_DISK | 將RDD反序列化爲java objects寫入JVM。若沒法徹底寫入內存,則沒法寫入內存的寫入磁盤,須要的時候從磁盤讀取。 |
MEMORY_ONLY_SER | 將RDD序列化爲java objects寫入JVM。 |
MEMORY_AND_DISK_SER | 將RDD序列化爲java objects寫入JVM。若沒法徹底寫入,則沒法寫入內存的部分寫入磁盤。 |
DISK_ONLY | 將RDD寫入磁盤 |
MEMORY_ONLY_2 MEMORY_AND_DISK_2 |
與MEMORY_ONLY、MEMORY_AND_DISK相似,但每一個partition會備份到兩個nodes中。 |
OFF_HEAP | 與MEMORY_ONLY_SER相似,可是會寫入堆外內存(off-heap memory),前提是啓用了堆外內存。 |
其餘操做能夠查閱API文檔或其餘資料,這裏再也不舉例。