Spark安裝部署| 運行模式

 1. Spark概述

一種基於內存的快速、通用、可擴展的大數據分析引擎;java

內置模塊:node

  Spark Core(封裝了rdd、任務調度、內存管理、錯誤恢復、與存儲系統交互); python

  Spark SQL(處理結構化數據)、算法

  Spark Streaming(對實時數據進行流式計算) 、sql

  Spark Mlib(機器學習程序庫包括分類、迴歸、聚合、協同過濾等)、shell

  Spark GraghX(圖計算);數據庫

  獨立調度器、Yarn、Mesosapache

特色:vim

快( 基於內存(而MR是基於磁盤)、多線程模型(而mapReduce是基於多進程的,每一個MR都是獨立的JVM進程)、可進行迭代計算(而hadoop須要多個mr串行) )api

易用(支持java、scala、python等的API,支持超過80多種算法,支持交互式的 Python 和 Scala 的 shell,可方便地在shell中使用spark集羣來驗證解決問題,而不像之前須要打包上傳驗證)、

通用(spark提供了統一解決方案,可用於批處理、交互式查詢(spark sql)\ 實時流式處理(spark streaming)\機器學習和圖計算,可在同一應用中無縫使用)

兼容性(與其餘開源產品的融合,如hadoop的yarn、Mesos、HDFS、Hbase等)

http://spark.apache.org/   文檔查看地址 https://spark.apache.org/docs/2.1.1/

集羣角色

 Master和Workers

1)Master

Spark特有資源調度系統的Leader。掌管着整個集羣的資源信息,相似於Yarn框架中的ResourceManager,主要功能:

(1)監聽Worker,看Worker是否正常工做;        

(2)Master對Worker、Application等的管理(接收worker的註冊並管理全部的worker,接收client提交的application,(FIFO)調度等待的application並向worker提交)。

2)Worker

Spark特有資源調度系統的Slave,有多個。每一個Slave掌管着所在節點的資源信息,相似於Yarn框架中的NodeManager,主要功能:

(1)經過RegisterWorker註冊到Master;

(2)定時發送心跳給Master;

 (3)根據master發送的application配置進程環境,並啓動StandaloneExecutorBackend(執行Task所需的臨時進程)

Driver和Executor

1)Driver(驅動器)

Spark的驅動器是執行開發程序中的main方法的進程。它負責開發人員編寫的用來建立SparkContext、建立RDD,以及進行RDD的轉化操做和行動操做代碼的執行。若是你是用spark shell,那麼當你啓動Spark shell的時候,系統後臺自啓了一個Spark驅動器程序,就是在Spark shell中預加載的一個叫做 sc的SparkContext對象。若是驅動器程序終止,那麼Spark應用也就結束了。主要負責:

(1)把用戶程序轉爲任務

(2)跟蹤Executor的運行情況

(3)爲執行器節點調度任務

(4)UI展現應用運行情況

2)Executor(執行器)

Spark Executor是一個工做進程,負責在 Spark 做業中運行任務,任務間相互獨立。Spark 應用啓動時,Executor節點被同時啓動,而且始終伴隨着整個 Spark 應用的生命週期而存在。若是有Executor節點發生了故障或崩潰,Spark 應用也能夠繼續執行,會將出錯節點上的任務調度到其餘Executor節點上繼續運行。主要負責:

(1)負責運行組成 Spark 應用的任務,並將狀態信息返回給驅動器進程;

(2)經過自身的塊管理器(Block Manager)爲用戶程序中要求緩存的RDD提供內存式存儲。RDD是直接緩存在Executor進程內的,所以任務能夠在運行時充分利用緩存數據加速運算。

總結:Master和Worker是Spark的守護進程,即Spark在特定模式下正常運行所必須的進程。Driver和Executor是臨時進程,當有具體任務提交到Spark集羣纔會開啓的進程。

 1. Local模式-本地單機

Linux中查看有多少核數:
[kris@hadoop101 ~]$ cat /proc/cpuinfo 
...
[kris@hadoop101 ~]$ cat /proc/cpuinfo | grep 'processor' | wc -l
8

 Local模式

在一臺計算機,能夠設置Master; (提交任務時須要指定--master)Local模式又分爲:
Local全部計算都運行在一個線程中(單節點單線程),沒有任何並行計算;
Local[K] ,如local[4]即運行4個Worker線程(單機也能夠並行有多個線程),可指定幾個線程來運行計算,一般CPU有幾個Core就執行幾個線程,最大化利用cpu的計算能力;
Local[*], 直接幫你安裝Cpu最多Cores來設置線程數,這種是默認的;

 

bin/spark-submit \  //提供任務的命令
--class org.apache.spark.examples.SparkPi \  //指定運行jar的主類
--master //它有默認值是local[*] =>spark://host:port, mesos://host:port, yarn, or local.
--executor-memory 1G \ //指定每一個executor可用內存
--total-executor-cores 2 \ 指定executor總核數
./examples/jars/spark-examples_2.11-2.1.1.jar \  \\jar包
100   //main方法中的args參數

./bin/spark-submit 回車可查看全部的參數

 

[kris@hadoop101 spark-local]$ bin/spark-shell 
Spark context Web UI available at http://192.168.1.101:4040
Spark context available as 'sc' (master = local[*], app id = local-1554255531204). ##spark core的入口sc
Spark session available as 'spark'.  ##它是spark sql程序的入口
再起一個spark-shell會報錯:
    spark sql也有一個默認的元數據也是存在derby數據庫裏邊
 Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@63e5b8aa, see the next exception for details.
 Caused by: org.apache.derby.iapi.error.StandardException: Another instance of Derby may have already booted the datab

 查看頁面:hadoop101:4040 
 
scala> sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((Hello,3), (smile,1), (java,2), (world,1), (kris,1))

提交任務(或者開啓spark-shell)的時候會有driver和executor進程,Local模式下它被封裝到了SparkSubmit中

提交任務分析

driver和executor是幹活的;

① Client提交任務--->②起一個Driver ---> ③註冊應用程序,申請資源--資源管理者有 (Master(Standalone模式)ResourceManage(yarn模式))----->④拿到資源後去其餘節點啓動Executor----> ⑤Executor會反向註冊給Driver彙報;

⑥(把提交的jar包作任務切分,把任務發給具體執行的節點Executor)--->Driver會進行初始化sc、任務劃分、任務調度 <===>Executor具體執行任務(負責具體執行任務、textFile、flatMap、map...

 ⑦ Driver把任務發到Executor不必定會執行,有可能資源cpu或內存不夠了或者executor掛了,spark會有一個容錯機制,某一個掛了可轉移到其餘的Executor;

最後任務跑完了,Driver會向資源管理者申請註銷(Executor也會註銷)

數據流程

textFile("input"):讀取本地文件input文件夾數據;

flatMap(_.split(" ")):壓平操做,按照空格分割符將一行數據映射成一個個單詞;

map((_,1)):對每個元素操做,將單詞映射爲元組;

reduceByKey(_+_):按照key將值進行聚合,相加;

collect:將數據收集到Driver端展現。

2. Standalone模式--徹底分佈式

概述

構建一個由Master+Slave構成的Spark集羣,Spark運行在集羣中;它的調度器是其實就是Master

 

提交任務時須要有一個客戶端Client,Master和Worker是守護進程它們是資源管理系統,提交任務(運行spark-shell或者spark-submit)以前它們就已經啓動了;

①提交--->起Driver就是初始化SparkContext,而後啓動Executor時須要資源;②向Master申請資源(即註冊),啓動ExecutorBackend

啓動Executor---->反向註冊給Driver彙報信息;

③ Driver劃分切分任務把Task發送給Executor,若是Executor會有一個容錯機制,Executor運行時會給Driver發送報告Task運行狀態直至結束;

④最後任務運行完以後driver向master申請註銷,Executor也會註銷掉; 

不必定非要在Client中起Driver(SparkContext),cluster模式,具體在哪一個節點起sc由Master決定,隨機的在worker節點上選擇一個一個;

Driver在哪一個節點起的緣由:driver和executor之間是有通信,每一個 executor都要向driver彙報信息,互相通信(消耗內存、資源+cpu數); 全部的executor節點都去跟driver作通信,客戶端的壓力就會特別大;

Client是本地調試用,輸入以後立刻能看到輸入的結果;

  1)修改slave文件,添加work節點:

[kris@hadoop101 conf]$ vim slaves
hadoop101
hadoop102
hadoop103

  2)修改spark-env.sh文件,添加以下配置:

在高可用集羣需把下面內容這給註釋掉:

#SPARK_MASTER_HOST=hadoop101

#SPARK_MASTER_PORT=7077

[kris@hadoop101 conf]$ vim spark-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144  ##若是遇到JAVA_HOME not set異常時可配置
SPARK_MASTER_HOST=hadoop101 SPARK_MASTER_PORT=7077
#配置歷史服務
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080
-Dspark.history.retainedApplications=30 
-Dspark.history.fs.logDirectory=hdfs://hadoop101:9000/directory"
#配置高可用
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=hadoop101,hadoop102,hadoop103 
-Dspark.deploy.zookeeper.dir=/spark"

  3)修改spark-default.conf文件,開啓Log:

[kris@hadoop101 conf]$ vi spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://hadoop101:9000/directory

  注意:HDFS上的目錄須要提早存在。  hadoop fs -mkdir /directory

4) 分發spark包 分發的緣由,由於這種模式下的資源調度是master和worker,各個節點須要本身去啓進程;
[kris@hadoop101 module]$ xsync spark/spark-standalone

   5)啓動

 [kris@hadoop101 spark]$ sbin/start-all.sh 網頁查看Master:hadoop101:8080 
可看到Status:ALIVE;Memory in use 等信息

高可用集羣的啓動,要 先啓動zookeeper; 在hadoop102上(也能夠是其餘節點)單獨啓動master節點 [kris@hadoop102 spark]$ sbin
/start-master.sh

啓動歷史服務以前要先啓動 start-dfs.sh
sbin/start-history-server.sh --->HistoryServer

查看歷史服務hadoop101:18080


官方求PI案例  

##運行以前上邊的① ② ③步都要啓動其餘; 默認的是client模式 
[kris@hadoop101 spark]$ bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop101:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100
===>>
Pi is roughly 3.1417439141743913
啓動spark shell /opt/module/spark/bin/spark-shell \ --master spark://hadoop101:7077 \ --executor-memory 1g \ --total-executor-cores 2   只要提交了任務就能夠看到driver和executor,driver被封裝在了SparkSubmit裏邊;CoarseGrainedExecutorBackend就是啓動的executor 提交任務提交給哪一個executor都是有可能的
執行WordCount程序 scala
>sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

res0: Array[(String, Int)] = Array((Hello,2), (World,1), (java,2), (sbase,1), (spark,2), (Hi,1))
[kris@hadoop101 ~]$ jpsall -------hadoop101-------
6675 DataNode 5971 Master 6100 Worker 7463 CoarseGrainedExecutorBackend 7895 Jps 7368 SparkSubmit 6527 NameNode 5855 QuorumPeerMain -------hadoop102-------
4647 CoarseGrainedExecutorBackend 4075 QuorumPeerMain 4875 Jps 4380 DataNode 4188 Worker -------hadoop103-------
4432 SecondaryNameNode 4353 DataNode 4085 QuorumPeerMain 4198 Worker 4778 Jps

 在Standalone--cluster模式下

[kris@hadoop101 spark-standalone]$ bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://hadoop101:7077 \ --deploy-mode cluster \ --executor-memory 1G \ --total-executor-cores 2 \ ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100 任務未執行完時的進程: [kris@hadoop101 spark-standalone]$ jpsall -------hadoop101------- 16740 CoarseGrainedExecutorBackend 16404 HistoryServer 16006 NameNode 15686 Master 16842 Jps 15805 Worker 16127 DataNode -------hadoop102------- 10240 CoarseGrainedExecutorBackend 10021 DataNode 9911  Worker 10334 Jps -------hadoop103------- 9824 DataNode 9714 Worker 9944 SecondaryNameNode 10299 Jps 10093 DriverWrapper ##cluster 模式下的Driver 任務執行完的進程: [kris@hadoop101 spark-standalone]$ jpsall -------hadoop101------- 16404 HistoryServer 16006 NameNode 15686 Master 15805 Worker 17166 Jps 16127 DataNode -------hadoop102------- 10021 DataNode 9911 Worker 10447 Jps -------hadoop103------- 10416 Jps 9824 DataNode 9714 Worker 9944 SecondaryNameNode

spark-shell的 spark HA集羣訪問,前提是另一個Master啓起來了; 

/opt/module/spark/bin/spark-shell \
--master spark://hadoop101:7077,hadoop102:7077 \
--executor-memory 1g \
--total-executor-cores 2

 把其中ACTIVE狀態節點的kill掉,另一個Master的狀態將從standby模式--->active狀態;

可驗證下:

scala>sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((Hello,2), (World,1), (java,2), (sbase,1), (spark,2), (Hi,1))

 

提交任務時: --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). client和cluster的區別: SparkContext的位置不一樣(也就是運行Driver的位置不同),由Master決定,隨機的在其餘節點初始化一個sc Driver和Executor之間會有通訊,通訊須要消耗資源內存cpu等,全部的executor去和客戶端(若是是client模式,Driver是啓在Client上的)去通訊, 客戶端的壓力會很是大,若是有大量的executor再加上提交多個任務就啓動多個Driver,那麼Client單點就掛掉被拖垮; cluster模式,每次提交任務時的sc的位置分散在不一樣節點上,分擔了壓力, Client本地調試時候用,能夠看到輸出的結果,如可看到打印的π
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode) bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://hadoop101:7077 \ --deploy-mode cluster \ --executor-memory 1G \ --total-executor-cores 2 \ ##總的是2,默認1個cores/executor--->推導出有2/1個executor;可控制executor的數量; ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100 cluster模式下,driver叫DriverWrapper

3. Yarn模式

概述

以前的standalone模式,是本身Master和worker管理資源,分發是爲了在各個節點啓進程;yarn模式資源由RM、NM來管理

Spark客戶端直接鏈接Yarn,不須要額外構建Spark集羣。有yarn-client和yarn-cluster兩種模式,主要區別在於:Driver程序的運行節點。

yarn-client:Driver程序運行在客戶端,適用於交互、調試,但願當即看到app的輸出;

yarn-cluster:Driver程序運行在由RM(ResourceManager)啓動的AM(APPMaster)適用於生產環境。分擔壓力不會拖垮某個節點;

 

 提交任務以前,客戶端Client、ResourceManager、NodeManager都是要啓動好的;

提交任務,App Submit; RM選擇一個NM啓動AM,AM來啓動Driver(即初始化sc),yarn的cluster模式SparkAppMaster(用來申請資源,啓動driver)和SparkContext在一個進程裏邊; 

AM(SparkAppMaster)向RM申請啓動Executor;(默認狀況下一個節點啓一個executor這樣子負載比較均衡,也能夠啓兩個),executor也是有個反向註冊的過程;

切分分配任務,同時executor上報集羣情況;跑完以後申請註銷;

安裝使用

1)修改hadoop配置文件yarn-site.xml,添加以下內容:

[kris@hadoop101 hadoop]$ vim yarn-site.xml

        <!--是否啓動一個線程檢查每一個任務正使用的物理內存量,若是任務超出分配值,則直接將其殺掉,默認是true -->
        <property>
                <name>yarn.nodemanager.pmem-check-enabled</name>
                <value>false</value>
        </property>
        <!--是否啓動一個線程檢查每一個任務正使用的虛擬內存量,若是任務超出分配值,則直接將其殺掉,默認是true -->
        <property>
                <name>yarn.nodemanager.vmem-check-enabled</name>
                <value>false</value>
        </property>

2)配置歷史服務JobHistoryServer| 配置日誌查看功能

修改spark-env.sh,添加以下配置:

[kris@hadoop101 conf]$ vim spark-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_144
YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop

# 配置JobHistoryServer  注意:HDFS上的目錄須要提早存在。
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080
-Dspark.history.retainedApplications=30 
-Dspark.history.fs.logDirectory=hdfs://hadoop101:9000/directory"

   從這裏看到歷史日誌:http://hadoop102:8088/cluster點擊直接跳轉到spark中  http://hadoop101:18080/history/application_1554294467331_0001/jobs/

[kris@hadoop101 conf]$ vim spark-defaults.conf 

#修改spark-default.conf文件,開啓Log:
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://hadoop101:9000/directory
# 日誌查看 
spark.yarn.historyServer.address=hadoop101:18080
spark.history.ui.port=18080

 

提交任務到Yarn執行
[kris@hadoop101 spark-yarn]$ bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100
[kris@hadoop101 spark-yarn]$ bin/spark-shell --master yarn ##shell只能用client模式啓動,默認的也是這種模式;  Spark context Web UI available at http://192.168.1.101:4040 Spark context available as 'sc' (master = yarn, app id = application_1554290192113_0004). Spark session available as 'spark'. -------hadoop101------- 25920 NodeManager 25751 DataNode 28075 SparkSubmit ##Driver仍是被封裝到這裏邊的 28252 Jps 25469 QuorumPeerMain 25630 NameNode -------hadoop102------- 14995 CoarseGrainedExecutorBackend 15076 Jps 13447 DataNode 13672 NodeManager 13369 QuorumPeerMain 13549 ResourceManager 14942 ExecutorLauncher     #Executor啓動器,就是AppMaster,Cluster模式,AM和sc在一個進程裏邊的,這種模式AM的任務是:既能夠申請資源又能夠作任務切分和調度;
                  Client模式它們就不在一個進程了,由RM隨機選擇一個節點來啓動AM,這種模式它的做用僅僅是用來申請資源去啓動Executor;
-------hadoop103------- 13536 DataNode 14610 CoarseGrainedExecutorBackend 14691 Jps 13638 NodeManager 13464 QuorumPeerMain 13710 SecondaryNameNode

yarn--cluster模式

[kris@hadoop101 spark-yarn]$ bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ./examples/jars/spark-examples_2.11-2.1.1.jar 在任務未完成以前的進程: [kris@hadoop101 spark-yarn]$ jpsall -------hadoop101------- 13328 SparkSubmit 12146 NodeManager 13706 CoarseGrainedExecutorBackend 12555 NameNode 12702 DataNode 13951 Jps -------hadoop102------- 6864 ResourceManager 8101 Jps 7403 DataNode 6990 NodeManager -------hadoop103------- 7984 ApplicationMaster  ## Yarn-Cluster模式下SparkAppMaster和Sparkcontext即Driver是在一個進程的 8432 Jps 7560 SecondaryNameNode 8158 CoarseGrainedExecutorBackend 7230 NodeManager 7438 DataNode 任務完成以後的進程: [kris@hadoop101 spark-yarn]$ jpsall -------hadoop101------- 12146 NodeManager 12555 NameNode 12702 DataNode 14031 Jps -------hadoop102------- 6864 ResourceManager 8153 Jps 7403 DataNode 6990 NodeManager -------hadoop103------- 7560 SecondaryNameNode 8537 Jps 7230 NodeManager 7438 DataNode

 [kris@hadoop101 spark-yarn]$ sbin/start-history-server.sh   ##開啓歷史服務

提交任務到Yarn執行
[kris@hadoop101 spark-yarn]$ bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100

 

 

Mesos模式

Spark客戶端直接鏈接Mesos;不須要額外構建Spark集羣。國內應用比較少,更多的是運用yarn調度。

幾種模式對比

 

package com.atguigu.spark
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
  def main(args: Array[String]): Unit = {
    //1.建立SparkConf並設置App名稱
    val conf = new SparkConf().setAppName("WordCount")
    //2.建立SparkContext,該對象是提交Spark App的入口
    val context = new SparkContext(conf)
    //3.使用sc建立RDD並執行相應的transformation和action
    context.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).sortBy(_._2, false).saveAsTextFile(args(1))
    //4.關閉鏈接
    context.stop()
  }
}

 /wc.txt必須在HDFS上有這個文件

[kris@hadoop101 spark-yarn]$ hadoop fs -put wc.txt /
[kris@hadoop101 spark-yarn]$ bin/spark-submit --class com.atguigu.spark.WordCount --master yarn --deploy-mode client /opt/module/spark/spark-yarn/WordCount.jar /wc.txt /out

結果:
(Hello,3)
(smile,2)
(kris,2)
(alex,1)
(hi,1)
相關文章
相關標籤/搜索