大數據-spark

Spark是用於大規模數據處理的快速通用的計算引擎。

相較MR快的緣由:其任務中間結果存在內存中,在迭代運算中尤其明顯,DAG的設置。java

架構說明:node

  • Dirver:負責節點通信,task分發,結果回收linux

  • Worker:資源管理的從節點shell

  • Master:資源調度的主節點apache

RDD 彈性分佈式數據集

五大特性

  • RDD由一系列partition組成編程

  • 函數做用在partition上vim

  • RDD之間存在相互依賴數組

  • 分區器做用在KV格式的RDD上瀏覽器

  • RDD提供了一系列最佳的計算位置,數據本地化,計算向數據移動緩存

RDD自己實際不存儲數據,爲了便於理解暫時認爲是存數據的

屬性說明

  • RDD的彈性來源於:partition的大小和數量可變

  • RDD的容錯來源於:RDD之間的依賴關係

  • RDD的分佈式來源於:partation分佈在不一樣的節點

  • KV格式的數據,RDD中的數據以二元組對象的形式存儲

RDD建立

scala:經過SparkConf對象設置參數,SparkContext接收SparkConf對象,生成上下文context,context的textFile方法載入數據源,返回第一個RDD,基於算子對RDD進行處理

val conf = new SparkConf()
conf.setMaster("local").setAppName("wc")
val sc = new SparkContext(conf)
//sc可以設置checkpiont目錄,日誌打印級別等
sc.setlogLevel("WARN");
sc.checkpoint("hdfs://node1:9000/spark/checkpoint");
val RDD: RDD[String] = sc.textFile("data")

java:SparkConf類設置參數,JavaSparkContext接收SparkConf對象,生成上下文context,context的textFile方法載入數據源,返回第一個RDD,基於算子對RDD進行處理

SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("location").setAppName("wc");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
//可以將jsc對象轉爲sc對象,執行sc的方法
SparkContext sc = jsc.sc();
JavaRDD<String> data = jsc.textFile("data");

建立RDD的主要方法

  • textFile

    JavaRDD<String> data = jsc.textFile("data");
  • parallelize 將容器轉爲RDD,可以執行分區數

    JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1,2,3),3);
  • parallelizePair

    JavaPairRDD<String, Integer> rdd1 = 
       jsc.parallelizePairs(Arrays.asList(
               new Tuple2<>("a", 1),
               new Tuple2<>("a", 2)
      ));

部署

Standalone模式

  • 安裝

    • tar -zxvf spark-2.3.1-bin-hadoop2.6.tgz

    • mv spark-2.3.1-bin-hadoop2.6.tgz spark-2.3.1 更名

  • 配置

    • cd /opt/sxt/spark-2.3.1/conf 進入目錄

    • mv slaves.template slaves 修改slaves文件

      • vim slaves

        • node1 node2 node3

    • mv spark-env.sh.template spark-env.sh

      • vim spark-env.sh

        • JAVA_HOME=/usr/java/jdk1.8.0_11 配置java_home路徑

        • SPARK_MASTER_HOST=node1 master的ip

        • SPARK_MASTER_PORT=7077 提交任務的端口,默認是7077

        • SPARK_WORKER_CORES=1 每一個worker從節點可以支配的核數

        • SPARK_WORKER_MEMORY=2g 每一個worker從節點可以支配的內存

  • 拷貝到其餘節點

    cd /opt/sxt
    scp -r spark-2.3.1 node2:`pwd`
    scp -r spark-2.3.1 node3:`pwd`
  • 啓動(在主節點上)

    cd /opt/sxt/spark-2.3.1/sbin

    ./start-all.sh (關閉: ./stop-all.sh)

    UI鏈接:node1:8080 (修改端口:start-master.sh)

yarn模式

  • 修改spark配置文件spark-env.sh

    添加hadoop配置路徑 HADOOP_CONF_DIR=/opt/sxt/hadoop-2.6.5/etc/hadoop

  • 修改hadoop的配置,處理兼容

    vim /opt/sxt/hadoop-2.6.5/etc/hadoop/yarn-site.xml

    <property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
    </property>
  • 啓動

    yarn:start-all.sh

    spark:cd /opt/sxt/spark/sbin ./start-all.sh

Master-HA配置

  • 修改spark-env.sh配置文件

    export SPARK_DAEMON_JAVA_OPTS="
    -Dspark.deploy.recoveryMode=ZOOKEEPER
    -Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181
    -Dspark.deploy.zookeeper.dir=/sparkmaster0821"
  • 配置到各節點 scp spark-env.sh node2://opt/sxt/spark/conf

  • 配置備用master節點,修改該節點的 spark-env.sh配置中的SPARK_MASTER_HOST

    SPARK_MASTER_HOST=node2
  • HA啓動

    • 主節點啓動進程:./start-all.sh

    • 備用節點啓動master進程:./start-master.sh

任務執行流程

Standalone-client模式

cd /opt/sxt/spark-2.3.1/bin
#指定任務提交地址,指定提交方式,指定主類,指定jar包位置 ,其餘參數
./spark-submit
--master spark://node1:7077
--deploy-mode client
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar
100
  • 執行流程

    • 集羣啓動時,worker向master彙報資源

    • client提交任務,客戶端中啓動Driver進程

    • Driver向Master申請啓動Application的資源

    • Master啓動worker節點的excutor進程,excutor進程反向註冊到Driver上

    • Driver將task發送到worker的excutor進程中執行

    • Driver監控task,接收worker的執行結果

  • 適用測試環境,多任務執行時Driver網卡壓力過大

Standalone-cluster模式

cd /opt/sxt/spark-2.3.1/bin
#指定任務提交地址,指定提交方式,指定主類,指定jar包位置 ,其餘參數
./spark-submit
--master spark://node1:7077
--deploy-mode cluster
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar  
100
#須要jar包存放值hdfs中,或者每臺節點上
  • 執行過程

    1. 集羣啓動時,worker向master彙報資源

    2. 客戶端將任務提交到集羣,向Master請求啓動Driver

    3. Master選擇隨機節點建立Driver

    4. Driver啓動後向Master請求application的資源

    5. Master啓動worker節點的excutor進程,excutor進程反向註冊到Driver上

    6. Driver將task發送到worker的excutor進程中執行

    7. Driver監控task,接收worker的執行結果

  • 適用生產環境,客戶端只負責提交任務,Driver均衡分佈在集羣中下降單節點網卡壓力

yarn-clinet模式

cd /opt/sxt/spark-2.3.1/bin
#指定任務提交地址,指定提交方式,指定主類,指定jar包位置 ,其餘參數
./spark-submit
--master yarn–client
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar  
100
  • 執行流程

    • client將任務提交到RM(ResourceManager)中,在客戶端內建立Driver進程

    • RM隨機選取NM節點(NodeManager),建立AM進程(applicationMaster)

    • AM進程向RM請求applictaion的資源,RM響應一批container資源

    • AM根據資源啓動NM節點中的excutor進程

    • excutor反向註冊到Driver進程上,接收Driver發出的task並執行

    • Driver監控task,接收worker的執行結果

  • 使用測試環境,客戶端過多Driver進程致使網卡壓力大

  • 注:AM只執行啓動任務,不負責監控

yarn-cluster模式

cd /opt/sxt/spark-2.3.1/bin
#指定任務提交地址,指定提交方式,指定主類,指定jar包位置 ,其餘參數
./spark-submit
--master yarn–cluster
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar  
100
  • 執行流程

    • client向RM節點提交任務

    • RM選擇隨機NM節點建立AM(ApplicationMaster)

    • AM向RM請求application執行的資源,並接收響應的資源

    • AM根據響應,將請求發送到NM中啓動Excutor進程

    • Excutor反向註冊到AM所在節點,從AM接收task並執行

    • AM監控task,接收worker的執行結果

  • 適用於生產環境,執行Driver功能的AM隨機分佈在NM中,單點網卡壓力下降

中止集羣任務命令:yarn application -kill applicationID

寬窄依賴

形容RDD之間的依賴關係,基於父子RDD之間的partition關聯來判斷

  • 窄依賴

    • 父RDD中的partition與子RDD中的partition爲一對一或一對多

    • 不須要shuffle

  • 寬依賴

    • 父RDD中的partition指向多個子RDD中的partition,呈多對一關係

    • 須要執行shuffle

stage+管道

  • 一個application包括若干並行的job,一個觸發算子對應一個job,每一個job會被拆分爲多組相互關聯的任務組,這些任務組就是stage

  • stage劃分流程

    • spark根據RDD之間依賴關係構建DAG有向無環圖,並提交給DAGScheduler

    • DAGScheduler將DAG劃分若干相互依賴的stage,劃分依據是RDD之間的寬窄依賴

    • 逆向切分,沿DAG從後向前,遇到寬依賴劃分一個出stage

  • stage之間存在並行和串行兩種關聯

  • stage內部由一組並行的task構成,stage內部的並行度由最後一個RDD的分區數決定

  • task被送到executor上執行的工做單元

  • RDD的分區數主要在如下狀況中能夠指定

    • 讀取數據時指定

    • 具有寬依賴RDD位置

  • stage內部計算採用管道計算模式

    • 每一個task至關於一個管道,同一時間一次處理一條數據

    • task以高階函數 f4(f3(f2(f1(x)))) 的形式處理stage內多個RDD的代碼邏輯

    • 一個stage內部的task能夠具有不一樣邏輯

  • 管道中數據落地的環節

    • 指定持久化的RDD節點

    • 執行shuffle write 的過程當中

 

資源調度與任務調度

  1. spark集羣啓動,worker節點向Master節點彙報資源,Master掌握了集羣的資源。

  2. 客戶端向spark提交application,Master根據app的RDD依賴關係構建DAG有向無環圖。

  3. Master建立Driver進程,Driver進程中建立DAGScheduler和TaskScheduler調度器。

  4. TaskScheduler建立後向Master節點請求app的資源,Master根據請求在worker節點上啓動Executor進程,Executor進程反向註冊到Driver進程中。

  5. DAGScheduler根據DAG的寬窄依賴劃分stage,將stage封裝TaskSet爲交給TaskScheduler,TaskSet中封裝了stage中並行的task。

  6. TaskScheduler遍歷TaskSet,將task分配給Executor執行,即發送到Executor中的線程池ThreadPool中

  7. TaskScheduler監控task執行,Executor的ThreadPool狀態會響應給TaskScheduler。

  8. 監控過程

    1. 若task執行失敗,TaskScheduler從新發送task到Executor中,默認重試3次

    2. 若task重試失敗,則對應stage執行失敗,DAGScheduler從新發送stage到TaskScheduler中從新執行,上述重試默認4次。若重試失敗,則job失敗,app失敗

    3. TaskScheduler還不然重試執行緩慢(straggling)的task,TaskScheduler會發送新的task並行執行。關於執行結果採用推測執行機制,兩個task以先執行完的結果爲準,默認是關閉的,配置屬性爲spark.speculation。推測執行機制不適應ETL等數據插入的操做(數據衝恢復插入)和數據傾斜的狀況

  9. app的資源申請是粗粒度的,application申請的資源,須要等待所有task執行完畢纔會釋放。優勢是:不須要每一個task反覆請求資源,任務執行效率高;缺點:資源沒法充分利用。

    注:MR使用細粒度資源申請方法,task本身申請資源執行任務,每一個task執行完畢釋放資源,資源充分利用,但task啓動變慢。

內存管理

靜態內存管理

  • 60% spark.storage.memoryFraction 存儲內存分區

    • 90%存儲+序列化

      • 80% RDD存儲+廣播變量

      • 20% 解壓序列化

    • 10% 預留OOM

  • 20% spark.shuffle.memoryFraction shuffle內存分區

    • 80% shuffle聚合內存

    • 20% OOM預留

  • 剩餘 task計算

統一內存管理

  • 300M JVM

  • 75% spark.memory.storageFraction 存儲內存分區

    • RDD存儲+廣播變量

    • shuffle聚合

      二者動態調用

  • 剩餘 執行task

注:spark1.6之後默認統一內存管理,設置spark.memory.useLegacyMode置爲true,修改成靜態內存管理

 

 

任務參數

提交任務的參數 ./spark-submit ...

  • --master MASTER_URL

    spark://host:port mesos://host:port yarn local (默認)

  • --deploy-mode DEPLOY_MODE client(默認)/cluster

  • --class CLASS_NAME 主類(包+類)

  • --name NAME 任務名

  • --jars JARS 依賴jar包,逗號分隔

  • --files FILES 相關文件

  • --conf PROP=VALUE 配置屬性

  • --driver-memory Driver的內存,默認1024M

  • --executor-memory executor的最大內存,默認1G

  • 適用 standalone + cluster

    • --driver-cores driver的核數,默認1

  • 適用 standalone/Mesos + cluster

    • --supervise 失敗重啓Driver

  • 適用 standalone and Mesos

    • --total-executor-cores executor的總核數

  • 適用 standalone 或 YARN

    • --executor-cores 每一個executor的核數,默認1

      經過--total-executor-cores和--executor-cores限定executor的數量

  • 使用yarn

    • --driver-cores driver的核數

    • --queue 資源隊列名,默認default

    • --num-executors 指定executors 數量

SparkShell

Spark自帶的一個快速原型開發工具,支持scala語言交互式編程。

啓動命令 ./spark-shell --master spark://node1:7077 (win/linux)

執行任務 sc爲默認建立的上下文環境

#指定hdfs的文件進程wordcount ,shsxt是hdfs的集羣名
sc.textFile("hdfs://shsxt:9000/spark/aaa.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.foreach(println)

SparkUI

瀏覽器訪問啓動spark的主機 node1:8080 範圍UI頁面

能夠查看當前運行狀況和歷史運行狀況

history+UI

對應須要臨時保存日誌:在shell啓動或命令提交時配置以下屬性

  • --conf spark.eventLog.enabled=true 保存日誌

  • --conf spark.eventLog.dir=hdfs://shsxt/spark/test 指定hdfs存放路徑,shsxt是hdfs集羣名

對於須要對全部任務都須要保存日誌,須要配置spark-default.conf

  • 配置文件中加入,須要複製到全部節點

    #開啓記錄事件日誌的功能
    spark.eventLog.enabled           true
    #設置事件日誌存儲的目錄,shsxt是hdfs的集羣名
    spark.eventLog.dir                 hdfs://shsxt/spark/log
    #日誌優化選項,壓縮日誌
    spark.eventLog.compress         true
    #歷史日誌在hdfs的目錄,shsxt是hdfs的集羣名
    spark.history.fs.logDirectory hdfs://shsxt/spark/log
  • 在sbin中啓動HistoryServer:./start-history-server.sh

  • UI地址爲啓動HistoryServer節點+端口: node4:18080

廣播變量與計數器

廣播變量

  • spark代碼,RDD之外部分由Driver端執行,RDD之內的部分executor端的各task中執行。

  • 爲了不向每一個task重複發送公用的變量,使用廣播變量。

  • Driver將廣播變量發送到executor端中,executor中的task公用一個變量。

//scala
val mybroadcase: Broadcast[String] =sc.broadcast("廣播變量")
sc.textFile("/data").foreach(x=>{
   print( mybroadcase.value )
})
//java
Broadcast<String> xxx = jsc.broadcast("xxx");
jsc.textFile("/data").foreach(new VoidFunction<String>(){
   @Override
   public void call( String line) throws Exception {
       //取值
       tring value = xxx.value();
  }
});

計數器

  • 在廣播變量的基礎上,不借助count算子,只能實現execute層面的變量計數,沒法實現全局的事件計數。

  • 計數器實現:RDD外定義計數器(Driver中),在RDD內進行累加(Executor中),task完成彙總到Driver中實現計數。

  • 注:計數結果必須在Driver端解析,計數器默認從0開始計數,每一個Executor獨立計數後彙總到Driver累加。

//scala
val count = sc.longAccumulator
sc.textFile("/data").foreach(x=>{
   count.add(1)
})
val num: lang.Long =count.value

//java
SparkContext sc = jsc.sc();
LongAccumulator count = sc.longAccumulator();
jsc.textFile("/data").foreach(new VoidFunction<String>(){
   @Override
   public void call( String line) throws Exception {
       //取值
       count.add(1);
}});
Long value = count.value();

shuffle

在寬依賴的RDD之間存在shuffle過程,將父RDD的分區的數據shuffle進入子RDD中不一樣的分區。相似reducebykey算子,將相同ke進入一個分區進行處理。

  • Shuffle Writer:上游stage的map task保證當前分區中相同的key寫入一個分區文件中

  • Shuffle Read:下游stage的reduce task在全部機器中獲取屬於本身分區的分區文件

Spark2以後使用SortShuffle,1.2以前使用HashShuffle,二者之間並用

HashShuffle

  • 普通機制

    • 每一個map task處理後數據經過hash分區器寫入不一樣的buffer(默認32K)

    • 每一個buffer對應一個磁盤小文件,每一個buffer或小磁盤文件對應一個reduce task

    • reduce task拉取對應的磁盤小文件的數據

    • 小文件數量:map task數*reduce task數

    • 小文件過多致使:內存建立過多對象,容易OOM;拉取過多,通信波動和故障易致使拉取失敗(shuffle file cannot find) ,這種失敗須要DAGScheduler重試stager,容易致使任務失敗。

  • 合併機制

    • 一個executor進程中,全部map task公用一組buffer,減小磁盤小文件的數量

    • 小文件數量:executor數*reduce task數

SortShuffle

  • 普通機制

    • map task 將計算結果寫入本身的內存數據結構(默認5M)

    • shuffle設置定時器對內存數據結構的容量進行監控,若監控到大小達到閾值,向內存數據結構分配一倍的容量,直到節點剩餘容量不夠分配。此時啓動內存數據結構溢寫

    • 內存數據結構對內部數據排序分區,寫出到磁盤小文件,溢寫以batch形式去寫,一個batch對應1w條數據,batch做爲寫出緩存

    • maptask完畢後,磁盤小文件合併爲:一個數據文件+一個索引文件

    • reduce task經過索引拉取對應部分的數據

    • 生成 2*map task數的文件

  • bypass

    • 取消排序,直接將數據寫出都小文件中

    • bypass的觸發條件:reduce task數小於 spark.shuffle.sort.bypassMergeThreshold的參數值(默認200),使得小批量數據不排序

Shuffle尋址

  • MapOutputTracker管理磁盤小文件

    • MapOutputTrackerMaster主,driver進程中

    • MapOutputTrackerWorker從,Executor進程中

  • BlockManager 塊管理者

    • 分爲主從架構

      • BlockManagerMaster,driver進程中,在使用廣播變量和緩存數據時,BlockManagerSlave執行

      • BlockManagerWorker,Executor進程中,做爲從節點

    • 通用進程

      • DiskStore 磁盤管理

      • MemoryStore 內存管理

      • ConnectionManager 負責鏈接其餘BlockManagerWorker

      • BlockTransferService 負責數據傳輸

  • 尋址流程

    • 每一個maptask執行完畢將小文件地址封裝到MpStatus對象,經過MapOutputTrackerWorker向Driver的MapOutputTrackerMaster彙報

    • 全部maptask執行完畢,reducetask執行前,Excutor的MapOutputTrackerWorker向Driver端的MapOutputTrackerMaster請求磁盤小文件地址數據

    • Excutor的ConnectionManager鏈接其餘節點的ConnectionManager,再借助BlockTransferService進行數據傳輸

    • BlockTransferService一次啓動5個task拉取數據,一個task拉取最多48M數據

shuffle優化

reduce的OOM優化

  • 減小每次數據的拉取量,spark.reducer.maxSizeInFlight (64M)

  • 增大shuffle的內存分配

  • 提升executor總內存

優化方式

  • 代碼指定配置信息,優先級最高,硬編碼不推薦

  • 任務提交命令中 --conf 後指定參數 (推薦)

  • 在spark-default.conf,適用全部任務,優先級最低,範圍太廣不推薦

全部優化屬性:

  • spark.shuffle.file.buffer 默認32k

    • 參數說明:該參數用於設置shuffle write task的BufferedOutputStream的buffer緩衝大小。將數據寫到磁盤文件以前,會先寫入buffer緩衝中,待緩衝寫滿以後,纔會溢寫到磁盤。

    • 調優建議:內存資源充足能夠適當增長(好比64k),從而減小shuffle write過程當中溢寫次數,減小磁盤IO次數。性能提高範圍1%~5%。

  • spark.reducer.maxSizeInFlight 默認48m

    • 參數說明:設置shuffle read task的buffer緩衝大小,而這個buffer緩衝決定了每次可以拉取多少數據。

    • 調優建議:內存充足能夠適當增長(好比96m),從而減小拉取數據的次數,減小網絡IO,進而提高性能。性能提高範圍1%~5%。

  • spark.shuffle.io.maxRetries 默認3

    • 參數說明:shuffle read task從shuffle write task所在節點拉取屬於本身的數據時,失敗重試的最大次數。若是3次之後未成功,致使shuffle file not find錯誤和stage執行失敗。

    • 調優建議:對於那些包含了特別耗時的shuffle操做,增長重試最大次數(好比60次),以免因爲JVM的full gc或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數能夠大幅度提高穩定性。 shuffle file not find taskScheduler不負責重試task,由DAGScheduler負責重試stage

spark.shuffle.io.retryWait 默認值:5s 參數說明:具體解釋同上,該參數表明了每次重試拉取數據的等待間隔,默認是5s。 調優建議:建議加大間隔時長(好比60s),以增長shuffle操做的穩定性。

spark.shuffle.memoryFraction 默認值:0.2 參數說明:該參數表明了Executor內存中,分配給shuffle read task進行聚合操做的內存比例,默認是20%。 調優建議:在資源參數調優中講解過這個參數。若是內存充足,並且不多使用持久化操做,建議調高這個比例,給shuffle read的聚合操做更多內存,以免因爲內存不足致使聚合過程當中頻繁讀寫磁盤。在實踐中發現,合理調節該參數能夠將性能提高10%左右。

spark.shuffle.manager 默認值:sort|hash 參數說明:該參數用於設置ShuffleManager的類型。Spark 1.5之後,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默認選項,可是Spark 1.2以及以後的版本默認都是SortShuffleManager了。tungsten-sort與sort相似,可是使用了tungsten計劃中的堆外內存管理機制,內存使用效率更高。 調優建議:因爲SortShuffleManager默認會對數據進行排序,所以若是你的業務邏輯中須要該排序機制的話,則使用默認的SortShuffleManager就能夠;而若是你的業務邏輯不須要對數據進行排序,那麼建議參考後面的幾個參數調優,經過bypass機制或優化的HashShuffleManager來避免排序操做,同時提供較好的磁盤讀寫性能。這裏要注意的是,tungsten-sort要慎用,由於以前發現了一些相應的bug。

spark.shuffle.sort.bypassMergeThreshold 默認值:200 參數說明:當ShuffleManager爲SortShuffleManager時,若是shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程當中不會進行排序操做,而是直接按照未經優化的HashShuffleManager的方式去寫數據,可是最後會將每一個task產生的全部臨時磁盤文件都合併成一個文件,並會建立單獨的索引文件。 調優建議:當你使用SortShuffleManager時,若是的確不須要排序操做,那麼建議將這個參數調大一些,大於shuffle read task的數量。那麼此時就會自動啓用bypass機制,map-side就不會進行排序了,減小了排序的性能開銷。可是這種方式下,依然會產生大量的磁盤文件,所以shuffle write性能有待提升。

spark.shuffle.consolidateFiles 默認值:false 參數說明:若是使用HashShuffleManager,該參數有效。若是設置爲true,那麼就會開啓consolidate機制,會大幅度合併shuffle write的輸出文件,對於shuffle read task數量特別多的狀況下,這種方法能夠極大地減小磁盤IO開銷,提高性能。 調優建議:若是的確不須要SortShuffleManager的排序機制,那麼除了使用bypass機制,還能夠嘗試將spark.shffle.manager參數手動指定爲hash,使用HashShuffleManager,同時開啓consolidate機制。在實踐中嘗試過,發現其性能比開啓了bypass機制的SortShuffleManager要高出10%~30%。

 

 

案例

PV-UV

數據結構:用戶ip+地址+時間+電話+會話+域名
146.1.30.98 河南 2017-10-10 1512012307080  5263761960810313758    www.mi.com View

pv:每一個網站的當日訪問數

uv:每一個網站的當日獨立訪客,以會話爲準

//配置再也不重複,提供RDD0,值爲字符串格式的數據
val RDD1: RDD[(String, String)] = RDD0.map(x => {
  val y = x.split("\\s+")
  //會話+域名
  (y(5), y(4))
})
//groupByKey取出相同的網站的數據 set去重,count計數
val RDD2: RDD[(String, Iterable[String])] = RDD1.groupByKey()
RDD2.foreach(x => {
  println("網站:"+x._1)
  val it = x._2.iterator
  var count=0
  val set = Set[String]()
  while(it.hasNext){
    count+=1
    set.add(it.next())
  }
  println("PV:"+count+"   UV:"+ set.size )
})

二次排序

  • 設置數據的封裝類,該實現Serializable與Comparable<當前類>

    例子中比較 2個數字組成的數據

    public class SecondSortKey implements Serializable, Comparable<SecondSortKey> {
    	//序列化版本
        private static final long serialVersionUID = 1L;
        private int first;
        private int second;
        //set,get
        public int getFirst() { return first; }
        public void setFirst(int first) { this.first = first; }
        public int getSecond() { return second; }
        public void setSecond(int second) { this.second = second; }
        //構造器
        public SecondSortKey(int first, int second) {
            super(); 
            this.first = first; 
            this.second = second; 
        }
    //重寫比較方法,返回+-0數字
        @Override
        public int compareTo(SecondSortKey o1) {
    		//對兩組數據分別比較得出比價結果
            if (getFirst() - o1.getFirst() == 0) {
                return getSecond() - o1.getSecond();
            } else {
                return getFirst() - o1.getFirst();
            }
    
        }
    }
    
  • 數據載入RDD是,轉爲KV結構的RDD,key爲封裝類,value爲實際數據

  • 使用sortByKey算子排序,獲得所需排序效果

    val result: RDD[(SecondSortKey, String)] =RDD.map(x => {
        val a = x.split(" ")
        val b=new SecondSortKey(a(1).toInt, a(2).toInt)
        new Tuple2(b,x)
    }).sortByKey()
    

分組取topN

//獲取一組數字中最大的3個值,使用一個3元素的數組,比較並寫入數據
Integer[] top3 = new Integer[3];
//數據經過迭代器輸出,分別填入數組的三個位置,其中處理了最早3個置入的數據和後面添加刪除數據的過程
while (iterator.hasNext()) {
    Integer x = iterator.next();
    for (int i = 0; i < top3.length; i++) {
        if(top3[i] == null){
            top3[i] = x;
            break;
        }else if(score > top3[i]){
            for (int j = 2; j > i; j--) {
                top3[j] = top3[j-1];
            }
            top3[i] = x;
            break;
        }
    }
}

源碼分析結論

資源調度

  1. 若設置 --total-executor-core 參數,則使用指定的核數,若未設置則壓榨集羣性能,使用全部剩餘核數

  2. 若沒有設置 --executor-core 參數 ,則每一個worker節點默認爲這個application只開啓一個executor進程。若設置了,一個worker可設置多個executor。集羣會進一步考慮內存因素

任務調度

  • DAGScheduler類的getMessingParentStages()方法是切割job劃分stage,其中使用了遞歸的方式實現

  • DAGScheduler將stage封裝後發送到TaskScheduler中,TaskScheduler遍歷stage中的task併發送到executor中執行

相關文章
相關標籤/搜索