Spark RDD的默認分區數:(spark 2.1.0)

本文基於Spark 2.1.0版本node

新手首先要明白幾個配置:shell

spark.default.parallelism:(默認的併發數)json

    若是配置文件spark-default.conf中沒有顯示的配置,則按照以下規則取值:併發

    本地模式(不會啓動executor,由SparkSubmit進程生成指定數量的線程數來併發):分佈式

    spark-shell                              spark.default.parallelism = 1oop

    spark-shell --master local[N] spark.default.parallelism = N (使用N個核)spa

    spark-shell --master local      spark.default.parallelism = 1線程

    僞集羣模式(x爲本機上啓動的executor數,y爲每一個executor使用的core數,scala

z爲每一個 executor使用的內存)orm

     spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y

     mesos 細粒度模式

     Mesos fine grained mode  spark.default.parallelism = 8

    其餘模式(這裏主要指yarn模式,固然standalone也是如此)

    Others: total number of cores on all executor nodes or 2, whichever is larger

    spark.default.parallelism =  max(全部executor使用的core總數, 2)

通過上面的規則,就能肯定了spark.default.parallelism的默認值(前提是配置文件spark-default.conf中沒有顯示的配置,若是配置了,則spark.default.parallelism = 配置的值)

還有一個配置比較重要,spark.files.maxPartitionBytes = 128 M(默認)

The maximum number of bytes to pack into a single partition when reading files.

表明着rdd的一個分區能存放數據的最大字節數,若是一個400m的文件,只分了兩個區,則在action時會發生錯誤。

當一個spark應用程序執行時,生成spark.context,同時會生成兩個參數,由上面獲得的spark.default.parallelism推導出這兩個參數的值

sc.defaultParallelism     = spark.default.parallelism

sc.defaultMinPartitions = min(spark.default.parallelism,2)

當sc.defaultParallelism和sc.defaultMinPartitions最終確認後,就能夠推算rdd的分區數了。

有兩種產生rdd的方式:

1,經過scala 集合方式parallelize生成rdd,

如, val rdd = sc.parallelize(1 to 10)

這種方式下,若是在parallelize操做時沒有指定分區數,則

rdd的分區數 = sc.defaultParallelism

2,經過textFile方式生成的rdd,

如, val rdd = sc.textFile(「path/file」)

有兩種狀況:

a,從本地文件file:///生成的rdd,操做時若是沒有指定分區數,則默認分區數規則爲:

(按照官網的描述,本地file的分片規則,應該按照hdfs的block大小劃分,但實測的結果是固定按照32M來分片,多是bug,不過不影響使用,由於spark能用全部hadoop接口支持的存儲系統,因此spark textFile使用hadoop接口訪問本地文件時和訪問hdfs仍是有區別的)

rdd的分區數 = max(本地file的分片數, sc.defaultMinPartitions)

b,從hdfs分佈式文件系統hdfs://生成的rdd,操做時若是沒有指定分區數,則默認分區數規則爲:

rdd的分區數 = max(hdfs文件的block數目, sc.defaultMinPartitions)

補充:

1,若是使用以下方式,從HBase的數據錶轉換爲RDD,則該RDD的分區數爲該Table的region數。

String tableName ="pic_test2";

conf.set(TableInputFormat.INPUT_TABLE,tableName);

conf.set(TableInputFormat.SCAN,convertScanToString(scan));

JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf,

TableInputFormat.class,ImmutableBytesWritable.class,

Result.class);

Hbase Table:pic_test2的region爲10,則hBaseRDD的分區數也爲10。

2,若是使用以下方式,經過獲取json(或者parquet等等)文件轉換爲DataFrame,則該DataFrame的分區數和該文件在文件系統中存放的Block數量對應。

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

people.json大小爲300M,在HDFS中佔用了2個blocks,則該DataFrame df分區數爲2。

3,Spark Streaming獲取Kafka消息對應的分區數,不在本文討論。

做者:俺是亮哥 連接:https://www.jianshu.com/p/4b7d07e754fa 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。
相關文章
相關標籤/搜索