1、大數據架構php
併發計算:html
並行計算:python
不多會說併發計算,通常都是說並行計算,可是並行計算用的是併發技術。併發更偏向於底層。併發一般指的是單機上的併發運行,經過多線程來實現。而並行計算的範圍更廣,他是散佈到集羣上的分佈式計算。linux
Spark內存計算比hadoop快100倍,磁盤計算快10倍,在worker節點主要基於內存進行計算,避免了沒必要要的磁盤io。web
2、Spark模塊算法
Spark是沒有分佈式存儲的,必須藉助hadoop的HDFS等。資源管理工具自帶的是Standalone也支持hadoop的YARN。Spark最底層是SparkCore(內核位於執行引擎之上,全部功能都在其上進行構建),SparkCore集成了最原始的計算功能,基於SparkCore這種底層的API上層主要有四個,Spark SQL, Spark streaming, Mlib, Graphxspring
3、Spark部署三種模式shell
1. standalone獨立模式apache
2. Hadoop yarnubuntu
3. Spark in mapreduce
Spark獨立集羣模式
獨立部署,實質就是手動經過腳本一一啓動所須要的進程,例如master進程,work進程。
在spark/bin目錄下,提供了相應腳本,
start-master.sh // 啓動master進程
start-slave.sh // 啓動單個work進程
start-slaves.sh // 啓動全部work進程
start-all.sh // 啓動全部進程,在master上執行
stop-xxx.sh // 對應的中止進程
Spark獨立集羣配置
① [spark-env.sh]
Spark/conf/spark-env.sh
② [slaves]
S02
S03
S04
③ spark-default.conf 也不用動
④ 分發文件
cd /soft
xsync.sh spark-2.1.0-bin-hadoop2.7
xsync.sh spark
su root
xsync.sh /etc/profile
⑤ 重啓電腦
⑥ 啓動spark集羣
[s201]
./start-master.sh
[s202, s203, s204]
./start-slave.sh spark://s201:7077
⑦ 進入webUI界面
Spark-shell鏈接到spark集羣的模式
Spark-shell –master spark://s201:7077 #standalone模式,須要啓動進程(前提是你已經搭建好了集羣)可是缺點是,若是你在master上運行本地文件,必須傳到其餘work節點上,而若是是hadoop的hdfs文件就全部worker均可以訪問
Spark-shell –master spark://ip:port #mesos模式
Yarn #結合hadoop,使用yarn資源調度框架
Spark-shell #本地模式
4、SparkRDD
Spark版本查看spark-submit –version
單獨在主節點上運行 spark-shell –master spark://s201:7077
1. RDD基礎
RDD resilient distributed dataset彈性式分佈數據集
rdd裏面是沒有數據的,只是一些計算法則,有每一個切片的計算函數,還有分區列表。你在每一個分區上進行什麼樣的操做,這些操做封裝起來就是rdd.
①是不可變,若是運行rdd的節點故障,driver就能夠重建rdd並指派到新的節點。計算過程當中數據一直是靜態的。(RDD存的是變換過程,變換的函數,上下互相變換依賴的鏈條 )
②是分佈式的。將大的RDD切割成小的rdd,分發到worker節點上運行,每一個worker都有本身環境,最終組裝結果。Spark在系統或其餘運行故障後會有本身的機制進行恢復(可容錯因此叫彈性式)
④駐留在內存。Spark高速的緣由
⑤RDD是強類型
⑥RDD兩種操做類型
Transformations:從前一個RDD,產生一個新的RDD,例如filter操做
Action:基於RDD計算一個結果/返回值給driver/存儲文件導存儲系統上。Count/take/first
⑦使用parallelize()建立RDD
Sc.parallelize(1 to 10, 3)產生3條切片
2. sparkConf
①設置spark參數,key-value對
②手動設置優先於系統屬性
3. sparkContext
org.apache.spark.SparkContext
SparkContext,spark上下文,驅動程序Driver,是spark程序的主入口,相似於main函數,負責鏈接到spark的集羣,可用建立RDD,在集羣上建立累加器和廣播變量。
每一個虛擬機jvm只能激活一個Spark上下文對象,建立新的spark上下文對象時,必須stop原來的。
val rdd = sc.textFile(「d:/2.txt」)
rdd.flatMap(line => line.split(「 「).map(w=>(w, 1)).reduceByKey((a, b)=>a+b).collect()
4.讀取文件sparkContext.textFile
Spark-shell下
Val lines = sc.textFile(「/user/ubuntu/xx.txt」) #讀取hdfs文件,hadoop集羣啓動起來
Val lines = sc.textFile(「file:///home/ubuntu/xx.txt」) #讀取本地文件
Lines.take(2) #提取前兩行
Lines.first()
Val rdd2 = lines.filter(x => x.contains(「0000」))
每一個spark應用由driver構成,由它啓動各類並行操做。driver含有main函數和分佈式數據集,並對他們應用各類操做。Spark-shell自己就是driver(包含Main)。
Driver經過sparkContext訪問spark對象,表明和spark集羣的鏈接
運行程序時,驅動程序須要管理一些叫作excuter的節點,若是是在分佈式集羣運行,則以下圖spark在分佈式上的執行主鍵所示。(在工做節點上有excutor執行線條,有若干task,task之間還能夠相互通訊)
Spark-shell(自己就是驅動)下默認使用local模式運行spark程序,沒有用到spark集羣。相似於Hadoop的本地模式。Spark-shell是能夠帶參的,
Spark-shell –master local[1] #1表示你要打算開啓1個線程來模擬spark集羣
spark-shell --master local[4] #4表示4個線程,併發度
val lines = sc.textFile("file:///D:/borrowCardJson.txt",3) #3表示分區數minpatition
val rdd2 = lines.map(x=>{val tname=Thread.currentThread().getName; println(tname + ":" + x);x})
rdd2.count()
附註:scala命令界面退出 :quit
中止spark stop()
5、使用maven編譯和運行scala的spark程序
①編寫scala源碼
import org.apache.spark.{ SparkContext, SparkConf}
object wcApp{
def main(args:Array[String]){
val conf = new SparkConf().setAppName("wordCount")
val sc = new SparkContext(conf)
val lines = sc.textFile("file:///D:/borrowCardJson.txt")
val words = lines.flatMap(x => x.split(","))
val counts = words.map(word => (word, 1)).reduceByKey((x, y) => x+y)
counts.saveAsTextFile("file:///D:/output.txt")
}
}
六. 調度相關核心概念
[Job]
//ActiveJob,在DAG運行job, 有兩種類型,Result job和map-stage job.
Result job計算ResultStage來執行action
Map-stage job在以一個stage提交前,計算shuffleMapStage的output,並能夠用做查詢計劃以及提交給下一階段前查看map輸出的統計內容,可使用finalStage字段進行兩種job類型的區分
[Stage]
在job中計算中間的結果的task集合,每一個task在同一RDD的每一個分區上計算同一函數。Stage是經過shuffle邊界,引入一個障礙(必須等到前一完成後才能提取Output)
有兩種stage,ResultStage和shuffleMapStage
ResultStage是執行action的final的stage
ShuffleMapStage是爲shuffle過程將map的output進行write的
[task]
單獨的工做單元,發送給一臺主機。
[cache tracking]
DAG調度器分析出被緩存的RDD,以免重複計算,也會記住在map階段以及產生了output的shuffle避免重複map過程
[preferred location]
DAG調度器基於底層RDD的首選位置、緩存的配置或者shuffle的數據參數獲得首選位置,並計算task的運行地點。
[cleanup]
Job完成後數據結構被清除防止內存泄漏。
taskset //task集合
stage //task集合
調度器
三級調度
1. DAGScheduler //有向五環圖調度器 stage
2.TaskScheduler //任務調度器.Taskset調度任務,依賴於SchedulerBackend
//TaskSchedulerImp1 任務調度器就這一個 後臺調度器有好幾種實現
3.SchedulerBackend //後臺調度器
//LocalSchedulerBackend針對本地模式調試
//CoarseGrainedSchedulerBackend粗粒度
//StandaloneSchedulerBackend繼承於CoarseGrainedSchedulerBackend
4. SchedulableBuilder //調度構建器 FIFO和Fair兩種
例如 RDD.collect()
①首先collect()裏面,啓動SparkContext的runjob方法,最終調用到DAG調度器runjob方法,參數仍然有rdd、分區數、resultHandler(結果處理器),在DAG的runjob方法裏面用了submitJob,DAG調度器把RDD封裝成事件放到隊列裏面(DAGSchedulerEventProcessLoop)。而後有線程開啓輪尋這個隊列。輪尋後再次將結果給DAG調度器,使用裏面封裝的方法handleJobSubmitted,該方法裏面有createResultStage(由於DAG是面向階段的,必須在RDD裏面找出階段),而後再submitStage,而後submitMissingTask(stage),串行化rdd(將任務變成字節數組)以便分發到各節點(廣播走),多個分區映射成多個任務shuffleMapTask。
②接着進入任務調度器TaskScheduler,taskScheduler.submitTasks(TaskSet)提交任務集,將任務集交給任務集管理器TaskSetManager
③進入第三層調度 SchedulerBackend 後臺調度器,走的localEndPoint,走到rpc,分發消息,分發線程輪尋dispathcher.MessageLoop(),開啓分線程運行各個做業threadpool,最終走到spark.executor,使用內部的rpc(遠程過程調用)用於和driver通信。Spark.ececutor啓動launchTask(taskId, taskName, serializedTask…)
④進入分線程處理,反序列化 Task.run,再shuffleMapTask.run.Task()反序列化廣播變量,而後調用shuffleWrite.write()處理咱們程序裏面本身寫的代碼。
7、在spark集羣上對hdfs文件進行單詞統計
1. 啓動zk
[s202, s203, s204]
zkServer.sh start
2. 啓動hadoop集羣
[s201]
只啓動hdfs便可
hdfs haadmin -transtionToActive –forcemanul nnl //強行切換active態
查看下 http:s201:50070
3. 啓動spark集羣
[s201]
Cd soft/spark/sbin目錄下,./start-all.shell
查看下 http:s201:8080
4. 準備文件 在HDFS上面放個文件
hdfs dfs -put words /user/centos
5. Spark-shell –master spark://s201:7077
6. 在shell環境下編寫單詞統計的wordCount程序
8、導出spark應用成jar,提交spark集羣上執行
有兩種方法,一種是IDEA 打可執行jar 包,
http://www.javashuo.com/article/p-mfglyzym-mv.html
IntellIJ IDEA 中配置Maven
http://www.javashuo.com/article/p-fddezcgr-dv.html
http://www.javashuo.com/article/p-svlxlsxt-mr.html
另外一種是idea maven 打可執行jar 包
http://www.javashuo.com/article/p-uvfqklwj-dn.html
在pom.xml文件中<build><plugins>後加入兩個插件org.apache.maven.plugins和net.alchim31.maven
在你jar包所在文件下,使用spark-submit,後面添加各類參數,若是依賴第三方包用--jars
Spark Application基於python編寫之後,如何提交spark運行呢? 同樣用spark-submit,
把該python文件上傳到hdfs文件系統上去 ,spark安裝目錄相同文件夾
須要修改下spark環境變量爲運行的集羣地址,os.environ['SPARK_HOME'] = '/opt/cdh-5.3.6/spark-1.6.1-bin-2.5.0'
.setMaster就不用指定了,經過程序來指定
bin/spark-submit spark://so21:7077 /opt/cdh-5.3.6/spark-1.6.1-bin-2.5.0/spark_wordCount.py