◆ Spark[4]:html
Scope: a MapReduce-like cluster computing framework designed for low-latency iterativejobs and interactive use from an interpreter(在大規模的特定數據集上的迭代運算或重複查詢檢索)前端
正如其目標scope,Spark適用於須要屢次操做特定數據集的應用場合。須要反覆操做的次數越多,所需讀取的數據量越大,受益越大,數據量小可是計算密集度較大的場合,受益就相對較小。java
◆ Spark Streaming(構建在Spark上處理Stream數據的框架)python
可以知足除對實時性要求很是高(如高頻實時交易)以外的全部流式準實時計算場景[2]。git
◆ Shark ( Hive on Spark): Shark基本上就是在Spark的框架基礎上提供和Hive同樣的H iveQL命令接口,爲了最大程度的保持和Hive的兼容性,Shark使用了Hive的API來實現query Parsing和 Logic Plan generation,最後的PhysicalPlan execution階段用Spark代替Hadoop MapReduce。經過配置Shark參數,Shark能夠自動在內存中緩存特定的RDD,實現數據重用,進而加快特定數據集的檢索。同時,Shark 經過UDF用戶自定義函數實現特定的數據分析學習算法,使得SQL數據查詢和運算分析能結合在一塊兒,最大化RDD的重複使用。github
◆ Spark streaming: 構建在Spark上處理Stream數據的框架,基本的原理是將Stream數據分紅小的時間片段(幾秒),以相似batch批量處理的方式來處理這小部 分數據。Spark Streaming構建在Spark上,一方面是由於Spark的低延遲執行引擎(100ms+)能夠用於實時計算,另外一方面相比基於Record的其它 處理框架(如Storm),RDD數據集更容易作高效的容錯處理。此外小批量處理的方式使得它能夠同時兼容批量和實時數據處理的邏輯和算法。方便了一些需 要歷史數據和實時數據聯合分析的特定應用場合。web
詳見10、Spark Streaming ...算法
◆ Bagel: Pregel on Spark,能夠用Spark進行圖計算,這是個很是有用的小項目。Bagel自帶了一個例子,實現了Google的PageRank算法。shell
◆ 本地模式數據庫
◆ Standalone模式
◆ Mesoes模式
◆ yarn模式
◆ Spark的中間數據放到內存中,對於迭代運算效率更高。
Spark更適合於迭代運算比較多的ML和DM運算。由於在Spark裏面,有RDD的抽象概念。
◆ Spark比Hadoop更通用。
Spark提供的數據集操做類型有不少種,不像Hadoop只提供了Map和Reduce兩種操做。好比map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多種操做類型,Spark把這些操做稱爲Transformations。同時還提供Count, collect, reduce, lookup, save等多種actions操做。
這些多種多樣的數據集操做類型,給給開發上層應用的用戶提供了方便。各個處理節點之間的通訊模型再也不像Hadoop那樣就是惟一的Data Shuffle一種模式。用戶能夠命名,物化,控制中間結果的存儲、分區等。能夠說編程模型比Hadoop更靈活。
不過因爲RDD的特性,Spark不適用那種異步細粒度更新狀態的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對於那種增量修改的應用模型不適合。
◆ 容錯性。
在分佈式數據集計算時經過checkpoint來實現容錯,而checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶能夠控制採用哪一種方式來實現容錯。
◆ 可用性。
Spark經過提供豐富的Scala, Java,Python API及交互式Shell來提升可用性。
Spark與Hadoop的結合
◆ Spark能夠直接對HDFS進行數據的讀寫,一樣支持Spark on YARN。Spark能夠與MapReduce運行於同集羣中,共享存儲資源與計算,數據倉庫Shark實現上借用Hive,幾乎與Hive徹底兼容。
◆ Spark項目在2009年啓動,2010年開源, 如今使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘寶等,豆瓣也在使用Spark的python克隆版Dpark。
Resilient Distributed Dataset (RDD)彈性分佈數據集
◆ RDD是Spark的最基本抽象,是對分佈式內存的抽象使用,實現了以操做本地集合的方式來操做分佈式數據集的抽象實現。RDD是Spark最核心的東西,它表示已被分區,不可變的並可以被並行操做的數據集合,不一樣的數據集格式對應不一樣的RDD實現。RDD必須是可序列化的。RDD能夠cache到內存 中,每次對RDD數據集的操做以後的結果,均可以存放到內存中,下一個操做能夠直接從內存中輸入,省去了MapReduce大量的磁盤IO操做。這對於迭代運算比較常見的機器學習算法, 交互式數據挖掘來講,效率提高比較大。
◆ RDD的特色:
它是在集羣節點上的不可變的、已分區的集合對象。
經過並行轉換的方式來建立如(map, filter, join, etc)。
失敗自動重建。
能夠控制存儲級別(內存、磁盤等)來進行重用。
必須是可序列化的。
是靜態類型的。
◆ RDD的好處
RDD只能從持久存儲或經過Transformations操做產生,相比於分佈式共享內存(DSM)能夠更高效實現容錯,對於丟失部分數據分區只需根據它的lineage就可從新計算出來,而不須要作特定的Checkpoint。
RDD的不變性,能夠實現類Hadoop MapReduce的推測式執行。
RDD的數據分區特性,能夠經過數據的本地性來提升性能,這與Hadoop MapReduce是同樣的。
RDD都是可序列化的,在內存不足時可自動降級爲磁盤存儲,把RDD存儲於磁盤上,這時性能會有大的降低但不會差於如今的MapReduce。
◆ RDD的存儲與分區
用戶能夠選擇不一樣的存儲級別存儲RDD以便重用。
當前RDD默認是存儲於內存,但當內存不足時,RDD會spill到disk。
RDD在須要進行分區把數據分佈於集羣中時會根據每條記錄Key進行分區(如Hash 分區),以此保證兩個數據集在Join時能高效。
◆ RDD的內部表示
在RDD的內部實現中每一個RDD均可以使用5個方面的特性來表示:
分區列表(數據塊列表)
計算每一個分片的函數(根據父RDD計算出此RDD)
對父RDD的依賴列表
對key-value RDD的Partitioner【可選】
每一個數據分片的預約義地址列表(如HDFS上的數據塊的地址)【可選】
◆ RDD的存儲級別
RDD根據useDisk、useMemory、deserialized、replication四個參數的組合提供了11種存儲級別:
val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
◆ RDD定義了各類操做,不一樣類型的數據由不一樣的RDD類抽象表示,不一樣的操做也由RDD進行抽實現。
RDD的生成
◆ RDD有兩種建立方式:
一、從Hadoop文件系統(或與Hadoop兼容的其它存儲系統)輸入(例如HDFS)建立。
二、從父RDD轉換獲得新RDD。
◆ 下面來看一從Hadoop文件系統生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file變量就是RDD(實際是HadoopRDD實例),生成的它的核心代碼以下:
// SparkContext根據文件/目錄及可選的分片數建立RDD, 這裏咱們能夠看到Spark與Hadoop MapReduce很像 // 須要InputFormat, Key、Value的類型,其實Spark使用的Hadoop的InputFormat, Writable類型。 def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) } // 根據Hadoop配置,及InputFormat等建立HadoopRDD new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
◆ 對RDD進行計算時,RDD從HDFS讀取數據時與Hadoop MapReduce幾乎同樣的:
RDD的轉換與操做
◆ 對於RDD能夠有兩種計算方式:轉換(返回值仍是一個RDD)與操做(返回值不是一個RDD)。
◆ 轉換(Transformations) (如:map, filter, groupBy, join等),Transformations操做是Lazy的,也就是說從一個RDD轉換生成另外一個RDD的操做不是立刻執行,Spark在遇到 Transformations操做時只會記錄須要這樣的操做,並不會去執行,須要等到有Actions操做的時候纔會真正啓動計算過程進行計算。
◆ 操做(Actions) (如:count, collect, save等),Actions操做會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啓動計算的動因。
◆ 下面使用一個例子來示例說明Transformations與Actions在Spark的使用。
val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A = sc.textFile(hdfs://.....) val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1)) val rdd_C = sc.textFile(hdfs://.....) val rdd_D = rdd_C.map(line => (line.substring(10), 1)) val rdd_E = rdd_D.reduceByKey((a, b) => a + b) val rdd_F = rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....)
Lineage(血統)
◆ 利用內存加快數據加載,在衆多的其它的In-Memory類數據庫或Cache類系統中也有實現,Spark的主要區別在於它處理分佈式運算環境下的數據容錯性(節點實效/數據丟失)問題時採用的方案。 爲了保證RDD中數據的魯棒性,RDD數據集經過所謂的血統關係(Lineage)記住了它是如何從其它RDD中演變過來的。相比其它系統的細顆粒度的內 存數據更新級別的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定數據轉換(Transformation)操做(filter, map, join etc.)行爲。當這個RDD的部分分區數據丟失時,它能夠經過Lineage獲取足夠的信息來從新運算和恢復丟失的數據分區。這種粗顆粒的數據模型,限 制了Spark的運用場合,但同時相比細顆粒度的數據模型,也帶來了性能的提高。
◆ RDD在Lineage依賴方面分爲兩種Narrow Dependencies與Wide Dependencies用來解決數據容錯的高效性。Narrow Dependencies是指父RDD的每個分區最多被一個子RDD的分區所用, 表現爲一個父RDD的分區對應於一個子RDD的分區或多個父RDD的分區對應於一個子RDD的分區,也就是說一個父RDD的一個分區不可能對應一個子 RDD的多個分區。Wide Dependencies是指子RDD的分區依賴於父RDD的多個分區或全部分區,也就是說存在一個父RDD的一個分區對應一個子RDD的多個分區。對與 Wide Dependencies,這種計算的輸入和輸出在不一樣的節點上,lineage方法對與輸入節點無缺,而輸出節點宕機時,經過從新計算,這種狀況下,這 種方法容錯是有效的,不然無效,由於沒法重試,須要向上其祖先追溯看是否能夠重試(這就是lineage,血統的意思),Narrow Dependencies對於數據的重算開銷要遠小於Wide Dependencies的數據重算開銷。
◆ 在RDD計算,經過checkpint進行容錯,作checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶能夠控制採用哪一種方式來實現容錯,默認是logging the updates方式,經過記錄跟蹤全部生成RDD的轉換(transformations)也就是記錄每一個RDD的lineage(血統)來從新計算生成 丟失的分區數據。
◆ Spark對於資源管理與做業調度可使用Standalone(獨立模式),Apache Mesos及Hadoop YARN來實現。 Spark on Yarn在Spark0.6時引用,但真正可用是在如今的branch-0.8版本。Spark on Yarn遵循YARN的官方規範實現,得益於Spark天生支持多種Scheduler和Executor的良好設計,對YARN的支持也就很是容 易,Spark on Yarn的大體框架圖。
◆ 讓Spark運行於YARN上與Hadoop共用集羣資源能夠提升資源利用率。
◆ Spark經過與編程語言集成的方式暴露RDD的操做,相似於DryadLINQ和FlumeJava,每一個數據集都表示爲RDD對象,對數據集的操做就 表示成對RDD對象的操做。Spark主要的編程語言是Scala,選擇Scala是由於它的簡潔性(Scala能夠很方便在交互式下使用)和性能 (JVM上的靜態強類型語言)。
◆ Spark和Hadoop MapReduce相似,由Master(相似於MapReduce的Jobtracker)和Workers(Spark的Slave工做節點)組成。 用戶編寫的Spark程序被稱爲Driver程序,Dirver程序會鏈接master並定義了對各RDD的轉換與操做,而對RDD的轉換與操做經過 Scala閉包(字面量函數)來表示,Scala使用Java對象來表示閉包且都是可序列化的,以此把對RDD的閉包操做發送到各Workers節點。 Workers存儲着數據分塊和享有集羣內存,是運行在工做節點上的守護進程,當它收到對RDD的操做時,根據數據分片信息進行本地化數據操做,生成新的 數據分片、返回結果或把RDD寫入存儲系統。
Scala
◆ Spark使用Scala開發,默認使用Scala做爲編程語言。編寫Spark程序比編寫Hadoop MapReduce程序要簡單的多,SparK提供了Spark-Shell,能夠在Spark-Shell測試程序。寫SparK程序的通常步驟就是創 建或使用(SparkContext)實例,使用SparkContext建立RDD,而後就是對RDD進行操做。如:
val sc = new SparkContext(master, appName, [sparkHome], [jars]) val textFile = sc.textFile("hdfs://.....") textFile.map(....).filter(.....).....
Java
◆ Spark支持Java編程,但對於使用Java就沒有了Spark-Shell這樣方便的工具,其它與Scala編程是同樣的,由於都是JVM上的語言,Scala與Java能夠互操做,Java編程接口其實就是對Scala的封裝。如:
JavaSparkContext sc = new JavaSparkContext(...); JavaRDD lines = ctx.textFile("hdfs://..."); JavaRDD words = lines.flatMap( new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } );
Python
◆ 如今Spark也提供了Python編程接口,Spark使用py4j來實現python與java的互操做,從而實現使用python編寫Spark程 序。Spark也一樣提供了pyspark,一個Spark的python shell,能夠以交互式的方式使用Python編寫Spark程序。 如:
from pyspark import SparkContext sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) words = sc.textFile("/usr/share/dict/words") words.filter(lambda w: w.startswith("spar")).take(5)
Standalone模式
◆ 爲方便Spark的推廣使用,Spark提供了Standalone模式,Spark一開始就設計運行於Apache Mesos資源管理框架上,這是很是好的設計,可是卻帶了部署測試的複雜性。爲了讓Spark能更方便的部署和嘗試,Spark所以提供了 Standalone運行模式,它由一個Spark Master和多個Spark worker組成,與Hadoop MapReduce1很類似,就連集羣啓動方式都幾乎是同樣。
◆ 以Standalone模式運行Spark集羣
下載Scala2.9.3,並配置SCALA_HOME
下載Spark代碼(可使用源碼編譯也能夠下載編譯好的版本)這裏下載 編譯好的版本(http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz)
解壓spark-0.7.3-prebuilt-cdh4.tgz安裝包
修改配置(conf/*) slaves: 配置工做節點的主機名 spark-env.sh:配置環境變量。
SCALA_HOME=/home/spark/scala-2.9.3 JAVA_HOME=/home/spark/jdk1.6.0_45 SPARK_MASTER_IP=spark1 SPARK_MASTER_PORT=30111 SPARK_MASTER_WEBUI_PORT=30118 SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g SPARK_WORKER_PORT=30333 SPARK_WORKER_WEBUI_PORT=30119 SPARK_WORKER_INSTANCES=1
◆ 把Hadoop配置copy到conf目錄下
◆ 在master主機上對其它機器作ssh無密碼登陸
◆ 把配置好的Spark程序使用scp copy到其它機器
◆ 在master啓動集羣
$SPARK_HOME/start-all.sh
yarn模式
◆ Spark-shell如今還不支持Yarn模式,使用Yarn模式運行,須要把Spark程序所有打包成一個jar包提交到Yarn上運行。目錄只有branch-0.8版本才真正支持Yarn。
◆ 以Yarn模式運行Spark
下載Spark代碼.
git clone git://github.com/mesos/spark
◆ 切換到branch-0.8
cd spark git checkout -b yarn --track origin/yarn
◆ 使用sbt編譯Spark並
$SPARK_HOME/sbt/sbt > package > assembly
◆ 把Hadoop yarn配置copy到conf目錄下
◆ 運行測試
SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0- SNAPSHOT.jar \ ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ --class spark.examples.SparkPi --args yarn-standalone
使用Spark-shell
◆ Spark-shell使用很簡單,當Spark以Standalon模式運行後,使用$SPARK_HOME/spark-shell進入shell即 可,在Spark-shell中SparkContext已經建立好了,實例名爲sc能夠直接使用,還有一個須要注意的是,在Standalone模式 下,Spark默認使用的調度器的FIFO調度器而不是公平調度,而Spark-shell做爲一個Spark程序一直運行在Spark上,其它的 Spark程序就只能排隊等待,也就是說同一時間只能有一個Spark-shell在運行。
◆ 在Spark-shell上寫程序很是簡單,就像在Scala Shell上寫程序同樣。
scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 scala> textFile.count() // Number of items in this RDD res0: Long = 21374 scala> textFile.first() // First item in this RDD res1: String = # Spark
編寫Driver程序
◆ 在Spark中Spark程序稱爲Driver程序,編寫Driver程序很簡單幾乎與在Spark-shell上寫程序是同樣的,不一樣的地方就是SparkContext須要本身建立。如WorkCount程序以下:
import spark.SparkContext import SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length ==0 ){ println("usage is org.test.WordCount ") } println("the args: ") args.foreach(println) val hdfsPath = "hdfs://hadoop1:8020" // create the SparkContext, args(0)由yarn傳入appMaster地址 val sc = new SparkContext(args(0), "WrodCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile(hdfsPath + args(1)) val result = textFile.flatMap(line => line.split("\\s+")) .map(word => (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(hdfsPath + args(2)) } }
提到Spark Streaming,咱們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於數據分析的軟件棧。從它的視角來看,目前的大數據處理能夠分爲如如下三個類型。
目前已有不少相對成熟的開源軟件來處理以上三種情景,咱們能夠利用MapReduce來進行批量數據處理,能夠用Impala來進行交互式查詢,對於流式數據處理,咱們能夠採用Storm。對於大多數互聯網公司來講,通常都會同時遇到以上三種情景,那麼在使用的過程當中這些公司可能會遇到以下的不便。
BDAS就是以Spark爲基礎的一套軟件棧,利用基於內存的通用計算模型將以上三種情景一網打盡,同時支持Batch、 Interactive、Streaming的處理,且兼容支持HDFS和S3等分佈式文件系統,能夠部署在YARN和Mesos等流行的集羣資源管理器 之上。BDAS的構架如圖1所示,其中Spark能夠替代MapReduce進行批處理,利用其基於內存的特色,特別擅長迭代式和交互式數據處理;Shark處理大規模數據的SQL查詢,兼容Hive的HQL。本文要重點介紹的Spark Streaming,在整個BDAS中進行大規模流式處理。
圖1 BDAS軟件棧
2. Spark Streaming構架
Spark Streaming是將流式計算分解成一系列短小的批處理做業。這裏的批處理引擎是Spark,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分紅一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),而後將Spark Streaming中對DStream的Transformation操做變爲針對Spark中對RDD的Transformation操做,將RDD經 過操做變成中間結果保存在內存中。整個流式計算根據業務的需求能夠對中間的結果進行疊加,或者存儲到外部設備。圖2顯示了Spark Streaming的整個流程。
圖2 Spark Streaming構架圖
對於流式計算來講,容錯性相當重要。首先咱們要明確一下Spark中RDD的容錯機制。每個RDD都是一個不可 變的分佈式可重算的數據集,其記錄着肯定性的操做繼承關係(lineage),因此只要輸入數據是可容錯的,那麼任意一個RDD的分區 (Partition)出錯或不可用,都是能夠利用原始輸入數據經過轉換操做而從新算出的。
圖3 Spark Streaming中RDD的lineage關係圖
對於Spark Streaming來講,其RDD的傳承關係如圖3所示,圖中的每個橢圓形表示一個RDD,橢圓形中的每一個圓形表明一個RDD中的一個 Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每個Batch Size所產生的中間結果RDD。咱們能夠看到圖中的每個RDD都是經過lineage相鏈接的,因爲Spark Streaming輸入數據能夠來自於磁盤,例如HDFS(多份拷貝)或是來自於網絡的數據流(Spark Streaming會將網絡輸入數據的每個數據流拷貝兩份到其餘的機器)都能保證容錯性。因此RDD中任意的Partition出錯,均可以並行地在其 他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。
對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段數據的處理都會通過Spark DAG圖分解,以及Spark的任務集的調度過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),因此Spark Streaming可以知足除對實時性要求很是高(如高頻實時交易)以外的全部流式準實時計算場景。
Spark目前在EC2上已可以線性擴展到100個節點(每一個節點4Core),能夠以數秒的延遲處理6GB/s的數據量(60M records/s),其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個用例所作的測試,在Grep這個測試中,Spark Streaming中的每一個節點的吞吐量是670k records/s,而Storm是115k records/s。
圖4 Spark Streaming與Storm吞吐量比較圖
3. Spark Streaming的編程模型
Spark Streaming的編程和Spark的編程一模一樣,對於編程的理解也很是相似。對於Spark來講,編程就是對於RDD的操做;而對於Spark Streaming來講,就是對DStream的操做。下面將經過一個你們熟悉的WordCount的例子來講明Spark Streaming中的輸入操做、轉換操做和輸出操做。
val ssc = new StreamingContext(「Spark://…」, 「WordCount」, Seconds(1), [Homes], [Jars])
val lines = ssc.socketTextStream(「localhost」,8888)
val words = lines.flatMap(_.split(「 」))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
另外,Spark Streaming有特定的窗口操做,窗口操做涉及兩個參數:一個是滑動窗口的寬度(Window Duration);另外一個是窗口滑動的頻率(Slide Duration),這兩個參數必須是batch size的倍數。例如以過去5秒鐘爲一個輸入窗口,每1秒統計一下WordCount,那麼咱們會將過去5秒鐘的每一秒鐘的WordCount都進行統計,而後進行疊加,得出這個窗口中的單詞統計。
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))
但上面這種方式還不夠高效。若是咱們以增量的方式來計算就更加高效,例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那麼 咱們能夠將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量(如圖5所示),這種方法能夠複用中間三秒的統 計量,提升統計的效率。
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1))
圖5 Spark Streaming中滑動窗口的疊加處理和增量處理
wordCounts = saveAsHadoopFiles(「WordCount」)
ssc.start()
4. Spark Streaming案例分析
在互聯網應用中,網站流量統計做爲一種經常使用的應用模式,須要在不一樣粒度上對不一樣數據進行統計,既有實時性的需求,又須要涉及到聚合、去重、鏈接等較爲複雜的統計需求。傳統上,如果使用Hadoop MapReduce框架,雖然能夠容易地實現較爲複雜的統計需求,但實時性卻沒法獲得保證;反之如果採用Storm這樣的流式框架,實時性雖能夠獲得保證,但需求的實現複雜度也大大提升了。Spark Streaming在二者之間找到了一個平衡點,可以以準實時的方式容易地實現較爲複雜的統計需求。 下面介紹一下使用Kafka和Spark Streaming搭建實時流量統計框架。
相比於傳統的處理框架,Kafka+Spark Streaming的架構有如下幾個優勢。
在基於Kafka+Spark Streaming的流量統計應用運行過程當中,有時會遇到內存不足、GC阻塞等各類問題。下面介紹一下如何對Spark Streaming應用程序進行調優來減小甚至避免這些問題的影響。
5. 性能調優
6. 總結
Spark Streaming提供了一套高效、可容錯的準實時大規模流式處理框架,它能和批處理及即時查詢放在同一個軟件棧中,下降學習成本。若是你學會了Spark編程,那麼也就學會了Spark Streaming編程,若是理解了Spark的調度和存儲,那麼Spark Streaming也相似。對開源軟件感興趣的讀者,咱們能夠一塊兒貢獻社區。目前Spark已在Apache孵化器中。按照目前的發展趨勢,Spark Streaming必定將會獲得更大範圍的使用。
(準實時任務(Firm Real-Time Task): 一般是指計算機系統容許任務超時,但若任務超時,該任務的計算結果沒有任何意義)
第一個是多生態做業競爭問題,一樣是好處也是一個壞處,內存消耗很是多的做業,有可能做業等待好久才能遞交上去,比普通的Hadoop更加麻煩,面臨着CPU、內存都申請到才能夠運行
第二機器內存性能,若是搭建Spark on Yarn集羣,千萬不要找內存比較小的搭集羣,最好96G或者120G,機器能夠相對少一點,好比100臺20G搭建小集羣,這樣花在私人申請、任務調 度、計算時間很是多,最後讓Spark性能很是差,既然要跑Spark,就不要珍惜內存了,並且內存價格愈來愈白菜價,用好機器,真正體驗Spark給數 據挖掘帶來質的飛躍;
第三個粗粒度的資源預申請,Spark提交的時候,運行算法以前須要把全部算法資源申請到,可能你提交一個算法運 行十個Stage,中間一個須要1T內存,那麼做業運行過程仲,要一直佔1T內存不放才能成功運行。這個做者會在後面框架作一些改進,可是動做比較大,暫時問題沒有辦法解決,須要把算法跑須要按照最大資源申請須要的內存以及Core才能夠;
最後一個問題開發人員的內存把控能力,由於 Spark做業目前寫起來有兩個問題,一個是語法問題,一個對內存把控能力。做爲普通的機器學習算法開發人員這方面會比較痛苦一點,由於他們更側重機器學 習算法,而對內存把控不太好,須要學習一些東西,才能在上邊嫺熟開發和運行算法。
[1]. Spark:一個高效的分佈式計算系統. http://soft.chinabyte.com/database/431/12914931.shtml
[2]. Spark Streaming:大規模流式數據處理的新貴.
http://www.csdn.net/article/2014-01-27/2818282-spark-streaming-big-data
[3]. 淘寶:Spark on Yarn 爲"大象"插上翅膀.
http://blog.csdn.net/it_man/article/details/18961067
[4]. Spark 快速理解.
http://blog.csdn.net/colorant/article/details/8255958