官網:spark.apache.orgjava
Spark是一種快速、通用、可擴展的大數據分析引擎,2009年誕生於加州大學伯克利分校AMPLab,2010年開源,2013年6月成爲Apache孵化項目,2014年2月成爲Apache頂級項目。目前,Spark生態系統已經發展成爲一個包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目,Spark是基於內存計算的大數據並行計算框架。Spark基於內存計算,提升了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,容許用戶將Spark部署在大量廉價硬件之上,造成集羣。node
Spark是一個計算框架,而Hadoop中包含計算框架MapReduce和分佈式文件系統HDFS,Hadoop更普遍地說還包括在其生態系統上的其餘系統.linux
Hadoop的MapReduce計算模型存在問題: Hadoop的MapReduce的核心是Shuffle(洗牌).在整個Shuffle的過程當中,至少產生6次I/O流.基於MapReduce計算引擎一般會將結果輸出到次盤上,進行存儲和容錯.另外,當一些查詢(如:hive)翻譯到MapReduce任務是,每每會產生多個Stage,而這些Stage有依賴底層文件系統來存儲每個Stage的輸出結果,而I/O的效率每每較低,從而影響MapReduce的運行速度.算法
快shell
與Hadoop的MapReduce相比,Spark基於內存的運算要快100倍以上,基於硬盤的運算也要快10倍以上。Spark實現了高效的DAG執行引擎,能夠經過基於內存來高效處理數據流。數據庫
易用apache
Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶能夠快速構建不一樣的應用。並且Spark支持交互式的Python和Scala的shell,能夠很是方便地在這些shell中使用Spark集羣來驗證解決問題的方法。編程
通用vim
Spark提供了統一的解決方案。Spark能夠用於批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不一樣類型的處理均可以在同一個應用中無縫使用。Spark統一的解決方案很是具備吸引力,畢竟任何公司都想用統一的平臺去處理遇到的問題,減小開發和維護的人力成本和部署平臺的物力成本。bash
兼容性
Spark 能夠很是方便地與其餘的開源產品進行融合。好比,Spark 可使用Hadoop 的 YARN 和 Apache Mesos 做爲它的資源管理和調度器.而且能夠處理全部 Hadoop 支持的數據,包括 HDFS、HBase 和 Cassandra 等。這對於已經部署Hadoop 集羣的用戶特別重要,由於不須要作任何數據遷移就可使用 Spark 的強大處理能力。Spark 也能夠不依賴於第三方的資源管理和調度器,它實現了Standalone 做爲其內置的資源管理和調度框架,這樣進一步下降了 Spark 的使用門檻,使得全部人均可以很是容易地部署和使用 Spark。此外,Spark 還提供了在EC2 上部Standalone 的 Spark 集羣的工具。
實現了 Spark 的基本功能,包含任務調度、內存管理、錯誤恢復、與存儲系統 交互等模塊。Spark Core 中還包含了對彈性分佈式數據集(resilient distributed dataset,簡稱RDD)的 API 定義。
Spark Streaming基於微批量方式的計算和處理,能夠用於處理實時的流數據.它使用DStream,簡單來講是一個彈性分佈式數據集(RDD)系列,處理實時數據.數據能夠從Kafka,Flume,Kinesis或TCP套接字等衆多來源獲取,而且可使用由高級函數(如 map,reduce,join 和 window)開發的複雜算法進行流數據處理。最後,處理後的數據能夠被推送到文件系統,數據庫和實時儀表板。
SPark SQL能夠經過JDBC API將Spark數據集暴露出去,並且還能夠用傳統的BI和可視化工具在Spark數據上執行相似SQL的查詢,用戶哈能夠用Spark SQL對不一樣格式的數據(如Json, Parque以及數據庫等)執行ETl,將其轉化,而後暴露特定的查詢.
MLlib是一個可擴展的Spark機器學習庫,由通用的學習算法和工具組成,包括二元分類、線性迴歸、聚類、協同過濾、梯度降低以及底層優化原語。
GraphX是用於圖計算和並行圖計算的新的(alpha)Spark API。經過引入彈性分佈式屬性圖(Resilient Distributed Property Graph),一種頂點和邊都帶有屬性的有向多重圖,擴展了Spark RDD。爲了支持圖計算,GraphX暴露了一個基礎操做符集合(如subgraph,joinVertices和aggregateMessages)和一個通過優化的Pregel API變體。此外,GraphX還包括一個持續增加的用於簡化圖分析任務的圖算法和構建器集合。
Spark 設計爲能夠高效地在一個計算節點到數千個計算節點之間伸縮計 算。爲了實現這樣的要求,同時得到最大靈活性,Spark 支持在各類集羣管理器(cluster manager)上運行,包括 Hadoop YARN、Apache Mesos,以及 Spark 自帶的一個簡易調度 器,叫做獨立調度器。
Spark獲得了衆多大數據公司的支持,這些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優酷土豆。當前百度的Spark已應用於鳳巢、大搜索、直達號、百度大數據等業務;阿里利用GraphX構建了大規模的圖計算和圖挖掘系統,實現了不少生產系統的推薦算法;騰訊Spark集羣達到8000臺的規模,是當前已知的世界上最大的Spark集羣。
咱們大體把Spark的用例分爲兩類:數據科學應用和數據處理應用。也就對應的有兩種人羣:數據科學家和工程師。
數據科學任務
主要是數據分析領域,數據科學家要負責分析數據並建模,具有 SQL、統計、預測建模(機器學習)等方面的經驗,以及必定的使用 Python、 Matlab 或 R 語言進行編程的能力。
數據處理應用
工程師定義爲使用 Spark 開發 生產環境中的數據處理應用的軟件開發者,經過對接Spark的API實現對處理的處理和轉換等任務。
Driver:運行Application的main() 函數並建立SparkContext
Worker:從節點,負責控制計算節點,啓動Ex而粗投入或Driver
SparkContext: 整個應用的上下文,監控應用的生命週期
SparkConf:負責存儲配置信息。
Executor: 執行器,在worker node上執行任務組件,用於啓動線程執行任務.每一個Application擁有獨立的一組Executors
ClusterManager:在standlone模式中即爲Master(主節點),控制整個集羣.監控Worker.在Yarn模式中爲資源管理器.
RDD:彈性分佈式集合,spark的基本計算單元,一組RDD可造成執行的有向無環圖RDD Graph
DAG Scheduler: 根據做業(Job)構建基於Stage的DAG,並交給Stage給TaskScheduler
TaskScheduler:將任務(Task)分發給Executor執行
SparkEnv:線程級別的上下文,存儲運行時的重要組件的引用。SparkEnv內建立幷包含以下一些重要組件的引用。
MapOutPutTracker:負責Shuffle元信息的存儲。
BroadcastManager:負責廣播變量的控制與元信息的存儲。
BlockManager:負責存儲管理、建立和查找塊。
MetricsSystem:監控運行時性能指標信息。
Spark的總體流程:client提交應用,Master找到一個Worker啓動Driver,Driver向Master或者向資源管理器申請資源,以後將應用轉化爲RDD Graph,再由DAGScheduler將RDD Graph轉化爲Stage的有向無環圖提交給TaskScheduler,由TaskScheduler提交任務給Executor執行。在任務執行的過程當中,其餘組件協同工做,確保整個應用順利執行。
Spark的部署模式有Local、Local-Cluster、Standalone、Yarn、Mesos,咱們選擇最具表明性的Standalone集羣部署模式。安裝java環境,Spark自動會把scala SDK打包到Spark中無需安裝scala環境
linux: CentOS-7.5_x64 hadoop: hadoop-3.2.0 spark: spark-2.3.3 zookeeper: zookeeper-3.4.10
主機名 | IP | 安裝軟件 | 運行進程 |
---|---|---|---|
node-1 | 192.168.91.11 | spark | Master |
node-2 | 192.168.91.12 | spark,zookeeper | Worker,QuorumPeerMain |
node-3 | 192.168.91.13 | spark,zookeeper | Worker,QuorumPeerMain |
node-4 | 192.168.91.14 | spark,zookeeper | Worker,QuorumPeerMain |
# 下載對應的Spark安裝包
$ wget http://mirrors.hust.edu.cn/apache/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
# 解壓縮
$ tar -zxvf spark-2.3.3-bin-hadoop2.7.tgz
# 進入spark解壓目錄
$ cd $SPARK_HOME
# 修改Spark的環境配置文件
$ cp conf/spark-env.sh.template spark-env.sh
$ vim conf/spark-env.sh
# 添加以下配置
export JAVA_HOME=/usr/java/jdk1.8.0_191
# 修改slave的配置
$ cp $SPARK_HOME/conf/slaves.template slaves
$ vi slaves
# 在該文件中添加子節點所在的位置(Worker節點)
node-2
node-3
node-4
# 將配置好的spark 複製到其餘機器上(node-2,node-3,node-4)
$ scp -r spark-2.3.2-bin-hadoop2.7 root@node-2:/xxx/xxx
# 啓動spark集羣
$ sbin/start-master.sh
$ sbin/start-slaves.sh
# 也能夠是用這個腳本啓動全部機器
$ sbin/start-all.sh
複製代碼
啓動後執行jps命令,主節點上有Master進程,其餘子節點上有Work進行,登陸Spark管理界面查看集羣狀態(主節點):http://node-1:8080/
主機名 | IP | 安裝軟件 | 運行進程 |
---|---|---|---|
node-1 | 192.168.91.11 | spark | Master |
node-2 | 192.168.91.12 | spark,zookeeper | Master,QuorumPeerMain |
node-3 | 192.168.91.13 | spark,zookeeper | Worker,QuorumPeerMain |
node-4 | 192.168.91.14 | spark,zookeeper | Worker,QuorumPeerMain |
1.安裝配置zk集羣,並啓動zk集羣zookeeper安裝
2.修改spark的配置文件添加以下配置
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node-2:2181,node-3:2181,node-4:2181 -Dspark.deploy.zookeeper.dir=/spark"
複製代碼
3.修改全部節點的slaves文件改成(node-3,node-4)節點
4.在node1上執行 sbin/start-all.sh,而後在 node-2 上啓動第二個 Master(sbin/start-master.sh )
$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://master-ip:7077 --executor-memory 1G --total-executor-cores 2 $SPARK_HOME/examples/jars/spark-examples_2.11-2.3.3.jar 100
複製代碼
spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶能夠在該命令行下用scala編寫spark程序。
$SPARK_HOME/bin/spark-shell --master spark://node-1:7077 --executor-memory 2g --total-executor-cores 2
複製代碼
參數說明:
# 指定Master的地址
--master spark://node-1:7077
# 指定每一個worker可用內存爲2G
--executor-memory 2g
# 指定整個集羣使用的cup核數爲2個
--total-executor-cores 2
複製代碼
注意
若是啓動spark shell時沒有指定master地址,可是也能夠正常啓動spark shell和執行spark shell中的程序,實際上是啓動了spark的local模式,該模式僅在本機啓動一個進程,沒有與集羣創建聯繫。 Spark Shell中已經默認將SparkContext類初始化爲對象sc。用戶代碼若是須要用到,則直接應用sc便可
在spark shell中用scala語言編寫spark程序
# sc是SparkContext對象,該對象時提交spark程序的入口
sc.textFile("file:///root/data/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("file:///root/data/output1")
# 從本地文件系統中讀取數據
textFile("file:///root/data/words.txt")
# 讀取每一行數據並切分
flatMap(_.split(" "))
# 將數據切分映射將單詞和1構成元組
map((_,1))
# 按照key進行reduce,並將value累加
reduceByKey(_+_)
# 將結果寫入到指定位置
saveAsTextFile("file:///root/data/output1")
複製代碼