Spark RDD、DataFrame原理及操做詳解

RDD是什麼?

  RDD (resilientdistributed dataset),指的是一個只讀的,可分區的分佈式數據集,這個數據集的所有或部分能夠緩存在內存中,在屢次計算間重用。html

  RDD內部能夠有許多分區(partitions),每一個分區又擁有大量的記錄(records)。java

五個特徵:sql

  dependencies:創建RDD的依賴關係,主要rdd之間是寬窄依賴的關係,具備窄依賴關係的rdd能夠在同一個stage中進行計算。數據庫

  partition:一個rdd會有若干個分區,分區的大小決定了對這個rdd計算的粒度,每一個rdd的分區的計算都在一個單獨的任務中進行。json

  preferedlocations:按照「移動數據不如移動計算」原則,在spark進行任務調度的時候,優先將任務分配到數據塊存儲的位置數組

  compute:spark中的計算都是以分區爲基本單位的,compute函數只是對迭代器進行復合,並不保存單次計算的結果。緩存

  partitioner:只存在於(K,V)類型的rdd中,非(K,V)類型的partitioner的值就是None。網絡

 

  rdd的算子action會觸發真正的做業提交,而transformation算子是不會當即觸發做業提交的。app

  在Spark中,全部RDD的轉換都是是惰性求值的。RDD的轉換操做transformation會生成新的RDD,新的RDD的數據依賴於原來的RDD的數據,每一個RDD又包含多個分區。那麼一段程序實際上就構造了一個由相互依賴的多個RDD組成的有向無環圖(DAG)。並經過在RDD上執行action動做將這個有向無環圖做爲一個Job提交給Spark執行分佈式

  在DAG中又進行stage的劃分,劃分的依據是依賴算子是不是shuffle(如reduceByKey,Join等)的,每一個stage又能夠劃分紅若干task。接下來的事情就是driver發送task到executor,executor本身的線程池去執行這些task,完成以後將結果返回給driver。action算子是劃分不一樣job的依據。

  Spark對於有向無環圖Job進行調度,肯定階段(Stage),分區(Partition),流水線(Pipeline),任務(Task)和緩存(Cache),進行優化,並在Spark集羣上運行Job。RDD之間的依賴分爲寬依賴(依賴多個分區)和窄依賴(只依賴一個分區),在肯定階段時,須要根據寬依賴shuffle劃分階段。根據分區劃分任務。

  Spark支持故障恢復的方式也不一樣,提供兩種方式,Linage,經過數據的血緣關係,再執行一遍前面的處理,Checkpoint,將數據集存儲到持久存儲中。  Spark爲迭代式數據處理提供更好的支持。每次迭代的數據能夠保存在內存中,而不是寫入文件

 

這裏注意兩個算子coalesce()和repartition()

coalesce

def coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T] 
該函數用於將RDD進行重分區,使用HashPartitioner。 
第一個參數爲重分區的數目,第二個爲是否進行shuffle,默認爲false。

repartition

def repartition(numPartitions: Int): RDD[T] 
該函數其實就是coalesce函數第二個參數爲true的實現。

使用注意

他們兩個都是RDD的分區進行從新劃分,repartition只是coalesce接口中shuffle爲true的簡易實現,(假設RDD有N個分區,須要從新劃分紅M個分區) 
  1)N < M。通常狀況下N個分區有數據分佈不均勻的情況,利用HashPartitioner函數將數據從新分區爲M個,這時須要將shuffle設置爲true。 
  2)若是N > M而且N和M相差很少,(假如N是1000,M是100)那麼就能夠將N個分區中的若干個分區合併成一個新的分區,最終合併爲M個分區,這時能夠將shuff設置爲false,在shuffl爲false的狀況下,若是M>N時,coalesce爲無效的,不進行shuffle過程,父RDD和子RDD之間是窄依賴關係。 
  3)若是N > M而且二者相差懸殊,這時若是將shuffle設置爲false,父子RDD是窄依賴關係,他們同處在一個stage中,就可能形成Spark程序的並行度不夠,從而影響性能,若是在M爲1的時候,爲了使coalesce以前的操做有更好的並行度,能夠講shuffle設置爲true。

  總之:若是shuff爲false時,若是傳入的參數大於現有的分區數目,RDD的分區數不變,也就是說不通過shuffle,是沒法將RDDde分區數變多的

參考:

Spark算子:RDD基本轉換操做(2)–coalesce、repartition

更多RDD算子內容推薦參考

    Spark函數詳解系列之RDD基本轉換

    Spark經常使用函數講解之鍵值RDD轉換

    Spark經常使用函數講解之Action操做

 

Spark的RDD原理以及2.0特性的介紹

 

 

窄依賴和寬依賴

  shuffle 是劃分 DAG 中 stage 的標識,同時影響 Spark 執行速度的關鍵步驟
  RDD 的 Transformation 函數中,又分爲窄依賴(narrow dependency)和寬依賴(wide dependency)的操做.窄依賴跟寬依賴的區別是是否發生 shuffle(洗牌) 操做.寬依賴會發生 shuffle 操做. 窄依賴是子 RDD的各個分片(partition)不依賴於其餘分片,可以獨立計算獲得結果,寬依賴指子 RDD 的各個分片會依賴於父RDD 的多個分片,因此會形成父 RDD 的各個分片在集羣中從新分片  

以下圖所示:map就是一種窄依賴,而join則會致使寬依賴

 

如上面的map,filter,union屬於第一類窄依賴,而join with inputs co-partitioned(對輸入進行協同劃分的join操做,也就是說先按照key分組而後shuffle write的時候一個父分區對應一個子分區)則爲第二類窄依賴

 groupByKey和對輸入未協同劃分的join操做就是寬依賴,這是shuffle類操做。

細說:

  首先,窄依賴容許在單個集羣節點上流水線式執行,這個節點能夠計算全部父級分區。例如,能夠逐個元素地依次執行filter操做和map操做。相反,寬依賴須要全部的父RDD數據可用而且數據已經經過類MapReduce的操做shuffle完成。 
  其次,在窄依賴中,節點失敗後的恢復更加高效。由於只有丟失的父級分區須要從新計算,而且這些丟失的父級分區能夠並行地在不一樣節點上從新計算。與此相反,在寬依賴的繼承關係中,單個失敗的節點可能致使一個RDD的全部先祖RDD中的一些分區丟失,致使計算的從新執行。

 

// Map: "cat" -> c, cat
val rdd1 = rdd.Map(x => (x.charAt(0), x))
// groupby same key and count
val rdd2 = rdd1.groupBy(x => x._1).
                Map(x => (x._1, x._2.toList.length))

 

第一個 Map 操做將 RDD 裏的各個元素進行映射, RDD 的各個數據元素之間不存在依賴,能夠在集羣的各個內存中獨立計算,也就是並行化,第二個 groupby 以後的 Map 操做,爲了計算相同 key 下的元素個數,須要把相同 key 的元素彙集到同一個 partition 下,因此形成了數據在內存中的從新分佈,即 shuffle 操做.shuffle 操做是 spark 中最耗時的操做,應儘可能避免沒必要要的 shuffle.

根據是否發生 shuffle 操做可以將其分紅以下的 stage 類型

(join 須要針對同一個 key 合併,因此須要 shuffle) 
  運行到每一個 stage 的邊界時,數據在父 stage 中按照 Task 寫到磁盤上,而在子 stage 中經過網絡從上一個 Task 中去讀取數據。這些操做會致使很嚴重的網絡傳輸以及磁盤的I/O,因此 stage 的邊界是很是佔資源的,在編寫 Spark 程序的時候須要儘可能避免的 。父 stage 中 partition 個數與子 stage 的 partition 個數可能不一樣,因此那些產生 stage 邊界的 Transformation 經常須要接受一個 numPartition 的參數來以爲子 stage 中的數據將被切分爲多少個 partition。 
PS:shuffle 操做的時候能夠用 combiner 壓縮數據,減小 IO 的消耗

 

參考:那些年咱們對Spark RDD的理解

 

DataFrame是什麼?
  在Spark中,DataFrame是一種以RDD爲基礎的分佈式數據集,相似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏於DataFrame背後的數據源以及做用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提高運行時效率的目標。反觀RDD,因爲無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。
更多參考

 

一:DataFrame建立

SparkSQL能夠以其餘RDD對象、parquet文件、json文件、hive表,以及經過JDBC鏈接到其餘關係型數據庫做爲數據源來生成DataFrame對象。

1)jdbc

【讀】

postgresUrl="jdbc:postgresql://127.0.0.1:5432/testdb" dimDF = sqlContext.read.format('jdbc').options(url=postgresUrl,dbtable=tableName,user="root",password="root") .load() dimDF.registerTempTable(tmpTableName)

 【寫】

 
 
self.postgresURL = str(self.postgresIP) + ":" + str(self.postgresPort) + "/" + str(self.postgresDB)
self.postgresqlDatasource = {
"url" : "jdbc:postgresql://" + self.postgresURL,
"user" : self.postgresUser,
"password" : self.postgresPwd
}
resultDF.coalesce(int(partitionNum)).write.jdbc(url=postgresqlDatasource["url"], table=reportTable, mode='append', properties=postgresqlDatasource)

 

2)parquet

【讀】

telematicFilePath = "/user/spark/test/telematic.parquet/key=" + handleRecordDateStr if( common.fileExist(telematicFilePath, self.sc) ): df = self.sqlContext.read.schema(TELEMATIC_PARQUET_SCHEMA).parquet(telematicFilePath).coalesce(int(self.partitionNum))
# schema for /user/spark/test/telematic.parquet
TELEMATIC_PARQUET_SCHEMA = SQLType.StructType([
  SQLType.StructField('dm_transct_date_hr_key', SQLType.LongType(), True), SQLType.StructField('dm_vehicle_dim_key', SQLType.IntegerType(), True), SQLType.StructField('dm_driver_dim_key', SQLType.IntegerType(), True), SQLType.StructField('dm_company_dim_key', SQLType.IntegerType(), True), SQLType.StructField('deviceId', SQLType.StringType(), True), SQLType.StructField('companyId', SQLType.StringType(), True)])

 【寫】

df.write.parquet(parquetPath, mode="overwrite")

 

3)json 

df = sqlContext.read.json(path)

 

4)list列表

dataList = resultDF.collect()
resultDF = self.sqlContext.createDataFrame(dataList)

 

5)Rdd

if rddSchema is None:
    df = sqlContext.createDataFrame(rdd)
else:
    df = sqlContext.createDataFrame(rdd, rddSchema)
rdd = sc.parallelize(resultList)
df = self.sqlContext.createDataFrame(rdd)

 

二:Transform操做

三:Action操做

一、 collect() ,返回一個數組,包括dataframe集合全部的行

df = sqlContext.createDataFrame(parquetRecordList, PARQUET_FILE_SCHEMA) for key in df.rdd.map(lambda x: x["key"]).distinct().collect(): filePath = "/user/spark/test.parquet/key=20171110" df.filter("key="+str(key)).drop("key").write.parquet(filePath, mode="append")

 

二、 collectAsList() 返回值是一個java類型的數組,返回dataframe集合全部的行
三、 count() 返回一個number類型的,返回dataframe集合的行數
四、 toJson
五、 first() 返回第一行 ,類型是row類型
六、 head() 返回第一行 ,類型是row類型
七、 head(n:Int)返回n行  ,類型是row 類型
八、 show()返回dataframe集合的值 默認是20行,返回類型是unit
九、 show(n:Int)返回n行,,返回值類型是unit
十、table(n:Int) 返回n行  ,類型是row 類型

 

dataframe的基本操做
一、 cache()同步數據的內存

 

data = self.sqlContext.sql(queryStr).toJSON().cache().collect()


二、 columns 返回一個string類型的數組,返回值是全部列的名字
三、 dtypes返回一個string類型的二維數組,返回值是全部列的名字以及類型
四、 explan()打印執行計劃  物理的
五、 toJSON 轉換爲json格式數據
六、 isLocal 返回值是Boolean類型,若是容許模式是local返回true 不然返回false
七、 persist(newlevel:StorageLevel) 返回一個dataframe.this.type 輸入存儲模型類型

稍後詳解
八、 printSchema() 打印出字段名稱和類型 按照樹狀結構來打印
九、 registerTempTable(tablename:String) 返回Unit ,將df的對象只放在一張表裏面,這個表隨着對象的刪除而刪除了
十、 schema 返回structType 類型,將字段名稱和類型按照結構體類型返回
十一、 toDF()返回一個新的dataframe類型的
十二、 toDF(colnames:String*)將參數中的幾個字段返回一個新的dataframe類型的,
1三、 unpersist() 返回dataframe.this.type 類型,去除模式中的數據
1四、 unpersist(blocking:Boolean)返回dataframe.this.type類型 true 和unpersist是同樣的做用false 是去除RDD

 

集成查詢:
一、 agg(expers:column*) 返回dataframe類型 ,按每一個device分組查最小時間

df = sqlContext.createDataFrame(tensRdd)
resultDF = df.groupBy("device_id").agg({RegularDataEtlConstants.TIME: 'min'})
resultDF.repartition(self._partitionNum).foreachPartition(lambda iterator: self.__saveToHBase(iterator))

 

startTime = df.filter((df.startTime != "") & (df.startTime >= minStartTimeCurrent)).agg({"startTime": "min"}).collect()[0][0]


四、 apply(colName: String) 返回column類型,捕獲輸入進去列的對象
五、 as(alias: String) 返回一個新的dataframe類型,就是原來的一個別名
六、 col(colName: String)  返回column類型,捕獲輸入進去列的對象
七、 cube(col1: String, cols: String*) 返回一個GroupedData類型,根據某些字段來彙總
八、 distinct 去重 返回一個dataframe類型
九、 drop(col: Column) 刪除某列 返回dataframe類型

 

columnList = ['key', 'type', 'timestamp', 'data']
df = sqlContext.createDataFrame(dataList[index], columnList)
for key in df.rdd.map(lambda x: x["key"]).distinct().collect():
   parquetPath = parquetList[index] + "/key=" + str(key)
   df.filter("key="+str(key)).drop("key").write.parquet(parquetPath, mode="append", partitionBy="type")


十、 dropDuplicates(colNames: Array[String]) 刪除相同的列 返回一個dataframe
十一、 except(other: DataFrame) 返回一個dataframe,返回在當前集合存在的在其餘集合不存在的


十二、 explode[A, B](inputColumn: String, outputColumn: String)行轉列

根據c3字段中的空格將字段內容進行分割,分割的內容存儲在新的字段c3_中
jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}

 


1三、 filter(conditionExpr: String): 刷選部分數據,返回dataframe類型

df.filter("age>10").show(); 
df.filter(df("age")>10).show();  
df.where(df("age")>10).show();

 

1四、 groupBy(col1: String, cols: String*) 分組  

dfgroupBy("age").avg().show();
1五、 intersect(other: DataFrame) 返回一個dataframe,在2個dataframe都存在的元素
1六、 join(right: DataFrame, joinExprs: Column, joinType: String)
一個是關聯的dataframe,第二個關聯的條件,第三個關聯的類型:inner, outer, left_outer, right_outer, leftsemi
df.join(ds,df("name")===ds("name") and  df("age")===ds("age"),"outer").show();
1七、 limit(n: Int) 返回dataframe類型  去n 條數據出來
1八、 na: DataFrameNaFunctions ,能夠調用dataframenafunctions的功能區作過濾 df.na.drop().show(); 刪除爲空的行
1九、 orderBy(sortExprs: Column*) 作alise排序
20、 select(cols:string*) dataframe 作字段的刷選 df.select($"colA", $"colB" + 1)
2一、 selectExpr(exprs: String*) 作字段的刷選 df.selectExpr("name","name as names","upper(name)","age+1").show();
2二、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默認是asc
2三、 unionAll(other:Dataframe) 合併 

 

df = df.unionAll(dfTemp).coalesce(int(self.partitionNum))


2四、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
2五、 withColumn(colName: String, col: Column) 增長一列

df中新增一個名爲aa的列,值與列name的同樣

 

df.withColumn("aa",df("name")).show(); 
將該列時間值計算加上時區偏移值 mergeDF
= mergeDF.withColumn("dm_transct_date_hr_key", functions.lit(self.__datehandle(mergeDF["dm_transct_date_hr_key"], self.timezoneOffset)))

 

Spark-SQL之DataFrame操做大全

http://blog.csdn.net/mtj66/article/details/52064827

相關文章
相關標籤/搜索