spark 已經成爲廣告、報表以及推薦系統等大數據計算場景中首選系統,因效率高,易用以及通用性愈來愈獲得你們的青睞,我本身最近半年在接觸spark以及spark streaming以後,對spark技術的使用有一些本身的經驗積累以及心得體會,在此分享給你們。html
本文依次從spark生態,原理,基本概念,spark streaming原理及實踐,還有spark調優以及環境搭建等方面進行介紹,但願對你們有所幫助。java
運行速度快 => Spark擁有DAG執行引擎,支持在內存中對數據進行迭代計算。官方提供的數據代表,若是數據由磁盤讀取,速度是Hadoop MapReduce的10倍以上,若是數據從內存中讀取,速度能夠高達100多倍。node
適用場景普遍 => 大數據分析統計,實時數據處理,圖計算及機器學習git
易用性 => 編寫簡單,支持80種以上的高級算子,支持多種語言,數據源豐富,可部署在多種集羣中github
容錯性高。Spark引進了彈性分佈式數據集RDD (Resilient Distributed Dataset) 的抽象,它是分佈在一組節點中的只讀對象集合,這些集合是彈性的,若是數據集一部分丟失,則能夠根據「血統」(即充許基於數據衍生過程)對它們進行重建。另外在RDD計算時能夠經過CheckPoint來實現容錯,而CheckPoint有兩種方式:CheckPoint Data,和Logging The Updates,用戶能夠控制採用哪一種方式來實現容錯。apache
目前大數據處理場景有如下幾個類型:swift
複雜的批量處理(Batch Data Processing),偏重點在於處理海量數據的能力,至於處理速度可忍受,一般的時間多是在數十分鐘到數小時;設計模式
基於歷史數據的交互式查詢(Interactive Query),一般的時間在數十秒到數十分鐘之間api
基於實時數據流的數據處理(Streaming Data Processing),一般在數百毫秒到數秒之間緩存
目前大數據在互聯網公司主要應用在廣告、報表、推薦系統等業務上。在廣告業務方面須要大數據作應用分析、效果分析、定向優化等,在推薦系統方面則須要大數據優化相關排名、個性化推薦以及熱點點擊分析等。這些應用場景的廣泛特色是計算量大、效率要求高。
騰訊 / yahoo / 淘寶 / 優酷土豆
spark基礎運行架構以下所示:
spark結合yarn集羣背後的運行流程以下所示:
spark 運行流程:
Spark架構採用了分佈式計算中的Master-Slave模型。Master是對應集羣中的含有Master進程的節點,Slave是集羣中含有Worker進程的節點。
Master做爲整個集羣的控制器,負責整個集羣的正常運行;
Worker至關於計算節點,接收主節點命令與進行狀態彙報;
Executor負責任務的執行;
Client做爲用戶的客戶端負責提交應用;
Driver負責控制一個應用的執行。
Spark集羣部署後,須要在主節點和從節點分別啓動Master進程和Worker進程,對整個集羣進行控制。在一個Spark應用的執行過程當中,Driver和Worker是兩個重要角色。Driver 程序是應用邏輯執行的起點,負責做業的調度,即Task任務的分發,而多個Worker用來管理計算節點和建立Executor並行處理任務。在執行階段,Driver會將Task和Task所依賴的file和jar序列化後傳遞給對應的Worker機器,同時Executor對相應數據分區的任務進行處理。
Excecutor /Task 每一個程序自有,不一樣程序互相隔離,task多線程並行
集羣對Spark透明,Spark只要能獲取相關節點和進程
Driver 與Executor保持通訊,協做處理
三種集羣模式:
1.Standalone 獨立集羣
2.Mesos, apache mesos
3.Yarn, hadoop yarn
基本概念:
Application =>Spark的應用程序,包含一個Driver program和若干Executor
SparkContext => Spark應用程序的入口,負責調度各個運算資源,協調各個Worker Node上的Executor
Driver Program => 運行Application的main()函數而且建立SparkContext
Executor => 是爲Application運行在Worker node上的一個進程,該進程負責運行Task,而且負責將數據存在內存或者磁盤上。每一個Application都會申請各自的Executor來處理任務
Cluster Manager =>在集羣上獲取資源的外部服務 (例如:Standalone、Mesos、Yarn)
Worker Node => 集羣中任何能夠運行Application代碼的節點,運行一個或多個Executor進程
Task => 運行在Executor上的工做單元
Job => SparkContext提交的具體Action操做,常和Action對應
Stage => 每一個Job會被拆分不少組task,每組任務被稱爲Stage,也稱TaskSet
RDD => 是Resilient distributed datasets的簡稱,中文爲彈性分佈式數據集;是Spark最核心的模塊和類
DAGScheduler => 根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler
TaskScheduler => 將Taskset提交給Worker node集羣運行並返回結果
Transformations => 是Spark API的一種類型,Transformation返回值仍是一個RDD,全部的Transformation採用的都是懶策略,若是隻是將Transformation提交是不會執行計算的
Action => 是Spark API的一種類型,Action返回值不是一個RDD,而是一個scala集合;計算只有在Action被提交的時候計算才被觸發。
Transformation返回值仍是一個RDD。它使用了鏈式調用的設計模式,對一個RDD進行計算後,變換成另一個RDD,而後這個RDD又能夠進行另一次轉換。這個過程是分佈式的。 Action返回值不是一個RDD。它要麼是一個Scala的普通集合,要麼是一個值,要麼是空,最終或返回到Driver程序,或把RDD寫入到文件系統中。
Action是返回值返回給driver或者存儲到文件,是RDD到result的變換,Transformation是RDD到RDD的變換。
只有action執行時,rdd纔會被計算生成,這是rdd懶惰執行的根本所在。
Job => 包含多個task的並行計算,一個action觸發一個job
stage => 一個job會被拆爲多組task,每組任務稱爲一個stage,以shuffle進行劃分
以reduceByKey爲例解釋shuffle過程。
在沒有task的文件分片合併下的shuffle過程以下:(spark.shuffle.consolidateFiles=false
)
fetch 來的數據存放到哪裏?
剛 fetch 來的 FileSegment 存放在 softBuffer 緩衝區,通過處理後的數據放在內存 + 磁盤上。這裏咱們主要討論處理後的數據,能夠靈活設置這些數據是「只用內存」仍是「內存+磁盤」。若是spark.shuffle.spill = false就只用內存。因爲不要求數據有序,shuffle write 的任務很簡單:將數據 partition 好,並持久化。之因此要持久化,一方面是要減小內存存儲空間壓力,另外一方面也是爲了 fault-tolerance。
shuffle之因此須要把中間結果放到磁盤文件中,是由於雖然上一批task結束了,下一批task還須要使用內存。若是所有放在內存中,內存會不夠。另一方面爲了容錯,防止任務掛掉。
存在問題以下:
產生的 FileSegment 過多。每一個 ShuffleMapTask 產生 R(reducer 個數)個 FileSegment,M 個 ShuffleMapTask 就會產生 M * R 個文件。通常 Spark job 的 M 和 R 都很大,所以磁盤上會存在大量的數據文件。
緩衝區佔用內存空間大。每一個 ShuffleMapTask 須要開 R 個 bucket,M 個 ShuffleMapTask 就會產生 MR 個 bucket。雖然一個 ShuffleMapTask 結束後,對應的緩衝區能夠被回收,但一個 worker node 上同時存在的 bucket 個數能夠達到 cores R 個(通常 worker 同時能夠運行 cores 個 ShuffleMapTask),佔用的內存空間也就達到了cores× R × 32 KB。對於 8 核 1000 個 reducer 來講,佔用內存就是 256MB。
爲了解決上述問題,咱們可使用文件合併的功能。
在進行task的文件分片合併下的shuffle過程以下:(spark.shuffle.consolidateFiles=true
)
能夠明顯看出,在一個 core 上連續執行的 ShuffleMapTasks 能夠共用一個輸出文件 ShuffleFile。先執行完的 ShuffleMapTask 造成 ShuffleBlock i,後執行的 ShuffleMapTask 能夠將輸出數據直接追加到 ShuffleBlock i 後面,造成 ShuffleBlock i',每一個 ShuffleBlock 被稱爲 FileSegment。下一個 stage 的 reducer 只須要 fetch 整個 ShuffleFile 就好了。這樣,每一個 worker 持有的文件數降爲 cores× R。FileConsolidation 功能能夠經過spark.shuffle.consolidateFiles=true
來開啓。
val rdd1 = ... // 讀取hdfs數據,加載成RDD rdd1.cache val rdd2 = rdd1.map(...) val rdd3 = rdd1.filter(...) rdd2.take(10).foreach(println) rdd3.take(10).foreach(println) rdd1.unpersist
cache和unpersisit兩個操做比較特殊,他們既不是action也不是transformation。cache會將標記須要緩存的rdd,真正緩存是在第一次被相關action調用後才緩存;unpersisit是抹掉該標記,而且馬上釋放內存。只有action執行時,rdd1纔會開始建立並進行後續的rdd變換計算。
cache其實也是調用的persist持久化函數,只是選擇的持久化級別爲MEMORY_ONLY
。
persist支持的RDD持久化級別以下:
須要注意的問題:
Cache或shuffle場景序列化時, spark序列化不支持protobuf message,須要java 能夠serializable的對象。一旦在序列化用到不支持java serializable的對象就會出現上述錯誤。
Spark只要寫磁盤,就會用到序列化。除了shuffle階段和persist會序列化,其餘時候RDD處理都在內存中,不會用到序列化。
spark程序是使用一個spark應用實例一次性對一批歷史數據進行處理,spark streaming是將持續不斷輸入的數據流轉換成多個batch分片,使用一批spark應用實例進行處理。
從原理上看,把傳統的spark批處理程序變成streaming程序,spark須要構建什麼?
須要構建4個東西:
一個靜態的 RDD DAG 的模板,來表示處理邏輯;
一個動態的工做控制器,將連續的 streaming data 切分數據片斷,並按照模板複製出新的 RDD ;
DAG 的實例,對數據片斷進行處理;
Receiver進行原始數據的產生和導入;Receiver將接收到的數據合併爲數據塊並存到內存或硬盤中,供後續batch RDD進行消費;
對長時運行任務的保障,包括輸入數據的失效後的重構,處理任務的失敗後的重調。
具體streaming的詳細原理能夠參考廣點通出品的源碼解析文章:
對於spark streaming須要注意如下三點:
內存管理:
Executor的內存主要分爲三塊:
第一塊是讓task執行咱們本身編寫的代碼時使用,默認是佔Executor總內存的20%;
第二塊是讓task經過shuffle過程拉取了上一個stage的task的輸出後,進行聚合等操做時使用,默認也是佔Executor總內存的20%;
第三塊是讓RDD持久化時使用,默認佔Executor總內存的60%。
每一個task以及每一個executor佔用的內存須要分析一下。每一個task處理一個partiiton的數據,分片太少,會形成內存不夠。
其餘資源配置:
具體調優能夠參考美團點評出品的調優文章:
http://tech.meituan.com/spark-tuning-basic.html
http://tech.meituan.com/spark-tuning-pro.html
spark tdw以及tdbank api文檔:
http://git.code.oa.com/tdw/tdw-spark-common/wikis/api
其餘學習資料:
http://km.oa.com/group/2430/articles/show/257492