Spark 程序設計

1、Spark Shell on Client

scala> var rdd =sc.parallelize(1 to 100 ,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.count
res0: Long = 100  
scala> val rdd2=rdd.map(_ + 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26
scala> rdd2.take(3)
res1: Array[Int] = Array(2, 3, 4)
scala> val rdd1=sc.textFile("file://home/hadoop/apps/sparkwc")
rdd1: org.apache.spark.rdd.RDD[String] = file://home/hadoop/apps/sparkwc MapPartitionsRDD[3] at textFile at <console>:24
cala> val rdd1=sc.textFile("file:///home/hadoop/apps/sparkwc")
rdd1: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/apps/sparkwc MapPartitionsRDD[9] at textFile at <console>:24

scala> val rdd2=rdd
rdd   rdd1   rdd2   rdd3   rddToDatasetHolder

scala> val rdd2=rdd1.flatMap(_.split("\t"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at flatMap at <console>:26

scala> val rdd3=rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:28

scala> val rdd4=rdd3.reduceByKey(_ + _)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[12] at reduceByKey at <console>:30

scala> rdd4.collect
res2: Array[(String, Int)] = Array((spark,1), (hadoop,1), (hello,3), (world,1))
scala> rdd4.collect
res2: Array[(String, Int)] = Array((spark,1), (hadoop,1), (hello,3), (world,1))

scala> rdd4.saveAsTextFile("file:///home/hadoop/apps/out1")
[hadoop@hadoop01 apps]$ cd out1/
[hadoop@hadoop01 out1]$ ls
part-00000  _SUCCESS
[hadoop@hadoop01 out1]$ cat part-00000 
(spark,1)
(hadoop,1)
(hello,3)
(world,1)
[hadoop@hadoop01 out1]$ pwd
/home/hadoop/apps/out1

複製代碼

WebUI 地址:http://192.168.43.20:4040/jobs/node

2、Spark Shuffle

  • Shuffle Write:將Task中間結果數據寫入到本地磁盤
  • Shuffle Read:從Shuffle Write階段拉取數據到內存中並行計算
    SparkShuffle Write

3、Shuffle Write(hash-based)

  • Shuffle Write階段產生的總文件數=MapTaskNum * ReduceTaskNum
  • TotalBufferSize=CoreNum * ReducceTaskNum*FileBufferSize
  • 產生大量小文件,佔用更多的內存緩衝區,形成沒必要要的內存開銷,增長 了磁盤IO和網絡開銷
    Shuffle Write

4、Shuffle Write(hash-based優化)

  • Shuffle Write階段產生的總文件數=CoreNum * ReduceTaskNum
  • TotalBufferSize=CoreNum * ReducceTaskNum*FileBufferSize 減小了小文件產生的個數,可是佔用內存緩衝區的大小沒變
  • 設置方法
    • conf.set("spark.shuffle.manager", "hash")
    • 在conf/spark-default.conf 配置文件中添加spark.shuffle.manager=hash
      Shuffle Write優化

5、Shuffle Write(hash-based優化)Shuffle Write(sort-based)

  • Shuffle Write階段產生的總文件數= MapTaskNum * 2
  • 優勢: 順序讀寫可以大幅提升磁盤IO性能,不會產生過多小文件,下降文件緩存佔用內存空間大小,提升內存使用率。
  • 缺點:多了一次粗粒度的排序。
  • 設置方法
  • 代碼中設置:conf.set("spark.shuffle.manager", "sort")
  • 在conf/spark-default.conf 配置文件中添加spark.shuffle.manager=sort
    sort-based

6、Shuffle Read

  • hase-based和sort-based使用相同的shuffle read實現
    Shuffle Read

7、Spark History Server配置

  • spark history server查看運行完成的做業信息和日誌
  • 配置Hadoop的yarn-site.xml文件,全部節點配置文件同步,重啓yarn
<property>
<name>yarn.log.server.url</name>
<value>http://node02:19888/jobhistory/logs</value>
<description> Yarn JobHistoryServer訪問地址 </description>
</property>
複製代碼
  • 修改spark安裝包conf目錄下的spark-defaults.conf(若是沒有該文件, 經過spark-defaults.conf.template模板複製一個),spark history server 在192.168.183.100節點啓動,spark_logs這個目錄須要在HDFS上提早建立
spark.yarn.historyServer.address=192.168.183.100:18080  spark.history.ui.port=18080
spark.eventLog.enabled=true  spark.eventLog.dir=hdfs:///spark_logs
spark.history.fs.logDirectory=hdfs:///spark_logs
複製代碼

1.Spark History Server啓動

  • 啓動Spark History Server
sbin/start-history-server.sh
複製代碼
  • Spark History Server訪問地址
httpL://192.168.183.100:18080
複製代碼

Spark history

7、Spark運行環境優化

  • 將spark系統jar包上傳到HDFS上,直接使用HDFS上的文件
  • 在spark安裝目錄下運行:jar cv0f spark-libs.jar -C jars/ .
  • 將spark安裝目錄下生成的spark-libs.jar上傳到HDFS上的 /system/spark(須要手動建立)目錄下
hadoop fs -put spark-libs.jar /system/spark
複製代碼

修改spark安裝包conf目錄下spark-defaults.conf配置文件添加spark- libs.jar在HDFS上的路徑數據庫

spark.yarn.archive=hdfs:///system/spark/spark-libs.jar
複製代碼

8、Spark編程模型

  • 建立SparkContext
    • 封裝了spark執行環境信息
  • 建立RDD
    • 能夠用scala集合或hadoop數據文件建立
  • 在RDD上進行transformation和action
    • spark提供了豐富的transformation和action算子
  • 返回結果
    • 保存到hdfs、其餘外部存儲、直接打印

1.提交Spark程序到Yarn上

2.Spark RDD算子分類

  • Transformation轉換操做,惰性執行,不觸發app執行
    • 針對Value數據類型,如map、filter
    • 針對Key-Value數據類型,如groupByKey、reduceByKey
  • Action執行操做,觸發app執行

3.建立RDD

  • parallelize從集合建立RDD
    • 參數1:Seq集合,必須
    • 參數2:分區數
    • 建立RDD:val rdd = sc. parallelize(List(1,2,3,4,5,6,7),3)
    • 查看RDD分區數:rdd.partitions.size
  • textFile從外部數據源(本地文件或者HDFS數據集)建立RDD
    • 參數1:外部數據源路徑,必須
    • 參數2:最小分區數
    • 從本地文件建立RDD:val rdd = sc.textFile("file:///home/hadoop/apps/in")
    • 從HDFS數據集建立RDD:val rdd = sc.textFile("hdfs:///data/wc/in",1)

4.Value數據類型Transformation

  • map
    • 輸入是一個RDD,將一個RDD中的每一個數據項,經過map中的函數映射輸出一個新的RDD,輸入分區與輸出分區一一對應
  • flatMap
    • 與map算子功能相似,能夠將嵌套類型數據拆開展平
  • distinct
    • 對RDD元素進行去重
  • coalesce
    • 對RDD進行重分區
    • 第一個參數爲重分區的數目
    • 第二個爲是否進行shuffle,默認爲false,若是重分區以後分區數目大於 原RDD的分區數,則必須設置爲true
  • repartition
    • 對RDD進行重分區, 等價於coalesce第二個參數設置爲true
  • union
    • 將兩個RDD進行合併,不去重
  • mapPartitions
    • 針對RDD的每一個分區進行操做,接收一個可以處理迭代器的函數做爲參數
    • 若是RDD處理的過程當中,須要頻繁的建立額外對象,使用mapPartitions要比使用map的性能高不少,如:建立數據庫鏈接
  • mapPartitionsWithIndex
    • 與mapPartitions功能相似,接收一個第一個參數是分區索引,第二個參數是分區迭代器的函數
  • zip
    • 拉鍊操做,將兩個RDD組合成Key-Value形式的RDD,保證兩個RDD的partition數量和元素個數要相同,不然會拋出異常
  • mapValues
    • 針對[K,V]中的V值進行map操做
  • groupByKdy
    • 將RDD[K,V]中每一個K對應的V值,合併到一個集合Iterable[V]中
  • reduceByKey
    • 將RDD[K,V]中每一個K對應的V值根據傳入的映射函數計算
  • join -返回兩個RDD根據K能夠關聯上的結果,join只能用於兩個RDD之間的關聯,若是要多個RDD關聯,須要關聯屢次

5.RDD Action

  • collect
    • 將一個RDD轉換成數組,經常使用於調試
  • saveAsTextFile
    • 用於將RDD以文本文件的格式存儲到文件系統中
  • take
    • 根據傳入參數返回RDD的指定個數元素
  • count
    • 返回RDD中元素數量

6.Spark優化-Cache應用

Cache應用

7.Accumulator計數器

  • accumulator累加器,計數器
    • accumulator累加器,計數器
    • 一般用於監控,調試,記錄關鍵數據處理的數目等
    • 分佈式計數器,在Driver端彙總
val total_counter = sc.accumulator(0L,"total_counter")  
val resultRdd = rowRdd.flatMap(_.split("\t")).map(x=>{  total_counter += 1
(x,1)
}).reduceByKey(_ + _)
複製代碼

經過Spark Web UI查看 apache

Accumulator計數器
相關文章
相關標籤/搜索