做者:楊思義,2014年6月至今工做於北京亞信智慧數據科技有限公司 BDX大數據事業部,從2014年9月開始從事項目spark相關應用開發。mysql
來源:數盟sql
Spark簡介shell
Spark是整個BDAS的核心組件,是一個大數據分佈式編程框架,不只實現了MapReduce的算子map 函數和reduce函數及計算模型,還提供更爲豐富的算子,如filter、join、groupByKey等。是一個用來實現快速而同用的集羣計算的平臺。數據庫
Spark將分佈式數據抽象爲彈性分佈式數據集(RDD),實現了應用任務調度、RPC、序列化和壓縮,併爲運行在其上的上層組件提供API。其底層採用Scala這種函數式語言書寫而成,而且所提供的API深度借鑑Scala函數式的編程思想,提供與Scala相似的編程接口apache
Sparkon Yarn編程
從用戶提交做業到做業運行結束整個運行期間的過程分析。設計模式
1、客戶端進行操做緩存
根據yarnConf來初始化yarnClient,並啓動yarnClient網絡
建立客戶端Application,並獲取Application的ID,進一步判斷集羣中的資源是否知足executor和ApplicationMaster申請的資源,若是不知足則拋出IllegalArgumentException;app
設置資源、環境變量:其中包括了設置Application的Staging目錄、準備本地資源(jar文件、log4j.properties)、設置Application其中的環境變量、建立Container啓動的Context等;
設置Application提交的Context,包括設置應用的名字、隊列、AM的申請的Container、標記該做業的類型爲Spark;
申請Memory,並最終經過yarnClient.submitApplication向ResourceManager提交該Application。
看成業提交到YARN上以後,客戶端就沒事了,甚至在終端關掉那個進程也沒事,由於整個做業運行在YARN集羣上進行,運行的結果將會保存到HDFS或者日誌中。
2、提交到YARN集羣,YARN操做
運行ApplicationMaster的run方法;
設置好相關的環境變量。
建立amClient,並啓動;
在Spark UI啓動以前設置Spark UI的AmIpFilter;
在startUserClass函數專門啓動了一個線程(名稱爲Driver的線程)來啓動用戶提交的Application,也就是啓動了Driver。在Driver中將會初始化SparkContext;
等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次數(默認爲10),若是等待了的次數超過了配置的,程序將會退出;不然用SparkContext初始化yarnAllocator;
當SparkContext、Driver初始化完成的時候,經過amClient向ResourceManager註冊ApplicationMaster
分配並啓動Executeors。在啓動Executeors以前,先要經過yarnAllocator獲取到numExecutors個Container,而後在Container中啓動Executeors。
那麼這個Application將失敗,將Application Status標明爲FAILED,並將關閉SparkContext。其實,啓動Executeors是經過ExecutorRunnable實現的,而ExecutorRunnable內部是啓動CoarseGrainedExecutorBackend的。
最後,Task將在CoarseGrainedExecutorBackend裏面運行,而後運行情況會經過Akka通知CoarseGrainedScheduler,直到做業運行完成。
Spark節點的概念
1、Spark驅動器是執行程序中的main()方法的進程。它執行用戶編寫的用來建立SparkContext(初始化)、建立RDD,以及運行RDD的轉化操做和行動操做的代碼。
驅動器節點driver的職責:
把用戶程序轉爲任務task(driver)
Spark驅動器程序負責把用戶程序轉化爲多個物理執行單元,這些單元也被稱之爲任務task(詳解見備註)
爲執行器節點調度任務(executor)
有了物理計劃以後,Spark驅動器在各個執行器節點進程間協調任務的調度。Spark驅動器程序會根據當前的執行器節點,把全部任務基於數據所在位置分配給合適的執行器進程。當執行任務時,執行器進程會把緩存的數據存儲起來,而驅動器進程一樣會跟蹤這些緩存數據的位置,並利用這些位置信息來調度之後的任務,以儘可能減小數據的網絡傳輸。(就是所謂的移動計算,而不移動數據)。
2、執行器節點
做用:
負責運行組成Spark應用的任務,並將結果返回給驅動器進程;
經過自身的塊管理器(blockManager)爲用戶程序中要求緩存的RDD提供內存式存儲。RDD是直接緩存在執行器進程內的,所以任務能夠在運行時充分利用緩存數據加快運算。
驅動器的職責:
全部的Spark程序都遵循一樣的結構:程序從輸入數據建立一系列RDD,再使用轉化操做派生成新的RDD,最後使用行動操做手機或存儲結果RDD,Spark程序實際上是隱式地建立出了一個由操做組成的邏輯上的有向無環圖DAG。當驅動器程序執行時,它會把這個邏輯圖轉爲物理執行計劃。
這樣 Spark就把邏輯計劃轉爲一系列步驟(stage),而每一個步驟又由多個任務組成。這些任務會被打包送到集羣中。
Spark初始化
每一個Spark應用都由一個驅動器程序來發起集羣上的各類並行操做。驅動器程序包含應用的main函數,而且定義了集羣上的分佈式數據集,以及對該分佈式數據集應用了相關操做。
驅動器程序經過一個SparkContext對象來訪問spark,這個對象表明對計算集羣的一個鏈接。(好比在sparkshell啓動時已經自動建立了一個SparkContext對象,是一個叫作SC的變量。(下圖,查看變量sc)
一旦建立了sparkContext,就能夠用它來建立RDD。好比調用sc.textFile()來建立一個表明文本中各行文本的RDD。(好比vallinesRDD = sc.textFile(「yangsy.text」),val spark = linesRDD.filter(line=>line.contains(「spark」),spark.count())
執行這些操做,驅動器程序通常要管理多個執行器,就是咱們所說的executor節點。
在初始化SparkContext的同時,加載sparkConf對象來加載集羣的配置,從而建立sparkContext對象。
從源碼中能夠看到,在啓動thriftserver時,調用了spark- daemon.sh文件,該文件源碼如左圖,加載spark_home下的conf中的文件。
(在執行後臺代碼時,須要首先建立conf對象,加載相應參數, val sparkConf = newSparkConf().setMaster("local").setAppName("cocapp").set("spark.executor.memory","1g"), val sc: SparkContext = new SparkContext(sparkConf))
RDD工做原理:
RDD(Resilient DistributedDatasets)[1] ,彈性分佈式數據集,是分佈式內存的一個抽象概念,RDD提供了一種高度受限的共享內存模型,即RDD是隻讀的記錄分區的集合,只能經過在其餘RDD執行肯定的轉換操做(如map、join和group by)而建立,然而這些限制使得實現容錯的開銷很低。對開發者而言,RDD能夠看做是Spark的一個對象,它自己運行於內存中,如讀文件是一個RDD,對文件計算是一個RDD,結果集也是一個RDD ,不一樣的分片、數據之間的依賴、key-value類型的map數據均可以看作RDD。
主要分爲三部分:建立RDD對象,DAG調度器建立執行計劃,Task調度器分配任務並調度Worker開始運行。
SparkContext(RDD相關操做)→經過(提交做業)→(遍歷RDD拆分stage→生成做業)DAGScheduler→經過(提交任務集)→任務調度管理(TaskScheduler)→經過(按照資源獲取任務)→任務調度管理(TaskSetManager)
Transformation返回值仍是一個RDD。它使用了鏈式調用的設計模式,對一個RDD進行計算後,變換成另一個RDD,而後這個RDD又能夠進行另一次轉換。這個過程是分佈式的。
Action返回值不是一個RDD。它要麼是一個Scala的普通集合,要麼是一個值,要麼是空,最終或返回到Driver程序,或把RDD寫入到文件系統中
轉換(Transformations)(如:map, filter, groupBy, join等),Transformations操做是Lazy的,也就是說從一個RDD轉換生成另外一個RDD的操做不是立刻執行,Spark在遇到Transformations操做時只會記錄須要這樣的操做,並不會去執行,須要等到有Actions操做的時候纔會真正啓動計算過程進行計算。
操做(Actions)(如:count, collect, save等),Actions操做會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啓動計算的動因。
它們本質區別是:Transformation返回值仍是一個RDD。它使用了鏈式調用的設計模式,對一個RDD進行計算後,變換成另一個RDD,而後這個RDD又能夠進行另一次轉換。這個過程是分佈式的。Action返回值不是一個RDD。它要麼是一個Scala的普通集合,要麼是一個值,要麼是空,最終或返回到Driver程序,或把RDD寫入到文件系統中。關於這兩個動做,在Spark開發指南中會有就進一步的詳細介紹,它們是基於Spark開發的核心。
RDD基礎
Spark中的RDD就是一個不可變的分佈式對象集合。每一個RDD都被分爲多個分區,這些分區運行在集羣的不一樣節點上。建立RDD的方法有兩種:一種是讀取一個外部數據集;一種是在羣東程序裏分發驅動器程序中的對象集合,不如剛纔的示例,讀取文本文件做爲一個字符串的RDD的示例。
建立出來後,RDD支持兩種類型的操做:轉化操做和行動操做
轉化操做會由一個RDD生成一個新的RDD。(好比剛纔的根據謂詞篩選)
行動操做會對RDD計算出一個結果,並把結果返回到驅動器程序中,或把結果存儲到外部存儲系統(好比HDFS)中。好比first()操做就是一個行動操做,會返回RDD的第一個元素。
注:轉化操做與行動操做的區別在於Spark計算RDD的方式不一樣。雖然你能夠在任什麼時候候定義一個新的RDD,但Spark只會惰性計算這些RDD。它們只有第一個在一個行動操做中用到時,纔會真正的計算。之因此這樣設計,是由於好比剛纔調用sc.textFile(...)時就把文件中的全部行都讀取並存儲起來,就會消耗不少存儲空間,而咱們立刻又要篩選掉其中的不少數據。
這裏還須要注意的一點是,spark會在你每次對它們進行行動操做時從新計算。若是想在多個行動操做中重用同一個RDD,那麼可使用RDD.persist()或RDD.collect()讓Spark把這個RDD緩存下來。(能夠是內存,也能夠是磁盤)
Spark會使用譜系圖來記錄這些不一樣RDD之間的依賴關係,Spark須要用這些信息來按需計算每一個RDD,也能夠依靠譜系圖在持久化的RDD丟失部分數據時用來恢復所丟失的數據。(以下圖,過濾errorsRDD與warningsRDD,最終調用union()函數)
RDD計算方式
RDD的寬窄依賴
窄依賴 (narrowdependencies) 和寬依賴 (widedependencies) 。窄依賴是指 父 RDD 的每一個分區都只被子 RDD 的一個分區所使用 。相應的,那麼寬依賴就是指父 RDD 的分區被多個子 RDD 的分區所依賴。例如, map 就是一種窄依賴,而 join 則會致使寬依賴
這種劃分有兩個用處。首先,窄依賴支持在一個結點上管道化執行。例如基於一對一的關係,能夠在 filter 以後執行 map 。其次,窄依賴支持更高效的故障還原。由於對於窄依賴,只有丟失的父 RDD 的分區須要從新計算。而對於寬依賴,一個結點的故障可能致使來自全部父 RDD 的分區丟失,所以就須要徹底從新執行。所以對於寬依賴,Spark 會在持有各個父分區的結點上,將中間數據持久化來簡化故障還原,就像 MapReduce 會持久化 map 的輸出同樣。
SparkExample
步驟 1 :建立 RDD 。上面的例子除去最後一個 collect 是個動做,不會建立 RDD 以外,前面四個轉換都會建立出新的 RDD 。所以第一步就是建立好全部 RDD( 內部的五項信息 ) 。
步驟 2 :建立執行計劃。Spark 會盡量地管道化,並基因而否要從新組織數據來劃分 階段 (stage) ,例如本例中的 groupBy() 轉換就會將整個執行計劃劃分紅兩階段執行。最終會產生一個 DAG(directedacyclic graph ,有向無環圖 ) 做爲邏輯執行計劃。
步驟 3 :調度任務。 將各階段劃分紅不一樣的 任務 (task) ,每一個任務都是數據和計算的合體。在進行下一階段前,當前階段的全部任務都要執行完成。由於下一階段的第一個轉換必定是從新組織數據的,因此必須等當前階段全部結果數據都計算出來了才能繼續。
假設本例中的 hdfs://names 下有四個文件塊,那麼 HadoopRDD 中 partitions 就會有四個分區對應這四個塊數據,同時 preferedLocations 會指明這四個塊的最佳位置。如今,就能夠建立出四個任務,並調度到合適的集羣結點上。
Spark數據分區
Spark的特性是對數據集在節點間的分區進行控制。在分佈式系統中,通信的代價是巨大的,控制數據分佈以得到最少的網絡傳輸能夠極大地提高總體性能。Spark程序能夠經過控制RDD分區方式來減小通信的開銷。
Spark中全部的鍵值對RDD均可以進行分區。確保同一組的鍵出如今同一個節點上。好比,使用哈希分區將一個RDD分紅了100個分區,此時鍵的哈希值對100取模的結果相同的記錄會被放在一個節點上。
(可以使用partitionBy(newHashPartitioner(100)).persist()來構造100個分區)
Spark中的許多操做都引入了將數據根據鍵跨界點進行混洗的過程。(好比:join(),leftOuterJoin(),groupByKey(),reducebyKey()等)對於像reduceByKey()這樣只做用於單個RDD的操做,運行在未分區的RDD上的時候會致使每一個鍵的全部對應值都在每臺機器上進行本地計算。
SparkSQL的shuffle過程
Spark SQL的核心是把已有的RDD,帶上Schema信息,而後註冊成相似sql裏的」Table」,對其進行sql查詢。這裏面主要分兩部分,一是生成SchemaRD,二是執行查詢。
若是是spark-hive項目,那麼讀取metadata信息做爲Schema、讀取hdfs上數據的過程交給Hive完成,而後根據這倆部分生成SchemaRDD,在HiveContext下進行hql()查詢。
SparkSQL結構化數據
首先說一下ApacheHive,Hive能夠在HDFS內或者在其餘存儲系統上存儲多種格式的表。SparkSQL能夠讀取Hive支持的任何表。要把Spark SQL鏈接已有的hive上,須要提供Hive的配置文件。hive-site.xml文件複製到spark的conf文件夾下。再建立出HiveContext對象(sparksql的入口),而後就可使用HQL來對錶進行查詢,並以由行足證的RDD的形式拿到返回的數據。
建立Hivecontext並查詢數據
importorg.apache.spark.sql.hive.HiveContext
valhiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
valrows = hiveCtx.sql(「SELECT name,age FROM users」)
valfitstRow – rows.first()
println(fitstRow.getSgtring(0)) //字段0是name字段
經過jdbc鏈接外部數據源更新與加載
Class.forName("com.mysql.jdbc.Driver")
val conn =DriverManager.getConnection(mySQLUrl)
val stat1 =conn.createStatement()
stat1.execute("UPDATE CI_LABEL_INFO set DATA_STATUS_ID = 2 , DATA_DATE ='" + dataDate +"' where LABEL_ID in ("+allCreatedLabels.mkString(",")+")")
stat1.close()
//加載外部數據源數據到內存
valDIM_COC_INDEX_MODEL_TABLE_CONF =sqlContext.jdbc(mySQLUrl,"DIM_COC_INDEX_MODEL_TABLE_CONF").cache()
val targets =DIM_COC_INDEX_MODEL_TABLE_CONF.filter("TABLE_DATA_CYCLE ="+TABLE_DATA_CYCLE).collect
SparkSQL解析
首先說下傳統數據庫的解析,傳統數據庫的解析過程是按Rusult、Data Source、Operation的次序來解析的。傳統數據庫先將讀入的SQL語句進行解析,分辨出SQL語句中哪些詞是關鍵字(如select,from,where),哪些是表達式,哪些是Projection,哪些是Data Source等等。進一步判斷SQL語句是否規範,不規範就報錯,規範則按照下一步過程綁定(Bind)。過程綁定是將SQL語句和數據庫的數據字典(列,表,視圖等)進行綁定,若是相關的Projection、Data Source等都存在,就表示這個SQL語句是能夠執行的。在執行過程當中,有時候甚至不須要讀取物理表就能夠返回結果,好比從新運行剛運行過的SQL語句,直接從數據庫的緩衝池中獲取返回結果。在數據庫解析的過程當中SQL語句時,將會把SQL語句轉化成一個樹形結構來進行處理,會造成一個或含有多個節點(TreeNode)的Tree,而後再後續的處理政對該Tree進行一系列的操做。
Spark SQL對SQL語句的處理和關係數據庫對SQL語句的解析採用了相似的方法,首先會將SQL語句進行解析,而後造成一個Tree,後續如綁定、優化等處理過程都是對Tree的操做,而操做方法是採用Rule,經過模式匹配,對不一樣類型的節點採用不一樣的操做。SparkSQL有兩個分支,sqlContext和hiveContext。sqlContext如今只支持SQL語法解析器(Catalyst),hiveContext支持SQL語法和HiveContext語法解析器。
版權聲明:
轉載文章均來自公開網絡,僅供學習使用,不會用於任何商業用途,若是出處有誤或侵犯到原做者權益,請與咱們聯繫刪除或受權事宜,聯繫郵箱:holly0801@163.com。轉載大數據公衆號文章請註明原文連接和做者,不然產生的任何版權糾紛與大數據無關。