分區的意義node
Spark RDD 是一種分佈式的數據集,因爲數據量很大,所以它被切分紅不一樣分區並存儲在各個Worker節點的內存中。從而當咱們對RDD進行操做時,其實是對每一個分區中的數據並行操做。Spark根據字段進行partition相似於關係型數據庫中的分區,能夠加大並行度,提升執行效率。Spark從HDFS讀入文件的分區數默認等於HDFS文件的塊數(blocks),HDFS中的block是分佈式存儲的最小單元。 數據庫
1. RDD repartition和partitionBy的區別apache
spark中RDD兩個經常使用的重分區算子,repartition 和 partitionBy 都是對數據進行從新分區,默認都是使用 HashPartitioner,區別在於partitionBy 只能用於 PairRdd(key-value類型的數據),可是當它們同時都用於 PairRdd時,效果也是不同的。reparation的分區比較的隨意,沒有什麼規律,而partitionBy把相同的key都分到了同一個分區。分佈式
val parRDD = pairRDD.repartition(10) //重分區爲10;
val parRDD = pairRDD.partitionBy(new HashPartitioner(10)) //重分區爲10;ide
import org.apache.log4j.{Level, Logger} import org.apache.spark.{HashPartitioner, SparkConf, SparkContext, TaskContext} import org.apache.spark.rdd.RDD object PartitionDemo { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("localTest").setMaster("local[4]") val sc = new SparkContext(conf) val rdd = sc.parallelize(List("hello", "jason", "what", "are", "you", "doing","hi","jason","do","you","eat","dinner", "hello","jason","do","you","have","some","time","hello","jason","time","do","you","jason","jason"),4) //設置4個分區; val word_count = rdd.flatMap(_.split(",")).map((_,1)) val repar = word_count.repartition(10) //重分區爲10; val parby = word_count.partitionBy(new HashPartitioner(10)) //重分區爲10; print(repar) print(parby) } def print(rdd : RDD[(String, Int)]) = { rdd.foreachPartition(pair=>{ println("partion " + TaskContext.get.partitionId + ":") pair.foreach(p=>{ println(" " + p) }) }) println } }
partitionBy的三種分區方式:url
一、HashPartitioner
val parRDD= pairRDD.partitionBy(new HashPartitioner(3))
HashPartitioner肯定分區的方式:partition = key.hashCode () % numPartitionsspa
二、RangePartitioner
val parRDD= pairRDD.partitionBy(new RangePartitioner(3,counts))
RangePartitioner會對key值進行排序,而後將key值被劃分紅3份key值集合。.net
三、CustomPartitioner
CustomPartitioner能夠根據本身具體的應用需求,自定義分區。3d
class CustomPartitioner(numParts: Int) extends Partitioner { override def numPartitions: Int = numParts override def getPartition(key: Any): Int = { if(key==1)){ 0 } else if (key==2){ 1} else{ 2 } } } val parRDD = pairRDD.partitionBy(new CustomPartitioner(3))
2. DataFrame分區 指針
1. repartition:根據字段分區
val regionDf = peopleDf.repartition($"region")
2. coalesce: coalesce通常用於合併/減小分區,將數據從一個分區移到另外一個分區。
val peopleDF2= peopleDF.coalesce(2) // 原來分區爲4,減小到2, 沒法增長分區數,例如peopleDF.coalesce(6)執行完分區仍是4
兩者區別:The repartition algorithm does a full shuffle of the data and creates equal sized partitions of data. coalesce combines existing partitions to avoid a full shuffle.
爲何使用repartition而不用coalesce? A full data shuffle is an expensive operation for large data sets, but our data puddle is only 2,000 rows. The repartition method returns equal sized text files, which are more efficient for downstream consumers. (non-partitioned) It took 241 seconds to count the rows in the data puddle when the data wasn’t repartitioned (on a 5 node cluster). (partitioned) It only took 2 seconds to count the data puddle when the data was partitioned — that’s a 124x speed improvement!
3. DataFrameWriter 分段和分區
1. bucketBy:分段和排序僅適用於持久表。 對於基於文件的數據源,能夠對輸出進行分類。
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
2. partitionBy:分區則能夠同時應用於save和saveAsTable
peopleDF.write.partitionBy("region").format("parquet").save("people_partitioned.parquet")
saveAsTable 保存數據並持久化表
DataFrame可使用saveAsTable 命令將其做爲持久表保存到Hive Metastore中。Spark將爲您建立一個默認的本地Hive Metastore(使用Derby)。與createOrReplaceTempView命令不一樣的是, saveAsTable將實現DataFrame的內容並建立指向Hive Metastore中的數據的指針。即便您的Spark程序從新啓動後,永久性表格仍然存在,只要您保持與同一Metastore的鏈接便可。用於持久表的DataFrame能夠經過使用表的名稱調用tablea方法來建立SparkSession。
持久化表時您能夠自定義表格路徑 ,例如df.write.option("path", "/some/path").saveAsTable("t")。當表被刪除時,自定義表路徑將不會被刪除,表數據仍然存在。若是沒有指定自定義表格路徑,Spark會將數據寫入倉庫目錄下的默認表格路徑。當表被刪除時,默認的表路徑也將被刪除。
4. JDBC partition
Spark提供jdbc方法操做數據庫,每一個RDD分區都會創建一個單獨的JDBC鏈接。 儘管用戶能夠設置RDD的分區數目,在一些分佈式的shuffle操做(例如reduceByKey
和join)以後,RDD又會變成默認的分區數spark.default.parallelism,這種狀況下JDBC鏈接數可能超出數據庫的最大鏈接。Spark 2.1提供numPartitions 參數來設置JDBC讀寫時的分區數,能夠解決前面說的問題。若是寫數據時的分區數超過最大值,咱們能夠在寫以前使用方法
coalesce(numPartitions)來減小分區數。
val userDF = spark.read.format("jdbc").options(Map("url" -> url, "dbtable" -> sourceTable, "lowerBound"->"1", "upperBound"->"886500", "partitionColumn"->"user_id", "numPartitions"->"10")).load()
userDF.write.option("maxRecordsPerFile", 10000).mode("overwrite").parquet(outputDirectory)
userDF.repartition(10000).write.mode("overwrite").parquet(outputDirectory)
分區案例
val df = spark.read.format("jdbc").options(Map("url" -> url, "dbtable" -> sourceTable, "lowerBound"->"1", "upperBound"->"825485207", "partitionColumn"->"org_id", "numPartitions"->"10")).load()
(1) jdbc partition: df.write.format("com.databricks.spark.csv").mode("overwrite").save(s"$filePath/$filename"+"_readpar")
(2) maxRecordsPerFile: df.write.option("maxRecordsPerFile", 10000).format("com.databricks.spark.csv").mode("overwrite").save(s"$filePath/$filename"+"_maxRecd")
(3) repartition: df.repartition(4).write.format("com.databricks.spark.csv").mode("overwrite").save(s"$filePath/$filename"+"_repar")
(4) rdd key-value partitionBy: df.rdd.map(r => (r.getInt(1), r)).partitionBy(new HashPartitioner(10)).values.saveAsTextFile(s"$filePath/$filename"+"_rddhash")
(1) jdbc partition: 數據分佈不均勻,有些分區數據多有的少; key是有序的,根據bound區間將key分紅不一樣分區
(2) maxRecordsPerFile: 同上,當一個分區條數超過maxRecordsPerFile,會被拆分紅多個子分區,同一個Key可能所以被分到不一樣分區
(3) repartition: 分紅同等大小的分區(不能保證每一個分區的條數是同樣的); key是無序的,一樣的key可能在不一樣分區
(4) rdd key-value partitionBy: 使用partition方法將數據按照必定規則分區,能夠自定義分區規則
---------------------
做者:zhangzeyuan56
來源:CSDN
原文:https://blog.csdn.net/zhangzeyuan56/article/details/80935034
版權聲明:本文爲博主原創文章,轉載請附上博文連接!
---------------------
做者:junzhou134
來源:CSDN
原文:https://blog.csdn.net/m0_37138008/article/details/78936029
版權聲明:本文爲博主原創文章,轉載請附上博文連接!
---------------------
做者:JasonLeeblog
來源:CSDN
原文:https://blog.csdn.net/xianpanjia4616/article/details/84328928