----本節內容-------java
1.遺留問題答疑node
1.1 典型問題解答mysql
1.2 知識點回顧es6
2.Spark編程基礎web
2.1 Spark開發四部曲面試
2.2 RDD典型實例sql
2.3 非RDD典型實例shell
3.問題解答apache
4.參考資料編程
---------------------
每一次答疑階段,我都會站在老師的角度去思考一下,若是是我,我應該怎麼回答,往往如此,不由嚇出一身冷汗。有些問題看答案確實挺容易的,但當本身做爲一個答疑者去思考,可能不同,由於快速確認一個答案的同時,你得否定不少的東西,腦海裏閃過不少的不肯定,都要快速否決。董先生無疑是值得敬佩的,這方面體現的很內行,可見內功深厚,他是一個優秀的架構師,也是一個很好的傳道者。
1.遺留問題答疑
1.1 典型問題解答
1)spark一般和hadoop與yarn一同使用,在學spark過程當中補充哪些知識點,能夠整理列一下
HDFS 我以爲必須都得清楚,HDFS的東西其實不少,向老師說的文件基本的存儲策略,讀取操做,基本命令行、API等等;
YARN基本架構,ResoureManger,NodeManger,ApplicationMaster,container等等
其實我想說的是,最好是都過一遍吧,這些都是內功,都是相互依賴的,誰也離不了誰,還有好比Linux知識,Java基本知識,網絡基本知識等等,都是實際工做問題中要用到的知識點,我保證,都會用的到。
2)一個partition只會在一個節點上把,老師沒有將container的概念麼?老師能把partion,container,executor,task再細講一下麼?
臨時有事,這裏沒有聽。這個其實挺基礎的,可是回答出來比較考驗基本功。我把問題衍生一下,包含以下問題:
a).partion的基本概念是什麼,partition和block是什麼關係?
partition是RDD內部並行計算的一個計算單元,RDD的數據集在邏輯上被劃分爲多個分片,每個分片叫作一個partition。block是HDFS存儲的最小單元
其實這裏還有一個問題,就是Spark爲何要進行分區,出於什麼緣由要搞一個分區的概念?,我以爲對partiion的理解很是有幫助,答案請參考博文《Spark你媽喊你回家吃飯-08 說說Spark分區原理及優化方法》
b).container的概念是什麼,yarn是如何基於container進行資源分配的?
c).Spark應用程序的執行過程是什麼?
還有兩個問題都是任務提交模式問題,瞭解spark程序運行的各類模式基本都能解答,這幾個問題在前面其實都已經有很明確的講解過。
1.2 知識點回顧
1)核心概念RDD
彈性分佈式數據集,把握如下幾點
· RDD是什麼數據集,他是一個描述數據在哪裏,對數據作什麼操做,以及操做之間的依賴關係的一個數據集
· 爲何是彈性,主要是說他的存儲,既能夠在內存,也能夠在磁盤
· 分佈式:分佈在集羣上
· 自動重構:失效夠能夠自動重構
2) 程序架構
application = 1個driver +多個executor
Excecutor = 多個task+cache
搞清楚driver作了什麼,Executor作了什麼,Task又作了什麼,如何配合
3)Yarn分佈式模式
a.client發送資源申請請求
b.RM發送通知NodeManger要調用資源,
c.NodeManger啓動AppAplicationMaster
d.AppAplicationMaster通知nodeManager啓動各個Executor
e.nodeManager啓動Executor
f.nodeManager向Driver回報實時執行狀況,也會告知AppAplicationMaster
2.Spark編程基礎
2.1 Spark開發四部曲
每個spark程序都有一個main,咱們稱之爲driver,driver將程序分解成多個task, task分發到多個executor,從而完成並行化。Spark程序開發的四部曲總結起來以下:
· 建立SparkContext對象
封裝了spark執行環境信息
· 建立RDD
可從scala結合或hadoop數據集上建立
· 在RDD上執行轉換和action
spark提供了多種轉換和action函數
· 返回結果
保存到HDFS中,或者直接打印出來
2.2 RDD典型實例
啓動spark shell:
bin/spark-shell --master spark://master01:7077 --driver-class-path/usr/local/tdr_hadoop/spark/spark-1.6.0-bin-hadoop2.6/lib/mysql-connector-java-5.1.40-bin.jar
1) 設置conf
val conf=new org.apache.spark.SparkConf().setAppName("xx");
conf.set("spark.app.name","test");
conf.set("spark.driver.allowMultipleContexts","true");
val sc= new org.apache.spark.SparkContext(conf);
#讀取hdfs上的文件,若是你在hdfs-site.xml中配置hdfs集羣的話
val a=sc.textFile("/tmp/test/core-site.xml");
a.collect().foreach(println);
#讀取hdfs上的文件
val a = sc.textFile("hdfs:///tmp/test/core-site.xml");
a.collect().foreach(println);
#讀取hdfs上的文件 ,這裏的端口是9000
vala=sc.textFile("hdfs://master02:9000/tmp/test/core-site.xml");
a.collect().foreach(println);
#讀取本地文件,這裏要注意,driver是在哪裏運行,若是driver不在本地運行,執行會報錯,找不到文件哦
val a=sc.textFile("file:///data/aa.txt");
a.collect().foreach(println);
報錯1:netty是spark通訊框架,通訊超時了。因此產生問題。把::1也就是ip6先註釋掉。可是仍是沒有解決問題,後來把master HA給回退過去了,又恢復了,多是HA的配置沒有配好致使
報錯2:不能出現多個sc
設置參數:conf.set("spark.driver.allowMultipleContexts","true");
2)建立RDD
· 從HDFS中讀取數據
inputRdd=sc.textFile("hdfs://master01:8020/xx/xx");
· 從本地讀取數據
inputRdd=sc.textFile("file://data/input")
· 從Hbase讀取數據
· 從自定義文件格式讀取數據
2.3 RDD典型實例
1.回憶經典概念
再次提一下RDD的transformation與Action,如今假設你去面試,面試官問你,簡單說說你對Transformation和Action的理解,我我的以爲應該回答如下幾個知識點,可能你有不少要說的,可是要整理好思路,一個個知識點回答。
1).先說概念
先說RDD概念,RDD彈性分佈式數據集。它記錄了2類東西一個是數據,一個是操做。數據,RDD記錄了數據是保存在內存仍是磁盤,並且能控制數據分區。操做,RDD記錄了數據之上的各類操做,以及操做之間的依賴關係。最後說一下特色,RDD具備容錯,並行,是spark的核心概念。
接着引入Transformation,Transformation是根據特定規則將一個RDD變換爲另外一個RDD,記錄了RDD的演變過程,最後結果是RDD。
Action:將數據彙集起來執行實際的操做,觸發全部job,而且保存計算結果,最後結果是數據。
2).後說聯繫
Transformation記錄RDD之間的轉換關係,爲Action的觸發記錄RDD邏輯關係。
3).最後說區別
·Transformation不觸發job,返回RDD。
·Action觸發job,執行計算,返回數據。
2.典型例子
1)Transformation例子
例子1:
------------------------
val nums =sc.parallelize(List(1,2,3),3);
nums.collect();
//返回前k個
nums.take(2);
nums.count();
//累加求和
nums.reduce(_+_);
//寫數據到hdfs
nums.saveAsTextFile("/tmp/dxc/week2/output1");
import org.apache.spark.rdd._;
nums.saveAsSequenceFile("/tmp/dxc/week2/output2");
nums.saveAsSequenceFile("/tmp/dxc/week2/output2");
//讀取sequenceFile格式文件
lines = sc.sequenceFile("/tmp/dxc/week2/output2/") ;
---------
說明:
collection:轉爲數組,單機概念,保存到driver裏面,很是危險,若是很是大
10G數據,2g內存,極可能打爆driver的內存,慎重
從這裏能夠看出,啓動了一個Executor,Executor上面起了2個task,由於是指定了2個分區,分區的個數決定了task的個數。
val nums =sc.parallelize(List(1,2,3),3);
nums.collect();
這裏指定3個分區,啓動了3個task。
保存到hdfs也是3個part,若是指定分區爲2,那就保存爲2個數據塊。
------------------------
例子2:
--------
valpets=sc.parallelize([("cat",1),("dog",2),("dog",2)]);
pets.reduceByKey(_+_);
pets.groupByKey();
pets.sortByKey();
pets.collect();
---------
說明:reduceByKey自動在本地進行combine
------------------------
例子3:
val line =sc.textFile("/tmp/test/core-site.xml");
val count=line.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey(_+_);
count.collect();
line.foreach(println);
例子4:
val left = sc.parallelize(List(("spark",1),("hadoop",1),("storm",1)))
val right=sc.parallelize(List(("scala",1),("hadoop",1),("spark",1)))
val result = left.join(right);
val result = left.cogroup(right);
result.collect();
result.foreach(println);
---------
說明:
cogroup:保證key惟一,key只佔一行,可用作一個笛卡爾積,cogroup結果以下
(scala,(CompactBuffer(),CompactBuffer(1)))
(spark,(CompactBuffer(1),CompactBuffer(1)))
(hadoop,(CompactBuffer(1),CompactBuffer(1)))
(storm,(CompactBuffer(1),CompactBuffer()))
------------------------
還有其餘的RDD操做能夠參考以前寫的博客。
2.3 非RDD典型實例
1.Accumulator
Accumulator是spark提供的累加器,顧名思義,該變量只可以增長。 只有driver能獲取到Accumulator的值(使用value方法),Task只能對其作增長操做(使用 +=)。你也能夠在爲Accumulator命名(不支持Python),這樣就會在spark web ui中顯示,能夠幫助你瞭解程序運行的狀況。用於監控和調試,記錄符合某類特徵的數據數據等。
-----------------
//在driver中定義
val accum = sc.accumulator(0, "Example Accumulator")
//在task中進行累加
sc.parallelize(1 to10,5).foreach(x=> accum += 1)
//在driver中輸出 accum.value
//結果將返回10 res: 10
---
說明:
在web ui中能夠看到Accumulators在task進行累加的具體狀況,driver將accumulator收集過來彙總
-----------------
3.廣播變量
Spark有兩種共享變量——累加器、廣播變量。廣播變量可讓程序高效地向全部工做節點發送一個較大的只讀值,以供一個或多個Spark操做使用。高效分發大對象,好比字典map,集合set,每一個executor一份,而不是每一個task一份.
Spark中分佈式執行的代碼須要傳遞到各個Executor的Task上運行。對於一些只讀、固定的數據(好比從DB中讀出的數據),每次都須要Driver廣播到各個Task上,這樣效率低下。廣播變量容許將變量只廣播(提早廣播)給各個Executor。該Executor上的各個Task再從所在節點的BlockManager獲取變量,而不是從Driver獲取變量,從而提高了效率。
一個Executor只須要在第一個Task啓動時,得到一份Broadcast數據,以後的Task都從本節點的BlockManager中獲取相關數據。
場景1:
val data = Set(1,2,3,4,5,6,7)
val rdd=sc.parallelize(1 to 6,2)
val result =rdd.map(_=>data.size)
result.collect();
場景2:
val data = Set(1,2,3,4,5,6,7)
val bddata=sc.broadcast(data)
val rdd=sc.parallelize(1 to 6,2)
val result =rdd.map(_=>data.size)
result.collect();
區別是:data數值的獲取方式,場景1 executor 每次都要從driver那裏獲取,要和交互7次,而場景2使用廣播變量,將data分發到executor,那麼driver和executor只須要交互一次。
4.cache的使用
val data=sc.textFile("/tmp/tbMonitordataResultHbase");
val a = data.count
println(a);
執行了29秒
val data=sc.textFile("/tmp/tbMonitordataResultHbase");
data.cache()
val a = data.count
println(a);
cache了100多M
val a = data.count
println(a);
data.persist(org.apache.spark.storage.StorageLevel. MEMORY_ONLY)
3.問題解答
1).spark計算時,數據會寫入磁盤麼,什麼條件下寫入磁盤
shuffle的時候都會寫到磁盤,帶shuffle的時候會寫入到磁盤,有哪算子呢?
2).報錯日誌裏的錯誤是,ERRORCoarsegRainedExcecutorBackend:RECEIVED SIGNAL TERM ,這個多是什麼緣由
虛擬內存超了,被yarn的nodemanager殺了,配置這個參數, 若是是1G內存,能夠達到1.1G的使用內存
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>10</value>
</property>
3).中文亂碼問題,出現中文亂碼和spark沒有關係
4).resourceManager的工做負擔重不重,實際部署時候是否有必要單獨到一臺機器上?
機器的規模,50或者100,能夠nodemanager合併
5).hbase預分區個數和spark過程當中reduce個數相同麼
設置,map階段和region同樣,reduce取決於task
4.參考資料
1.董西成ppt