spark生態圈

 

http://www.cnblogs.com/-wangjiannan/p/3671247.htmlhtml

王健男前端

初步瞭解Spark生態系統及Spark Streamingjava

1、        場景python

Spark[4]git

Scope:  a MapReduce-like cluster computing framework designed for low-latency iterativejobs and interactive use from an interpreter(在大規模的特定數據集上的迭代運算或重複查詢檢索)github

正如其目標scopeSpark適用於須要屢次操做特定數據集的應用場合。須要反覆操做的次數越多,所需讀取的數據量越大,受益越大,數據量小可是計算密集度較大的場合,受益就相對較小。web

 

Spark Streaming(構建在Spark上處理Stream數據的框架)算法

可以知足除對實時性要求很是高(如高頻實時交易)以外的全部流式準實時計算場景[2]sql

 

2、    Spark生態系統包括[1]shell

   Shark ( Hive on Spark): Shark基本上就是在Spark的框架基礎上提供和Hive同樣的H iveQL命令接口,爲了最大程度的保持和Hive的兼容性,Shark使用了HiveAPI來實現query Parsing Logic Plan generation,最後的PhysicalPlan execution階段用Spark代替Hadoop MapReduce。經過配置Shark參數,Shark能夠自動在內存中緩存特定的RDD,實現數據重用,進而加快特定數據集的檢索。同時,Shark 經過UDF用戶自定義函數實現特定的數據分析學習算法,使得SQL數據查詢和運算分析能結合在一塊兒,最大化RDD的重複使用。

Spark streaming: 構建在Spark上處理Stream數據的框架,基本的原理是將Stream數據分紅小的時間片段(幾秒),以相似batch批量處理的方式來處理這小部 分數據。Spark Streaming構建在Spark上,一方面是由於Spark的低延遲執行引擎(100ms+)能夠用於實時計算,另外一方面相比基於Record的其它 處理框架(Storm)RDD數據集更容易作高效的容錯處理。此外小批量處理的方式使得它能夠同時兼容批量和實時數據處理的邏輯和算法。方便了一些需 要歷史數據和實時數據聯合分析的特定應用場合。

詳見10、Spark Streaming ...

   Bagel: Pregel on Spark,能夠用Spark進行圖計算,這是個很是有用的小項目。Bagel自帶了一個例子,實現了GooglePageRank算法。

 

3、    運行模式[1]

   本地模式

   Standalone模式

   Mesoes模式

   yarn模式

4、    SparkHadoop的對比[1]

   Spark的中間數據放到內存中,對於迭代運算效率更高。

  Spark更適合於迭代運算比較多的MLDM運算。由於在Spark裏面,有RDD的抽象概念。

   SparkHadoop更通用。

  Spark提供的數據集操做類型有不少種,不像Hadoop只提供了MapReduce兩種操做。好比map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多種操做類型,Spark把這些操做稱爲Transformations。同時還提供Count, collect, reduce, lookup, save等多種actions操做。

  這些多種多樣的數據集操做類型,給給開發上層應用的用戶提供了方便。各個處理節點之間的通訊模型再也不像Hadoop那樣就是惟一的Data Shuffle一種模式。用戶能夠命名,物化,控制中間結果的存儲、分區等。能夠說編程模型比Hadoop更靈活。

  不過因爲RDD的特性,Spark不適用那種異步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對於那種增量修改的應用模型不適合。

   容錯性。

  在分佈式數據集計算時經過checkpoint來實現容錯,而checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶能夠控制採用哪一種方式來實現容錯。

   可用性。

  Spark經過提供豐富的Scala, JavaPython API及交互式Shell來提升可用性。

  SparkHadoop的結合

   Spark能夠直接對HDFS進行數據的讀寫,一樣支持Spark on YARNSpark能夠與MapReduce運行於同集羣中,共享存儲資源與計算,數據倉庫Shark實現上借用Hive,幾乎與Hive徹底兼容。

 

5、    在業界的使用[1]

Spark項目在2009年啓動,2010年開源, 如今使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘寶等,豆瓣也在使用Sparkpython克隆版Dpark

 

6、    Spark核心概念[1]

  Resilient Distributed Dataset (RDD)彈性分佈數據集

   RDDSpark的最基本抽象,是對分佈式內存的抽象使用,實現了以操做本地集合的方式來操做分佈式數據集的抽象實現。RDDSpark最核心的東西,它表示已被分區,不可變的並可以被並行操做的數據集合,不一樣的數據集格式對應不一樣的RDD實現。RDD必須是可序列化的。RDD能夠cache到內存 中,每次對RDD數據集的操做以後的結果,均可以存放到內存中,下一個操做能夠直接從內存中輸入,省去了MapReduce大量的磁盤IO操做。這對於迭代運算比較常見的機器學習算法, 交互式數據挖掘來講,效率提高比較大。

   RDD的特色:

  它是在集羣節點上的不可變的、已分區的集合對象。

  經過並行轉換的方式來建立如(map, filter, join, etc)

  失敗自動重建。

  能夠控制存儲級別(內存、磁盤等)來進行重用。

  必須是可序列化的。

  是靜態類型的。

   RDD的好處

  RDD只能從持久存儲或經過Transformations操做產生,相比於分佈式共享內存(DSM)能夠更高效實現容錯,對於丟失部分數據分區只需根據它的lineage就可從新計算出來,而不須要作特定的Checkpoint

  RDD的不變性,能夠實現類Hadoop MapReduce的推測式執行。

  RDD的數據分區特性,能夠經過數據的本地性來提升性能,這與Hadoop MapReduce是同樣的。

  RDD都是可序列化的,在內存不足時可自動降級爲磁盤存儲,把RDD存儲於磁盤上,這時性能會有大的降低但不會差於如今的MapReduce

   RDD的存儲與分區

  用戶能夠選擇不一樣的存儲級別存儲RDD以便重用。

  當前RDD默認是存儲於內存,但當內存不足時,RDDspilldisk

  RDD在須要進行分區把數據分佈於集羣中時會根據每條記錄Key進行分區(Hash 分區),以此保證兩個數據集在Join時能高效。

   RDD的內部表示

  在RDD的內部實現中每一個RDD均可以使用5個方面的特性來表示:

  分區列表(數據塊列表)

  計算每一個分片的函數(根據父RDD計算出此RDD)

  對父RDD的依賴列表

  對key-value RDDPartitioner【可選】

  每一個數據分片的預約義地址列表(HDFS上的數據塊的地址)【可選】

   RDD的存儲級別

  RDD根據useDiskuseMemorydeserializedreplication四個參數的組合提供了11種存儲級別:

  val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val MEMORY_ONLY_SER = new  StorageLevel(false, true, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)

   RDD定義了各類操做,不一樣類型的數據由不一樣的RDD類抽象表示,不一樣的操做也由RDD進行抽實現。

  RDD的生成

   RDD有兩種建立方式:

  1、從Hadoop文件系統(或與Hadoop兼容的其它存儲系統)輸入(例如HDFS)建立。

  2、從父RDD轉換獲得新RDD

   下面來看一從Hadoop文件系統生成RDD的方式,如:val file = spark.textFile("hdfs://...")file變量就是RDD(實際是HadoopRDD實例),生成的它的核心代碼以下:

  // SparkContext根據文件/目錄及可選的分片數建立RDD, 這裏咱們能夠看到SparkHadoop MapReduce很像 // 須要InputFormat, KeyValue的類型,其實Spark使用的HadoopInputFormat, Writable類型。 def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) } // 根據Hadoop配置,及InputFormat等建立HadoopRDD new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)

   RDD進行計算時,RDDHDFS讀取數據時與Hadoop MapReduce幾乎同樣的:

  RDD的轉換與操做

   對於RDD能夠有兩種計算方式:轉換(返回值仍是一個RDD)與操做(返回值不是一個RDD)

   轉換(Transformations) (如:map, filter, groupBy, join)Transformations操做是Lazy的,也就是說從一個RDD轉換生成另外一個RDD的操做不是立刻執行,Spark在遇到 Transformations操做時只會記錄須要這樣的操做,並不會去執行,須要等到有Actions操做的時候纔會真正啓動計算過程進行計算。

   操做(Actions) (如:count, collect, save)Actions操做會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啓動計算的動因。

   下面使用一個例子來示例說明TransformationsActionsSpark的使用。

  val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A = sc.textFile(hdfs://.....) val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1)) val rdd_C = sc.textFile(hdfs://.....) val rdd_D = rdd_C.map(line => (line.substring(10), 1)) val rdd_E = rdd_D.reduceByKey((a, b) => a + b) val rdd_F = rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....)

 

  Lineage(血統)

   利用內存加快數據加載,在衆多的其它的In-Memory數據庫Cache類系統中也有實現,Spark的主要區別在於它處理分佈式運算環境下的數據容錯性(節點實效/數據丟失)問題時採用的方案 爲了保證RDD中數據的魯棒性,RDD數據集經過所謂的血統關係(Lineage)記住了它是如何從其它RDD中演變過來的。相比其它系統的細顆粒度的內 存數據更新級別的備份或者LOG機制,RDDLineage記錄的是粗顆粒度的特定數據轉換(Transformation)操做(filter, map, join etc.)行爲。當這個RDD的部分分區數據丟失時,它能夠經過Lineage獲取足夠的信息來從新運算和恢復丟失的數據分區。這種粗顆粒的數據模型,限 制了Spark的運用場合,但同時相比細顆粒度的數據模型,也帶來了性能的提高。

   RDDLineage依賴方面分爲兩種Narrow DependenciesWide Dependencies用來解決數據容錯的高效性。Narrow Dependencies是指父RDD的每個分區最多被一個子RDD的分區所用 表現爲一個父RDD的分區對應於一個子RDD的分區或多個父RDD的分區對應於一個子RDD的分區,也就是說一個父RDD的一個分區不可能對應一個子 RDD的多個分區。Wide Dependencies是指子RDD的分區依賴於父RDD的多個分區或全部分區,也就是說存在一個父RDD的一個分區對應一個子RDD的多個分區。對與 Wide Dependencies,這種計算的輸入和輸出在不一樣的節點上,lineage方法對與輸入節點無缺,而輸出節點宕機時,經過從新計算,這種狀況下,這 種方法容錯是有效的,不然無效,由於沒法重試,須要向上其祖先追溯看是否能夠重試(這就是lineage,血統的意思)Narrow Dependencies對於數據的重算開銷要遠小於Wide Dependencies的數據重算開銷。

 

7、    容錯[1]

   RDD計算,經過checkpint進行容錯,作checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶能夠控制採用哪一種方式來實現容錯,默認是logging the updates方式,經過記錄跟蹤全部生成RDD的轉換(transformations)也就是記錄每一個RDDlineage(血統)來從新計算生成 丟失的分區數據。

 

8、    資源管理與做業調度[1]

   Spark對於資源管理與做業調度可使用Standalone(獨立模式)Apache MesosHadoop YARN來實現。 Spark on YarnSpark0.6時引用,但真正可用是在如今的branch-0.8版本。Spark on Yarn遵循YARN的官方規範實現,得益於Spark天生支持多種SchedulerExecutor的良好設計,對YARN的支持也就很是容 易,Spark on Yarn的大體框架圖。

 

Spark運行於YARN上與Hadoop共用集羣資源能夠提升資源利用率。

 

9、    編程接口[1]

   Spark經過與編程語言集成的方式暴露RDD的操做,相似於DryadLINQFlumeJava,每一個數據集都表示爲RDD對象,對數據集的操做就 表示成對RDD對象的操做。Spark主要的編程語言是Scala,選擇Scala是由於它的簡潔性(Scala能夠很方便在交互式下使用)和性能 (JVM上的靜態強類型語言)

   SparkHadoop MapReduce相似,由Master(相似於MapReduceJobtracker)Workers(SparkSlave工做節點)組成。 用戶編寫的Spark程序被稱爲Driver程序,Dirver程序會鏈接master並定義了對各RDD的轉換與操做,而對RDD的轉換與操做經過 Scala閉包(字面量函數)來表示,Scala使用Java對象來表示閉包且都是可序列化的,以此把對RDD的閉包操做發送到各Workers節點。 Workers存儲着數據分塊和享有集羣內存,是運行在工做節點上的守護進程,當它收到對RDD的操做時,根據數據分片信息進行本地化數據操做,生成新的 數據分片、返回結果或把RDD寫入存儲系統。

    Scala

  Spark使用Scala開發,默認使用Scala做爲編程語言。編寫Spark程序比編寫Hadoop MapReduce程序要簡單的多,SparK提供了Spark-Shell,能夠在Spark-Shell測試程序。寫SparK程序的通常步驟就是創 建或使用(SparkContext)實例,使用SparkContext建立RDD,而後就是對RDD進行操做。如:

  val sc = new SparkContext(master, appName, [sparkHome], [jars]) val textFile = sc.textFile("hdfs://.....") textFile.map(....).filter(.....).....

  Java

   Spark支持Java編程,但對於使用Java就沒有了Spark-Shell這樣方便的工具,其它與Scala編程是同樣的,由於都是JVM上的語言,ScalaJava能夠互操做,Java編程接口其實就是對Scala封裝。如:

  JavaSparkContext sc = new JavaSparkContext(...); JavaRDD lines = ctx.textFile("hdfs://..."); JavaRDD words = lines.flatMap( new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } );

  Python

   如今Spark也提供了Python編程接口,Spark使用py4j來實現pythonjava的互操做,從而實現使用python編寫Spark 序。Spark也一樣提供了pyspark,一個Sparkpython shell,能夠以交互式的方式使用Python編寫Spark程序。 如:

from pyspark import SparkContext sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) words = sc.textFile("/usr/share/dict/words") words.filter(lambda w: w.startswith("spar")).take(5)

 

10、    使用示例[1]

  Standalone模式

   爲方便Spark的推廣使用,Spark提供了Standalone模式,Spark一開始就設計運行於Apache Mesos資源管理框架上,這是很是好的設計,可是卻帶了部署測試的複雜性。爲了讓Spark能更方便的部署和嘗試,Spark所以提供了 Standalone運行模式,它由一個Spark Master和多個Spark worker組成,與Hadoop MapReduce1很類似,就連集羣啓動方式都幾乎是同樣。

   Standalone模式運行Spark集羣

  下載Scala2.9.3,並配置SCALA_HOME

  下載Spark代碼(可使用源碼編譯也能夠下載編譯好的版本)這裏下載 編譯好的版本(http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz)

  解壓spark-0.7.3-prebuilt-cdh4.tgz安裝包

  修改配置(conf/*) slaves: 配置工做節點的主機名 spark-env.sh:配置環境變量。

  SCALA_HOME=/home/spark/scala-2.9.3 JAVA_HOME=/home/spark/jdk1.6.0_45 SPARK_MASTER_IP=spark1 SPARK_MASTER_PORT=30111 SPARK_MASTER_WEBUI_PORT=30118 SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g SPARK_WORKER_PORT=30333 SPARK_WORKER_WEBUI_PORT=30119 SPARK_WORKER_INSTANCES=1

   Hadoop配置copyconf目錄下

   master主機上對其它機器作ssh無密碼登陸

   把配置好的Spark程序使用scp copy到其它機器

   master啓動集羣

  $SPARK_HOME/start-all.sh

  yarn模式

   Spark-shell如今還不支持Yarn模式,使用Yarn模式運行,須要把Spark程序所有打包成一個jar包提交到Yarn上運行。目錄只有branch-0.8版本才真正支持Yarn

   Yarn模式運行Spark

  下載Spark代碼.

  git clone git://github.com/mesos/spark

   切換到branch-0.8

  cd spark git checkout -b yarn --track origin/yarn

   使用sbt編譯Spark

  $SPARK_HOME/sbt/sbt > package > assembly

   Hadoop yarn配置copyconf目錄下

   運行測試

  SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0- SNAPSHOT.jar \ ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ --class spark.examples.SparkPi --args yarn-standalone

  使用Spark-shell

   Spark-shell使用很簡單,當SparkStandalon模式運行後,使用$SPARK_HOME/spark-shell進入shell 可,在Spark-shellSparkContext已經建立好了,實例名爲sc能夠直接使用,還有一個須要注意的是,在Standalone模式 下,Spark默認使用的調度器的FIFO調度器而不是公平調度,而Spark-shell做爲一個Spark程序一直運行在Spark上,其它的 Spark程序就只能排隊等待,也就是說同一時間只能有一個Spark-shell在運行。

   Spark-shell上寫程序很是簡單,就像在Scala Shell上寫程序同樣。

  scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 scala> textFile.count() // Number of items in this RDD res0: Long = 21374 scala> textFile.first() // First item in this RDD res1: String = # Spark

  編寫Driver程序

   SparkSpark程序稱爲Driver程序,編寫Driver程序很簡單幾乎與在Spark-shell上寫程序是同樣的,不一樣的地方就是SparkContext須要本身建立。如WorkCount程序以下:

import spark.SparkContext import SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length ==0 ){ println("usage is org.test.WordCount ") } println("the args: ") args.foreach(println) val hdfsPath = "hdfs://hadoop1:8020" // create the SparkContext args(0)yarn傳入appMaster地址 val sc = new SparkContext(args(0), "WrodCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile(hdfsPath + args(1)) val result = textFile.flatMap(line => line.split("\\s+")) .map(word => (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(hdfsPath + args(2)) } }

 

 

11、    Spark Streaming:大規模流式數據處理的新貴[2]

1. 前言

提到Spark Streaming,咱們不得不說一下BDASBerkeley Data Analytics Stack),這個伯克利大學提出的關於數據分析的軟件棧。從它的視角來看,目前的大數據處理能夠分爲如如下三個類型。

·         複雜的批量數據處理(batch data processing),一般的時間跨度在數十分鐘到數小時之間。

·         基於歷史數據的交互式查詢(interactive query),一般的時間跨度在數十秒到數分鐘之間。

·         基於實時數據流的數據處理(streaming data processing),一般的時間跨度在數百毫秒到數秒之間。

目前已有不少相對成熟的開源軟件來處理以上三種情景,咱們能夠利用MapReduce來進行批量數據處理,能夠用Impala來進行交互式查詢,對於流式數據處理,咱們能夠採用Storm。對於大多數互聯網公司來講,通常都會同時遇到以上三種情景,那麼在使用的過程當中這些公司可能會遇到以下的不便。

·         三種情景的輸入輸出數據沒法無縫共享,須要進行格式相互轉換。

·         每個開源軟件都須要一個開發和維護團隊,提升了成本。

·         在同一個集羣中對各個系統協調資源分配比較困難。

BDAS就是以Spark爲基礎的一套軟件棧,利用基於內存的通用計算模型將以上三種情景一網打盡,同時支持Batch InteractiveStreaming的處理,且兼容支持HDFSS3等分佈式文件系統,能夠部署在YARNMesos等流行的集羣資源管理器 之上。BDAS的構架如圖1所示,其中Spark能夠替代MapReduce進行批處理,利用其基於內存的特色,特別擅長迭代式和交互式數據處理;Shark處理大規模數據的SQL查詢,兼容HiveHQL。本文要重點介紹的Spark Streaming,在整個BDAS中進行大規模流式處理。



1 BDAS軟件棧

2. Spark Streaming構架

1)  計算流程

Spark Streaming是將流式計算分解成一系列短小的批處理做業。這裏的批處理引擎是Spark,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分紅一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDDResilient Distributed Dataset),而後將Spark Streaming中對DStreamTransformation操做變爲針對Spark中對RDDTransformation操做,將RDD 過操做變成中間結果保存在內存中。整個流式計算根據業務的需求能夠對中間的結果進行疊加,或者存儲到外部設備。圖2顯示了Spark Streaming的整個流程。

 

2 Spark Streaming構架圖

 

2)  容錯性

對於流式計算來講,容錯性相當重要。首先咱們要明確一下SparkRDD的容錯機制。每個RDD都是一個不可 變的分佈式可重算的數據集,其記錄着肯定性的操做繼承關係(lineage),因此只要輸入數據是可容錯的,那麼任意一個RDD的分區 Partition)出錯或不可用,都是能夠利用原始輸入數據經過轉換操做而從新算出的。

3 Spark StreamingRDDlineage關係圖

對於Spark Streaming來講,其RDD的傳承關係如圖3所示,圖中的每個橢圓形表示一個RDD,橢圓形中的每一個圓形表明一個RDD中的一個 Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每個Batch Size所產生的中間結果RDD。咱們能夠看到圖中的每個RDD都是經過lineage相鏈接的,因爲Spark Streaming輸入數據能夠來自於磁盤,例如HDFS(多份拷貝)或是來自於網絡的數據流(Spark Streaming會將網絡輸入數據的每個數據流拷貝兩份到其餘的機器)都能保證容錯性。因此RDD中任意的Partition出錯,均可以並行地在其 他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。

3)  實時性

對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段數據的處理都會通過Spark DAG圖分解,以及Spark的任務集的調度過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),因此Spark Streaming可以知足除對實時性要求很是高(如高頻實時交易)以外的全部流式準實時計算場景。

 

4)  擴展性與吞吐量

Spark目前在EC2上已可以線性擴展到100個節點(每一個節點4Core),能夠以數秒的延遲處理6GB/s的數據量(60M records/s),其吞吐量也比流行的Storm25倍,圖4Berkeley利用WordCountGrep兩個用例所作的測試,在Grep這個測試中,Spark Streaming中的每一個節點的吞吐量是670k records/s,而Storm115k records/s

4 Spark StreamingStorm吞吐量比較圖

3. Spark Streaming的編程模型

Spark Streaming的編程和Spark的編程一模一樣,對於編程的理解也很是相似。對於Spark來講,編程就是對於RDD的操做;而對於Spark Streaming來講,就是對DStream的操做。下面將經過一個你們熟悉的WordCount的例子來講明Spark Streaming中的輸入操做、轉換操做和輸出操做。

·         Spark Streaming初始化:在開始進行DStream操做以前,須要對Spark Streaming進行初始化生成StreamingContext。參數中比較重要的是第一個和第三個,第一個參數是指定Spark Streaming運行的集羣地址,而第三個參數是指定Spark Streaming運行時的batch窗口大小。在這個例子中就是將1秒鐘的輸入數據進行一次Spark Job處理。

val ssc = new StreamingContext(「Spark://…」, 「WordCount」, Seconds(1), [Homes], [Jars])

·          Spark Streaming的輸入操做:目前Spark Streaming已支持了豐富的輸入接口,大體分爲兩類:一類是磁盤輸入,如以batch size做爲時間間隔監控HDFS文件系統的某個目錄,將目錄中內容的變化做爲Spark Streaming的輸入;另外一類就是網絡流的方式,目前支持KafkaFlumeTwitterTCP socket。在WordCount例子中,假定經過網絡socket做爲輸入流,監聽某個特定的端口,最後得出輸入DStreamlines)。

val lines = ssc.socketTextStream(「localhost」,8888)

·         Spark Streaming的轉換操做:與Spark RDD的操做極爲相似,Spark Streaming也就是經過轉換操做將一個或多個DStream轉換成新的DStream。經常使用的操做包括mapfilterflatmap join,以及須要進行shuffle操做的groupByKey/reduceByKey等。在WordCount例子中,咱們首先須要將 DStream(lines)切分紅單詞,而後將相同單詞的數量進行疊加, 最終獲得的wordCounts就是每個batch size的(單詞,數量)中間結果。 

val words = lines.flatMap(_.split(「 」))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

另外,Spark Streaming有特定的窗口操做,窗口操做涉及兩個參數:一個是滑動窗口的寬度(Window Duration);另外一個是窗口滑動的頻率(Slide Duration),這兩個參數必須是batch size的倍數。例如以過去5秒鐘爲一個輸入窗口,每1秒統計一下WordCount,那麼咱們會將過去5秒鐘的每一秒鐘的WordCount都進行統計,而後進行疊加,得出這個窗口中的單詞統計。 

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s)seconds(1))

 

但上面這種方式還不夠高效。若是咱們以增量的方式來計算就更加高效,例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那麼 咱們能夠將t+3時刻過去5秒的統計量加上[t+3t+4]的統計量,在減去[t-2t-1]的統計量(如圖5所示),這種方法能夠複用中間三秒的統 計量,提升統計的效率。

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s)seconds(1))



5 Spark Streaming中滑動窗口的疊加處理和增量處理

·         Spark Streaming的輸入操做:對於輸出操做,Spark提供了將數據打印到屏幕及輸入到文件中。在WordCount中咱們將DStream wordCounts輸入到HDFS文件中。

wordCounts = saveAsHadoopFiles(「WordCount」)

·         Spark Streaming啓動:通過上述的操做,Spark Streaming尚未進行工做,咱們還須要調用Start操做,Spark Streaming纔開始監聽相應的端口,而後收取數據,並進行統計。

ssc.start()

4. Spark Streaming案例分析

在互聯網應用中,網站流量統計做爲一種經常使用的應用模式,須要在不一樣粒度上對不一樣數據進行統計,既有實時性的需求,又須要涉及到聚合、去重、鏈接等較爲複雜的統計需求。傳統上,如果使用Hadoop MapReduce框架,雖然能夠容易地實現較爲複雜的統計需求,但實時性卻沒法獲得保證;反之如果採用Storm這樣的流式框架,實時性雖能夠獲得保證,但需求的實現複雜度也大大提升了。Spark Streaming在二者之間找到了一個平衡點,可以以準實時的方式容易地實現較爲複雜的統計需求。 下面介紹一下使用KafkaSpark Streaming搭建實時流量統計框架。

·         數據暫存:Kafka做爲分佈式消息隊列,既有很是優秀的吞吐量,又有較高的可靠性和擴展性,在這裏採用Kafka做爲日誌傳遞中間件來接收日誌,抓取客戶端發送的流量日誌,同時接受Spark Streaming的請求,將流量日誌按序發送給Spark Streaming集羣。

·         數據處理:將Spark Streaming集羣與Kafka集羣對接,Spark StreamingKafka集羣中獲取流量日誌並進行處理。Spark Streaming會實時地從Kafka集羣中獲取數據並將其存儲在內部的可用內存空間中。當每個batch窗口到來時,便對這些數據進行處理。 

·         結果存儲:爲了便於前端展現和頁面請求,處理獲得的結果將寫入到數據庫中。

相比於傳統的處理框架,Kafka+Spark Streaming的架構有如下幾個優勢。

·         Spark框架的高效和低延遲保證了Spark Streaming操做的準實時性。

·         利用Spark框架提供的豐富API和高靈活性,能夠精簡地寫出較爲複雜的算法。 

·         編程模型的高度一導致得上手Spark Streaming至關容易,同時也能夠保證業務邏輯在實時處理和批處理上的複用。

在基於Kafka+Spark Streaming的流量統計應用運行過程當中,有時會遇到內存不足、GC阻塞等各類問題。下面介紹一下如何對Spark Streaming應用程序進行調優來減小甚至避免這些問題的影響。

5. 性能調優

1)  優化運行時間

·         增長並行度。確保使用整個集羣的資源,而不是把任務集中在幾個特定的節點上。對於包含shuffle的操做,增長其並行度以確保更爲充分地使用集羣資源。

·         減小數據序列化、反序列化的負擔。Spark Streaming默認將接收到的數據序列化後存儲以減小內存的使用。但序列化和反序列化須要更多的CPU時間,所以更加高效的序列化方式(Kryo)和自定義的序列化接口能夠更高效地使用CPU 

·         設置合理的batch窗口。在Spark Streaming中,Job之間有可能存在着依賴關係,後面的Job必須確保前面的Job執行結束後才能提交。若前面的Job執行時間超出了設置的 batch窗口,那麼後面的Job就沒法按時提交,這樣就會進一步拖延接下來的Job,形成後續Job的阻塞。所以,設置一個合理的batch窗口確保 Job可以在這個batch窗口中結束是必須的。 

·         減小任務提交和分發所帶來的負擔。一般狀況下Akka框架可以高效地確保任務及時分發,但當batch窗口很是小(500ms)時,提交和分發任務的延遲就變得不可接受了。使用Standalone模式和Coarse-grained Mesos模式一般會比使用Fine-Grained Mesos模式有更小的延遲。

2)  優化內存使用

·         控制batch sizeSpark Streaming會把batch窗口內接收到的全部數據存放在Spark內部的可用內存區域中,所以必須確保當前節點Spark的可用內存至少可以容納這個batch窗口內全部的數據,不然必須增長新的資源以提升集羣的處理能力。

·         及時清理再也不使用的數據。上面說到Spark Streaming會將接收到的數據所有存儲於內部的可用內存區域中,所以對於處理過的再也不須要的數據應及時清理以確保Spark Streaming有富餘的可用內存空間。經過設置合理的spark.cleaner.ttl時長來及時清理超時的無用數據。 

·         觀察及適當調整GC策略。GC會影響Job的正常運行,延長Job的執行時間,引發一系列不可預料的問題。觀察GC的運行狀況,採起不一樣的GC策略以進一步減少內存回收對Job運行的影響。

6. 總結

Spark Streaming提供了一套高效、可容錯的準實時大規模流式處理框架,它能和批處理及即時查詢放在同一個軟件棧中,下降學習成本。若是你學會了Spark編程,那麼也就學會了Spark Streaming編程,若是理解了Spark的調度和存儲,那麼Spark Streaming也相似。對開源軟件感興趣的讀者,咱們能夠一塊兒貢獻社區。目前Spark已在Apache孵化器中。按照目前的發展趨勢,Spark Streaming必定將會獲得更大範圍的使用。 

 

準實時任務(Firm Real-Time Task): 一般是指計算機系統容許任務超時,但若任務超時,該任務的計算結果沒有任何意義)

 

12、    Spark on Yarn的實施過程當中遇到的問題[3]

第一個是多生態做業競爭問題,一樣是好處也是一個壞處,內存消耗很是多的做業,有可能做業等待好久才能遞交上去,比普通的Hadoop更加麻煩,面臨着CPU、內存都申請到才能夠運行

   第二機器內存性能,若是搭建Spark on Yarn集羣,千萬不要找內存比較小的搭集羣,最好96G或者120G,機器能夠相對少一點,好比10020G搭建小集羣,這樣花在私人申請、任務調 度、計算時間很是多,最後讓Spark性能很是差,既然要跑Spark,就不要珍惜內存了,並且內存價格愈來愈白菜價,用好機器,真正體驗Spark給數 據挖掘帶來質的飛躍;

  第三個粗粒度的資源預申請,Spark提交的時候,運行算法以前須要把全部算法資源申請到,可能你提交一個算法運 行十個Stage,中間一個須要1T內存,那麼做業運行過程仲,要一直佔1T內存不放才能成功運行。這個做者會在後面框架作一些改進,可是動做比較大,暫時問題沒有辦法解決,須要把算法跑須要按照最大資源申請須要的內存以及Core才能夠;

  最後一個問題開發人員的內存把控能力,由於 Spark做業目前寫起來有兩個問題,一個是語法問題,一個對內存把控能力。做爲普通的機器學習算法開發人員這方面會比較痛苦一點,由於他們更側重機器學 習算法,而對內存把控不太好,須要學習一些東西,才能在上邊嫺熟開發和運行算法。

 

 

 

 

 

 

 

 

 

 

http://www.cnblogs.com/shishanyuan/p/4700615.html

 

Spark生態圈也稱爲BDAS(伯克利數據分析棧),是伯克利APMLab實驗室打造的,力圖在算法(Algorithms)、機器(Machines)、人(People)之間經過大規模集成來展示大數據應用的一個平臺。伯克利AMPLab運用大數據、雲計算、通訊等各類資源以及各類靈活的技術方案,對海量不透明的數據進行甄別並轉化爲有用的信息,以供人們更好的理解世界。該生態圈已經涉及到機器學習、數據挖掘、數據庫、信息檢索、天然語言處理和語音識別等多個領域。

Spark生態圈以Spark Core爲核心,從HDFS、Amazon S3和HBase等持久層讀取數據,以MESS、YARN和自身攜帶的Standalone爲資源管理器調度Job完成Spark應用程序的計算。 這些應用程序能夠來自於不一樣的組件,如Spark Shell/Spark Submit的批處理、Spark Streaming的實時處理應用、Spark SQL的即席查詢、BlinkDB的權衡查詢、MLlib/MLbase的機器學習、GraphX的圖處理和SparkR的數學計算等等。

2.1 Spark Core

前面介紹了Spark Core的基本狀況,如下總結一下Spark內核架構:

l  提供了有向無環圖(DAG)的分佈式並行計算框架,並提供Cache機制來支持屢次迭代計算或者數據共享,大大減小迭代計算之間讀取數據局的開銷,這對於須要進行屢次迭代的數據挖掘和分析性能有很大提高

l  在Spark中引入了RDD (Resilient Distributed Dataset) 的抽象,它是分佈在一組節點中的只讀對象集合,這些集合是彈性的,若是數據集一部分丟失,則能夠根據「血統」對它們進行重建,保證了數據的高容錯性;

l  移動計算而非移動數據,RDD Partition能夠就近讀取分佈式文件系統中的數據塊到各個節點內存中進行計算

l  使用多線程池模型來減小task啓動開稍

l  採用容錯的、高可伸縮性的akka做爲通信框架

2.2 SparkStreaming

SparkStreaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,能夠對多種數據源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行相似Map、Reduce和Join等複雜操做,並將結果保存到外部文件系統、數據庫或應用到實時儀表盤。

Spark Streaming構架

l計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理做業。這裏的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分紅一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),而後將Spark Streaming中對DStream的Transformation操做變爲針對Spark中對RDD的Transformation操做,將RDD通過操做變成中間結果保存在內存中。整個流式計算根據業務的需求能夠對中間的結果進行疊加或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。

圖Spark Streaming構架

l容錯性:對於流式計算來講,容錯性相當重要。首先咱們要明確一下Spark中RDD的容錯機制。每個RDD都是一個不可變的分佈式可重算的數據集,其記錄着肯定性的操做繼承關係(lineage),因此只要輸入數據是可容錯的,那麼任意一個RDD的分區(Partition)出錯或不可用,都是能夠利用原始輸入數據經過轉換操做而從新算出的。  

對於Spark Streaming來講,其RDD的傳承關係以下圖所示,圖中的每個橢圓形表示一個RDD,橢圓形中的每一個圓形表明一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每個Batch Size所產生的中間結果RDD。咱們能夠看到圖中的每個RDD都是經過lineage相鏈接的,因爲Spark Streaming輸入數據能夠來自於磁盤,例如HDFS(多份拷貝)或是來自於網絡的數據流(Spark Streaming會將網絡輸入數據的每個數據流拷貝兩份到其餘的機器)都能保證容錯性,因此RDD中任意的Partition出錯,均可以並行地在其餘機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。

Spark Streaming中RDD的lineage關係圖

l實時性:對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段數據的處理都會通過Spark DAG圖分解以及Spark的任務集的調度過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),因此Spark Streaming可以知足除對實時性要求很是高(如高頻實時交易)以外的全部流式準實時計算場景。

l擴展性與吞吐量:Spark目前在EC2上已可以線性擴展到100個節點(每一個節點4Core),能夠以數秒的延遲處理6GB/s的數據量(60M records/s),其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個用例所作的測試,在Grep這個測試中,Spark Streaming中的每一個節點的吞吐量是670k records/s,而Storm是115k records/s。

Spark Streaming與Storm吞吐量比較圖

2.3 Spark SQL

Shark是SparkSQL的前身,它發佈於3年前,那個時候Hive能夠說是SQL on Hadoop的惟一選擇,負責將SQL編譯成可擴展的MapReduce做業,鑑於Hive的性能以及與Spark的兼容,Shark項目由此而生。

Shark即Hive on Spark,本質上是經過Hive的HQL解析,把HQL翻譯成Spark上的RDD操做,而後經過Hive的metadata獲取數據庫裏的表信息,實際HDFS上的數據和文件,會由Shark獲取並放到Spark上運算。Shark的最大特性就是快和與Hive的徹底兼容,且能夠在shell模式下使用rdd2sql()這樣的API,把HQL獲得的結果集,繼續在scala環境下運算,支持本身編寫簡單的機器學習或簡單分析處理函數,對HQL結果進一步分析計算。

在2014年7月1日的Spark Summit上,Databricks宣佈終止對Shark的開發,將重點放到Spark SQL上。Databricks表示,Spark SQL將涵蓋Shark的全部特性,用戶能夠從Shark 0.9進行無縫的升級。在會議上,Databricks表示,Shark更可能是對Hive的改造,替換了Hive的物理執行引擎,所以會有一個很快的速度。然而,不容忽視的是,Shark繼承了大量的Hive代碼,所以給優化和維護帶來了大量的麻煩。隨着性能優化和先進分析整合的進一步加深,基於MapReduce設計的部分無疑成爲了整個項目的瓶頸。所以,爲了更好的發展,給用戶提供一個更好的體驗,Databricks宣佈終止Shark項目,從而將更多的精力放到Spark SQL上。

Spark SQL容許開發人員直接處理RDD,同時也可查詢例如在 Apache Hive上存在的外部數據。Spark SQL的一個重要特色是其可以統一處理關係表和RDD,使得開發人員能夠輕鬆地使用SQL命令進行外部查詢,同時進行更復雜的數據分析。除了Spark SQL外,Michael還談到Catalyst優化框架,它容許Spark SQL自動修改查詢方案,使SQL更有效地執行。

還有Shark的做者是來自中國的博士生辛湜(Reynold Xin),也是Spark的核心成員,具體信息能夠看他的專訪http://www.csdn.net/article/2013-04-26/2815057-Spark-Reynold

Spark SQL的特色:

l引入了新的RDD類型SchemaRDD,能夠象傳統數據庫定義表同樣來定義SchemaRDD,SchemaRDD由定義了列數據類型的行對象構成。SchemaRDD能夠從RDD轉換過來,也能夠從Parquet文件讀入,也可使用HiveQL從Hive中獲取。

l內嵌了Catalyst查詢優化框架,在把SQL解析成邏輯執行計劃以後,利用Catalyst包裏的一些類和接口,執行了一些簡單的執行計劃優化,最後變成RDD的計算

l在應用程序中能夠混合使用不一樣來源的數據,如能夠未來自HiveQL的數據和來自SQL的數據進行Join操做。

Shark的出現使得SQL-on-Hadoop的性能比Hive有了10-100倍的提升,  那麼,擺脫了Hive的限制,SparkSQL的性能又有怎麼樣的表現呢?雖然沒有Shark相對於Hive那樣矚目地性能提高,但也表現得很是優異,以下圖所示:

爲何sparkSQL的性能會獲得怎麼大的提高呢?主要sparkSQL在下面幾點作了優化:

1. 內存列存儲(In-Memory Columnar Storage) sparkSQL的表數據在內存中存儲不是採用原生態的JVM對象存儲方式,而是採用內存列存儲;

2. 字節碼生成技術(Bytecode Generation) Spark1.1.0在Catalyst模塊的expressions增長了codegen模塊,使用動態字節碼生成技術,對匹配的表達式採用特定的代碼動態編譯。另外對SQL表達式都做了CG優化, CG優化的實現主要仍是依靠Scala2.10的運行時放射機制(runtime reflection);

3. Scala代碼優化 SparkSQL在使用Scala編寫代碼的時候,儘可能避免低效的、容易GC的代碼;儘管增長了編寫代碼的難度,但對於用戶來講接口統一。

2.4 BlinkDB

BlinkDB 是一個用於在海量數據上運行交互式 SQL 查詢的大規模並行查詢引擎,它容許用戶經過權衡數據精度來提高查詢響應時間,其數據的精度被控制在容許的偏差範圍內。爲了達到這個目標,BlinkDB 使用兩個核心思想:

l一個自適應優化框架,從原始數據隨着時間的推移創建並維護一組多維樣本;

l一個動態樣本選擇策略,選擇一個適當大小的示例基於查詢的準確性和(或)響應時間需求。

和傳統關係型數據庫不一樣,BlinkDB是一個頗有意思的交互式查詢系統,就像一個蹺蹺板,用戶須要在查詢精度和查詢時間上作一權衡;若是用戶想更快地獲取查詢結果,那麼將犧牲查詢結果的精度;一樣的,用戶若是想獲取更高精度的查詢結果,就須要犧牲查詢響應時間。用戶能夠在查詢的時候定義一個失誤邊界。

2.5  MLBase/MLlib

MLBase是Spark生態圈的一部分專一於機器學習,讓機器學習的門檻更低,讓一些可能並不瞭解機器學習的用戶也能方便地使用MLbase。MLBase分爲四部分:MLlib、MLI、ML Optimizer和MLRuntime。

l  ML Optimizer會選擇它認爲最適合的已經在內部實現好了的機器學習算法和相關參數,來處理用戶輸入的數據,並返回模型或別的幫助分析的結果;

l  MLI 是一個進行特徵抽取和高級ML編程抽象的算法實現的API或平臺;

l MLlib是Spark實現一些常見的機器學習算法和實用程序,包括分類、迴歸、聚類、協同過濾、降維以及底層優化,該算法能夠進行可擴充; MLRuntime 基於Spark計算框架,將Spark的分佈式計算應用到機器學習領域。

總的來講,MLBase的核心是他的優化器,把聲明式的Task轉化成複雜的學習計劃,產出最優的模型和計算結果。與其餘機器學習Weka和Mahout不一樣的是:

l  MLBase是分佈式的,Weka是一個單機的系統;

l  MLBase是自動化的,Weka和Mahout都須要使用者具有機器學習技能,來選擇本身想要的算法和參數來作處理;

l  MLBase提供了不一樣抽象程度的接口,讓算法能夠擴充

l  MLBase基於Spark這個平臺

2.6 GraphX

GraphX是Spark中用於圖(e.g., Web-Graphs and Social Networks)和圖並行計算(e.g., PageRank and Collaborative Filtering)的API,能夠認爲是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重寫及優化,跟其餘分佈式圖計算框架相比,GraphX最大的貢獻是,在Spark之上提供一棧式數據解決方案,能夠方便且高效地完成圖計算的一整套流水做業。GraphX最早是伯克利AMPLAB的一個分佈式圖計算框架項目,後來整合到Spark中成爲一個核心組件。

GraphX的核心抽象是Resilient Distributed Property Graph,一種點和邊都帶屬性的有向多重圖。它擴展了Spark RDD的抽象,有Table和Graph兩種視圖,而只須要一份物理存儲。兩種視圖都有本身獨有的操做符,從而得到了靈活操做和執行效率。如同Spark,GraphX的代碼很是簡潔。GraphX的核心代碼只有3千多行,而在此之上實現的Pregel模型,只要短短的20多行。GraphX的代碼結構總體下圖所示,其中大部分的實現,都是圍繞Partition的優化進行的。這在某種程度上說明了點分割的存儲和相應的計算優化的確是圖計算框架的重點和難點。

GraphX的底層設計有如下幾個關鍵點。

1.對Graph視圖的全部操做,最終都會轉換成其關聯的Table視圖的RDD操做來完成。這樣對一個圖的計算,最終在邏輯上,等價於一系列RDD的轉換過程。所以,Graph最終具有了RDD的3個關鍵特性:Immutable、Distributed和Fault-Tolerant。其中最關鍵的是Immutable(不變性)。邏輯上,全部圖的轉換和操做都產生了一個新圖;物理上,GraphX會有必定程度的不變頂點和邊的複用優化,對用戶透明。

2.兩種視圖底層共用的物理數據,由RDD[Vertex-Partition]和RDD[EdgePartition]這兩個RDD組成。點和邊實際都不是以表Collection[tuple]的形式存儲的,而是由VertexPartition/EdgePartition在內部存儲一個帶索引結構的分片數據塊,以加速不一樣視圖下的遍歷速度。不變的索引結構在RDD轉換過程當中是共用的,下降了計算和存儲開銷。

3.圖的分佈式存儲採用點分割模式,並且使用partitionBy方法,由用戶指定不一樣的劃分策略(PartitionStrategy)。劃分策略會將邊分配到各個EdgePartition,頂點Master分配到各個VertexPartition,EdgePartition也會緩存本地邊關聯點的Ghost副本。劃分策略的不一樣會影響到所須要緩存的Ghost副本數量,以及每一個EdgePartition分配的邊的均衡程度,須要根據圖的結構特徵選取最佳策略。目前有EdgePartition2d、EdgePartition1d、RandomVertexCut和CanonicalRandomVertexCut這四種策略。在淘寶大部分場景下,EdgePartition2d效果最好。

2.7 SparkR

SparkR是AMPLab發佈的一個R開發包,使得R擺脫單機運行的命運,能夠做爲Spark的job運行在集羣上,極大得擴展了R的數據處理能力。

SparkR的幾個特性:

l  提供了Spark中彈性分佈式數據集(RDD)的API,用戶能夠在集羣上經過R shell交互性的運行Spark job。

l  支持序化閉包功能,能夠將用戶定義函數中所引用到的變量自動序化發送到集羣中其餘的機器上。

l  SparkR還能夠很容易地調用R開發包,只須要在集羣上執行操做前用includePackage讀取R開發包就能夠了,固然集羣上要安裝R開發包。

2.8  Tachyon

Tachyon是一個高容錯的分佈式文件系統,容許文件之內存的速度在集羣框架中進行可靠的共享,就像Spark和MapReduce那樣。經過利用信息繼承,內存侵入,Tachyon得到了高性能。Tachyon工做集文件緩存在內存中,而且讓不一樣的 Jobs/Queries以及框架都能內存的速度來訪問緩存文件」。所以,Tachyon能夠減小那些須要常用的數據集經過訪問磁盤來得到的次數。Tachyon兼容Hadoop,現有的Spark和MR程序不須要任何修改而運行。

在2013年4月,AMPLab共享了其Tachyon 0.2.0 Alpha版本的Tachyon,其宣稱性能爲HDFS的300倍,繼而受到了極大的關注。Tachyon的幾個特性以下:

lJAVA-Like File API

Tachyon提供相似JAVA File類的API,

l兼容性

Tachyon實現了HDFS接口,因此Spark和MR程序不須要任何修改便可運行。

l可插拔的底層文件系統

Tachyon是一個可插拔的底層文件系統,提供容錯功能。tachyon將內存數據記錄在底層文件系統。它有一個通用的接口,使得能夠很容易的插入到不一樣的底層文件系統。目前支持HDFS,S3,GlusterFS和單節點的本地文件系統,之後將支持更多的文件系統。

相關文章
相關標籤/搜索