spark部署

1 節點說明

 
IP

Role

192.168.1.111

ActiveNameNode

192.168.1.112

StandbyNameNode,Master,Worker

192.168.1.113

DataNode,Master,Worker

192.168.1.114

DataNode,Worker

HDFS集羣和Spark集羣之間節點共用。

2 安裝HDFS

    見HDFS2.X和Hive的安裝部署文檔:http://www.cnblogs.com/Scott007/p/3614960.html

3 Spark部署

    Spark經常使用的安裝部署模式有Spark On Yarn和Standalone,能夠同時使用。

3.1 Spark on Yarn

    這種模式,藉助Yarn資源分配的功能,使用Spark客戶端來向Yarn提交任務運行。只需將Spark的部署包放置到Yarn集羣的某個節點上便可(或者是Yarn的客戶端,能讀取到Yarn集羣的配置文件便可)。Spark自己的Worker節點、Master節點不須要啓動。

    可是,Spark的部署包須是基於對應的Yarn版本正確編譯後的,不然會出現Spark和Yarn的兼容性問題。

    on Yarn的兩種運行方式,其運行結束後的日誌不能在Yarn的Application管理界面看到,目前只能在客戶端經過:

yarn logs -applicationId <applicationId>

命令查看每一個Application的日誌。

3.1.1 配置

    部署這種模式,須要修改conf目錄下的spark-env.sh文件。在其中新增以下配置選項:

 
export HADOOP_HOME= /home/hadoop/hadoop-2.0.0-cdh4.5.0

export HADOOP_CONF_DIR= $HADOOP_HOME/etc/hadoop

SPARK_EXECUTOR_INSTANCES=2

SPARK_EXECUTOR_CORES=1

SPARK_EXECUTOR_MEMORY=400M

SPARK_DRIVER_MEMORY=400M

SPARK_YARN_APP_NAME="Spark 1.0.0"
 
 

    其中:

(1) HADOOP_HOME:當前節點中HDFS的部署路徑,由於Spark須要和HDFS中的節點在一塊兒;

(2) HADOOP_CONF_DIR:HDFS節點中的conf配置文件路徑,正常狀況下此目錄爲$HADOOP_HOME/etc/hadoop;

(3) SPARK_EXECUTOR_INSTANCES:在Yarn集羣中啓動的Worker的數目,默認爲2個;

(4) SPARK_EXECUTOR_CORES:每一個Worker所佔用的CPU核的數目;

(5) SPARK_EXECUTOR_MEMORY:每一個Worker所佔用的內存大小;

(6) SPARK_DRIVER_MEMORY:Spark應用程序Application所佔的內存大小,這裏的Driver對應Yarn中的ApplicationMaster;

(7) SPARK_YARN_APP_NAME:Spark Application在Yarn中的名字;

    配置完成後,將Spark部署文件放置到Yarn的節點中便可。這裏,將spark-1.0.0整個目錄放到Yarn集羣的一個節點192.168.1.112的/home/hadoop(設爲spark的安裝路徑的父目錄)路徑下。

 

3.1.2 測試

    在Spark的部署路徑的bin路徑下,執行spark-submit腳原本運行spark-examples包中的例子。執行以下:

 
./bin/spark-submit --master yarn \

--class org.apache.spark.examples.JavaWordCount \

--executor-memory 400M \

--driver-memory 400M \

/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar ./hdfs-site.xml
 
 

    這個例子是計算WordCount的,例子被打包在/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar包中,對應的Class爲org.apache.spark.examples.JavaWordCount,./hdfs-site.xml是HDFS中指定路徑下的一個文件,WordCount就是針對它來作的。而--master yarn就是指定運行在Yarn集羣中,以yarn模式運行。

    Spark On Yarn有兩種運行模式,一種是Yarn Cluster方式,一種是Yarn Client方式。

(1) Yarn Cluster: Spark Driver程序將做爲一個ApplicationMaster在YARN集羣中先啓動,而後再由ApplicationMaster向RM申請資源啓動 executor以運行Task。由於Driver程序在Yarn中運行,因此程序的運行結果不能在客戶端顯示,因此最好將結果保存在HDFS上,客戶端 的終端顯示的是做爲Yarn的job的運行狀況。

(2) Yarn Client: Spark Driver程序在客戶端上運行,而後向Yarn申請運行exeutor以運行Task,本地程序負責最後的結果彙總等。客戶端的Driver將應用提交 給Yarn後,Yarn會前後啓動ApplicationMaster和executor,另外ApplicationMaster和executor都 是裝載在container裏運行,container默認的內存是1G,ApplicationMaster分配的內存是driver- memory,executor分配的內存是executor-memory。同時,由於Driver在客戶端,因此程序的運行結果能夠在客戶端顯 示,Driver以進程名爲SparkSubmit的形式存在。

    上面命令中的提交方式「yarn」就是默認按照「Yarn Client」方式運行。用戶可自定義運行方式,經過「--master」指定程序以yarn、yarn-cluster或者yarn-client中的一種方式運行。

    須要重點說明的是最後文件的路徑,是至關於HDFS中的/user/hadoop而言,hadoop是當前命令的用戶。「./hdfs-site.xml」在HDFS中的全路徑爲「hdfs://namespace/user/hadoop/hdfs-site.xml」,其中hadoop是當前的用戶,namespace是HDFS的命名空間;若是寫成「/hdfs-site.xml」則在HDFS中指的是「hdfs://namespace/hdfs-site.xml」;固然也能夠直接傳入「hdfs://namespace/user/hadoop/hdfs-site.xml」用於指定在HDFS中的要進行WordCount計算的文件。

    另外,Spark應用程序須要的CPU Core數目和內存,須要根據當前Yarn的NodeManager的硬件條件相應設置,不能超過NodeManager的硬件條件。

 
./bin/spark-submit --master yarn \

--class org.apache.spark.examples.JavaWordCount \

--executor-memory 400M \

--driver-memory 400M \

/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar hdfs://namespace/user/hadoop/hdfs-site.xml
 
 

在Yarn的ResourceManager對應的Web界面中查看啓動的Application。

Running:

 

Success:

 

同時能夠在啓動腳本的客戶端看到WordCount的運行結果:

 

 

3.2 Spark Standalone

    這種模式,就是把Spark單獨做爲一個集羣來進行部署。集羣中有兩種節點,一種是Master,另外一種是Worker節點。Master負責分配任務給Worker節點來執行,並負責最後的結果合併,Worker節點負責具體的任務執行。

3.2.1 配置

    所需修改的配置文件除了spark-env.sh文件之外,還有slave文件,都位於conf目錄中。

    slave文件中保存的是worker節點host或者IP,此處的配置爲:

192.168.1.112

192.168.1.113

192.168.1.114

    至於spark-env.sh文件,能夠配置以下屬性:

(1) SPARK_MASTER_PORT:Master服務端口,默認爲7077;

(2) SPARK_WORKER_CORES:每一個Worker進程所須要的CPU核的數目;

(3) SPARK_WORKER_MEMORY:每一個Worker進程所須要的內存大小;

(4) SPARK_WORKER_INSTANCES:每一個Worker節點上運行Worker進程的數目;

(5) SPARK_MASTER_WEBUI_PORT:Master節點對應Web服務的端口;

(6)export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:2181,192.168.1.118:2181,192.168.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark":用於指定Master的HA,依賴於zookeeper集羣;

(7) export SPARK_JAVA_OPTS="-Dspark.cores.max=4":用於限定每一個提交的Spark Application的使用的CPU核的數目,由於缺省狀況下提交的Application會使用全部集羣中剩餘的CPU Core。

    注意在Worker進程的CPU個數和內存大小的時候,要結合機器的實際硬件條件,若是一個Worker節點上的全部Worker進程須要的CPU總數目或者內存大小超過當前Worker節點的硬件條件,則Worker進程會啓動失敗。

將配置好的Spark文件拷貝至每一個Spark集羣的節點上的相同路徑中。爲方便使用spark-shell,能夠在環境變量中配置上SPARK_HOME。

3.2.2 啓動

    配置結束後,就該啓動集羣了。這裏使用Master的HA方式,選取192.168.1.1十二、192.168.1.113節點做爲Master,192.168.1.1十二、192.168.1.11三、192.168.1.114節點上運行兩個Worker進程。

首先在192.168.1.113節點上作此操做:

 

啓動以後,能夠查看當前節點的進程:

 

另外,爲了保證Master的HA,在192.168.1.112節點上只啓動Master:

 

192.168.1.112節點的進程爲:

 

啓動事後,經過Web頁面查看集羣的狀況,這裏訪問的是:

http://192.168.1.113:8090/

 

再看standby節點192.168.1.112的web界面http://192.168.1.112:8090/

 

3.2.3 測試

    Spark的bin子目錄中的spark-submit腳本是用於提交程序到集羣中運行的工具,咱們使用此工具作一個關於pi的計算。命令以下:

 
./bin/spark-submit --master spark://spark113:7077 \ --class org.apache.spark.examples.SparkPi \ --name Spark-Pi --executor-memory 400M \ --driver-memory 512M \ /home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar   
 
 

    其中--master參數用於指定Master節點的URI,可是這裏填的是Host,不是IP!

    任務啓動以後,在Spark的Master的Web界面能夠看到運行中的Application。

 

    任務運行結束以後,在Web界面中Completed Applications表格中會看到對應的結果。

 

同時,命令行中會打印出來運行的結果,以下所示:

 

4 spark-submit工具

    上面測試程序的提交都是使用的spark-submit腳本,其位於$SPARK_HOME/bin目錄中,執行時須要傳入的參數說明以下:

Usage: spark-submit [options] <app jar | python file> [app options]

參數名稱

含義

--master MASTER_URL

能夠是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local

--deploy-mode DEPLOY_MODE

Driver程序運行的地方,client或者cluster

--class CLASS_NAME

主類名稱,含包名

--name NAME

Application名稱

--jars JARS

Driver依賴的第三方jar包

--py-files PY_FILES

用逗號隔開的放置在Python應用程序PYTHONPATH上的.zip, .egg, .py文件列表

--files FILES

 用逗號隔開的要放置在每一個executor工做目錄的文件列表

--properties-file FILE

設置應用程序屬性的文件路徑,默認是conf/spark-defaults.conf

--driver-memory MEM

Driver程序使用內存大小

--driver-java-options

 

--driver-library-path

Driver程序的庫路徑

--driver-class-path

Driver程序的類路徑

--executor-memory MEM

executor內存大小,默認1G

--driver-cores NUM

Driver程序的使用CPU個數,僅限於Spark Alone模式

--supervise

失敗後是否重啓Driver,僅限於Spark Alone模式

--total-executor-cores NUM

executor使用的總核數,僅限於Spark Alone、Spark on Mesos模式

--executor-cores NUM

每一個executor使用的內核數,默認爲1,僅限於Spark on Yarn模式

--queue QUEUE_NAME

提交應用程序給哪一個YARN的隊列,默認是default隊列,僅限於Spark on Yarn模式

--num-executors NUM

啓動的executor數量,默認是2個,僅限於Spark on Yarn模式

--archives ARCHIVES

僅限於Spark on Yarn模式

    另外,在執行spark-submit.sh工具進行提交應用以前,可使用以下方式提早定義好當前Spark Application所使用的CPU Core數目和內存大小:

 
SPARK_JAVA_OPTS="-Dspark.cores.max=2 -Dspark.executor.memory=600m" \

./bin/spark-submit --master spark://update113:7077 \ --class org.apache.spark.examples.SparkPi \




 
 

5 Spark HistoryServer

    相似於Mapreduce的JobHistoryServer,Spark也有一個服務能夠保存歷史Application的運行記錄。

    修改$SPARK_HOME/conf下的spark-defaults.conf文件(注意,修改後的配置文件在每一個節點都要有),其中可修改的配置屬性爲:

 

屬性名稱

默認值

含義

spark.history.updateInterval

10

以秒爲單位,更新日誌相關信息的時間間隔

spark.history.retainedApplications

250

保存Application歷史記錄的個數,若是超過這個值,舊的應用程序信息將被刪除

spark.history.ui.port

18080

HistoryServer的web端口

 spark.history.kerberos.enabled

False

是否使用kerberos方式登陸訪問HistoryServer,對於持久層位於安全集羣的HDFS上是有用的,若是設置爲true,就要配置下面的兩個屬性

 spark.history.kerberos.principal

 

用於HistoryServer的kerberos主體名稱

spark.history.kerberos.keytab

 

用於HistoryServer的kerberos keytab文件位置

spark.history.ui.acls.enable

False

受權用戶查看應用程序信息的時候是否檢查acl。若是啓用,只有應用程序全部者和spark.ui.view.acls指定的用戶能夠查看應用程序信息;不然,不作任何檢查

spark.eventLog.enabled

False

是否記錄Spark事件

spark.eventLog.dir

 

保存日誌相關信息的路徑,能夠是hdfs://開頭的HDFS路徑,也能夠是file://開頭的本地路徑,都須要提早建立

spark.yarn.historyServer.address

 

Server端的URL:Ip:port 或者host:port

    此處的設置以下:

spark.eventLog.enabled true spark.eventLog.dir hdfs://yh/user/hadoop/sparklogs  spark.yarn.historyServer.address    update113:18080
 

    設置完文件以後,進入sbin目錄啓動服務:

 

    運行完成的Application歷史記錄能夠經過訪問上面指定的HistoryServer地址查看,這裏是http://192.168.1.113:18080/。

 

    不管運行時是本地模式,仍是yarn-client、yarn-cluster,運行記錄都可在此頁面查看。

    而且程序運行時的環境變量、系統參數、各個階段的耗時都可在此查看,很強大!

6 Spark可配置參數

Spark參數的配置可經過三種方式:SparkConf方式 > 命令行參數方式 >文件配置方式。

6.1 應用屬性

 

屬性名

默認值

含義

 spark.app.name

 

應用程序名稱

spark.master

 

要鏈接的Spark集羣Master的URL

spark.executor.memory

512 m

每一個executor使用的內存大小

spark.serializer

org.apache.spark

.serializer.JavaSerializer

序列化方式,官方建議使用org.apache.spark.serializer.KryoSerializer,固然也能夠任意是定義爲org.apache.spark.Serializer子類的序化器

spark.kryo.registrator

 

若是要使用 Kryo序化器,須要建立一個繼承KryoRegistrator的類並設置系統屬性spark.kryo.registrator指向該類

 spark.local.dir

/tmp

用於保存map輸出文件或者轉儲RDD。能夠多個目錄,之間以逗號分隔。在Spark 1.0 及更高版本此屬性會被環境變量 SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 代替

spark.logConf

False

SparkContext 啓動時是否記錄有效 SparkConf信息

 

6.2 運行環境變量

 

屬性名

默認值

含義

spark.executor.extraJavaOptions

 

傳遞給executor的額外JVM 選項,可是不能使用它來設置Spark屬性或堆空間大小

spark.executor.extraClassPath

 

追加到executor類路徑中的附加類路徑

spark.executor.extraLibraryPath

 

啓動executor JVM 時要用到的特殊庫路徑

spark.files.userClassPathFirst

False

executor在加載類的時候是否優先使用用戶自定義的JAR包,而不是Spark帶有的JAR包,目前,該屬性只是一項試驗功能

 

6.3 Shuffle操做相關屬性

屬性名

默認值

含義

spark.shuffle.consolidateFiles

False

若是爲true,在shuffle時就合併中間文件,對於有大量Reduce任務的shuffle來講,合併文件可 以提升文件系統性能,若是使用的是ext4 或 xfs 文件系統,建議設置爲true;對於ext3,因爲文件系統的限制,設置爲true反而會使內核>8的機器下降性能

 spark.shuffle.spill

True

若是爲true,在shuffle期間經過溢出數據到磁盤來下降了內存使用總量,溢出閾值是由spark.shuffle.memoryFraction指定的

spark.shuffle.spill.compress

True

是否壓縮在shuffle期間溢出的數據,若是壓縮將使用spark.io.compression.codec。

 spark.shuffle.compress

True

是否壓縮map輸出文件,壓縮將使用spark.io.compression.codec。

spark.shuffle.file.buffer.kb

100

每一個shuffle的文件輸出流內存緩衝區的大小,以KB爲單位。這些緩衝區能夠減小磁盤尋道的次數,也減小建立shuffle中間文件時的系統調用

spark.reducer.maxMbInFlight

48

每一個reduce任務同時獲取map輸出的最大大小 (以兆字節爲單位)。因爲每一個map輸出都須要一個緩衝區來接收它,這表明着每一個 reduce 任務有固定的內存開銷,因此要設置小點,除非有很大內存

 

6.4 SparkUI相關屬性

屬性名

默認值

含義

spark.ui.port

4040

應用程序webUI的端口

spark.ui.retainedStages

1000

在GC以前保留的stage數量

 spark.ui.killEnabled

True

容許在webUI將stage和相應的job殺死

 spark.eventLog.enabled

False

是否記錄Spark事件,用於應用程序在完成後重構webUI

spark.eventLog.compress

False

是否壓縮記錄Spark事件,前提spark.eventLog.enabled爲true

spark.eventLog.dir

file:///tmp/spark-events

若是spark.eventLog.enabled爲 true,該屬性爲記錄spark事件的根目錄。在此根目錄中,Spark爲每一個應用程序建立分目錄,並將應用程序的事件記錄到在此目錄中。能夠將此屬性 設置爲HDFS目錄,以便history server讀取歷史記錄文件

 

6.5 壓縮和序列化相關屬性

屬性名

默認值

含義

spark.broadcast.compress

True

是否在發送以前壓縮廣播變量

spark.rdd.compress

False

是否壓縮RDD分區

spark.io.compression.codec

org.apache.spark.io.

LZFCompressionCodec

用於壓縮內部數據如 RDD分區和shuffle輸出的編碼解碼器, org.apache.spark.io.LZFCompressionCodec和 org.apache.spark.io.SnappyCompressionCodec。其中,Snappy提供更快速的壓縮和解壓縮,而LZF提供了 更好的壓縮比

 spark.io.compression.snappy

.block.size

32768

使用Snappy編碼解碼器時,編碼解碼器使用的塊大小 (以字節爲單位)

 spark.closure.serializer

org.apache.spark.serializer.

JavaSerializer

用於閉包的序化器,目前只有支持Java序化器

spark.serializer.
objectStreamReset

10000

org.apache.spark.serializer.JavaSerializer序列化時,會緩存對象以防 止寫入冗餘數據,此時會中止這些對象的垃圾收集。經過調用重置序化器,刷新該信息就能夠收集舊對象。若要關閉這重按期重置功能將其設置爲< = 0 。默認狀況下每10000個對象將重置序化器

spark.kryo.referenceTracking

True

當使用Kryo序化數據時,是否跟蹤對同一對象的引用。若是你的對象圖有迴路或者同一對象有多個副本,有必要設置爲true;其餘狀況下能夠禁用以提升性能

 spark.kryoserializer.buffer.mb

2

在Kryo 裏容許的最大對象大小(Kryo會建立一個緩衝區,至少和序化的最大單個對象同樣大)。每一個worker的每一個core只有一個緩衝區

 

6.6 執行時相關屬性

屬性名

默認值

含義

spark.default.parallelism

本地模式:機器核數

Mesos:8

其餘:max(executor的core,2)

若是用戶不設置,系統使用集羣中運行shuffle操做的默認任務數(groupByKey、 reduceByKey等)

spark.broadcast.factory

org.apache.spark.broadcast.

HttpBroadcastFactory

廣播的實現類

spark.broadcast.blockSize

4096

TorrentBroadcastFactory塊大小(以kb爲單位)。過大會下降廣播速度;太小會使印象BlockManager性能

spark.files.overwrite

Fale

經過 SparkContext.addFile() 添加的文件在目標中已經存在而且內容不匹配時,是否覆蓋目標文件

spark.files.fetchTimeout

False

在獲取由driver經過SparkContext.addFile() 添加的文件時,是否使用通訊時間超時

spark.storage.memoryFraction

0.6

Java堆用於cache的比例

 spark.tachyonStore.baseDir

System.getProperty("java.io.tmpdir")

用於存儲RDD的techyon目錄,tachyon文件系統的URL由spark.tachyonStore.url設置,也能夠是逗號分隔的多個techyon目錄

spark.storage.

memoryMapThreshold

8192

以字節爲單位的塊大小,用於磁盤讀取一個塊大小時進行內存映射。這能夠防止Spark在內存映射時使用很小塊,通常狀況下,對塊進行內存映射的開銷接近或低於操做系統的頁大小

spark.tachyonStore.url

tachyon://localhost:19998

基於techyon文件的URL

spark.cleaner.ttl

 

spark記錄任何元數據(stages生成、task生成等)的持續時間。按期清理能夠確保將超期的元數據丟棄,這在運行長時間任務是頗有用的,如運行7*24的sparkstreaming任務。RDD持久化在內存中的超期數據也會被清理

 

6.7 網絡相關屬性

屬性名

默認值

含義

spark.driver.host

 

運行driver的主機名或 IP 地址

spark.driver.port

隨機

driver偵聽的端口

spark.akka.frameSize

10

以MB爲單位的driver和executor之間通訊信息的大小,設置值越大,driver能夠接受更大的計算結果

spark.akka.threads

4

用於通訊的actor線程數,在大型集羣中擁有更多CPU內核的driver能夠增長actor線程數

spark.akka.timeout

100

以秒爲單位的Spark節點之間超時時間

spark.akka.heartbeat.pauses

600

下面3個參數是用於設置Akka自帶的故障探測器。啓用的話,以秒爲單位設置以下這三個參數,有助於對惡意的executor的定位,而對於因爲GC暫停或網絡滯後引發的狀況下,不須要開啓故障探測器;另外故障探測器的開啓會致使因爲心跳信息的頻繁交換而引發的網絡氾濫。

本參數是設置可接受的心跳停頓時間

spark.akka.failure-detector.threshold

300.0

對應Akka的akka.remote.transport-failure-detector.threshold

spark.akka.heartbeat.interval

1000

心跳間隔時間

 

6.8 調度相關屬性

屬性名

默認值

含義

spark.task.cpus

1

爲每一個任務分配的內核數

spark.task.maxFailures

4

Task的最大重試次數

spark.scheduler.mode

FIFO

Spark的任務調度模式,還有一種Fair模式

spark.cores.max

 

當應用程序運行在Standalone集羣或者粗粒度共享模式Mesos集羣時,應用程序向集羣請求的最大CPU內 核總數(不是指每臺機器,而是整個集羣)。若是不設置,對於Standalone集羣將使用spark.deploy.defaultCores中數值, 而Mesos將使用集羣中可用的內核

spark.mesos.coarse

 False

若是設置爲true,在Mesos集羣中運行時使用粗粒度共享模式

 spark.speculation

False

如下幾個參數是關於Spark推測執行機制的相關參數。此參數設定是否使用推測執行機制,若是設置爲true則spark使用推測執行機制,對於Stage中拖後腿的Task在其餘節點中從新啓動,並將最早完成的Task的計算結果最爲最終結果

spark.speculation.interval

100

Spark多長時間進行檢查task運行狀態用以推測,以毫秒爲單位

 spark.speculation.quantile

0.75

推測啓動前,Stage必需要完成總Task的百分比

spark.speculation.multiplier

1.5

比已完成Task的運行速度中位數慢多少倍才啓用推測

 spark.locality.wait

3000

如下幾個參數是關於Spark數據本地性的。本參數是以毫秒爲單位啓動本地數據task的等待時間,若是超出就啓動 下一本地優先級別的task。該設置一樣能夠應用到各優先級別的本地性之間(本地進程 -> 本地節點 -> 本地機架 -> 任意節點 ),固然,也能夠經過spark.locality.wait.node等參數設置不一樣優先級別的本地性

 spark.locality.wait.process

spark.locality.wait

本地進程級別的本地等待時間

spark.locality.wait.node

spark.locality.wait

本地節點級別的本地等待時間

spark.locality.wait.rack

spark.locality.wait

本地機架級別的本地等待時間

spark.scheduler.revive.interval

1000

復活從新獲取資源的Task的最長時間間隔(毫秒),發生在Task由於本地資源不足而將資源分配給其餘Task運行後進入等待時間,若是這個等待時間內從新獲取足夠的資源就繼續計算

 

6.9 安全相關屬性

屬性名

默認值

含義

spark.authenticate

False

是否啓用內部身份驗證

spark.authenticate.secret

 

設置組件之間進行身份驗證的密鑰。若是不是YARN上運行而且spark.authenticate爲true時,須要設置密鑰

spark.core.connection. auth.wait.timeout

30

進行身份認證的超時時間

spark.ui.filters

 

Spark web UI 要使用的以逗號分隔的篩選器名稱列表。篩選器要符合javax servlet Filter標準,每一個篩選器的參數能夠經過設置java系統屬性來指定:

spark.<class name of filter>.params='param1=value1,param2=value2'

例如:

-Dspark.ui.filters=com.test.filter1

-Dspark.com.test.filter1.params='param1=foo,param2=testing'

 spark.ui.acls.enable

False

Spark webUI存取權限是否啓用。若是啓用,在用戶瀏覽web界面的時候會檢查用戶是否有訪問權限

spark.ui.view.acls

 

以逗號分隔Spark webUI訪問用戶的列表。默認狀況下只有啓動Spark job的用戶纔有訪問權限

 

6.10 SparkStreaming相關屬性

屬性名

默認值

含義

spark.streaming.blockInterval

200

Spark Streaming接收器將接收數據合併成數據塊並存儲在Spark裏的時間間隔,毫秒

spark.streaming.unpersist

True

若是設置爲true,強迫將SparkStreaming持久化的RDD數據從Spark內存中清理,一樣 的,SparkStreaming接收的原始輸入數據也會自動被清理;若是設置爲false,則容許原始輸入數據和持久化的RDD數據可被外部的 Streaming應用程序訪問,由於這些數據不會自動清理

6.11 Standalone模式特有屬性

能夠在文件conf/spark-env.sh中來設置此模式的特有相關屬性:

(1)SPARK_MASTER_OPTS:配置master使用的屬性

(2)SPARK_WORKER_OPTS:配置worker使用的屬性

(3)SPARK_DAEMON_JAVA_OPTS:配置master和work都使用的屬性

配置的時候,使用相似的語句:

export SPARK_MASTER_OPTS="-Dx1=y1 -Dx2=y2"

其中x表明屬性,y表明屬性值。

SPARK_MASTER_OPTS所支持的屬性有:

屬性名

默認值

含義

spark.deploy.spreadOut

True

Standalone集羣管理器是否自由選擇節點仍是固定到儘量少的節點,前者會有更好的數據本地性,後者對於計算密集型工做負載更有效

spark.worker.timeout

60

master由於沒有收到心跳信息而認爲worker丟失的時間(秒)

spark.deploy.defaultCores

 

若是沒有設置spark.cores.max,該參數設置Standalone集羣分配給應用程序的最大內核數,若是不設置,應用程序獲取全部的有效內核。注意在一個共享的集羣中,設置一個低值防止攫取了全部的內核,影響他人的使用

 

SPARK_WORKER_OPTS所支持的屬性有

屬性名

默認值

含義

spark.worker.cleanup.enabled

False

是否認期清理worker的應用程序工做目錄,只適用於Standalone模式,清理的時候將無視應用程序是否在運行

 spark.worker.cleanup.interval

1800

清理worker本地過時的應用程序工做目錄的時間間隔(秒)

spark.worker.cleanup.appDataTtl

7*24*3600

worker保留應用程序工做目錄的有效時間。該時間由磁盤空間、應用程序日誌、應用程序的jar包以及應用程序的提交頻率來設定

 

SPARK_DAEMON_JAVA_OPTS所支持的屬性有:

屬性名

含義

spark.deploy.recoveryMode

下面3個參數是用於配置zookeeper模式的master HA。設置爲ZOOKEEPER表示啓用master備用恢復模式,默認爲NONE

spark.deploy.zookeeper.url

zookeeper集羣URL

 spark.deploy.zookeeper.dir

zooKeeper保存恢復狀態的目錄,缺省爲/spark

spark.deploy.recoveryMode

設成FILESYSTEM啓用master單節點恢復模式,缺省值爲NONE

spark.deploy.recoveryDirectory

Spark保存恢復狀態的目錄

 

6.12 Spark on Yarn特有屬性

屬性名

默認值

含義

spark.yarn.applicationMaster.waitTries

10

RM等待Spark AppMaster啓動重試次數,也就是SparkContext初始化次數。超過這個數值,啓動失敗

spark.yarn.submit.file.replication

3

應用程序上傳到HDFS的文件的副本數

spark.yarn.preserve.staging.files

False

若爲true,在job結束後,將stage相關的文件保留而不是刪除

spark.yarn.scheduler.heartbeat.interval-ms

5000

Spark AppMaster發送心跳信息給YARN RM的時間間隔

spark.yarn.max.executor.failures

2倍於executor數

致使應用程序宣告失敗的最大executor失敗次數

spark.yarn.historyServer.address

 

Spark history server的地址(不要加http://)。這個地址會在Spark應用程序完成後提交給YARN RM,而後RM將信息從RM UI寫到history server UI上。

 

7 示例配置

    主要的配置文件均位於$SPARK_HOME/conf中,包括slave、spark-env.sh、spark-defaults.conf文件等。

7.1 slave文件

192.168.1.112 192.168.1.113 192.168.1.114
 

7.2 spark-env.sh文件

 
export JAVA_HOME="/export/servers/jdk1.6.0_25" #yarn

export HADOOP_HOME=/home/hadoop/hadoop-2.0.0-cdh4.5.0 export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

SPARK_EXECUTOR_INSTANCES=2 SPARK_EXECUTOR_CORES=1 SPARK_EXECUTOR_MEMORY=400M

SPARK_DRIVER_MEMORY=400M

SPARK_YARN_APP_NAME="Spark 1.0.0" #alone

SPARK_MASTER_WEBUI_PORT=8090 SPARK_WORKER_MEMORY=400M

SPARK_WORKER_CORES=1 SPARK_WORKER_INSTANCES=2 #Master HA

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:2181,192.168.1.118:2181,192.168.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark"
 
 

7.3 spark-defaults.conf文件

 
#history server

spark.eventLog.enabled true spark.eventLog.dir hdfs://namespace/user/hadoop/sparklogs  spark.yarn.historyServer.address    spark113:18080 #shuffle

spark.shuffle.consolidateFiles true #task

spark.task.cpus 1 spark.task.maxFailures 3 #scheduler type

spark.scheduler.mode FAIR

#security

park.authenticate true spark.authenticate.secret hadoop

spark.core.connection.auth.wait.timeout 1500 spark.ui.acls.enable true spark.ui.view.acls root,hadoop

#each executor used max memory

spark.executor.memory 400m

#spark on yarn

spark.yarn.applicationMaster.waitTries 5 spark.yarn.submit.file.replication 3 spark.yarn.preserve.staging.files false spark.yarn.scheduler.heartbeat.interval-ms 5000 #park standalone and on mesos

spark.cores.max 4
 
 

8 Spark SQL

    Spark支持Scala、Python等語言寫的腳本直接在Spark環境執行,更重要的是支持對Hive語句進行包裝後在Spark上運行。這就是Spark SQL。

8.1 相關配置

    配置的步驟比較簡單,把Hive的配置文件hive-site.xml直接放置到$SPARK_HOME的conf路徑下便可。若是是想在Spark集羣本地執行SQL的話,每一個對應的節點都要作一樣的配置。

8.2 運行SQL

    啓動bin目錄下的spark-shell腳本,依次執行以下語句:

 
val sc: SparkContext

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

import hiveContext._

hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

hql("LOAD DATA LOCAL INPATH '/examples /data.txt' INTO TABLE src")

hql("FROM src SELECT key, value").collect().foreach(println)
 
 

    上面的命令,分別是聲明SparkContext對象,利用hql方法執行Hive的SQL語句,在執行SQL語句的過程當中,能夠經過Hive的Cli客戶端進行查看相應操做的結果。

8.3 on yarn模式

    因爲spark-shell腳本是在本地執行的,若是想放到Yarn上去執行的話,可使用上面第4節中的spark-submit工具,這時候須要對須要輸入的sql語句進行包裝,將包裝類打包成jar文件,再提交。

    包裝類的代碼以下:

 
 1 package spark;  2  3 import java.util.List;  4  5 import org.apache.spark.SparkConf;  6 import org.apache.spark.api.java.JavaSparkContext;  7 import org.apache.spark.sql.api.java.Row;  8 import org.apache.spark.sql.hive.api.java.JavaHiveContext;  9 10 /** 11  * Description: 12  * Author: ITScott@163.com 13  * Date: 2014/7/15 14 */ 15 public class SparkSQL { 16 17 public static void main(String[] args) { 18 if(args.length != 2){ 19 System.out.println("usage: <applicationName> <sql statments>"); 20 System.exit(1); 21  } 22 23 String applicationName = args[0]; 24 String sql = args[1]; 25 26 SparkConf conf = new SparkConf().setAppName(applicationName); 27 JavaSparkContext sc = new JavaSparkContext(conf); 28 JavaHiveContext hiveContext = new JavaHiveContext(sc); 29 List<Row> results = hiveContext.hql(sql).collect(); 30 31 System.out.println("Sql is:" + sql + ", has been executed over."); 32 System.out.println("The result size is " + results.size() + ", they are:"); 33 for(int i=0; i<results.size(); i++){ 34  System.out.println(results.get(i).toString()); 35  } 36 37 System.out.println("Execute over ..."); 38  sc.stop(); 39 System.out.println("Stop over ..."); 40  } 41 42 }
 
 

    將其打包成jar文件spark-0.0.1-SNAPSHOT.jar,再使用spark-submit工具進行任務的提交,命令以下:

 
./spark-submit \ --class spark.SparkSQL \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 400m --executor-memory 400m --executor-cores 1 \ --jars /home/hadoop/spark-1.0.0/examples/libs/spark-core_2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/examples/libs/spark-hive_2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-core-3.2.2.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-rdbms-3.2.1.jar,/home/hadoop/hive-0.12.0/lib/mysql-connector-java-5.1.27-bin.jar --files /home/hadoop/spark-1.0.0/conf/hive-site.xml \ /home/hadoop/spark-1.0.0/examples/libs/spark-0.0.1-SNAPSHOT.jar "hiveTest" "CREATE TABLE IF NOT EXISTS test4 (key INT, value STRING)"
 
 

其中,--master參數指定的是yarn-cluster模式,固然也可使用yarn-client模式,至於區別,已經在上文說了;--class指定的是咱們包裝類的主類,見上文源碼;--jars是依賴的四個jar包;--files是指定的hive-site.xml配置文件,提交到Yarn中的Application在執行的時候,須要把此配置文件分發到每一個Executor上;最後的兩個參數,一個是Application的名稱,一個是運行的SQL語句。

    運行結束後,能夠到Spark HistoryServer中查看運行結果。html

相關文章
相關標籤/搜索