spark partition

HA方式啓動spark

#HA方式啓動spark,當Leader,掛掉的時候,standy變爲alive
./bin/spark-shell --master spark://xupan001:7070,xupan002:7070git

 

指定分區

#指定兩個分區,會生成兩個做業task,hdfs上會有兩個文件
 val rdd1 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9), 2)
rdd1.partitions.length //2
#saveAsTextFile
 rdd1.saveAsTextFile("hdfs://xupan001:8020/user/root/spark/output/partition")github

Permission Owner Group Size Replication Block Size Name
-rw-r--r-- root supergroup 0 B 1 128 MB _SUCCESS
-rw-r--r-- root supergroup 8 B 1 128 MB part-00000
-rw-r--r-- root supergroup 10 B 1 128 MB part-00001

 

cores相關

若是沒有指定分區數:文件個數和cores有關,也就是可用核數有關(總核數)
val  rdd1 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9))
rdd1.partitions.length //6
rdd1.saveAsTextFile("hdfs://xupan001:8020/user/root/spark/output/partition2")shell

Permission Owner Group Size Replication Block Size Name
-rw-r--r-- root supergroup 0 B 1 128 MB _SUCCESS
-rw-r--r-- root supergroup 2 B 1 128 MB part-00000
-rw-r--r-- root supergroup 4 B 1 128 MB part-00001
-rw-r--r-- root supergroup 2 B 1 128 MB part-00002
-rw-r--r-- root supergroup 4 B 1 128 MB part-00003
-rw-r--r-- root supergroup 2 B 1 128 MB part-00004
-rw-r--r-- root supergroup 4 B 1 128 MB part-00005

 

對照基本配置:

  • URL: spark://xupan001:7070
  • REST URL: spark://xupan001:6066 (cluster mode)
  • Alive Workers: 3
  • Cores in use: 6 Total, 6 Used
  • Memory in use: 6.0 GB Total, 3.0 GB Used
  • Applications: 1 Running, 5 Completed
  • Drivers: 0 Running, 0 Completed
  • Status: ALIVE

Workersapache

Worker Id Address State Cores Memory
worker-20171211031717-192.168.0.118-7071 192.168.0.118:7071 ALIVE 2 (2 Used) 2.0 GB (1024.0 MB Used)
worker-20171211031718-192.168.0.119-7071 192.168.0.119:7071 ALIVE 2 (2 Used) 2.0 GB (1024.0 MB Used)
worker-20171211031718-192.168.0.120-7071 192.168.0.120:7071 ALIVE 2 (2 Used) 2.0 GB (1024.0 MB Used)

 

 

======================================================app

hdfs文件大小相關

從hdfs上讀取文件若是沒有指定分區,默認爲2個分區
scala> val rdd = sc.textFile("hdfs://xupan001:8020/user/root/spark/input/zookeeper.out")
scala> rdd.partitions.length
res3: Int = 2oop

/**
 * Default min number of partitions for Hadoop RDDs when not given by user
 * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
 * The reasons for this are discussed in https://github.com/mesos/spark/pull/718
 */
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

 

若是hdfs文件很大,則會根據 文件Size/128個partition,若是餘數不足128則Size/128 + 1個partition測試

 

總結:以上是我在spark2.2.0上作的測試:
1.若是是Driver端的Scala集合並行化建立RDD,而且沒有指定RDD的分區,RDD的分區數就是Application分配的總cores數
2:若是是hdfs文件系統的方式讀取數據this

2.1一個文件文件的大小小於128M
scala> val rdd = sc.textFile("hdfs://xupan001:8020/user/root/spark/input/zookeeper.out",1)
scala> rdd.partitions.length
res0: Int = 1spa

2.2多個文件,其中一個文件大大小爲:scala

Permission Owner Group Size Replication Block Size Name
-rw-r--r-- root supergroup 4.9 KB 1 128 MB userLog.txt
-rw-r--r-- root supergroup 284.35 MB 1 128 MB userLogBig.txt
-rw-r--r-- root supergroup 51.83 KB 1 128 MB zookeeper.out

 

scala> val rdd = sc.textFile("hdfs://xupan001:8020/user/root/spark/input")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://xupan001:8020/user/root/spark/input MapPartitionsRDD[3] at textFile at <console>:24

 

scala> rdd.partitions.length
res1: Int = 5

userLogBig.txt會有3個block

相關文章
相關標籤/搜索