相較MR快的緣由:其任務中間結果存在內存中,在迭代運算中尤其明顯,DAG的設置。java
架構說明:node
Dirver:負責節點通信,task分發,結果回收linux
Worker:資源管理的從節點shell
Master:資源調度的主節點apache
RDD由一系列partition組成編程
函數做用在partition上vim
RDD之間存在相互依賴數組
分區器做用在KV格式的RDD上瀏覽器
RDD提供了一系列最佳的計算位置,數據本地化,計算向數據移動緩存
RDD自己實際不存儲數據,爲了便於理解暫時認爲是存數據的
RDD的彈性來源於:partition的大小和數量可變
RDD的容錯來源於:RDD之間的依賴關係
RDD的分佈式來源於:partation分佈在不一樣的節點
KV格式的數據,RDD中的數據以二元組對象的形式存儲
scala:經過SparkConf對象設置參數,SparkContext接收SparkConf對象,生成上下文context,context的textFile方法載入數據源,返回第一個RDD,基於算子對RDD進行處理
val conf = new SparkConf()
conf.setMaster("local").setAppName("wc")
val sc = new SparkContext(conf)
//sc可以設置checkpiont目錄,日誌打印級別等
sc.setlogLevel("WARN");
sc.checkpoint("hdfs://node1:9000/spark/checkpoint");
val RDD: RDD[String] = sc.textFile("data")
java:SparkConf類設置參數,JavaSparkContext接收SparkConf對象,生成上下文context,context的textFile方法載入數據源,返回第一個RDD,基於算子對RDD進行處理
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("location").setAppName("wc");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
//可以將jsc對象轉爲sc對象,執行sc的方法
SparkContext sc = jsc.sc();
JavaRDD<String> data = jsc.textFile("data");
建立RDD的主要方法
textFile
JavaRDD<String> data = jsc.textFile("data");
parallelize 將容器轉爲RDD,可以執行分區數
JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1,2,3),3);
parallelizePair
JavaPairRDD<String, Integer> rdd1 =
jsc.parallelizePairs(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2)
));
安裝
tar -zxvf spark-2.3.1-bin-hadoop2.6.tgz
mv spark-2.3.1-bin-hadoop2.6.tgz spark-2.3.1 更名
配置
cd /opt/sxt/spark-2.3.1/conf 進入目錄
mv slaves.template slaves 修改slaves文件
vim slaves
node1 node2 node3
mv spark-env.sh.template spark-env.sh
vim spark-env.sh
JAVA_HOME=/usr/java/jdk1.8.0_11 配置java_home路徑
SPARK_MASTER_HOST=node1 master的ip
SPARK_MASTER_PORT=7077 提交任務的端口,默認是7077
SPARK_WORKER_CORES=1 每一個worker從節點可以支配的核數
SPARK_WORKER_MEMORY=2g 每一個worker從節點可以支配的內存
拷貝到其餘節點
cd /opt/sxt
scp -r spark-2.3.1 node2:`pwd`
scp -r spark-2.3.1 node3:`pwd`
啓動(在主節點上)
cd /opt/sxt/spark-2.3.1/sbin
./start-all.sh (關閉: ./stop-all.sh)
UI鏈接:node1:8080 (修改端口:start-master.sh)
修改spark配置文件spark-env.sh
添加hadoop配置路徑 HADOOP_CONF_DIR=/opt/sxt/hadoop-2.6.5/etc/hadoop
修改hadoop的配置,處理兼容
vim /opt/sxt/hadoop-2.6.5/etc/hadoop/yarn-site.xml
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
啓動
yarn:start-all.sh
spark:cd /opt/sxt/spark/sbin ./start-all.sh
修改spark-env.sh配置文件
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181
-Dspark.deploy.zookeeper.dir=/sparkmaster0821"
配置到各節點 scp spark-env.sh node2://opt/sxt/spark/conf
配置備用master節點,修改該節點的 spark-env.sh配置中的SPARK_MASTER_HOST
SPARK_MASTER_HOST=node2
HA啓動
主節點啓動進程:./start-all.sh
備用節點啓動master進程:./start-master.sh
cd /opt/sxt/spark-2.3.1/bin
#指定任務提交地址,指定提交方式,指定主類,指定jar包位置 ,其餘參數
./spark-submit
--master spark://node1:7077
--deploy-mode client
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar
100
執行流程
集羣啓動時,worker向master彙報資源
client提交任務,客戶端中啓動Driver進程
Driver向Master申請啓動Application的資源
Master啓動worker節點的excutor進程,excutor進程反向註冊到Driver上
Driver將task發送到worker的excutor進程中執行
Driver監控task,接收worker的執行結果
適用測試環境,多任務執行時Driver網卡壓力過大
cd /opt/sxt/spark-2.3.1/bin
#指定任務提交地址,指定提交方式,指定主類,指定jar包位置 ,其餘參數
./spark-submit
--master spark://node1:7077
--deploy-mode cluster
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar
100
#須要jar包存放值hdfs中,或者每臺節點上
執行過程
集羣啓動時,worker向master彙報資源
客戶端將任務提交到集羣,向Master請求啓動Driver
Master選擇隨機節點建立Driver
Driver啓動後向Master請求application的資源
Master啓動worker節點的excutor進程,excutor進程反向註冊到Driver上
Driver將task發送到worker的excutor進程中執行
Driver監控task,接收worker的執行結果
適用生產環境,客戶端只負責提交任務,Driver均衡分佈在集羣中下降單節點網卡壓力
cd /opt/sxt/spark-2.3.1/bin
#指定任務提交地址,指定提交方式,指定主類,指定jar包位置 ,其餘參數
./spark-submit
--master yarn–client
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar
100
執行流程
client將任務提交到RM(ResourceManager)中,在客戶端內建立Driver進程
RM隨機選取NM節點(NodeManager),建立AM進程(applicationMaster)
AM進程向RM請求applictaion的資源,RM響應一批container資源
AM根據資源啓動NM節點中的excutor進程
excutor反向註冊到Driver進程上,接收Driver發出的task並執行
Driver監控task,接收worker的執行結果
使用測試環境,客戶端過多Driver進程致使網卡壓力大
注:AM只執行啓動任務,不負責監控
cd /opt/sxt/spark-2.3.1/bin
#指定任務提交地址,指定提交方式,指定主類,指定jar包位置 ,其餘參數
./spark-submit
--master yarn–cluster
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar
100
執行流程
client向RM節點提交任務
RM選擇隨機NM節點建立AM(ApplicationMaster)
AM向RM請求application執行的資源,並接收響應的資源
AM根據響應,將請求發送到NM中啓動Excutor進程
Excutor反向註冊到AM所在節點,從AM接收task並執行
AM監控task,接收worker的執行結果
適用於生產環境,執行Driver功能的AM隨機分佈在NM中,單點網卡壓力下降
中止集羣任務命令:yarn application -kill applicationID
形容RDD之間的依賴關係,基於父子RDD之間的partition關聯來判斷
窄依賴
父RDD中的partition與子RDD中的partition爲一對一或一對多
不須要shuffle
寬依賴
父RDD中的partition指向多個子RDD中的partition,呈多對一關係
須要執行shuffle
一個application包括若干並行的job,一個觸發算子對應一個job,每一個job會被拆分爲多組相互關聯的任務組,這些任務組就是stage
stage劃分流程
spark根據RDD之間依賴關係構建DAG有向無環圖,並提交給DAGScheduler
DAGScheduler將DAG劃分若干相互依賴的stage,劃分依據是RDD之間的寬窄依賴
逆向切分,沿DAG從後向前,遇到寬依賴劃分一個出stage
stage之間存在並行和串行兩種關聯
stage內部由一組並行的task構成,stage內部的並行度由最後一個RDD的分區數決定
task被送到executor上執行的工做單元
RDD的分區數主要在如下狀況中能夠指定
讀取數據時指定
具有寬依賴RDD位置
stage內部計算採用管道計算模式
每一個task至關於一個管道,同一時間一次處理一條數據
task以高階函數 f4(f3(f2(f1(x)))) 的形式處理stage內多個RDD的代碼邏輯
一個stage內部的task能夠具有不一樣邏輯
管道中數據落地的環節
指定持久化的RDD節點
執行shuffle write 的過程當中
spark集羣啓動,worker節點向Master節點彙報資源,Master掌握了集羣的資源。
客戶端向spark提交application,Master根據app的RDD依賴關係構建DAG有向無環圖。
Master建立Driver進程,Driver進程中建立DAGScheduler和TaskScheduler調度器。
TaskScheduler建立後向Master節點請求app的資源,Master根據請求在worker節點上啓動Executor進程,Executor進程反向註冊到Driver進程中。
DAGScheduler根據DAG的寬窄依賴劃分stage,將stage封裝TaskSet爲交給TaskScheduler,TaskSet中封裝了stage中並行的task。
TaskScheduler遍歷TaskSet,將task分配給Executor執行,即發送到Executor中的線程池ThreadPool中
TaskScheduler監控task執行,Executor的ThreadPool狀態會響應給TaskScheduler。
監控過程
若task執行失敗,TaskScheduler從新發送task到Executor中,默認重試3次
若task重試失敗,則對應stage執行失敗,DAGScheduler從新發送stage到TaskScheduler中從新執行,上述重試默認4次。若重試失敗,則job失敗,app失敗
TaskScheduler還不然重試執行緩慢(straggling)的task,TaskScheduler會發送新的task並行執行。關於執行結果採用推測執行機制,兩個task以先執行完的結果爲準,默認是關閉的,配置屬性爲spark.speculation。推測執行機制不適應ETL等數據插入的操做(數據衝恢復插入)和數據傾斜的狀況
app的資源申請是粗粒度的,application申請的資源,須要等待所有task執行完畢纔會釋放。優勢是:不須要每一個task反覆請求資源,任務執行效率高;缺點:資源沒法充分利用。
注:MR使用細粒度資源申請方法,task本身申請資源執行任務,每一個task執行完畢釋放資源,資源充分利用,但task啓動變慢。
靜態內存管理
60% spark.storage.memoryFraction 存儲內存分區
90%存儲+序列化
80% RDD存儲+廣播變量
20% 解壓序列化
10% 預留OOM
20% spark.shuffle.memoryFraction shuffle內存分區
80% shuffle聚合內存
20% OOM預留
剩餘 task計算
統一內存管理
300M JVM
75% spark.memory.storageFraction 存儲內存分區
RDD存儲+廣播變量
shuffle聚合
二者動態調用
剩餘 執行task
注:spark1.6之後默認統一內存管理,設置spark.memory.useLegacyMode置爲true,修改成靜態內存管理
提交任務的參數 ./spark-submit ...
--master MASTER_URL
spark://host:port mesos://host:port yarn local (默認)
--deploy-mode DEPLOY_MODE client(默認)/cluster
--class CLASS_NAME 主類(包+類)
--name NAME 任務名
--jars JARS 依賴jar包,逗號分隔
--files FILES 相關文件
--conf PROP=VALUE 配置屬性
--driver-memory Driver的內存,默認1024M
--executor-memory executor的最大內存,默認1G
適用 standalone + cluster
--driver-cores driver的核數,默認1
適用 standalone/Mesos + cluster
--supervise 失敗重啓Driver
適用 standalone and Mesos
--total-executor-cores executor的總核數
適用 standalone 或 YARN
--executor-cores 每一個executor的核數,默認1
經過--total-executor-cores和--executor-cores限定executor的數量
使用yarn
--driver-cores driver的核數
--queue 資源隊列名,默認default
--num-executors 指定executors 數量
Spark自帶的一個快速原型開發工具,支持scala語言交互式編程。
啓動命令 ./spark-shell --master spark://node1:7077 (win/linux)
執行任務 sc爲默認建立的上下文環境
#指定hdfs的文件進程wordcount ,shsxt是hdfs的集羣名
sc.textFile("hdfs://shsxt:9000/spark/aaa.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.foreach(println)
瀏覽器訪問啓動spark的主機 node1:8080 範圍UI頁面
能夠查看當前運行狀況和歷史運行狀況
history+UI
對應須要臨時保存日誌:在shell啓動或命令提交時配置以下屬性
--conf spark.eventLog.enabled=true 保存日誌
--conf spark.eventLog.dir=hdfs://shsxt/spark/test 指定hdfs存放路徑,shsxt是hdfs集羣名
對於須要對全部任務都須要保存日誌,須要配置spark-default.conf
配置文件中加入,須要複製到全部節點
#開啓記錄事件日誌的功能
spark.eventLog.enabled true
#設置事件日誌存儲的目錄,shsxt是hdfs的集羣名
spark.eventLog.dir hdfs://shsxt/spark/log
#日誌優化選項,壓縮日誌
spark.eventLog.compress true
#歷史日誌在hdfs的目錄,shsxt是hdfs的集羣名
spark.history.fs.logDirectory hdfs://shsxt/spark/log
在sbin中啓動HistoryServer:./start-history-server.sh
UI地址爲啓動HistoryServer節點+端口: node4:18080
廣播變量
spark代碼,RDD之外部分由Driver端執行,RDD之內的部分executor端的各task中執行。
爲了不向每一個task重複發送公用的變量,使用廣播變量。
Driver將廣播變量發送到executor端中,executor中的task公用一個變量。
//scala
val mybroadcase: Broadcast[String] =sc.broadcast("廣播變量")
sc.textFile("/data").foreach(x=>{
print( mybroadcase.value )
})
//java
Broadcast<String> xxx = jsc.broadcast("xxx");
jsc.textFile("/data").foreach(new VoidFunction<String>(){
計數器
在廣播變量的基礎上,不借助count算子,只能實現execute層面的變量計數,沒法實現全局的事件計數。
計數器實現:RDD外定義計數器(Driver中),在RDD內進行累加(Executor中),task完成彙總到Driver中實現計數。
注:計數結果必須在Driver端解析,計數器默認從0開始計數,每一個Executor獨立計數後彙總到Driver累加。
//scala
val count = sc.longAccumulator
sc.textFile("/data").foreach(x=>{
count.add(1)
})
val num: lang.Long =count.value
//java
SparkContext sc = jsc.sc();
LongAccumulator count = sc.longAccumulator();
jsc.textFile("/data").foreach(new VoidFunction<String>(){
在寬依賴的RDD之間存在shuffle過程,將父RDD的分區的數據shuffle進入子RDD中不一樣的分區。相似reducebykey算子,將相同ke進入一個分區進行處理。
Shuffle Writer:上游stage的map task保證當前分區中相同的key寫入一個分區文件中
Shuffle Read:下游stage的reduce task在全部機器中獲取屬於本身分區的分區文件
Spark2以後使用SortShuffle,1.2以前使用HashShuffle,二者之間並用
普通機制
每一個map task處理後數據經過hash分區器寫入不一樣的buffer(默認32K)
每一個buffer對應一個磁盤小文件,每一個buffer或小磁盤文件對應一個reduce task
reduce task拉取對應的磁盤小文件的數據
小文件數量:map task數*reduce task數
小文件過多致使:內存建立過多對象,容易OOM;拉取過多,通信波動和故障易致使拉取失敗(shuffle file cannot find) ,這種失敗須要DAGScheduler重試stager,容易致使任務失敗。
合併機制
一個executor進程中,全部map task公用一組buffer,減小磁盤小文件的數量
小文件數量:executor數*reduce task數
普通機制
map task 將計算結果寫入本身的內存數據結構(默認5M)
shuffle設置定時器對內存數據結構的容量進行監控,若監控到大小達到閾值,向內存數據結構分配一倍的容量,直到節點剩餘容量不夠分配。此時啓動內存數據結構溢寫
內存數據結構對內部數據排序分區,寫出到磁盤小文件,溢寫以batch形式去寫,一個batch對應1w條數據,batch做爲寫出緩存
maptask完畢後,磁盤小文件合併爲:一個數據文件+一個索引文件
reduce task經過索引拉取對應部分的數據
生成 2*map task數的文件
bypass
取消排序,直接將數據寫出都小文件中
bypass的觸發條件:reduce task數小於 spark.shuffle.sort.bypassMergeThreshold的參數值(默認200),使得小批量數據不排序
MapOutputTracker管理磁盤小文件
MapOutputTrackerMaster主,driver進程中
MapOutputTrackerWorker從,Executor進程中
BlockManager 塊管理者
分爲主從架構
BlockManagerMaster,driver進程中,在使用廣播變量和緩存數據時,BlockManagerSlave執行
BlockManagerWorker,Executor進程中,做爲從節點
DiskStore 磁盤管理
MemoryStore 內存管理
ConnectionManager 負責鏈接其餘BlockManagerWorker
BlockTransferService 負責數據傳輸
尋址流程
每一個maptask執行完畢將小文件地址封裝到MpStatus對象,經過MapOutputTrackerWorker向Driver的MapOutputTrackerMaster彙報
全部maptask執行完畢,reducetask執行前,Excutor的MapOutputTrackerWorker向Driver端的MapOutputTrackerMaster請求磁盤小文件地址數據
Excutor的ConnectionManager鏈接其餘節點的ConnectionManager,再借助BlockTransferService進行數據傳輸
BlockTransferService一次啓動5個task拉取數據,一個task拉取最多48M數據
reduce的OOM優化
減小每次數據的拉取量,spark.reducer.maxSizeInFlight (64M)
增大shuffle的內存分配
提升executor總內存
優化方式
代碼指定配置信息,優先級最高,硬編碼不推薦
任務提交命令中 --conf 後指定參數 (推薦)
在spark-default.conf,適用全部任務,優先級最低,範圍太廣不推薦
全部優化屬性:
spark.shuffle.file.buffer 默認32k
參數說明:該參數用於設置shuffle write task的BufferedOutputStream的buffer緩衝大小。將數據寫到磁盤文件以前,會先寫入buffer緩衝中,待緩衝寫滿以後,纔會溢寫到磁盤。
調優建議:內存資源充足能夠適當增長(好比64k),從而減小shuffle write過程當中溢寫次數,減小磁盤IO次數。性能提高範圍1%~5%。
spark.reducer.maxSizeInFlight 默認48m
參數說明:設置shuffle read task的buffer緩衝大小,而這個buffer緩衝決定了每次可以拉取多少數據。
調優建議:內存充足能夠適當增長(好比96m),從而減小拉取數據的次數,減小網絡IO,進而提高性能。性能提高範圍1%~5%。
spark.shuffle.io.maxRetries 默認3
參數說明:shuffle read task從shuffle write task所在節點拉取屬於本身的數據時,失敗重試的最大次數。若是3次之後未成功,致使shuffle file not find錯誤和stage執行失敗。
調優建議:對於那些包含了特別耗時的shuffle操做,增長重試最大次數(好比60次),以免因爲JVM的full gc或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數能夠大幅度提高穩定性。 shuffle file not find taskScheduler不負責重試task,由DAGScheduler負責重試stage
spark.shuffle.io.retryWait 默認值:5s 參數說明:具體解釋同上,該參數表明了每次重試拉取數據的等待間隔,默認是5s。 調優建議:建議加大間隔時長(好比60s),以增長shuffle操做的穩定性。
spark.shuffle.memoryFraction 默認值:0.2 參數說明:該參數表明了Executor內存中,分配給shuffle read task進行聚合操做的內存比例,默認是20%。 調優建議:在資源參數調優中講解過這個參數。若是內存充足,並且不多使用持久化操做,建議調高這個比例,給shuffle read的聚合操做更多內存,以免因爲內存不足致使聚合過程當中頻繁讀寫磁盤。在實踐中發現,合理調節該參數能夠將性能提高10%左右。
spark.shuffle.manager 默認值:sort|hash 參數說明:該參數用於設置ShuffleManager的類型。Spark 1.5之後,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默認選項,可是Spark 1.2以及以後的版本默認都是SortShuffleManager了。tungsten-sort與sort相似,可是使用了tungsten計劃中的堆外內存管理機制,內存使用效率更高。 調優建議:因爲SortShuffleManager默認會對數據進行排序,所以若是你的業務邏輯中須要該排序機制的話,則使用默認的SortShuffleManager就能夠;而若是你的業務邏輯不須要對數據進行排序,那麼建議參考後面的幾個參數調優,經過bypass機制或優化的HashShuffleManager來避免排序操做,同時提供較好的磁盤讀寫性能。這裏要注意的是,tungsten-sort要慎用,由於以前發現了一些相應的bug。
spark.shuffle.sort.bypassMergeThreshold 默認值:200 參數說明:當ShuffleManager爲SortShuffleManager時,若是shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程當中不會進行排序操做,而是直接按照未經優化的HashShuffleManager的方式去寫數據,可是最後會將每一個task產生的全部臨時磁盤文件都合併成一個文件,並會建立單獨的索引文件。 調優建議:當你使用SortShuffleManager時,若是的確不須要排序操做,那麼建議將這個參數調大一些,大於shuffle read task的數量。那麼此時就會自動啓用bypass機制,map-side就不會進行排序了,減小了排序的性能開銷。可是這種方式下,依然會產生大量的磁盤文件,所以shuffle write性能有待提升。
spark.shuffle.consolidateFiles 默認值:false 參數說明:若是使用HashShuffleManager,該參數有效。若是設置爲true,那麼就會開啓consolidate機制,會大幅度合併shuffle write的輸出文件,對於shuffle read task數量特別多的狀況下,這種方法能夠極大地減小磁盤IO開銷,提高性能。 調優建議:若是的確不須要SortShuffleManager的排序機制,那麼除了使用bypass機制,還能夠嘗試將spark.shffle.manager參數手動指定爲hash,使用HashShuffleManager,同時開啓consolidate機制。在實踐中嘗試過,發現其性能比開啓了bypass機制的SortShuffleManager要高出10%~30%。
PV-UV
數據結構:用戶ip+地址+時間+電話+會話+域名 146.1.30.98 河南 2017-10-10 1512012307080 5263761960810313758 www.mi.com View
pv:每一個網站的當日訪問數
uv:每一個網站的當日獨立訪客,以會話爲準
//配置再也不重複,提供RDD0,值爲字符串格式的數據 val RDD1: RDD[(String, String)] = RDD0.map(x => { val y = x.split("\\s+") //會話+域名 (y(5), y(4)) }) //groupByKey取出相同的網站的數據 set去重,count計數 val RDD2: RDD[(String, Iterable[String])] = RDD1.groupByKey() RDD2.foreach(x => { println("網站:"+x._1) val it = x._2.iterator var count=0 val set = Set[String]() while(it.hasNext){ count+=1 set.add(it.next()) } println("PV:"+count+" UV:"+ set.size ) })
二次排序
設置數據的封裝類,該實現Serializable與Comparable<當前類>
例子中比較 2個數字組成的數據
public class SecondSortKey implements Serializable, Comparable<SecondSortKey> { //序列化版本 private static final long serialVersionUID = 1L; private int first; private int second; //set,get public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } //構造器 public SecondSortKey(int first, int second) { super(); this.first = first; this.second = second; } //重寫比較方法,返回+-0數字 @Override public int compareTo(SecondSortKey o1) { //對兩組數據分別比較得出比價結果 if (getFirst() - o1.getFirst() == 0) { return getSecond() - o1.getSecond(); } else { return getFirst() - o1.getFirst(); } } }
數據載入RDD是,轉爲KV結構的RDD,key爲封裝類,value爲實際數據
使用sortByKey算子排序,獲得所需排序效果
val result: RDD[(SecondSortKey, String)] =RDD.map(x => { val a = x.split(" ") val b=new SecondSortKey(a(1).toInt, a(2).toInt) new Tuple2(b,x) }).sortByKey()
分組取topN
//獲取一組數字中最大的3個值,使用一個3元素的數組,比較並寫入數據 Integer[] top3 = new Integer[3]; //數據經過迭代器輸出,分別填入數組的三個位置,其中處理了最早3個置入的數據和後面添加刪除數據的過程 while (iterator.hasNext()) { Integer x = iterator.next(); for (int i = 0; i < top3.length; i++) { if(top3[i] == null){ top3[i] = x; break; }else if(score > top3[i]){ for (int j = 2; j > i; j--) { top3[j] = top3[j-1]; } top3[i] = x; break; } } }
若設置 --total-executor-core 參數,則使用指定的核數,若未設置則壓榨集羣性能,使用全部剩餘核數
若沒有設置 --executor-core 參數 ,則每一個worker節點默認爲這個application只開啓一個executor進程。若設置了,一個worker可設置多個executor。集羣會進一步考慮內存因素
DAGScheduler類的getMessingParentStages()方法是切割job劃分stage,其中使用了遞歸的方式實現
DAGScheduler將stage封裝後發送到TaskScheduler中,TaskScheduler遍歷stage中的task併發送到executor中執行