1、Spark Shell on Client
scala> var rdd =sc.parallelize(1 to 100 ,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.count
res0: Long = 100
scala> val rdd2=rdd.map(_ + 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26
scala> rdd2.take(3)
res1: Array[Int] = Array(2, 3, 4)
scala> val rdd1=sc.textFile("file://home/hadoop/apps/sparkwc" )
rdd1: org.apache.spark.rdd.RDD[String] = file://home/hadoop/apps/sparkwc MapPartitionsRDD[3] at textFile at <console>:24
cala> val rdd1=sc.textFile("file:///home/hadoop/apps/sparkwc" )
rdd1: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/apps/sparkwc MapPartitionsRDD[9] at textFile at <console>:24
scala> val rdd2=rdd
rdd rdd1 rdd2 rdd3 rddToDatasetHolder
scala> val rdd2=rdd1.flatMap(_.split("\t" ))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at flatMap at <console>:26
scala> val rdd3=rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:28
scala> val rdd4=rdd3.reduceByKey(_ + _)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[12] at reduceByKey at <console>:30
scala> rdd4.collect
res2: Array[(String, Int)] = Array((spark,1), (hadoop,1), (hello,3), (world,1))
scala> rdd4.collect
res2: Array[(String, Int)] = Array((spark,1), (hadoop,1), (hello,3), (world,1))
scala> rdd4.saveAsTextFile("file:///home/hadoop/apps/out1" )
[hadoop@hadoop01 apps]$ cd out1/
[hadoop@hadoop01 out1]$ ls
part-00000 _SUCCESS
[hadoop@hadoop01 out1]$ cat part-00000
(spark,1)
(hadoop,1)
(hello,3)
(world,1)
[hadoop@hadoop01 out1]$ pwd
/home/hadoop/apps/out1
複製代碼
WebUI 地址:http://192.168.43.20:4040/jobs/ node
2、Spark Shuffle
Shuffle Write:將Task中間結果數據寫入到本地磁盤
Shuffle Read:從Shuffle Write階段拉取數據到內存中並行計算
3、Shuffle Write(hash-based)
Shuffle Write階段產生的總文件數=MapTaskNum * ReduceTaskNum
TotalBufferSize=CoreNum * ReducceTaskNum*FileBufferSize
產生大量小文件,佔用更多的內存緩衝區,形成沒必要要的內存開銷,增長 了磁盤IO和網絡開銷
4、Shuffle Write(hash-based優化)
Shuffle Write階段產生的總文件數=CoreNum * ReduceTaskNum
TotalBufferSize=CoreNum * ReducceTaskNum*FileBufferSize 減小了小文件產生的個數,可是佔用內存緩衝區的大小沒變
設置方法
conf.set("spark.shuffle.manager", "hash")
在conf/spark-default.conf 配置文件中添加spark.shuffle.manager=hash
5、Shuffle Write(hash-based優化)Shuffle Write(sort-based)
Shuffle Write階段產生的總文件數= MapTaskNum * 2
優勢: 順序讀寫可以大幅提升磁盤IO性能,不會產生過多小文件,下降文件緩存佔用內存空間大小,提升內存使用率。
缺點:多了一次粗粒度的排序。
設置方法
代碼中設置:conf.set("spark.shuffle.manager", "sort")
在conf/spark-default.conf 配置文件中添加spark.shuffle.manager=sort
6、Shuffle Read
hase-based和sort-based使用相同的shuffle read實現
7、Spark History Server配置
spark history server查看運行完成的做業信息和日誌
配置Hadoop的yarn-site.xml文件,全部節點配置文件同步,重啓yarn
<property>
<name>yarn.log.server.url</name>
<value>http://node02:19888/jobhistory/logs</value>
<description> Yarn JobHistoryServer訪問地址 </description>
</property>
複製代碼
修改spark安裝包conf目錄下的spark-defaults.conf(若是沒有該文件, 經過spark-defaults.conf.template模板複製一個),spark history server 在192.168.183.100節點啓動,spark_logs這個目錄須要在HDFS上提早建立
spark.yarn.historyServer.address=192.168.183.100:18080 spark.history.ui.port=18080
spark.eventLog.enabled=true spark.eventLog.dir=hdfs:///spark_logs
spark.history.fs.logDirectory=hdfs:///spark_logs
複製代碼
1.Spark History Server啓動
sbin/start-history-server.sh
複製代碼
httpL://192.168.183.100:18080
複製代碼
7、Spark運行環境優化
將spark系統jar包上傳到HDFS上,直接使用HDFS上的文件
在spark安裝目錄下運行:jar cv0f spark-libs.jar -C jars/ .
將spark安裝目錄下生成的spark-libs.jar上傳到HDFS上的 /system/spark(須要手動建立)目錄下
hadoop fs -put spark-libs.jar /system/spark
複製代碼
修改spark安裝包conf目錄下spark-defaults.conf配置文件添加spark- libs.jar在HDFS上的路徑數據庫
spark.yarn.archive=hdfs:///system/spark/spark-libs.jar
複製代碼
8、Spark編程模型
建立SparkContext
建立RDD
在RDD上進行transformation和action
spark提供了豐富的transformation和action算子
返回結果
1.提交Spark程序到Yarn上
2.Spark RDD算子分類
Transformation轉換操做,惰性執行,不觸發app執行
針對Value數據類型,如map、filter
針對Key-Value數據類型,如groupByKey、reduceByKey
Action執行操做,觸發app執行
3.建立RDD
parallelize從集合建立RDD
參數1:Seq集合,必須
參數2:分區數
建立RDD:val rdd = sc. parallelize(List(1,2,3,4,5,6,7),3)
查看RDD分區數:rdd.partitions.size
textFile從外部數據源(本地文件或者HDFS數據集)建立RDD
參數1:外部數據源路徑,必須
參數2:最小分區數
從本地文件建立RDD:val rdd = sc.textFile("file:///home/hadoop/apps/in")
從HDFS數據集建立RDD:val rdd = sc.textFile("hdfs:///data/wc/in",1)
4.Value數據類型Transformation
map
輸入是一個RDD,將一個RDD中的每一個數據項,經過map中的函數映射輸出一個新的RDD,輸入分區與輸出分區一一對應
flatMap
distinct
coalesce
對RDD進行重分區
第一個參數爲重分區的數目
第二個爲是否進行shuffle,默認爲false,若是重分區以後分區數目大於 原RDD的分區數,則必須設置爲true
repartition
對RDD進行重分區, 等價於coalesce第二個參數設置爲true
union
mapPartitions
針對RDD的每一個分區進行操做,接收一個可以處理迭代器的函數做爲參數
若是RDD處理的過程當中,須要頻繁的建立額外對象,使用mapPartitions要比使用map的性能高不少,如:建立數據庫鏈接
mapPartitionsWithIndex
與mapPartitions功能相似,接收一個第一個參數是分區索引,第二個參數是分區迭代器的函數
zip
拉鍊操做,將兩個RDD組合成Key-Value形式的RDD,保證兩個RDD的partition數量和元素個數要相同,不然會拋出異常
mapValues
groupByKdy
將RDD[K,V]中每一個K對應的V值,合併到一個集合Iterable[V]中
reduceByKey
將RDD[K,V]中每一個K對應的V值根據傳入的映射函數計算
join -返回兩個RDD根據K能夠關聯上的結果,join只能用於兩個RDD之間的關聯,若是要多個RDD關聯,須要關聯屢次
5.RDD Action
collect
saveAsTextFile
take
count
6.Spark優化-Cache應用
7.Accumulator計數器
accumulator累加器,計數器
accumulator累加器,計數器
一般用於監控,調試,記錄關鍵數據處理的數目等
分佈式計數器,在Driver端彙總
val total_counter = sc.accumulator(0L,"total_counter" )
val resultRdd = rowRdd.flatMap(_.split("\t" )).map(x=>{ total_counter += 1
(x,1)
}).reduceByKey(_ + _)
複製代碼
經過Spark Web UI查看 apache