Apache Spark是一個圍繞速度、易用性和複雜分析構建的大數據處理框架。最初在2009年由加州大學伯克利分校的AMPLab開發,並於2010年成爲Apache的開源項目之一。html
與Hadoop和Storm等其餘大數據和MapReduce技術相比,Spark有以下優點。java
首先,Spark爲咱們提供了一個全面、統一的框架用於管理各類有着不一樣性質(文本數據、圖表數據等)的數據集和數據源(批量數據或實時的流數據)的大數據處理的需求。python
Spark能夠將Hadoop集羣中的應用在內存中的運行速度提高100倍,甚至可以將應用在磁盤上的運行速度提高10倍。web
Spark讓開發者能夠快速的用Java、Scala或Python編寫程序。它自己自帶了一個超過80個高階操做符集合。並且還能夠用它在shell中以交互式地查詢數據。算法
除了Map和Reduce操做以外,它還支持SQL查詢,流數據,機器學習和圖表數據處理。開發者能夠在一個數據管道用例中單獨使用某一能力或者將這些能力結合在一塊兒使用。sql
在這個Apache Spark文章系列的第一部分中,咱們將瞭解到什麼是Spark,它與典型的MapReduce解決方案的比較以及它如何爲大數據處理提供了一套完整的工具。shell
Hadoop這項大數據處理技術大概已有十年曆史,並且被看作是首選的大數據集合處理的解決方案。MapReduce是一路計算的優秀解決方案,不 過對於須要多路計算和算法的用例來講,並不是十分高效。數據處理流程中的每一步都須要一個Map階段和一個Reduce階段,並且若是要利用這一解決方案, 須要將全部用例都轉換成MapReduce模式。數據庫
在下一步開始以前,上一步的做業輸出數據必需要存儲到分佈式文件系統中。所以,複製和磁盤存儲會致使這種方式速度變慢。另外Hadoop解決方案中 一般會包含難以安裝和管理的集羣。並且爲了處理不一樣的大數據用例,還須要集成多種不一樣的工具(如用於機器學習的Mahout和流數據處理的Storm)。express
若是想要完成比較複雜的工做,就必須將一系列的MapReduce做業串聯起來而後順序執行這些做業。每個做業都是高時延的,並且只有在前一個做業完成以後下一個做業才能開始啓動。apache
而Spark則容許程序開發者使用有向無環圖(DAG)開發複雜的多步數據管道。並且還支持跨有向無環圖的內存數據共享,以便不一樣的做業能夠共同處理同一個數據。
Spark運行在現有的Hadoop分佈式文件系統基礎之上(HDFS)提供額外的加強功能。它支持將Spark應用部署到現存的Hadoop v1集羣(with SIMR – Spark-Inside-MapReduce)或Hadoop v2 YARN集羣甚至是Apache Mesos之中。
咱們應該將Spark看做是Hadoop MapReduce的一個替代品而不是Hadoop的替代品。其意圖並不是是替代Hadoop,而是爲了提供一個管理不一樣的大數據用例和需求的全面且統一的解決方案。
Spark經過在數據處理過程當中成本更低的洗牌(Shuffle)方式,將MapReduce提高到一個更高的層次。利用內存數據存儲和接近實時的處理能力,Spark比其餘的大數據處理技術的性能要快不少倍。
Spark還支持大數據查詢的延遲計算,這能夠幫助優化大數據處理流程中的處理步驟。Spark還提供高級的API以提高開發者的生產力,除此以外還爲大數據解決方案提供一致的體系架構模型。
Spark將中間結果保存在內存中而不是將其寫入磁盤,當須要屢次處理同一數據集時,這一點特別實用。Spark的設計初衷就是既能夠在內存中又可 以在磁盤上工做的執行引擎。當內存中的數據不適用時,Spark操做符就會執行外部操做。Spark能夠用於處理大於集羣內存容量總和的數據集。
Spark會嘗試在內存中存儲儘量多的數據而後將其寫入磁盤。它能夠將某個數據集的一部分存入內存而剩餘部分存入磁盤。開發者須要根據數據和用例評估對內存的需求。Spark的性能優點得益於這種內存中的數據存儲。
Spark的其餘特性包括:
支持比Map和Reduce更多的函數。
優化任意操做算子圖(operator graphs)。
能夠幫助優化總體數據處理流程的大數據查詢的延遲計算。
提供簡明、一致的Scala,Java和Python API。
提供交互式Scala和Python Shell。目前暫不支持Java。
Spark是用Scala程序設計語言編寫而成,運行於Java虛擬機(JVM)環境之上。目前支持以下程序設計語言編寫Spark應用:
Scala
Java
Python
Clojure
R
除了Spark核心API以外,Spark生態系統中還包括其餘附加庫,能夠在大數據分析和機器學習領域提供更多的能力。
這些庫包括:
Spark Streaming:
Spark Streaming基於微批量方式的計算和處理,能夠用於處理實時的流數據。它使用DStream,簡單來講就是一個彈性分佈式數據集(RDD)系列,處理實時數據。
Spark SQL:
Spark SQL能夠經過JDBC API將Spark數據集暴露出去,並且還能夠用傳統的BI和可視化工具在Spark數據上執行相似SQL的查詢。用戶還能夠用Spark SQL對不一樣格式的數據(如JSON,Parquet以及數據庫等)執行ETL,將其轉化,而後暴露給特定的查詢。
Spark MLlib:
MLlib是一個可擴展的Spark機器學習庫,由通用的學習算法和工具組成,包括二元分類、線性迴歸、聚類、協同過濾、梯度降低以及底層優化原語。
Spark GraphX:
GraphX是用於圖計算和並行圖計算的 新的(alpha)Spark API。經過引入彈性分佈式屬性圖(Resilient Distributed Property Graph),一種頂點和邊都帶有屬性的有向多重圖,擴展了Spark RDD。爲了支持圖計算,GraphX暴露了一個基礎操做符集合(如subgraph,joinVertices和aggregateMessages) 和一個通過優化的Pregel API變體。此外,GraphX還包括一個持續增加的用於簡化圖分析任務的圖算法和構建器集合。
除了這些庫之外,還有一些其餘的庫,如BlinkDB和Tachyon。
BlinkDB是一個近似查詢引擎,用於在海量數據上執行交互式SQL查詢。BlinkDB能夠經過犧牲數據精度來提高查詢響應時間。經過在數據樣本上執行查詢並展現包含有意義的錯誤線註解的結果,操做大數據集合。
Tachyon是一個之內存爲中心的 分佈式文件系統,可以提供內存級別速度的跨集羣框架(如Spark和MapReduce)的可信文件共享。它將工做集文件緩存在內存中,從而避免到磁盤中 加載須要常常讀取的數據集。經過這一機制,不一樣的做業/查詢和框架能夠之內存級的速度訪問緩存的文件。
此外,還有一些用於與其餘產品集成的適配器,如Cassandra(Spark Cassandra 鏈接器)和R(SparkR)。Cassandra Connector可用於訪問存儲在Cassandra數據庫中的數據並在這些數據上執行數據分析。
下圖展現了在Spark生態系統中,這些不一樣的庫之間的相互關聯。
圖1. Spark框架中的庫
咱們將在這一系列文章中逐步探索這些Spark庫
Spark體系架構包括以下三個主要組件:
數據存儲
API
管理框架
接下來讓咱們詳細瞭解一下這些組件。
數據存儲:
Spark用HDFS文件系統存儲數據。它可用於存儲任何兼容於Hadoop的數據源,包括HDFS,HBase,Cassandra等。
API:
利用API,應用開發者能夠用標準的API接口建立基於Spark的應用。Spark提供Scala,Java和Python三種程序設計語言的API。
下面是三種語言Spark API的網站連接。
資源管理:
Spark既能夠部署在一個單獨的服務器也能夠部署在像Mesos或YARN這樣的分佈式計算框架之上。
下圖2展現了Spark體系架構模型中的各個組件。
圖2 Spark體系架構
彈性分佈式數據集(基於Matei的研究論文)或RDD是Spark框架中的核心概念。能夠將RDD視做數據庫中的一張表。其中能夠保存任何類型的數據。Spark將數據存儲在不一樣分區上的RDD之中。
RDD能夠幫助從新安排計算並優化數據處理過程。
此外,它還具備容錯性,由於RDD知道如何從新建立和從新計算數據集。
RDD是不可變的。你能夠用變換(Transformation)修改RDD,可是這個變換所返回的是一個全新的RDD,而原有的RDD仍然保持不變。
RDD支持兩種類型的操做:
變換(Transformation)
行動(Action)
變換:變換的返回值是一個新的RDD集合,而不是單個值。調用一個變換方法,不會有任何求值計算,它只獲取一個RDD做爲參數,而後返回一個新的RDD。
變換函數包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。
行動:行動操做計算並返回一個新的值。當在一個RDD對象上調用行動函數時,會在這一時刻計算所有的數據處理查詢並返回結果值。
行動操做包括:reduce,collect,count,first,take,countByKey以及foreach。
安裝和使用Spark有幾種不一樣方式。你能夠在本身的電腦上將Spark做爲一個獨立的框架安裝或者從諸如Cloudera,HortonWorks或MapR之類的供應商處獲取一個Spark虛擬機鏡像直接使用。或者你也可使用在雲端環境(如Databricks Cloud)安裝並配置好的Spark。
在本文中,咱們將把Spark做爲一個獨立的框架安裝並在本地啓動它。最近Spark剛剛發佈了1.2.0版本。咱們將用這一版本完成示例應用的代碼展現。
當你在本地機器安裝了Spark或使用了基於雲端的Spark後,有幾種不一樣的方式能夠鏈接到Spark引擎。
下表展現了不一樣的Spark運行模式所需的Master URL參數。
Spark啓動並運行後,能夠用Spark shell鏈接到Spark引擎進行交互式數據分析。Spark shell支持Scala和Python兩種語言。Java不支持交互式的Shell,所以這一功能暫未在Java語言中實現。
能夠用spark-shell.cmd和pyspark.cmd命令分別運行Scala版本和Python版本的Spark Shell。
不論Spark運行在哪種模式下,均可以經過訪問Spark網頁控制檯查看Spark的做業結果和其餘的統計數據,控制檯的URL地址以下:
http://localhost:4040
Spark控制檯以下圖3所示,包括Stages,Storage,Environment和Executors四個標籤頁
(點擊查看大圖)
圖3. Spark網頁控制檯
Spark提供兩種類型的共享變量能夠提高集羣環境中的Spark程序運行效率。分別是廣播變量和累加器。
廣播變量:廣播變量能夠在每臺機器上緩存只讀變量而不須要爲各個任務發送該變量的拷貝。他們可讓大的輸入數據集的集羣拷貝中的節點更加高效。
下面的代碼片斷展現瞭如何使用廣播變量。
// // Broadcast Variables // val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value
累加器:只有在使用相關操做時纔會添加累加器,所以它能夠很好地支持並行。累加器可用於實現計數(就像在MapReduce中那樣)或求和。能夠用add方法將運行在集羣上的任務添加到一個累加器變量中。不過這些任務沒法讀取變量的值。只有驅動程序纔可以讀取累加器的值。
下面的代碼片斷展現瞭如何使用累加器共享變量:
// // Accumulators // val accum = sc.accumulator(0, "My Accumulator") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) accum.value
本篇文章中所涉及的示例應用是一個簡單的字數統計應用。這與學習用Hadoop進行大數據處理時的示例應用相同。咱們將在一個文本文件上執行一些數 據分析查詢。本示例中的文本文件和數據集都很小,不過無須修改任何代碼,示例中所用到的Spark查詢一樣能夠用到大容量數據集之上。
爲了讓討論儘可能簡單,咱們將使用Spark Scala Shell。
首先讓咱們看一下如何在你本身的電腦上安裝Spark。
前提條件:
爲了讓Spark可以在本機正常工做,你須要安裝Java開發工具包(JDK)。這將包含在下面的第一步中。
一樣還須要在電腦上安裝Spark軟件。下面的第二步將介紹如何完成這項工做。
注:下面這些指令都是以Windows環境爲例。若是你使用不一樣的操做系統環境,須要相應的修改系統變量和目錄路徑已匹配你的環境。
I. 安裝JDK
1)從Oracle網站上下載JDK。推薦使用JDK 1.7版本。
將JDK安裝到一個沒有空格的目錄下。對於Windows用戶,須要將JDK安裝到像c:\dev這樣的文件夾下,而不能安裝到「c: \Program Files」文件夾下。「c:\Program Files」文件夾的名字中包含空格,若是軟件安裝到這個文件夾下會致使一些問題。
注:不要在「c:\Program Files」文件夾中安裝JDK或(第二步中所描述的)Spark軟件。
2)完成JDK安裝後,切換至JDK 1.7目錄下的」bin「文件夾,而後鍵入以下命令,驗證JDK是否正確安裝:
java -version
若是JDK安裝正確,上述命令將顯示Java版本。
II. 安裝Spark軟件:
從Spark網站上下載最新版本 的Spark。在本文發表時,最新的Spark版本是1.2。你能夠根據Hadoop的版本選擇一個特定的Spark版本安裝。我下載了與Hadoop 2.4或更高版本匹配的Spark,文件名是spark-1.2.0-bin-hadoop2.4.tgz。
將安裝文件解壓到本地文件夾中(如:c:\dev)。
爲了驗證Spark安裝的正確性,切換至Spark文件夾而後用以下命令啓動Spark Shell。這是Windows環境下的命令。若是使用Linux或Mac OS,請相應地編輯命令以便可以在相應的平臺上正確運行。
c: cd c:\dev\spark-1.2.0-bin-hadoop2.4 bin\spark-shell
若是Spark安裝正確,就可以在控制檯的輸出中看到以下信息。
…. 15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server 15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) Type in expressions to have them evaluated. Type :help for more information. …. 15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager 15/01/17 23:17:53 INFO SparkILoop: Created spark context.. Spark context available as sc.
能夠鍵入以下命令檢查Spark Shell是否工做正常。
sc.version
(或)
sc.appName
完成上述步驟以後,能夠鍵入以下命令退出Spark Shell窗口:
:quit
若是想啓動Spark Python Shell,須要先在電腦上安裝Python。你能夠下載並安裝Anaconda,這是一個免費的Python發行版本,其中包括了一些比較流行的科學、數學、工程和數據分析方面的Python包。
而後能夠運行以下命令啓動Spark Python Shell:
c: cd c:\dev\spark-1.2.0-bin-hadoop2.4 bin\pyspark
完成Spark安裝並啓動後,就能夠用Spark API執行數據分析查詢了。
這些從文本文件中讀取並處理數據的命令都很簡單。咱們將在這一系列文章的後續文章中向你們介紹更高級的Spark框架使用的用例。
首先讓咱們用Spark API運行流行的Word Count示例。若是尚未運行Spark Scala Shell,首先打開一個Scala Shell窗口。這個示例的相關命令以下所示:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ val txtFile = "README.md" val txtData = sc.textFile(txtFile) txtData.cache()
咱們能夠調用cache函數將上一步生成的RDD對象保存到緩存中,在此以後Spark就不須要在每次數據查詢時都從新計算。須要注意的 是,cache()是一個延遲操做。在咱們調用cache時,Spark並不會立刻將數據存儲到內存中。只有當在某個RDD上調用一個行動時,纔會真正執 行這個操做。
如今,咱們能夠調用count函數,看一下在文本文件中有多少行數據。
txtData.count()
而後,咱們能夠執行以下命令進行字數統計。在文本文件中統計數據會顯示在每一個單詞的後面。
val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) wcData.collect().foreach(println)
若是想查看更多關於如何使用Spark核心API的代碼示例,請參考網站上的Spark文檔。
在後續的系列文章中,咱們將從Spark SQL開始,學習更多關於Spark生態系統的其餘部分。以後,咱們將繼續瞭解Spark Streaming,Spark MLlib和Spark GraphX。咱們也會有機會學習像Tachyon和BlinkDB等框架。
在本文中,咱們瞭解了Apache Spark框架如何經過其標準API幫助完成大數據處理和分析工做。咱們還對Spark和傳統的MapReduce實現(如Apache Hadoop)進行了比較。Spark與Hadoop基於相同的HDFS文件存儲系統,所以若是你已經在Hadoop上進行了大量投資和基礎設施建設,可 以一塊兒使用Spark和MapReduce。
此外,也能夠將Spark處理與Spark SQL、機器學習以及Spark Streaming結合在一塊兒。關於這方面的內容咱們將在後續的文章中介紹。
利用Spark的一些集成功能和適配器,咱們能夠將其餘技術與Spark結合在一塊兒。其中一個案例就是將Spark、Kafka和Apache Cassandra結合在一塊兒,其中Kafka負責輸入的流式數據,Spark完成計算,最後Cassandra NoSQL數據庫用於保存計算結果數據。
不過須要牢記的是,Spark生態系統仍不成熟,在安全和與BI工具集成等領域仍然須要進一步的改進。
Srini Penchikala目 前是一家金融服務機構的軟件架構師,這個機構位於德克薩斯州的奧斯汀。他在軟件系統架構、設計和開發方面有超過20年的經驗。Srini目前正在撰寫一本 關於NoSQL數據庫模式的書。他仍是曼寧出版社出版的《Spring Roo in Action》一書的合著者(http://www.manning.com/SpringRooinAction)。他還曾經出席各類會議,如 JavaOne,SEI Architecture Technology Conference(SATURN),IT Architect Conference(ITARC),No Fluff Just Stuff,NoSQL Now和Project World Conference等。Srini還在InfoQ,The ServerSide,OReilly Network(ONJava),DevX Java,java.net以及JavaWorld等網站上發表過不少關於軟件系統架構、安全和風險管理以及NoSQL數據庫等方面的文章。他仍是InfoQ NoSQL數據庫社區的責任編輯。
查看英文原文:Big Data Processing with Apache Spark – Part 1: Introduction