歡迎你們前往騰訊雲+社區,獲取更多騰訊海量技術實踐乾貨哦~node
本文首發在雲+社區,未經許可,不得轉載。sql
做者:張國鵬 | 騰訊 運營開發工程師mongodb
Spark做爲大數據計算引擎,憑藉其快速、穩定、簡易等特色,快速的佔領了大數據計算的領域。本文主要爲做者在搭建使用計算平臺的過程當中,對於Spark的理解,但願能給讀者一些學習的思路。文章內容爲介紹Spark在DataMagic平臺扮演的角色、如何快速掌握Spark以及DataMagic平臺是如何使用好Spark的。docker
整套架構的主要功能爲日誌接入、查詢(實時和離線)、計算。離線計算平臺主要負責計算這一部分,系統的存儲用的是COS(公司內部存儲),而非HDFS。apache
下面將主要介紹Spark on Yarn這一架構,抽取出來即圖2-2所示,能夠看到Spark on yarn的運行流程。編程
對於理解Spark,我以爲掌握下面4個步驟就能夠了。bash
對於入門,學習Spark能夠經過其架構圖,快速瞭解其關鍵術語,掌握了關鍵術語,對Spark基本上就有認識了,分別是結構術語Shuffle、Patitions、MapReduce、Driver、Application Master、Container、Resource Manager、Node Manager等。API編程術語關鍵RDD、DataFrame,結構術語用於瞭解其運行原理,API術語用於使用過程當中編寫代碼,掌握了這些術語以及背後的知識,你就也知道Spark的運行原理和如何編程了。架構
Spark在運行的時候,不少運行信息是經過配置文件讀取的,通常在spark-defaults.conf,要把Spark使用好,須要掌握一些關鍵配置,例如跟運行內存相關的,spark.yarn.executor.memoryOverhead、spark.executor.memory,跟超時相關的spark.network.timeout等等,Spark不少信息均可以經過配置進行更改,所以對於配置須要有必定的掌握。可是使用配置時,也要根據不一樣的場景,這個舉個例子,例如spark.speculation配置,這個配置主要目的是推測執行,當worker1執行慢的狀況下,Spark會啓動一個worker2,跟worker1執行相同的任務,誰先執行完就用誰的結果,從而加快計算速度,這個特性在通常計算任務來講是很是好的,可是若是是執行一個出庫到Mysql的任務時,同時有兩個同樣的worker,則會致使Mysql的數據重複。所以咱們在使用配置時,必定要理解清楚,直接google spark conf就會列出不少配置了。併發
咱們之因此使用Spark進行計算,緣由就是由於它計算快,可是它快的緣由很大在於它的並行度,掌握Spark是如何提供並行服務的,從而是咱們更好的提升並行度。app
對於提升並行度,對於RDD,須要從幾個方面入手,一、配置num-executor。二、配置executor-cores。三、配置spark.default.parallelism。三者之間的關係通常爲spark.default.parallelism=num-executors*executor-cores的2~3倍較爲合適。對於Spark-sql,則設置spark.sql.shuffle.partitions、num-executor和executor-cores。
新手而言,特別是須要對Spark進行優化或者修改時,感到很迷茫,其實咱們能夠首先聚焦於局部,而Spark確實也是模塊化的,不須要以爲Spark複雜而且難以理解,我將從修改Spark代碼的某一角度來進行分析。
首先,Spark的目錄結構如圖3-1所示,能夠經過文件夾,快速知道sql、graphx等代碼所在位置,而Spark的運行環境主要由jar包支撐,如圖3-2所示,這裏截取部分jar包,實際上遠比這多,全部的jar包均可以經過Spark的源代碼進行編譯,當須要修改某個功能時,僅須要找到相應jar包的代碼,修改以後,編譯該jar包,而後進行替換就好了。
而對於編譯源代碼這塊,其實也很是簡單,安裝好maven、scala等相關依賴,下載源代碼進行編譯便可,掌握修改源碼技巧對於使用好開源項目十分重要。
Spark在DataMagic中使用,也是在邊使用邊探索的過程,在這過程當中,列舉了其比較重要的特色。
在計算中,計算任務的數量以及數據的量級天天都會發生變化,所以對於Spark平臺,須要有快速部署的特性,在實體機上,有一鍵部署腳本,只要運行一個腳本,則能夠立刻上線一個擁有128G內存、48cores的實體機,可是實體機一般須要申請報備才能得到,所以還會有docker來支持計算資源。
Spark大多數屬性都是經過配置來實現的,所以能夠經過配置動態修改Spark的運行行爲,這裏舉個例子,例如經過配置自動調整exector的數量。
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>複製代碼
spark.dynamicAllocation.minExecutors 1 #最小Executor數
spark.dynamicAllocation.maxExecutors 100 #最大Executor數複製代碼
經過這種配置,能夠達到自動調整exector的目的。
做爲一個平臺,其計算任務確定不是固定的,有的數據量多,有的數據量少,所以須要合理分配資源,例若有些千萬、億級別的數據,分配20覈計算資源就足夠了。可是有些數據量級達到百億的,就須要分配更多的計算資源了。參考第三章節的第3點。
計算的目的其實就是爲了服務業務,業務的需求也理應是平臺的追求,當業務產生合理需求時,平臺方也應該儘可能去知足。如爲了支持業務高併發、高實時性查詢的需求下,Spark在數據出庫方式上,支持了Cmongo的出庫方式。
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
database = d = dict((l.split('=') for l in dbparameter.split()))
parquetFile = sqlContext.read.parquet(file_name)
parquetFile.registerTempTable(tempTable)
result = sqlContext.sql(sparksql)
url = "mongodb://"+database['user']+":"+database['password']+"@"+database['host']+":"+database['port'] result.write.format("com.mongodb.spark.sql").mode('overwrite').options(uri=url,database=database['dbname'],collection=pg_table_name).save()複製代碼
Spark在計算任務失敗時候,須要去定位失敗緣由,當Job失敗是,能夠經過yarn logs -applicationId application 來合併任務log,打開log,定位到Traceback,通常能夠找到失敗緣由。通常而言,失敗能夠分紅幾類。
a. 代碼問題,寫的Sql有語法問題,或者Spark代碼有問題。
b. Spark問題,舊Spark版本處理NULL值等。
c. 任務長時間Running狀態,則多是數據傾斜問題。
d. 任務內存越界問題。
Spark集羣在平常使用中,也是須要運營維護的,從而運營維護,發現其存在的問題,不斷的對集羣進行優化,這裏從如下幾個方面進行介紹,經過運營手段來保障集羣的健壯性和穩定性,保證任務順利執行。
a. 定時查看是否有lost node和unhealthy node,能夠經過腳原本定時設置告警,若存在,則須要進行定位處理。
b. 定時掃描hdfs的運行log是否滿了,須要定時刪除過時log。
c. 定時掃描集羣資源是否知足計算任務使用,可以提早部署資源。
本文主要是經過做者在搭建使用計算平臺的過程當中,寫出對於Spark的理解,而且介紹了Spark在當前的DataMagic是如何使用的,當前平臺已經用於架平離線分析,天天計算分析的數據量已經達到千億~萬億級別。
問答
相關閱讀
此文已由做者受權騰訊雲+社區發佈,原文連接:https://cloud.tencent.com/developer/article/1092587?fromSource=waitui