【轉】Spark,一種快速數據分析替代方案

Spark 是一種與 Hadoop 類似的開源集羣計算環境,可是二者之間還存在一些不一樣之處,這些有用的不一樣之處使 Spark 在某些工做負載方面表現得更加優越,換句話說,Spark 啓用了內存分佈數據集,除了可以提供交互式查詢外,它還能夠優化迭代工做負載。html

Spark 是在 Scala 語言中實現的,它將 Scala 用做其應用程序框架。與 Hadoop 不一樣,Spark 和 Scala 可以緊密集成,其中的 Scala 能夠像操做本地集合對象同樣輕鬆地操做分佈式數據集。git

儘管建立 Spark 是爲了支持分佈式數據集上的迭代做業,可是實際上它是對 Hadoop 的補充,能夠在 Hadoo 文件系統中並行運行。經過名爲 Mesos 的第三方集羣框架能夠支持此行爲。Spark 由加州大學伯克利分校 AMP 實驗室 (Algorithms, Machines, and People Lab) 開發,可用來構建大型的、低延遲的數據分析應用程序。github

Spark 集羣計算架構算法

雖然 Spark 與 Hadoop 有類似之處,但它提供了具備有用差別的一個新的集羣計算框架。首先,Spark 是爲集羣計算中的特定類型的工做負載而設計,即那些在並行操做之間重用工做數據集(好比機器學習算法)的工做負載。爲了優化這些類型的工做負載,Spark 引進了內存集羣計算的概念,可在內存集羣計算中將數據集緩存在內存中,以縮短訪問延遲。shell

Spark 還引進了名爲 彈性分佈式數據集 (RDD) 的抽象。RDD 是分佈在一組節點中的只讀對象集合。這些集合是彈性的,若是數據集一部分丟失,則能夠對它們進行重建。重建部分數據集的過程依賴於容錯機制,該機制能夠維護 「血統」(即充許基於數據衍生過程重建部分數據集的信息)。RDD 被表示爲一個 Scala 對象,而且能夠從文件中建立它;一個並行化的切片(遍及於節點之間);另外一個 RDD 的轉換形式;而且最終會完全改變現有 RDD 的持久性,好比請求緩存在內存中。express

Spark 中的應用程序稱爲驅動程序,這些驅動程序可實如今單一節點上執行的操做或在一組節點上並行執行的操做。與 Hadoop 相似,Spark 支持單節點集羣或多節點集羣。對於多節點操做,Spark 依賴於 Mesos 集羣管理器。Mesos 爲分佈式應用程序的資源共享和隔離提供了一個有效平臺(參見 圖 1)。該設置充許 Spark 與 Hadoop 共存於節點的一個共享池中。編程


圖 1. Spark 依賴於 Mesos 集羣管理器實現資源共享和隔離。
圖片顯示了資源共享和隔離中 Mesos 和 Spark 之間的關係  

Spark 編程模式緩存

驅動程序能夠在數據集上執行兩種類型的操做:動做和轉換。動做 會在數據集上執行一個計算,並向驅動程序返回一個值;而轉換會從現有數據集中建立一個新的數據集。動做的示例包括執行一個 Reduce 操做(使用函數)以及在數據集上進行迭代(在每一個元素上運行一個函數,相似於 Map 操做)。轉換示例包括 Map 操做和 Cache 操做(它請求新的數據集存儲在內存中)。安全

咱們隨後就會看看這兩個操做的示例,可是,讓咱們先來了解一下 Scala 語言。bash

回頁首

Scala 簡介

Scala 多是 Internet 上鮮爲人知的祕密之一。您能夠在一些最繁忙的 Internet 網站(如 Twitter、LinkedIn 和 Foursquare,Foursquare 使用了名爲 Lift 的 Web 應用程序框架)的製做過程當中看到 Scala 的身影。還有證據代表,許多金融機構已開始關注 Scala 的性能(好比 EDF Trading 公司將 Scala 用於衍生產品訂價)。

Scala 是一種多範式語言,它以一種流暢的、讓人感到舒服的方法支持與命令式、函數式和麪向對象的語言相關的語言特性。從面向對象的角度來看,Scala 中的每一個值都是一個對象。一樣,從函數觀點來看,每一個函數都是一個值。Scala 也是屬於靜態類型,它有一個既有表現力又很安全的類型系統。

此外,Scala 是一種虛擬機 (VM) 語言,而且能夠經過 Scala 編譯器生成的字節碼,直接運行在使用 Java Runtime Environment V2 的 Java™ Virtual Machine (JVM) 上。該設置充許 Scala 運行在運行 JVM 的任何地方(要求一個額外的 Scala 運行時庫)。它還充許 Scala 利用大量現存的 Java 庫以及現有的 Java 代碼。

最後,Scala 具備可擴展性。該語言(它實際上表明瞭可擴展語言)被定義爲可直接集成到語言中的簡單擴展。

Scala 的起源

Scala 語言由 Ecole Polytechnique Federale de Lausanne(瑞士洛桑市的兩所瑞士聯邦理工學院之一)開發。它是 Martin Odersky 在開發了名爲 Funnel 的編程語言以後設計的,Funnel 集成了函數編程和 Petri net 中的創意。在 2011 年,Scala 設計團隊從歐洲研究委員會 (European Research Council) 那裏得到了 5 年的研究經費,而後他們成立新公司 Typesafe,從商業上支持 Scala,接收籌款開始相應的運做。

舉例說明 Scala

讓咱們來看一些實際的 Scala 語言示例。Scala 提供自身的解釋器,充許您以交互方式試用該語言。Scala 的有用處理已超出本文所涉及的範圍,可是您能夠在 參考資料 中找到更多相關信息的連接。

清單 1 經過 Scala 自身提供的解釋器開始了快速瞭解 Scala 語言之旅。啓用 Scala 後,系統會給出提示,經過該提示,您能夠以交互方式評估表達式和程序。咱們首先建立了兩個變量,一個是不可變變量(即vals,稱做單賦值),另外一個變量是可變變量 (vars)。注意,當您試圖更改 b(您的 var)時,您能夠成功地執行此操做,可是,當您試圖更改 val 時,則會返回一個錯誤。


清單 1. Scala 中的簡單變量
$ scala Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.
 
scala> val a = 1 a: Int = 1
 
scala> var b = 2 b: Int = 2
 
scala> b = b + a b: Int = 3
 
scala> a = 2 <console>6: error: reassignment to val
       a = 2
         ^

接下來,建立一個簡單的方法來計算和返回 Int 的平方值。在 Scala 中定義一個方法得先從 def 開始,後跟方法名稱和參數列表,而後,要將它設置爲語句的數量(在本示例中爲 1)。無需指定任何返回值,由於能夠從方法自己推斷出該值。注意,這相似於爲變量賦值。在一個名爲 3 的對象和一個名爲 res0 的結果變量(Scala 解釋器會自動爲您建立該變量)上,我演示了這個過程。這些都顯示在 清單 2 中。


清單 2. Scala 中的一個簡單方法
scala> def square(x: Int) = x*x square: (x: Int)Int
 
scala> square(3) res0: Int = 9

scala> square(res0) res1: Int = 81

接下來,讓咱們看一下 Scala 中的一個簡單類的構建過程(參見 清單 3)。定義一個簡單的 Dog 類來接收一個 String 參數(您的名稱構造函數)。注意,這裏的類直接採用了該參數(無需在類的正文中定義類參數)。還有一個定義該參數的方法,可在調用參數時發送一個字符串。您要建立一個新的類實例,而後調用您的方法。注意,解釋器會插入一些豎線:它們不屬於代碼。


清單 3. Scala 中的一個簡單的類
scala> class Dog( name: String ) {      |   def bark() = println(name + " barked")      | } defined class Dog
 
scala> val stubby = new Dog("Stubby") stubby: Dog = Dog@1dd5a3d
 
scala> stubby.bark Stubby barked
 
scala>

完成上述操做後,只需輸入 :quit 便可退出 Scala 解釋器。

回頁首

安裝 Scala 和 Spark

第一步是下載和配置 Scala。清單 4 中顯示的命令闡述了 Scala 安裝的下載和準備工做。使用 Scala v2.8,由於這是通過證明的 Spark 所需的版本。


清單 4. 安裝 Scala
$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.8.1.final.tgz $ sudo tar xvfz scala-2.8.1.final.tgz --directory /opt/

要使 Scala 可視化,請將下列行添加至您的 .bashrc 中(若是您正使用 Bash 做爲 shell):

export SCALA_HOME=/opt/scala-2.8.1.final
export PATH=$SCALA_HOME/bin:$PATH

接着能夠對您的安裝進行測試,如 清單 5 所示。這組命令會將更改加載至 bashrc 文件中,接着快速測試 Scala 解釋器 shell。


清單 5. 配置和運行交互式 Scala
$ scala Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> println("Scala is installed!") Scala is installed!

scala> :quit $

如清單中所示,如今應該看到一個 Scala 提示。您能夠經過輸入 :quit 執行退出。注意,Scala 要在 JVM 的上下文中執行操做,因此您會須要 JVM。我使用的是 Ubuntu,它在默認狀況下會提供 OpenJDK。

接下來,請獲取最新的 Spark 框架副本。爲此,請使用 清單 6 中的腳本。


清單 6. 下載和安裝 Spark 框架
wget https://github.com/mesos/spark/tarball/0.3-scala-2.8/
mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz
$ sudo tar xvfz mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz

接下來,使用下列行將 spark 配置設置在 Scala 的根目錄 ./conf/spar-env.sh 中:

export SCALA_HOME=/opt/scala-2.8.1.final

設置的最後一步是使用簡單的構建工具 (sbt) 更新您的分佈。sbt 是一款針對 Scala 的構建工具,用於 Spark 分佈中。您能夠在 mesos-spark-c86af80 子目錄中執行更新和變異步驟,以下所示:

$ sbt/sbt update compile

注意,在執行此步驟時,須要鏈接至 Internet。當完成此操做後,請執行 Spark 快速檢測,如 清單 7 所示。在該測試中,須要運行 SparkPi 示例,它會計算 pi 的估值(經過單位平方中的任意點採樣)。所顯示的格式須要樣例程序 (spark.examples.SparkPi) 和主機參數,該參數定義了 Mesos 主機(在此例中,是您的本地主機,由於它是一個單節點集羣)和要使用的線程數量。注意,在 清單 7 中,執行了兩個任務,並且這兩個任務被序列化(任務 0 開始和結束以後,任務 1 再開始)。


清單 7. 對 Spark 執行快速檢測
$ ./run spark.examples.SparkPi local[1] 11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registered actor on port 50501
11/08/26 19:52:33 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
11/08/26 19:52:33 INFO spark.SparkContext: Starting job...
11/08/26 19:52:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Asked for current cache locations
11/08/26 19:52:33 INFO spark.LocalScheduler: Final stage: Stage 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Parents of final stage: List()
11/08/26 19:52:33 INFO spark.LocalScheduler: Missing parents: List()
11/08/26 19:52:33 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 1
11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 1
11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
11/08/26 19:52:33 INFO spark.SparkContext: Job finished in 0.145892763 s
Pi is roughly 3.14952
$

經過增長線程數量,您不只能夠增長線程執行的並行化,還能夠用更少的時間執行做業(如 清單 8 所示)。


清單 8. 對包含兩個線程的 Spark 執行另外一個快速檢測
$ ./run spark.examples.SparkPi local[2] 11/08/26 20:04:30 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registered actor on port 50501
11/08/26 20:04:30 INFO spark.SparkContext: Starting job...
11/08/26 20:04:30 INFO spark.CacheTracker: Registering RDD ID 0 with cache
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Asked for current cache locations
11/08/26 20:04:30 INFO spark.LocalScheduler: Final stage: Stage 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Parents of final stage: List()
11/08/26 20:04:30 INFO spark.LocalScheduler: Missing parents: List()
11/08/26 20:04:30 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 1
11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 1
11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
11/08/26 20:04:30 INFO spark.SparkContext: Job finished in 0.101287331 s
Pi is roughly 3.14052
$

回頁首

使用 Scala 構建一個簡單的 Spark 應用程序

要構建 Spark 應用程序,您須要單一 Java 歸檔 (JAR) 文件形式的 Spark 及其依賴關係。使用 sbt 在 Spark 的頂級目錄中建立該 JAR 文件,以下所示:

$ sbt/sbt assembly

結果產生一個文件 ./core/target/scala_2.8.1/"Spark Core-assembly-0.3.jar"。將該文件添加至您的 CLASSPATH 中,以即可以訪問它。在本示例中,不會用到此 JAR 文件,由於您將會使用 Scala 解釋器運行它,而不是對其進行編譯。

在本示例中,使用了標準的 MapReduce 轉換(如 清單 9 所示)。該示例從執行必要的 Spark 類導入開始。接着,須要定義您的類 (SparkTest) 及其主方法,用它解析稍後使用的參數。這些參數定義了執行 Spark 的環境(在本例中,該環境是一個單節點集羣)。接下來,要建立 SparkContext 對象,它會告知 Spark 如何對您的集羣進行訪問。該對象須要兩個參數:Mesos 主機名稱(已傳入)以及您分配給做業的名稱 (SparkTest)。解析命令行中的切片數量,它會告知 Spark 用於做業的線程數量。要設置的最後一項是指定用於 MapReduce 操做的文本文件。

最後,您將瞭解 Spark 示例的實質,它是由一組轉換組成。使用您的文件時,可調用 flatMap 方法返回一個 RDD(經過指定的函數將文本行分解爲標記)。而後經過 map 方法(該方法建立了鍵值對)傳遞此 RDD ,最終經過 ReduceByKey 方法合併鍵值對。合併操做是經過將鍵值對傳遞給 _ + _ 匿名函數來完成的。該函數只採用兩個參數(密鑰和值),並返回將二者合併所產生的結果(一個String 和一個 Int)。接着以文本文件的形式發送該值(到輸出目錄)。


清單 9. Scala/Spark 中的 MapReduce (SparkTest.scala)
import spark.SparkContext
import SparkContext._
 
object SparkTest {
 
  def main( args: Array[String]) {
 
    if (args.length == 0) {
      System.err.println("Usage: SparkTest <host> [<slices>]")
      System.exit(1)
    }
 
    val spark = new SparkContext(args(0), "SparkTest")
    val slices = if (args.length > 1) args(1).toInt else 2
 
    val myFile = spark.textFile("test.txt")
    val counts = myFile.flatMap(line => line.split(" "))
                        .map(word => (word, 1))
                        .reduceByKey(_ + _)
 
    counts.saveAsTextFile("out.txt")
 
  }
 
}
 
SparkTest.main(args)

要執行您的腳本,只須要執行如下命令:

$ scala SparkTest.scala local[1]

您能夠在輸出目錄中找到 MapReduce 測試文件(如 output/part-00000)。

回頁首

其餘的大數據分析框架

自從開發了 Hadoop 後,市場上推出了許多值得關注的其餘大數據分析平臺。這些平臺範圍廣闊,從簡單的基於腳本的產品到與 Hadoop 相似的生產環境。

名爲 bashreduce 的平臺是這些平臺中最簡單的平臺之一,顧名思義,它充許您在 Bash 環境中的多個機器上執行 MapReduce 類型的操做。bashreduce 依賴於您計劃使用的機器集羣的 Secure Shell(無密碼),並以腳本的形式存在,經過它,您可使用 UNIX®-style 工具(sortawknetcat 等)請求做業。

GraphLab 是另外一個受人關注的 MapReduce 抽象實現,它側重於機器學習算法的並行實現。在 GraphLab 中,Map 階段會定義一些可單獨(在獨立主機上)執行的計算指令,而 Reduce 階段會對結果進行合併。

最後,大數據場景的一個新成員是來自 Twitter 的 Storm(經過收購 BackType 得到)。Storm 被定義爲 「實時處理的 Hadoop」,它主要側重於流處理和持續計算(流處理能夠得出計算的結果)。Storm 是用 Clojure 語言(Lisp 語言的一種方言)編寫的,但它支持用任何語言(好比 Ruby 和 Python)編寫的應用程序。Twitter 於 2011 年 9 月以開源形式發佈 Storm。

請參閱 參考資料 得到有關的更多信息。

回頁首

結束語

Spark 是不斷壯大的大數據分析解決方案家族中備受關注的新增成員。它不只爲分佈數據集的處理提供一個有效框架,並且以高效的方式(經過簡潔的 Scala 腳本)處理分佈數據集。Spark 和 Scala 都處在積極發展階段。不過,因爲關鍵 Internet 屬性中採用了它們,二者彷佛都已從受人關注的開源軟件過渡成爲基礎 Web 技術。

相關文章
相關標籤/搜索