本人並未從事Spark相關的工做,但因爲項目須要使用了Spark將算法實現並行化,因此本篇博客更多的是一些簡單、直白的Spark用法與優化。適合看本篇博客的人應該是與我同樣因爲課題須要臨時使用Spark或者說出於興趣探索Spark,這篇博客能夠給予一個基礎的介紹。若是你從事使用Spark編程的工做,那麼我更建議你簡單看一看本篇博客後前往官網或者找基本承認度較高的書籍去系統的學習,由於本篇博客不可避免的會出現——對內容的淺嘗輒止、一些其餘經常使用方法的缺失、甚至是一些理解上的誤差。html
Apache Spark™ is a fast and general engine for large-scale data processing.node
Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架,它是爲了實現大數據的快速計算而設計的。Spark擁有Hadoop MapReduce所具備的優勢;但不一樣於MapReduce的是Job中間輸出結果能夠保存在內存中,從而再也不須要讀寫HDFS,所以Spark能更好地適用於數據挖掘與機器學習等須要迭代的MapReduce的算法。python
Spark自己是由Scala編寫的。Scala(http://www.scala-lang.org/ )是一門集成了函數式編程和麪向對象編程特性的語言,同時它與Java同樣編譯成class文件後運行於JVM中,它支持很是多的Java模塊。算法
爲了方便數據科學家們使用Spark去驗證算法,Spark同時提供了Python與R的編程接口。固然,Spark也支持Scala甚至Java,並且它們有比Python或R更好的計算性能。shell
Spark能夠不依賴於任何第三方框架單獨運行,這時其部集羣模式爲standalone模式。與此同時,Spark還支持Meos與Yarn模式,它們都依賴於Apache Hadoop框架。在將來,Spark還將支持Kubernetes平臺。數據庫
關於三種模式的使用,具體能夠前往官方網站(http://spark.apache.org/docs/...查閱。apache
因爲本人嘗試在集羣上安裝部署Spark失敗了,因此這裏不打算介紹如何在集羣上部署Spark,並且更多的狀況下,你們主要工做是去使用集羣工做,部署的事情交給運維們就行了。編程
Spark既可使用shell進行腳本編程來編寫一些簡單的代碼熟悉接口或測試環境,也能夠編寫代碼文件提交應用到集羣運行。數據結構
你能夠在terminal中輸入spark-shell指令來啓動scala的shell,也能夠輸入pyspark指令來啓動python的shell。在啓動shell時,將默認初始化SparkContext對象(SparkContext是全部Spark接口的入口,它包含一個與Spark集羣的鏈接),你能夠直接在腳本中使用sc變量來訪問到它。若是你想要向SparkConf中添加其餘設置(SparkConf是一個Spark應用的設置,它是SparkContext初始化的可選參數),你能夠在輸入啓動指令的同時在後面添加形如--options value的指令來啓動shell,具體設置參數的名字與取值能夠前往 spark.apache.org/docs/latest/configuration.html 查詢。多線程
當運行結構複雜的代碼時,你便須要使用spark-submit參數來提交你的應用。以python和scala爲例,對於python代碼編寫的應用,你能夠直接提交程序入口的py文件,但還須要添加--files來向Spark提交程序須要訪問的本地文件(不一樣文件名之間用逗號隔開),以及添加--py-files來向Spark提交主文件須要調用到的你本身編寫的模塊(不一樣文件名之間用逗號隔開);對於scala代碼編寫的應用,你須要使用工具(例如sbt)來打包你的scala代碼與資源文件,而後直接提交jar文件便可。
撇開Spark中的DAG框架GraphX不談,Spark中的兩種分佈式數據結構即是RDD與DataFrame。RDD與DataFrame都是模板類,其自己是一個集合,它們的集合元素會在計算時分發到不一樣的節點上。DataFrame是相似SQL中表格格式的存儲,而RDD只是單純對集合元素的封裝。除非你的應用須要與SQL數據庫交互,或者你的程序須要用到ml等僅支持DataFrame類型參數的模塊,不然具有優秀靈活性的RDD毫無疑問是更好的選擇。
在RDD數據上Spark支持Transformation與Action兩種類型的操做。
常見的Transformation操做是map、flatmap、distinct等函數,它們在RDD上執行分佈式數據的映射,轉換先後的數據一一對應。注意,Spark中的全部Transformation函數都是lazy的,當你調用它們時,Spark僅作了映射規則以及變量的一些記錄,僅當Transformation後的RDD變量執行Action操做時,以前的Transformation纔會被執行,而這一Transformation的執行結果除去用於Action操做外不會被記錄,換言之,對於相似下面的代碼
mapData = rddData.map(func) result1 = mapData.collect() result2 = mapData.count()
對rdd執行func的amp會運行兩次,而不是想象中的map結果被保存下來並用於兩次Action計算。若是想避免這種狀況,可使用persist函數保留Transformation的結果。
常見的Action操做則包括collect、count等函數,Action函數返回的再也不是分佈式數據對象而是原生數據結構,如RDD[T]對象collect會返回Array[T],而count會返回一個符合條件的集合元素數量的Int值。
因爲涉及到重要的參數設置,這三個概念的關係必需要先理清。
node就是物理意義上集羣裏的一臺機器,worker運行於node上,executor運行於worker中。worker能夠調度的資源受集羣啓動時node設置的限制,executor能夠調度的資源受應用啓動時worker設置的限制。一個node上能夠有多個worker,一個worker中能夠有多個executor。一個應用使用的node數受集羣啓動時設置限制,沒法少用或多用節點;一個應用老是使用每一個node上的一個worker;一個應用老是使用每一個worker中的全部executor。
executor是spark中任務執行的最小單元,manager會在一個stage中一次將分佈式數據的一個切片分發給一個executor進行計算。當計算完「一片」數據後,若RDD還有數據切片未被分發到executor,則manager會將下一切片數據分發給計算完畢的executor。一個executor可使用多個CPU核心,計算「一片」數據時會根據一個executor能夠調用的CPU核心數進行多線程的並行計算。
那麼假如獨佔集羣,一個node僅有一個worker並能夠調用node上全部的CPU核心、而一個worker僅有一個executor並能夠調用全部的worker核心時,集羣利用率應該是最高。
在代碼、提交指令的選項與環境配置文件中均可以對應用的Spark屬性進行設置。假如對相同屬性設置了不一樣的值,其優先級按上文順序依次降低。
不管是使用SparkConf仍是直接使用SparkContext來啓動與集羣的鏈接,master都是必須設置的一個參數,其默認值爲"local[*]",表示以僞集羣模式按driver的物理核心數啓動僞worker。若是僅是在一臺電腦上想要體驗Spark的並行計算,能夠經過調整local中的參數值來設置worker的個數。若是想要使用到集羣,必須將其設置爲集羣的master機IP"spark://xxxx:xxxx"。對於yarn模式直接設置爲"yarn"便可。
driver是應用的「起點」,代碼、資源文件都被提交到driver,同時主函數中的變量與分佈式數據Action的返回值都會存儲於driver的內存。這一參數用於設置driver機器的可用內存上限,即便設置值超過機器的實際內存大小Spark也會進行內存交換來知足應用要求,可是若是應用在driver上使用的內存超過了設置值則spark不會進行內存交換而是會直接報錯。注意,雖然spark文檔上說道SparkConf能夠設置spark.driver.memory,但實際上driver的啓動先於提交的應用代碼運行,因此如需設置請寫在指令或配置文件中。
上面提到,Action返回值會存儲在driver中。這一參數用於設置Action返回值的數據大小上限,超過該值Spark應用會報錯並終止運行。設置爲0表示不設置上限。
該參數用於設置一個worker中的executor數量。注意,該參數僅在yarn模式中生效,假若出於測試加速比等緣由須要在standalone模式中設置executor數量,可使用cores max參數配合executor cores參數來間接設置。
該參數用於設置一個executor所用的CPU核心數,啓動的每一個executor都必須知足這一設置,假如設置值大於worker cores或者因爲在同一worker上已有其餘executor致使核心數不知足條件,則executor不會啓動。
應用可使用的最大核心數。配合executor參數,能夠啓動(cores max)/(executor cores)個executor。
該參數用於設置分佈式數據默認的切片數量或者說並行度,並行度自己也能夠在parallelize等建立分佈式數據的函數中做爲參數傳入。理論上,該值等於應用總核心數,即應用同一時間運行的最大線程數時,計算效率最高。可是因爲並行度低時切片數據更大,容易出現超出內存空間而致使發生磁盤讀寫的狀況。相比屢次分發數據,磁盤讀寫是一個極爲耗時的操做。因此實際中,應該經過估計或者實驗,選擇一個最佳的並行度,它應該是總核心數的整數倍。
除了須要用你本身的數據集合初始化RDD或其餘分佈數據類、以及將對數據的操做封裝成函數或寫成lambda表達式並傳入Transformation或Action的接口中外,Spark的編程與普通編程並無什麼不一樣。更多的工做在於瞭解各個接口的含義來完成並行化,以及瞭解Spark的底層機制來進行應用性能的優化。不過這是一篇入門文章,因此我仍是放一段官方example代碼來解釋一下
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf) lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
這一段代碼足夠簡單,它調用了Transformation中的map與Action中的reduce,它們也是並行編程中最經常使用的操做之一。前兩行代碼裏進行了Spark的初始化設置;第三行進行了對象初始化並返回RDD[String]類型的數據(固然python中不指定數據類型,這只是方便理解),textFile將文件中的數據按行讀取並切片,它也支持傳入第二個參數來設定切片數量;第四行中進行了map操做,建立了一個新的RDD對象存儲文件中每一行的長度(記住以前說的,這裏其實尚未真正執行map),同時這裏說明一下RDD仍是immutable的,即便真正執行了map也並非將原RDD的數據進行了轉換而是建立了新的RDD對象並存儲原RDD數據映射的結果;最後一行裏對RDD進行reduce操做,將每個集合元素的值累加並返回,reduce的傳入函數或lambda表達式要求有兩個與返回值數據類型相同的參數,它將RDD中元素兩兩之間進行計算並做爲下一次計算的傳入參數,底層中多是並行化操做因此假如n個數據只須要logn次計算的時間(沒有去查資料驗證,感興趣本身去查MapReduce的論文)。
Spark自己遠不止這些應用,ml與mllib機器學習庫、更高效的GraphX有向圖框架、更多靈活方便的API,甚至業界主流的基於Spark的機器學習框架BigDL,都是有用的工具。但願這一篇博客能夠幫助你快速上手Spark。若是你想要更深刻地瞭解Spark,除了官網的program guide外能夠幫助你瞭解API的用法與一些底層理論外,Jacek Laskowski的"Mastering Apache Spark 2"也能夠幫助你瞭解Spark的框架結構,有興趣的話能夠去查閱。