【Spark深刻學習 -12】Spark程序設計與企業級應用案例02

----本節內容-------java

1.遺留問題答疑node

1.1 典型問題解答mysql

1.2 知識點回顧es6

2.Spark編程基礎web

2.1 Spark開發四部曲面試

2.2 RDD典型實例sql

2.3 非RDD典型實例shell

3.問題解答apache

4.參考資料編程

---------------------

 

 

每一次答疑階段,我都會站在老師的角度去思考一下,若是是我,我應該怎麼回答,往往如此,不由嚇出一身冷汗。有些問題看答案確實挺容易的,但當本身做爲一個答疑者去思考,可能不同,由於快速確認一個答案的同時,你得否定不少的東西,腦海裏閃過不少的不肯定,都要快速否決。董先生無疑是值得敬佩的,這方面體現的很內行,可見內功深厚,他是一個優秀的架構師,也是一個很好的傳道者。

1.遺留問題答疑

 

1.1 典型問題解答

1)spark一般和hadoop與yarn一同使用,在學spark過程當中補充哪些知識點,能夠整理列一下

HDFS 我以爲必須都得清楚,HDFS的東西其實不少,向老師說的文件基本的存儲策略,讀取操做,基本命令行、API等等;

YARN基本架構,ResoureManger,NodeManger,ApplicationMaster,container等等

其實我想說的是,最好是都過一遍吧,這些都是內功,都是相互依賴的,誰也離不了誰,還有好比Linux知識,Java基本知識,網絡基本知識等等,都是實際工做問題中要用到的知識點,我保證,都會用的到。

2)一個partition只會在一個節點上把,老師沒有將container的概念麼?老師能把partion,container,executor,task再細講一下麼?

臨時有事,這裏沒有聽。這個其實挺基礎的,可是回答出來比較考驗基本功。我把問題衍生一下,包含以下問題:

a).partion的基本概念是什麼,partition和block是什麼關係?

partition是RDD內部並行計算的一個計算單元,RDD的數據集在邏輯上被劃分爲多個分片,每個分片叫作一個partition。block是HDFS存儲的最小單元

其實這裏還有一個問題,就是Spark爲何要進行分區,出於什麼緣由要搞一個分區的概念?,我以爲對partiion的理解很是有幫助,答案請參考博文《Spark你媽喊你回家吃飯-08 說說Spark分區原理及優化方法》

b).container的概念是什麼,yarn是如何基於container進行資源分配的?

c).Spark應用程序的執行過程是什麼?

還有兩個問題都是任務提交模式問題,瞭解spark程序運行的各類模式基本都能解答,這幾個問題在前面其實都已經有很明確的講解過。

 

1.2 知識點回顧

1)核心概念RDD

彈性分佈式數據集,把握如下幾點

· RDD是什麼數據集,他是一個描述數據在哪裏,對數據作什麼操做,以及操做之間的依賴關係的一個數據集

· 爲何是彈性,主要是說他的存儲,既能夠在內存,也能夠在磁盤

· 分佈式:分佈在集羣上

· 自動重構:失效夠能夠自動重構

 

2) 程序架構

application = 1個driver +多個executor

Excecutor = 多個task+cache

搞清楚driver作了什麼,Executor作了什麼,Task又作了什麼,如何配合

 

3)Yarn分佈式模式

a.client發送資源申請請求

b.RM發送通知NodeManger要調用資源,

c.NodeManger啓動AppAplicationMaster

d.AppAplicationMaster通知nodeManager啓動各個Executor

e.nodeManager啓動Executor

f.nodeManager向Driver回報實時執行狀況,也會告知AppAplicationMaster

 

 

2.Spark編程基礎

2.1 Spark開發四部曲

每個spark程序都有一個main,咱們稱之爲driver,driver將程序分解成多個task, task分發到多個executor,從而完成並行化。Spark程序開發的四部曲總結起來以下:

· 建立SparkContext對象

封裝了spark執行環境信息

· 建立RDD

可從scala結合或hadoop數據集上建立

· 在RDD上執行轉換和action

spark提供了多種轉換和action函數

· 返回結果

保存到HDFS中,或者直接打印出來

 

2.2 RDD典型實例

啓動spark shell:

bin/spark-shell --master spark://master01:7077 --driver-class-path/usr/local/tdr_hadoop/spark/spark-1.6.0-bin-hadoop2.6/lib/mysql-connector-java-5.1.40-bin.jar

 

1) 設置conf

val conf=new org.apache.spark.SparkConf().setAppName("xx");

conf.set("spark.app.name","test");

conf.set("spark.driver.allowMultipleContexts","true");

val sc= new org.apache.spark.SparkContext(conf);

#讀取hdfs上的文件,若是你在hdfs-site.xml中配置hdfs集羣的話

val a=sc.textFile("/tmp/test/core-site.xml");

a.collect().foreach(println);

#讀取hdfs上的文件

val a = sc.textFile("hdfs:///tmp/test/core-site.xml");

a.collect().foreach(println);

#讀取hdfs上的文件 ,這裏的端口是9000

vala=sc.textFile("hdfs://master02:9000/tmp/test/core-site.xml");

a.collect().foreach(println);

 

#讀取本地文件,這裏要注意,driver是在哪裏運行,若是driver不在本地運行,執行會報錯,找不到文件哦

val a=sc.textFile("file:///data/aa.txt");

a.collect().foreach(println);

 

報錯1:netty是spark通訊框架,通訊超時了。因此產生問題。把::1也就是ip6先註釋掉。可是仍是沒有解決問題,後來把master HA給回退過去了,又恢復了,多是HA的配置沒有配好致使

 

報錯2:不能出現多個sc

設置參數:conf.set("spark.driver.allowMultipleContexts","true");

 

2)建立RDD

· 從HDFS中讀取數據

inputRdd=sc.textFile("hdfs://master01:8020/xx/xx");

· 從本地讀取數據

inputRdd=sc.textFile("file://data/input")

· 從Hbase讀取數據

 

· 從自定義文件格式讀取數據

 

 

 

2.3 RDD典型實例

1.回憶經典概念

再次提一下RDD的transformation與Action,如今假設你去面試,面試官問你,簡單說說你對Transformation和Action的理解,我我的以爲應該回答如下幾個知識點,可能你有不少要說的,可是要整理好思路,一個個知識點回答。

1).先說概念

先說RDD概念,RDD彈性分佈式數據集。它記錄了2類東西一個是數據,一個是操做。數據,RDD記錄了數據是保存在內存仍是磁盤,並且能控制數據分區。操做,RDD記錄了數據之上的各類操做,以及操做之間的依賴關係。最後說一下特色,RDD具備容錯,並行,是spark的核心概念。

接着引入Transformation,Transformation是根據特定規則將一個RDD變換爲另外一個RDD,記錄了RDD的演變過程,最後結果是RDD。

Action:將數據彙集起來執行實際的操做,觸發全部job,而且保存計算結果,最後結果是數據。

2).後說聯繫

Transformation記錄RDD之間的轉換關係,爲Action的觸發記錄RDD邏輯關係。

3).最後說區別

·Transformation不觸發job,返回RDD。

·Action觸發job,執行計算,返回數據。

 

2.典型例子

1)Transformation例子

例子1:

------------------------

val nums =sc.parallelize(List(1,2,3),3);

nums.collect();

//返回前k個

nums.take(2);

nums.count();

//累加求和

nums.reduce(_+_);

//寫數據到hdfs

nums.saveAsTextFile("/tmp/dxc/week2/output1");

import org.apache.spark.rdd._;

nums.saveAsSequenceFile("/tmp/dxc/week2/output2");

nums.saveAsSequenceFile("/tmp/dxc/week2/output2");

//讀取sequenceFile格式文件

lines = sc.sequenceFile("/tmp/dxc/week2/output2/") ;

---------

說明:

collection:轉爲數組,單機概念,保存到driver裏面,很是危險,若是很是大

10G數據,2g內存,極可能打爆driver的內存,慎重

 

 

從這裏能夠看出,啓動了一個Executor,Executor上面起了2個task,由於是指定了2個分區,分區的個數決定了task的個數。

 

val nums =sc.parallelize(List(1,2,3),3);

nums.collect();

 

這裏指定3個分區,啓動了3個task。

 

保存到hdfs也是3個part,若是指定分區爲2,那就保存爲2個數據塊。

 

------------------------

例子2:

--------

valpets=sc.parallelize([("cat",1),("dog",2),("dog",2)]);

pets.reduceByKey(_+_);

pets.groupByKey();

pets.sortByKey();

pets.collect();

---------

說明:reduceByKey自動在本地進行combine

------------------------

例子3:

val line =sc.textFile("/tmp/test/core-site.xml");

val count=line.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey(_+_);

count.collect();

line.foreach(println);

 

 

例子4:

val left = sc.parallelize(List(("spark",1),("hadoop",1),("storm",1)))

val right=sc.parallelize(List(("scala",1),("hadoop",1),("spark",1)))

val result = left.join(right);

val result = left.cogroup(right);

result.collect();

result.foreach(println);

---------

說明:

cogroup:保證key惟一,key只佔一行,可用作一個笛卡爾積,cogroup結果以下

(scala,(CompactBuffer(),CompactBuffer(1)))

(spark,(CompactBuffer(1),CompactBuffer(1)))

(hadoop,(CompactBuffer(1),CompactBuffer(1)))

(storm,(CompactBuffer(1),CompactBuffer()))

------------------------

還有其餘的RDD操做能夠參考以前寫的博客。

2.3 非RDD典型實例

1.Accumulator

Accumulator是spark提供的累加器,顧名思義,該變量只可以增長。 只有driver能獲取到Accumulator的值(使用value方法),Task只能對其作增長操做(使用 +=)。你也能夠在爲Accumulator命名(不支持Python),這樣就會在spark web ui中顯示,能夠幫助你瞭解程序運行的狀況。用於監控和調試,記錄符合某類特徵的數據數據等。

-----------------

//在driver中定義

val accum = sc.accumulator(0, "Example Accumulator") 

//在task中進行累加 

sc.parallelize(1 to10,5).foreach(x=> accum += 1) 

//在driver中輸出 accum.value 

//結果將返回10 res: 10

 

 

 

---

說明:

在web ui中能夠看到Accumulators在task進行累加的具體狀況,driver將accumulator收集過來彙總

 

-----------------

3.廣播變量

Spark有兩種共享變量——累加器、廣播變量。廣播變量可讓程序高效地向全部工做節點發送一個較大的只讀值,以供一個或多個Spark操做使用。高效分發大對象,好比字典map,集合set,每一個executor一份,而不是每一個task一份.

Spark中分佈式執行的代碼須要傳遞到各個Executor的Task上運行。對於一些只讀、固定的數據(好比從DB中讀出的數據),每次都須要Driver廣播到各個Task上,這樣效率低下。廣播變量容許將變量只廣播(提早廣播)給各個Executor。該Executor上的各個Task再從所在節點的BlockManager獲取變量,而不是從Driver獲取變量,從而提高了效率。

一個Executor只須要在第一個Task啓動時,得到一份Broadcast數據,以後的Task都從本節點的BlockManager中獲取相關數據。

 

 

場景1:

val data = Set(1,2,3,4,5,6,7)

val rdd=sc.parallelize(1 to 6,2)

val result =rdd.map(_=>data.size)

result.collect();

場景2:

val data = Set(1,2,3,4,5,6,7)

val bddata=sc.broadcast(data)

val rdd=sc.parallelize(1 to 6,2)

val result =rdd.map(_=>data.size)

result.collect();

區別是:data數值的獲取方式,場景1 executor 每次都要從driver那裏獲取,要和交互7次,而場景2使用廣播變量,將data分發到executor,那麼driver和executor只須要交互一次。

4.cache的使用

val data=sc.textFile("/tmp/tbMonitordataResultHbase");

val a = data.count

println(a);

 

 

 

執行了29秒

 

 

val data=sc.textFile("/tmp/tbMonitordataResultHbase");

data.cache()

val a = data.count

println(a);

 

 

cache了100多M

val a = data.count

println(a);

data.persist(org.apache.spark.storage.StorageLevel. MEMORY_ONLY)

 

 

3.問題解答

1).spark計算時,數據會寫入磁盤麼,什麼條件下寫入磁盤

 shuffle的時候都會寫到磁盤,帶shuffle的時候會寫入到磁盤,有哪算子呢?

2).報錯日誌裏的錯誤是,ERRORCoarsegRainedExcecutorBackend:RECEIVED SIGNAL TERM ,這個多是什麼緣由

虛擬內存超了,被yarn的nodemanager殺了,配置這個參數, 若是是1G內存,能夠達到1.1G的使用內存

<property>

<name>yarn.nodemanager.vmem-pmem-ratio</name>

<value>10</value>

</property>

 

3).中文亂碼問題,出現中文亂碼和spark沒有關係

4).resourceManager的工做負擔重不重,實際部署時候是否有必要單獨到一臺機器上?

  機器的規模,50或者100,能夠nodemanager合併

5).hbase預分區個數和spark過程當中reduce個數相同麼

設置,map階段和region同樣,reduce取決於task

 

 

 

4.參考資料

1.董西成ppt

相關文章
相關標籤/搜索