spark基礎知識(1)

1、大數據架構php

 

併發計算:html

並行計算:python

不多會說併發計算,通常都是說並行計算,可是並行計算用的是併發技術。併發更偏向於底層。併發一般指的是單機上的併發運行,經過多線程來實現。而並行計算的範圍更廣,他是散佈到集羣上的分佈式計算。linux

Spark內存計算比hadoop快100倍,磁盤計算快10倍,在worker節點主要基於內存進行計算,避免了沒必要要的磁盤io。web

2、Spark模塊算法

Spark是沒有分佈式存儲的,必須藉助hadoop的HDFS等。資源管理工具自帶的是Standalone也支持hadoop的YARN。Spark最底層是SparkCore(內核位於執行引擎之上,全部功能都在其上進行構建),SparkCore集成了最原始的計算功能,基於SparkCore這種底層的API上層主要有四個,Spark SQL, Spark streaming, Mlib, Graphxspring

 

 

3、Spark部署三種模式shell

1. standalone獨立模式apache

2. Hadoop yarnubuntu

3. Spark in mapreduce

Spark獨立集羣模式

獨立部署,實質就是手動經過腳本一一啓動所須要的進程,例如master進程,work進程。

在spark/bin目錄下,提供了相應腳本,

start-master.sh // 啓動master進程

start-slave.sh  // 啓動單個work進程

start-slaves.sh // 啓動全部work進程

start-all.sh    // 啓動全部進程,在master上執行

stop-xxx.sh   //  對應的中止進程

Spark獨立集羣配置

① [spark-env.sh]

   Spark/conf/spark-env.sh

② [slaves]

   S02

   S03

   S04

③ spark-default.conf 也不用動

④ 分發文件

   cd /soft

   xsync.sh spark-2.1.0-bin-hadoop2.7

   xsync.sh spark

   su root

   xsync.sh /etc/profile

⑤ 重啓電腦

⑥ 啓動spark集羣

[s201]

   ./start-master.sh

   [s202, s203, s204]

   ./start-slave.sh spark://s201:7077

⑦ 進入webUI界面

http://s201:8080/

Spark-shell鏈接到spark集羣的模式

Spark-shell –master spark://s201:7077  #standalone模式,須要啓動進程(前提是你已經搭建好了集羣)可是缺點是,若是你在master上運行本地文件,必須傳到其餘work節點上,而若是是hadoop的hdfs文件就全部worker均可以訪問

Spark-shell –master spark://ip:port  #mesos模式

Yarn                            #結合hadoop,使用yarn資源調度框架

Spark-shell                 #本地模式

  

4、SparkRDD

Spark版本查看spark-submit –version

單獨在主節點上運行 spark-shell –master spark://s201:7077

1.  RDD基礎

RDD resilient distributed dataset彈性式分佈數據集

rdd裏面是沒有數據的,只是一些計算法則,有每一個切片的計算函數,還有分區列表。你在每一個分區上進行什麼樣的操做,這些操做封裝起來就是rdd.

①是不可變,若是運行rdd的節點故障,driver就能夠重建rdd並指派到新的節點。計算過程當中數據一直是靜態的。(RDD存的是變換過程,變換的函數,上下互相變換依賴的鏈條 )

②是分佈式的。將大的RDD切割成小的rdd,分發到worker節點上運行,每一個worker都有本身環境,最終組裝結果。Spark在系統或其餘運行故障後會有本身的機制進行恢復(可容錯因此叫彈性式)

④駐留在內存。Spark高速的緣由

⑤RDD是強類型

⑥RDD兩種操做類型

Transformations:從前一個RDD,產生一個新的RDD,例如filter操做

Action:基於RDD計算一個結果/返回值給driver/存儲文件導存儲系統上。Count/take/first

⑦使用parallelize()建立RDD

   Sc.parallelize(1 to 10, 3)產生3條切片

2. sparkConf

①設置spark參數,key-value對

②手動設置優先於系統屬性

3. sparkContext

org.apache.spark.SparkContext

SparkContext,spark上下文,驅動程序Driver,是spark程序的主入口,相似於main函數,負責鏈接到spark的集羣,可用建立RDD,在集羣上建立累加器和廣播變量。

每一個虛擬機jvm只能激活一個Spark上下文對象,建立新的spark上下文對象時,必須stop原來的。

val rdd = sc.textFile(「d:/2.txt」)

rdd.flatMap(line => line.split(「 「).map(w=>(w, 1)).reduceByKey((a, b)=>a+b).collect()

4.讀取文件sparkContext.textFile

Spark-shell下

Val lines = sc.textFile(「/user/ubuntu/xx.txt」) #讀取hdfs文件,hadoop集羣啓動起來

Val lines = sc.textFile(「file:///home/ubuntu/xx.txt」) #讀取本地文件

Lines.take(2) #提取前兩行

Lines.first()

Val rdd2 = lines.filter(x => x.contains(「0000」))

每一個spark應用由driver構成,由它啓動各類並行操做。driver含有main函數和分佈式數據集,並對他們應用各類操做。Spark-shell自己就是driver(包含Main)。

Driver經過sparkContext訪問spark對象,表明和spark集羣的鏈接

運行程序時,驅動程序須要管理一些叫作excuter的節點,若是是在分佈式集羣運行,則以下圖spark在分佈式上的執行主鍵所示。(在工做節點上有excutor執行線條,有若干task,task之間還能夠相互通訊)

 

Spark-shell(自己就是驅動)下默認使用local模式運行spark程序,沒有用到spark集羣。相似於Hadoop的本地模式。Spark-shell是能夠帶參的,

Spark-shell –master local[1] #1表示你要打算開啓1個線程來模擬spark集羣

spark-shell --master local[4] #4表示4個線程,併發度

val lines = sc.textFile("file:///D:/borrowCardJson.txt",3) #3表示分區數minpatition

val rdd2 = lines.map(x=>{val tname=Thread.currentThread().getName; println(tname + ":" + x);x})

rdd2.count()

附註:scala命令界面退出 :quit

      中止spark stop()

 

5、使用maven編譯和運行scala的spark程序

①編寫scala源碼

 import org.apache.spark.{ SparkContext, SparkConf}

object wcApp{

  def main(args:Array[String]){

    val conf = new SparkConf().setAppName("wordCount")

    val sc = new SparkContext(conf)

    val lines = sc.textFile("file:///D:/borrowCardJson.txt")

    val words = lines.flatMap(x => x.split(","))

    val counts = words.map(word => (word, 1)).reduceByKey((x, y) => x+y)

    counts.saveAsTextFile("file:///D:/output.txt")

  }

}

 

六. 調度相關核心概念

[Job]

//ActiveJob,在DAG運行job, 有兩種類型,Result job和map-stage job.

Result job計算ResultStage來執行action

Map-stage job在以一個stage提交前,計算shuffleMapStage的output,並能夠用做查詢計劃以及提交給下一階段前查看map輸出的統計內容,可使用finalStage字段進行兩種job類型的區分

[Stage]

在job中計算中間的結果的task集合,每一個task在同一RDD的每一個分區上計算同一函數。Stage是經過shuffle邊界,引入一個障礙(必須等到前一完成後才能提取Output)

有兩種stage,ResultStage和shuffleMapStage

ResultStage是執行action的final的stage

ShuffleMapStage是爲shuffle過程將map的output進行write的

[task]

單獨的工做單元,發送給一臺主機。

[cache tracking]

DAG調度器分析出被緩存的RDD,以免重複計算,也會記住在map階段以及產生了output的shuffle避免重複map過程

[preferred location]

DAG調度器基於底層RDD的首選位置、緩存的配置或者shuffle的數據參數獲得首選位置,並計算task的運行地點。

[cleanup]

Job完成後數據結構被清除防止內存泄漏。

 

taskset  //task集合

stage   //task集合

調度器

三級調度

1. DAGScheduler  //有向五環圖調度器 stage

2.TaskScheduler  //任務調度器.Taskset調度任務,依賴於SchedulerBackend 

                               //TaskSchedulerImp1 任務調度器就這一個 後臺調度器有好幾種實現

3.SchedulerBackend  //後臺調度器

                                      //LocalSchedulerBackend針對本地模式調試

                                     //CoarseGrainedSchedulerBackend粗粒度

                                     //StandaloneSchedulerBackend繼承於CoarseGrainedSchedulerBackend

4. SchedulableBuilder  //調度構建器  FIFO和Fair兩種

例如 RDD.collect()

①首先collect()裏面,啓動SparkContext的runjob方法,最終調用到DAG調度器runjob方法,參數仍然有rdd、分區數、resultHandler(結果處理器),在DAG的runjob方法裏面用了submitJob,DAG調度器把RDD封裝成事件放到隊列裏面(DAGSchedulerEventProcessLoop)。而後有線程開啓輪尋這個隊列。輪尋後再次將結果給DAG調度器,使用裏面封裝的方法handleJobSubmitted,該方法裏面有createResultStage(由於DAG是面向階段的,必須在RDD裏面找出階段),而後再submitStage,而後submitMissingTask(stage),串行化rdd(將任務變成字節數組)以便分發到各節點(廣播走),多個分區映射成多個任務shuffleMapTask。

②接着進入任務調度器TaskScheduler,taskScheduler.submitTasks(TaskSet)提交任務集,將任務集交給任務集管理器TaskSetManager

③進入第三層調度 SchedulerBackend 後臺調度器,走的localEndPoint,走到rpc,分發消息,分發線程輪尋dispathcher.MessageLoop(),開啓分線程運行各個做業threadpool,最終走到spark.executor,使用內部的rpc(遠程過程調用)用於和driver通信。Spark.ececutor啓動launchTask(taskId, taskName, serializedTask…)

④進入分線程處理,反序列化 Task.run,再shuffleMapTask.run.Task()反序列化廣播變量,而後調用shuffleWrite.write()處理咱們程序裏面本身寫的代碼。

 

  

                      

                   

 

7、在spark集羣上對hdfs文件進行單詞統計

1. 啓動zk

[s202, s203, s204]

zkServer.sh start

2. 啓動hadoop集羣

[s201]

只啓動hdfs便可

 

hdfs haadmin -transtionToActive –forcemanul nnl  //強行切換active態

查看下 http:s201:50070

3. 啓動spark集羣

[s201]

Cd soft/spark/sbin目錄下,./start-all.shell

查看下 http:s201:8080

 

4.  準備文件 在HDFS上面放個文件

hdfs dfs -put words /user/centos

5. Spark-shell –master spark://s201:7077

 

6. 在shell環境下編寫單詞統計的wordCount程序

 

 

8、導出spark應用成jar,提交spark集羣上執行

有兩種方法,一種是IDEA 打可執行jar 包,

http://www.javashuo.com/article/p-mfglyzym-mv.html

IntellIJ IDEA 中配置Maven

http://www.javashuo.com/article/p-fddezcgr-dv.html

http://www.javashuo.com/article/p-svlxlsxt-mr.html

另外一種是idea maven 打可執行jar 包

http://www.javashuo.com/article/p-uvfqklwj-dn.html  

在pom.xml文件中<build><plugins>後加入兩個插件org.apache.maven.plugins和net.alchim31.maven

 

 

在你jar包所在文件下,使用spark-submit,後面添加各類參數,若是依賴第三方包用--jars

 

Spark Application基於python編寫之後,如何提交spark運行呢? 同樣用spark-submit, 

把該python文件上傳到hdfs文件系統上去 ,spark安裝目錄相同文件夾

須要修改下spark環境變量爲運行的集羣地址,os.environ['SPARK_HOME'] = '/opt/cdh-5.3.6/spark-1.6.1-bin-2.5.0'

.setMaster就不用指定了,經過程序來指定

bin/spark-submit  spark://so21:7077  /opt/cdh-5.3.6/spark-1.6.1-bin-2.5.0/spark_wordCount.py

相關文章
相關標籤/搜索