大數據開發實戰:Spark Streaming流計算開發

  一、背景介紹

      Storm以及離線數據平臺的MapReduce和Hive構成了Hadoop生態對實時和離線數據處理的一套完整處理解決方案。除了此套解決方案以外,還有一種很是流行的並且完整的離線和java

    實時數據處理方案。這種方案就是Spark。Spark本質上是對Hadoop特別是MapReduce的補充、優化和完善,尤爲是數據處理速度、易用性、迭代計算和複雜數據分析等方面。算法

      Spark Streaming 做爲Spark總體解決方案中實時數據處理部分,本質上仍然是基於Spark的彈性分佈式數據集(Resilient Distributed Datasets :RDD)概念。Spark Streaming將源頭shell

    數據劃分爲很小的批,並以相似於離線批的方式來處理這部分微批數據。數據庫

      相對於Storm這種原生的實時處理框架,Spark Streaming基於微批的的方案帶來了吞吐量的提高,可是也致使了數據處理延遲的增長---基於Spark Streaming實時數據處理方案的數據apache

    延遲一般在秒級甚至分鐘級。編程

  二、Spark生態和核心概念

    2.一、Spark概覽

      Spark誕生於美國伯克利大學的AMPLab,它最初屬於伯克利大學的研究性項目,與2010年正式開源,於2013年成爲Apache基金項目,冰雨2014年成爲Apache基金的頂級項目。api

      Spark用了不到5年的時間就成了Apache的頂級項目,目前已被國內外的衆多互聯網公司使用,包括Amazon、EBay、淘寶、騰訊等。數組

      Spark的流行和它解決了Hadoop的不少不足密不可分。緩存

      傳統Hadoop基於MapReduce的方案適用於大多數的離線批處理場景,可是對於實時查詢、迭代計算等場景很是不適合,這是有其內在侷限決定的。服務器

      一、MapReduce只提供Map和Reduce兩個操做,抽象程度低,可是複雜的計算一般須要不少操做,並且操做之間有複雜的依賴關係。

      二、MapReduce的中間處理結果是放在HDFS文件系統中的,每次的落地和讀取都消耗大量的時間和資源。

      三、固然,MapReduce也不支持高級數據處理API、DAG(有向五環圖)計算、迭代計算等。

      Spark則較好地解決了上述這些問題。

      一、Spark經過引入彈性分佈式數據集(Resilient Distributed Datasets:RDD)以及RDD豐富的動做操做API,很是好地支持了DGA的計算和迭代計算。

      二、Spark經過內存計算和緩存數據很是好地支持了迭代計算和DAG計算的數據共享、減小了數據讀取的IO開銷、大大提升了數據處理速度。

      三、Spark爲批處理(Spark Core)、流式處理(Spark Streaming)、交互分析(Spark SQL)、機器學習(MLLib)和圖計算(GraphX)提供了一個同一的平臺和API,很是便於使用。

      四、Spark很是容易使用、Spark支持java、Python和Scala的API,還支持超過80種高級算法,使得用戶能夠快速構建不一樣的應用。Spark支持交互式的Python和Scala的shell,這意味着

          能夠很是方便地在這些shell中使用Spark集羣來驗證解決問題的方法,而不是像之前同樣,須要打包、上傳集羣、驗證等。這對於原型開發尤爲重要。

      五、Spark能夠很是方便地與其餘開源產品進行融合:好比,Spark可使用Hadoop的YARN和Apache Mesos做爲它的資源管理和調度器,而且能夠處理全部Hadoop支持的數據,包括HDFS、

        HBase和Cassandra等。Spark也能夠不依賴於第三方的資源管理和調度器,它實現了Standalone做爲其內置的資源管理和調度框架,這樣進一步下降了Spark的使用門檻。

      六、External Data Source多數據源支持:Spark能夠獨立運行,除了能夠運行在當下的Yarn集羣管理以外,它還能夠讀取已有的Hadoop數據。它能夠運行多種數據源,好比Parquet、Hive、

        HBase、HDFS等。

   2.二、Spark核心概念

        RDD是Spark中最爲核心和重要的概念。RDD,全稱爲 Resilient Distributed Dataset,在Spark官方文檔中被稱爲「一個可並行操做的有容錯機制的數據集合」。實際上RDD就是

      一個數據集,並且是分佈式的。同時Spark還對這個分佈式數據集提供了豐富的數據操做和容錯性。

      一、RDD建立

        Spark中建立RDD最直接的方法是調用SparkContext(SparkContext是Spark集羣環境的訪問入口,Spark Streaming也有本身對應的對象StreamContext)的parallelize方法。

        List<Integer> data =  Arrays.asList(1,2,3,4,5);

        HavaRDD<Integer> distData = sc.parallelize(data);

        上述代碼會將數據集合 (data)轉換爲這個分佈式數據集(distData),以後就能夠對此RDD執行各類轉換等。好比調用distData.reduce((a,b) => a+b),將這個數組中的元素項加,

      此外,還能夠經過設置parallelize的第二個參數手動設置生成RDD的分區數:sc.parallelize(data,10),若是不設定的話,Spark會自動設定。

        但在實際的項目中,RDD通常是從源頭數據建立的。Spark支持從任何一個Hadoop支持的存儲數據建立RDD,包括本地文件系統、HDFS、Cassandna、HBase和Amazon S3等。

      另外,Spark也支持從文本文件,SequenceFiles和其它Hadoop InputFormat的格式文件中建立RDD。建立的方法也很簡單,只須要指定源頭文件並調用對應的方法便可:

        JavaRDD<String> distFile = sc.textFile("data.txt");

        Spark 中轉換SequenceFile的SparkContext方法是sequenceFile,轉換Hadoop InputFormats的SparkContext方法是HadoopRDD。

      二、RDD操做

        RDD操做分爲轉換(transformation)和行動(action),transformation是根據原有的RDD建立一個新的RDD,action則吧RDD操做後的結果返回給driver。例如map 是一個轉換,

      它把數據集中的每一個元素通過一個方法處理的結果返回一個新的RDD,reduce是一個action,它收集RDD的全部數據通過一些方法的處理,最後把結果返回給driver。

        Spark對transformation的抽象能夠大大提升性能,這是由於在Spark中,全部transformation操做都是lazy模式,即Spark不會當即計算結果,而只是簡單地記住全部對數據集的

      轉換操做邏。這些轉換隻有遇到action操做的時候纔會開始計算。這樣的設計使得Spark更加高效,例如能夠經過map建立一個新數據集在reduce中使用,並僅僅返回reduce的

      結果給driver,而不是整個大大的map過的數據集。

      三、RDD持久化

        Spark最重要的一個功能是它能夠經過各類操做持久化(或緩存)一個集合到內存中。當持久化一個RDD的時候,每個節點都將參與計算的全部分區數據存儲到內存中,

      而且這些數據能夠被這個集合(以及這個集合衍生的其餘集合)的動做重複利用。這個能力使後續的動做速度更快(一般快10倍以上)。對應迭代算法和快速的交互應用來講,

      緩存是一個關鍵的工具。

        能夠經過 persist()或者cache()方法持久化一個RDD。先在action中計算獲得RDD,而後將其保存在每一個節點的內存中。Spark的緩存是一個容錯的技術,也就是說,若是RDD的

      任何一個分區丟失,它能夠經過原有的轉換操做自動重複計算而且建立出這個分區。

        此外,還能夠利用不一樣的存儲級別存儲每個被持久化的RDD,。例如,它容許持久化集合到磁盤上、將集合做爲序列化的Java對象持久化到內存中、在節點間複製集合或存儲集合

      到Tachyon中。能夠經過傳遞一個StorageLevel對象給persist()方式設置這些存儲級別。cache()使用了默認的存儲級別-----StorageLevel.MEMORY_ONLY。

      四、Spark生態圈

        Spark創建在統一抽象的RDD之上,使得它能夠以基本一致的方式應對不一樣的大數據處理場景,包括批處理,流處理、SQL、Machine Learning以及GraphX等。這就是Spark設計的「

      通用的編程抽象」( Unified Programming Abstraction),也正是Spark獨特的地方。

        Spark生態圈包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等組件,其中Spark Core提供內存計算框架、SparkStreaming提供實時處理應用、Spark SQL提供

      即席查詢,再加上MLlib的機器學習和GraphX的圖處理,它們能無縫集成並提供Spark一站式的大數據解決平臺和生態圈。

      

      Spark Core:Spark Core實現了Spark的基本功能,包括任務調度、內存管理、錯誤恢復、與存儲系統交互等模塊。Spark Core還包括了RDD的API定義,並提供了建立和操做RDD的

        豐富API。Spark Core是Spark其它組件的基礎和根本。

      Spark Streaming:他是Spark提供的對實時數據進行流計算的組件,提供了用來操做數據流的API,而且與Spark Core中的RDD API高度對應。Spark Streaming支持與Spark Core

        同級別的容錯性、吞吐量和伸縮性。

      Spark SQL:它是Spark用來操做結構化數據的程序包,經過Spark SQL,可使用SQL或類SQL語言來查詢數據;同時Spark SQL支持多種數據源,好比Hive表、Parquet以及

        JSON等,除了爲Spark提供一個SQL接口,Spark SQL還支持開發者將SQL和傳統的RDD編程的數據操做方式向結合,不管是使用Python、Java仍是Scala,開發者均可以在

        單個應用中同時使用SQL和複雜的數據分析。

      MLLib:Spark提供了常見的機器學習功能的程序庫,叫作MLlib,MLlib提供了多種機器學習算法,包括分類、迴歸、聚類、協同過濾等,還提供了模型評估、數據導入等額外的

        支持功能。此外,MLLib還提供了一些更底層的機器學習原語,包括一個通用的梯度降低優化算法,全部這些方法都被設計爲能夠在集羣上輕鬆伸縮的架構。

      GraphX:GraphX是用來操做圖(如社交網絡的朋友圈)的程序庫,能夠進行並行的圖計算。與Spark Streaming和Spark SQL相似,GraphX也擴展了Spark的RDD API,

        能用來建立一個頂點和邊都包含任意屬性的有向圖。GraphX還支持針對圖的各類操做(如進行圖分割的subgraph和操做全部頂點的mapVertices),以及一些經常使用的圖算法

       (如PageRank和三角計算)。

   三、Spark生態的流計算技術:Spark Streaming

        Spark Streaming做爲Spark的核心組件之一,通Storm同樣,主要對數據進行實時的流處理,可是不一樣於Apache Storm(這裏指的是原生Storm,非Trident),在Spark Streaming

      中數據處理的單位是是一批而不是一條,Spark會等採集的源頭數據累積到設置的間隔條件後,對數據進行統一的微批處理。這個間隔是Spark Streaming中的核心概念和關鍵參數,

      直接決定了Spark Streaming做業的數據處理延遲,固然也決定着數據處理的吞吐量和性能。

        相對於Storm的毫秒級延遲來講,Spark Streaming的延遲最多隻能到幾百毫秒,通常是秒級甚至分鐘級,所以對於實時數據處理延遲要很是高的場合,Spark Streaming並不合適。

        另外,Spark Streaming底層依賴於Spark Core 的RDD實現,即它和Spark框架總體是綁定在一塊兒的,這是優勢也是缺點。

        對於已經採用Spark 做爲大數據處理框架,同時對數據延遲性要求不是很高的場合,Spark Streaming很是適合做爲事實流處理的工具和方案,緣由以下:

        一、Spark Streaming內部的實現和調度方式高度依賴於Spark的DAG調度器和RDD,Spark Streaming的離散流(DStream)本質上是RDD在流式數據上的抽象,所以熟悉Spark和

        和RDD概念的用戶很是容易理解Spark Streaming已經其DSream。

        二、Spark上各個組件編程模型都是相似的,因此若是熟悉Spark的API,那麼對Spark Streaming的API也很是容易上手和掌握。

        可是,若是已經採用了其餘諸如Hadoop和Storm的數據處理方案,那麼若是使用Spark Streaming,則面臨着Spark以及Spark Streaming的概念及原理的學習成本。

        整體來講,Spark Streaming做爲Spark核心API的一個擴展,它對實時流式數據的處理具備可擴展性、高吞吐量、和能夠容錯性等特色。

        同其餘流處理框架同樣,Spark Streaming從Kafka、Flume、Twitter、ZeroMQ、Kinesis等源頭獲取數據,並map、reduce、join、window等組成的複雜算法計算出指望的結果,處理

      後的結果數據可被推送到文件系統,數據庫、實時儀表盤中,固然,也能夠將處理後的數據應用到Spark的機器學習算法、圖處理算法中。整個的數據處理流程以下:

      

 

    3.一、Spark Streaming基本原理

      Spark Streaming 中基本的抽象是離散流(即DStream).DStream表明一個連續的數據流。在Spark Streaming內部中,DStream其實是由一系列連續RDD組成。每一個RDD包含肯定

    時間間隔內的數據,這些離散的RDD連在一塊兒,共同組成了對應的DStream。

    

      實際上任何,任何對DStream的操做都轉換成了對DStream隱含的一系列對應RDD的操做,如上圖中對lines DStream是的flatMap操做,實際上應用於lines對應每一個RDD的操做,並生成了

    對應的work DStream的RDD。

      也就是上面所說的,Spark Streaming底層依賴於Spark Core的RDD實現。從本質上來講,Spark Streaming只不過是將流式的數據流根據設定的間隔分紅了一系列的RDD,而後在每一個RDD上

    應用相應的各類操做和協做,因此Spark Streaming底層的運行引擎其實是Spark Core。

   3.二、Spark Streaming核心API

      SparkStreaming完整的API包括StreamingContext、DStream輸入、DStream上的各類操做和動做、DStream輸出等。

      一、StreamingContext

        爲了初始化Spark Streaming程序,必須建立一個StreamingContext對象,該對象是Spark Streaming全部流操做的主要入口。一個StreamingContext對象能夠用SparkConf對象建立:

        import org.apache.spark.*;

        import org.apache.spark.streaming.api.Java.*;

        SparkConf conf =  new SparkConf().setAppName(appName).setMaster(master);

        JavaStreamingContext ssc =  new JavaStreamingContext(conf, new Duration(1000));

      二、DStream輸入

        DStream輸入表示從數據源獲取的原始數據流。每一個輸入流DStream和一個接收器(receiver)對象相關聯,這個Receiver從源中獲取數據,並將數據存入內存中用於處理。

        Spark Streaming有兩類數據源:

        基本源(basic source):在StreamingContext API中直接可用的源頭,例如文件系統、套接字鏈接、Akka的actor等。

        高級源(advanced source):包括 Kafka、Flume、Kinesis、Tiwtter等,他們須要經過額外的類來使用。

      三、DStream的轉換

        和RDD相似,transformation用來對輸入DStreams的數據進行轉換、修改等各類操做,固然,DStream也支持不少在Spark RDD的transformation算子。

        

 

      四、DStream的輸出

        和RDD相似,Spark Streaming容許將DStream轉換後的結果發送到數據庫、文件系統等外部系統中。目前,定義了Spark Streaming的輸出操做:

        

 

  四、Spark Streaming實時開發實例

      下面用字符計數這個例子來講明 Spark Streming

      首先,導入 Spark Streaming的相關類到環境中,這些類(如DStream)提供了流操做不少有用的方法,StreamingContext是Spark全部流操做的主要入口。

      其次,建立一個具備兩個執行線程以及1秒批間隔時間(即以秒爲單位分隔數據流)的本地StreamingContext.

      

import org.apache.spark.{*, SparkConf}
import org.apache.spark.api.java.function.*
import org.apache.spark.streaming.{*, Duration, Durations}
import org.apache.spark.streaming.api.java.{*, JavaDStream, JavaStreamingContext}

import scala.Tuple2;

object streaming_test {
  def main(args: Array[String]): Unit = {
    //建立一個本地的StreamingContext上下文對象,該對象包含兩個工做線程,批處理間隔爲1秒
    val conf  = new SparkConf().setMaster("local[2]").setAppName("Network-WordCount");

    val jssc = new JavaStreamingContext(conf,Durations.seconds(1));
    //利用這個上下文,可以建立一個DStream,它表示從TCP源(主機爲localhost,端口爲9999)獲取的流式數據
    //建立一個鏈接到hostname:port的DStream對象,相似localhost:9999
    val lines =jssc.socketTextStream("localhost",9999);
    //這個lines變量是一個DStream,表示即將從數據服務器或的數據流,這個DStream的每條記錄都表明一行文本,
    // 接下來須要將DStream中的每行文本都切分爲單詞
    val words =lines.flatMap(x:String => util.Arrays.asList(x.split(" ")).iterator());
    val pairs =words.mapToPair<s=>new Tuple2<>(s,1));
    val wordCounts =pairs.reduceByKey((i1,i2)=> i1+i2);
    wordCounts.print();
  }
}

   四、Spark Streaming調優實踐

    Spark Streaming做業的調優一般都涉及做業開發的優化、並行度的優化和批大小以及內存等資源的優化。

    一、做業開發優化

      RDD複用:對於實時做業,尤爲是鏈路較長的做業,要儘可能重複使用RDD,而不是重複建立多個RDD。另外須要屢次使用的中間RDD,能夠將其持久化,以下降每次都須要重複計算的開銷。

      使用效率較高的shuffle算子:如同Hadoop中的做業同樣,實時做業的shuffle操做會涉及數據從新分佈,所以會耗費大量的內存、網絡和計算等資源,須要儘可能下降須要shuffle的數據量,

      reduceByKey/aggregateByKey相比groupByKey,會在map端先進行預聚合,所以效率較高。

      相似於Hive的MapJoin:對於實時做業,join也會涉及數據的從新分佈,所以若是是大數據量的RDD和小數據量的RDD進行join,能夠經過broadcast與map操做實現相似於Hive的MapJoin,

      可是須要注意小數量的RDD不能過大,否則廣播數據的開銷也很大。

      其它高效的例子:如使用mapPartitions替代普通map,使用foreachPartitions替代foreach,使用repartitionAndSortWithinPartitions替代repartition與 sort類操做等。

    二、並行度和批大小

      對於Spark Streaming這種基於微批處理和實時處理框架來講,其調優不外乎兩點:

      一是儘可能縮短每一批次的處理時間

      二是設置合適的batch size(即每批處理的數據量),使得數據處理的速度可以適配數據流入的速度。

      第一點一般以設置源頭、處理、輸出的併發度來實現。

      源頭併發:若是源頭的輸入任務是實時做業的瓶頸,那麼能夠經過加大源頭的併發度提供性能,來保證數據可以流入後續的處理鏈路。在Spark Streaming,能夠經過以下代碼來實現(

      一Kafka源頭爲例):

      int numStreams = 5;

      List<JavaPairDStream<String,String>> kafkaStreams = new ArrayList<JavaPairDStream<String,String>>(numStreams );

      for(int i=0;i<numStreams ;i++){

        kafkaStreams.add(KafkaUtils.createStream(...));

      }

       JavaPairDStream<String,String> unifiedStream = streamingContext.union( kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));

      處理併發:處理任務的併發決定了實際做業執行的物理視圖。Spark Streaming做業的默認併發度能夠經過spark.default.parllelism來設置,可是實際中不推薦,建議針對每一個任務單獨設置

      併發度進行精細控制。

      輸出併發:如圖Hadoop做業同樣,實時做業的shuffle操做會涉及數據從新分佈,所以會耗費大量的內存、網絡和計算等資源,所以須要儘可能減小shuffle操做。

      batch size:batch size主要影響系統的吞吐量和延遲。batch size 過小,通常處理延遲會下降,可是系統吞吐量會降低;batch size太大,吞吐量上去了,可是處理延遲會提升,同時要求的

      內存也會增長,所以實際中須要找到一個平衡點,既能知足吞吐量也能知足延遲的要求,那麼實際中如何設置batch大小呢?

    參考資料:《離線和實時大數據開發實戰》

相關文章
相關標籤/搜索