Storm以及離線數據平臺的MapReduce和Hive構成了Hadoop生態對實時和離線數據處理的一套完整處理解決方案。除了此套解決方案以外,還有一種很是流行的並且完整的離線和java
實時數據處理方案。這種方案就是Spark。Spark本質上是對Hadoop特別是MapReduce的補充、優化和完善,尤爲是數據處理速度、易用性、迭代計算和複雜數據分析等方面。算法
Spark Streaming 做爲Spark總體解決方案中實時數據處理部分,本質上仍然是基於Spark的彈性分佈式數據集(Resilient Distributed Datasets :RDD)概念。Spark Streaming將源頭shell
數據劃分爲很小的批,並以相似於離線批的方式來處理這部分微批數據。數據庫
相對於Storm這種原生的實時處理框架,Spark Streaming基於微批的的方案帶來了吞吐量的提高,可是也致使了數據處理延遲的增長---基於Spark Streaming實時數據處理方案的數據apache
延遲一般在秒級甚至分鐘級。編程
Spark誕生於美國伯克利大學的AMPLab,它最初屬於伯克利大學的研究性項目,與2010年正式開源,於2013年成爲Apache基金項目,冰雨2014年成爲Apache基金的頂級項目。api
Spark用了不到5年的時間就成了Apache的頂級項目,目前已被國內外的衆多互聯網公司使用,包括Amazon、EBay、淘寶、騰訊等。數組
Spark的流行和它解決了Hadoop的不少不足密不可分。緩存
傳統Hadoop基於MapReduce的方案適用於大多數的離線批處理場景,可是對於實時查詢、迭代計算等場景很是不適合,這是有其內在侷限決定的。服務器
一、MapReduce只提供Map和Reduce兩個操做,抽象程度低,可是複雜的計算一般須要不少操做,並且操做之間有複雜的依賴關係。
二、MapReduce的中間處理結果是放在HDFS文件系統中的,每次的落地和讀取都消耗大量的時間和資源。
三、固然,MapReduce也不支持高級數據處理API、DAG(有向五環圖)計算、迭代計算等。
Spark則較好地解決了上述這些問題。
一、Spark經過引入彈性分佈式數據集(Resilient Distributed Datasets:RDD)以及RDD豐富的動做操做API,很是好地支持了DGA的計算和迭代計算。
二、Spark經過內存計算和緩存數據很是好地支持了迭代計算和DAG計算的數據共享、減小了數據讀取的IO開銷、大大提升了數據處理速度。
三、Spark爲批處理(Spark Core)、流式處理(Spark Streaming)、交互分析(Spark SQL)、機器學習(MLLib)和圖計算(GraphX)提供了一個同一的平臺和API,很是便於使用。
四、Spark很是容易使用、Spark支持java、Python和Scala的API,還支持超過80種高級算法,使得用戶能夠快速構建不一樣的應用。Spark支持交互式的Python和Scala的shell,這意味着
能夠很是方便地在這些shell中使用Spark集羣來驗證解決問題的方法,而不是像之前同樣,須要打包、上傳集羣、驗證等。這對於原型開發尤爲重要。
五、Spark能夠很是方便地與其餘開源產品進行融合:好比,Spark可使用Hadoop的YARN和Apache Mesos做爲它的資源管理和調度器,而且能夠處理全部Hadoop支持的數據,包括HDFS、
HBase和Cassandra等。Spark也能夠不依賴於第三方的資源管理和調度器,它實現了Standalone做爲其內置的資源管理和調度框架,這樣進一步下降了Spark的使用門檻。
六、External Data Source多數據源支持:Spark能夠獨立運行,除了能夠運行在當下的Yarn集羣管理以外,它還能夠讀取已有的Hadoop數據。它能夠運行多種數據源,好比Parquet、Hive、
HBase、HDFS等。
RDD是Spark中最爲核心和重要的概念。RDD,全稱爲 Resilient Distributed Dataset,在Spark官方文檔中被稱爲「一個可並行操做的有容錯機制的數據集合」。實際上RDD就是
一個數據集,並且是分佈式的。同時Spark還對這個分佈式數據集提供了豐富的數據操做和容錯性。
一、RDD建立
Spark中建立RDD最直接的方法是調用SparkContext(SparkContext是Spark集羣環境的訪問入口,Spark Streaming也有本身對應的對象StreamContext)的parallelize方法。
List<Integer> data = Arrays.asList(1,2,3,4,5);
HavaRDD<Integer> distData = sc.parallelize(data);
上述代碼會將數據集合 (data)轉換爲這個分佈式數據集(distData),以後就能夠對此RDD執行各類轉換等。好比調用distData.reduce((a,b) => a+b),將這個數組中的元素項加,
此外,還能夠經過設置parallelize的第二個參數手動設置生成RDD的分區數:sc.parallelize(data,10),若是不設定的話,Spark會自動設定。
但在實際的項目中,RDD通常是從源頭數據建立的。Spark支持從任何一個Hadoop支持的存儲數據建立RDD,包括本地文件系統、HDFS、Cassandna、HBase和Amazon S3等。
另外,Spark也支持從文本文件,SequenceFiles和其它Hadoop InputFormat的格式文件中建立RDD。建立的方法也很簡單,只須要指定源頭文件並調用對應的方法便可:
JavaRDD<String> distFile = sc.textFile("data.txt");
Spark 中轉換SequenceFile的SparkContext方法是sequenceFile,轉換Hadoop InputFormats的SparkContext方法是HadoopRDD。
二、RDD操做
RDD操做分爲轉換(transformation)和行動(action),transformation是根據原有的RDD建立一個新的RDD,action則吧RDD操做後的結果返回給driver。例如map 是一個轉換,
它把數據集中的每一個元素通過一個方法處理的結果返回一個新的RDD,reduce是一個action,它收集RDD的全部數據通過一些方法的處理,最後把結果返回給driver。
Spark對transformation的抽象能夠大大提升性能,這是由於在Spark中,全部transformation操做都是lazy模式,即Spark不會當即計算結果,而只是簡單地記住全部對數據集的
轉換操做邏。這些轉換隻有遇到action操做的時候纔會開始計算。這樣的設計使得Spark更加高效,例如能夠經過map建立一個新數據集在reduce中使用,並僅僅返回reduce的
結果給driver,而不是整個大大的map過的數據集。
三、RDD持久化
Spark最重要的一個功能是它能夠經過各類操做持久化(或緩存)一個集合到內存中。當持久化一個RDD的時候,每個節點都將參與計算的全部分區數據存儲到內存中,
而且這些數據能夠被這個集合(以及這個集合衍生的其餘集合)的動做重複利用。這個能力使後續的動做速度更快(一般快10倍以上)。對應迭代算法和快速的交互應用來講,
緩存是一個關鍵的工具。
能夠經過 persist()或者cache()方法持久化一個RDD。先在action中計算獲得RDD,而後將其保存在每一個節點的內存中。Spark的緩存是一個容錯的技術,也就是說,若是RDD的
任何一個分區丟失,它能夠經過原有的轉換操做自動重複計算而且建立出這個分區。
此外,還能夠利用不一樣的存儲級別存儲每個被持久化的RDD,。例如,它容許持久化集合到磁盤上、將集合做爲序列化的Java對象持久化到內存中、在節點間複製集合或存儲集合
到Tachyon中。能夠經過傳遞一個StorageLevel對象給persist()方式設置這些存儲級別。cache()使用了默認的存儲級別-----StorageLevel.MEMORY_ONLY。
四、Spark生態圈
Spark創建在統一抽象的RDD之上,使得它能夠以基本一致的方式應對不一樣的大數據處理場景,包括批處理,流處理、SQL、Machine Learning以及GraphX等。這就是Spark設計的「
通用的編程抽象」( Unified Programming Abstraction),也正是Spark獨特的地方。
Spark生態圈包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等組件,其中Spark Core提供內存計算框架、SparkStreaming提供實時處理應用、Spark SQL提供
即席查詢,再加上MLlib的機器學習和GraphX的圖處理,它們能無縫集成並提供Spark一站式的大數據解決平臺和生態圈。
Spark Core:Spark Core實現了Spark的基本功能,包括任務調度、內存管理、錯誤恢復、與存儲系統交互等模塊。Spark Core還包括了RDD的API定義,並提供了建立和操做RDD的
豐富API。Spark Core是Spark其它組件的基礎和根本。
Spark Streaming:他是Spark提供的對實時數據進行流計算的組件,提供了用來操做數據流的API,而且與Spark Core中的RDD API高度對應。Spark Streaming支持與Spark Core
同級別的容錯性、吞吐量和伸縮性。
Spark SQL:它是Spark用來操做結構化數據的程序包,經過Spark SQL,可使用SQL或類SQL語言來查詢數據;同時Spark SQL支持多種數據源,好比Hive表、Parquet以及
JSON等,除了爲Spark提供一個SQL接口,Spark SQL還支持開發者將SQL和傳統的RDD編程的數據操做方式向結合,不管是使用Python、Java仍是Scala,開發者均可以在
單個應用中同時使用SQL和複雜的數據分析。
MLLib:Spark提供了常見的機器學習功能的程序庫,叫作MLlib,MLlib提供了多種機器學習算法,包括分類、迴歸、聚類、協同過濾等,還提供了模型評估、數據導入等額外的
支持功能。此外,MLLib還提供了一些更底層的機器學習原語,包括一個通用的梯度降低優化算法,全部這些方法都被設計爲能夠在集羣上輕鬆伸縮的架構。
GraphX:GraphX是用來操做圖(如社交網絡的朋友圈)的程序庫,能夠進行並行的圖計算。與Spark Streaming和Spark SQL相似,GraphX也擴展了Spark的RDD API,
能用來建立一個頂點和邊都包含任意屬性的有向圖。GraphX還支持針對圖的各類操做(如進行圖分割的subgraph和操做全部頂點的mapVertices),以及一些經常使用的圖算法
(如PageRank和三角計算)。
Spark Streaming做爲Spark的核心組件之一,通Storm同樣,主要對數據進行實時的流處理,可是不一樣於Apache Storm(這裏指的是原生Storm,非Trident),在Spark Streaming
中數據處理的單位是是一批而不是一條,Spark會等採集的源頭數據累積到設置的間隔條件後,對數據進行統一的微批處理。這個間隔是Spark Streaming中的核心概念和關鍵參數,
直接決定了Spark Streaming做業的數據處理延遲,固然也決定着數據處理的吞吐量和性能。
相對於Storm的毫秒級延遲來講,Spark Streaming的延遲最多隻能到幾百毫秒,通常是秒級甚至分鐘級,所以對於實時數據處理延遲要很是高的場合,Spark Streaming並不合適。
另外,Spark Streaming底層依賴於Spark Core 的RDD實現,即它和Spark框架總體是綁定在一塊兒的,這是優勢也是缺點。
對於已經採用Spark 做爲大數據處理框架,同時對數據延遲性要求不是很高的場合,Spark Streaming很是適合做爲事實流處理的工具和方案,緣由以下:
一、Spark Streaming內部的實現和調度方式高度依賴於Spark的DAG調度器和RDD,Spark Streaming的離散流(DStream)本質上是RDD在流式數據上的抽象,所以熟悉Spark和
和RDD概念的用戶很是容易理解Spark Streaming已經其DSream。
二、Spark上各個組件編程模型都是相似的,因此若是熟悉Spark的API,那麼對Spark Streaming的API也很是容易上手和掌握。
可是,若是已經採用了其餘諸如Hadoop和Storm的數據處理方案,那麼若是使用Spark Streaming,則面臨着Spark以及Spark Streaming的概念及原理的學習成本。
整體來講,Spark Streaming做爲Spark核心API的一個擴展,它對實時流式數據的處理具備可擴展性、高吞吐量、和能夠容錯性等特色。
同其餘流處理框架同樣,Spark Streaming從Kafka、Flume、Twitter、ZeroMQ、Kinesis等源頭獲取數據,並map、reduce、join、window等組成的複雜算法計算出指望的結果,處理
後的結果數據可被推送到文件系統,數據庫、實時儀表盤中,固然,也能夠將處理後的數據應用到Spark的機器學習算法、圖處理算法中。整個的數據處理流程以下:
Spark Streaming 中基本的抽象是離散流(即DStream).DStream表明一個連續的數據流。在Spark Streaming內部中,DStream其實是由一系列連續RDD組成。每一個RDD包含肯定
時間間隔內的數據,這些離散的RDD連在一塊兒,共同組成了對應的DStream。
實際上任何,任何對DStream的操做都轉換成了對DStream隱含的一系列對應RDD的操做,如上圖中對lines DStream是的flatMap操做,實際上應用於lines對應每一個RDD的操做,並生成了
對應的work DStream的RDD。
也就是上面所說的,Spark Streaming底層依賴於Spark Core的RDD實現。從本質上來講,Spark Streaming只不過是將流式的數據流根據設定的間隔分紅了一系列的RDD,而後在每一個RDD上
應用相應的各類操做和協做,因此Spark Streaming底層的運行引擎其實是Spark Core。
SparkStreaming完整的API包括StreamingContext、DStream輸入、DStream上的各類操做和動做、DStream輸出等。
一、StreamingContext
爲了初始化Spark Streaming程序,必須建立一個StreamingContext對象,該對象是Spark Streaming全部流操做的主要入口。一個StreamingContext對象能夠用SparkConf對象建立:
import org.apache.spark.*;
import org.apache.spark.streaming.api.Java.*;
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
二、DStream輸入
DStream輸入表示從數據源獲取的原始數據流。每一個輸入流DStream和一個接收器(receiver)對象相關聯,這個Receiver從源中獲取數據,並將數據存入內存中用於處理。
Spark Streaming有兩類數據源:
基本源(basic source):在StreamingContext API中直接可用的源頭,例如文件系統、套接字鏈接、Akka的actor等。
高級源(advanced source):包括 Kafka、Flume、Kinesis、Tiwtter等,他們須要經過額外的類來使用。
三、DStream的轉換
和RDD相似,transformation用來對輸入DStreams的數據進行轉換、修改等各類操做,固然,DStream也支持不少在Spark RDD的transformation算子。
四、DStream的輸出
和RDD相似,Spark Streaming容許將DStream轉換後的結果發送到數據庫、文件系統等外部系統中。目前,定義了Spark Streaming的輸出操做:
下面用字符計數這個例子來講明 Spark Streming
首先,導入 Spark Streaming的相關類到環境中,這些類(如DStream)提供了流操做不少有用的方法,StreamingContext是Spark全部流操做的主要入口。
其次,建立一個具備兩個執行線程以及1秒批間隔時間(即以秒爲單位分隔數據流)的本地StreamingContext.
import org.apache.spark.{*, SparkConf} import org.apache.spark.api.java.function.* import org.apache.spark.streaming.{*, Duration, Durations} import org.apache.spark.streaming.api.java.{*, JavaDStream, JavaStreamingContext} import scala.Tuple2; object streaming_test { def main(args: Array[String]): Unit = { //建立一個本地的StreamingContext上下文對象,該對象包含兩個工做線程,批處理間隔爲1秒 val conf = new SparkConf().setMaster("local[2]").setAppName("Network-WordCount"); val jssc = new JavaStreamingContext(conf,Durations.seconds(1)); //利用這個上下文,可以建立一個DStream,它表示從TCP源(主機爲localhost,端口爲9999)獲取的流式數據 //建立一個鏈接到hostname:port的DStream對象,相似localhost:9999 val lines =jssc.socketTextStream("localhost",9999); //這個lines變量是一個DStream,表示即將從數據服務器或的數據流,這個DStream的每條記錄都表明一行文本, // 接下來須要將DStream中的每行文本都切分爲單詞 val words =lines.flatMap(x:String => util.Arrays.asList(x.split(" ")).iterator()); val pairs =words.mapToPair<s=>new Tuple2<>(s,1)); val wordCounts =pairs.reduceByKey((i1,i2)=> i1+i2); wordCounts.print(); } }
Spark Streaming做業的調優一般都涉及做業開發的優化、並行度的優化和批大小以及內存等資源的優化。
一、做業開發優化
RDD複用:對於實時做業,尤爲是鏈路較長的做業,要儘可能重複使用RDD,而不是重複建立多個RDD。另外須要屢次使用的中間RDD,能夠將其持久化,以下降每次都須要重複計算的開銷。
使用效率較高的shuffle算子:如同Hadoop中的做業同樣,實時做業的shuffle操做會涉及數據從新分佈,所以會耗費大量的內存、網絡和計算等資源,須要儘可能下降須要shuffle的數據量,
reduceByKey/aggregateByKey相比groupByKey,會在map端先進行預聚合,所以效率較高。
相似於Hive的MapJoin:對於實時做業,join也會涉及數據的從新分佈,所以若是是大數據量的RDD和小數據量的RDD進行join,能夠經過broadcast與map操做實現相似於Hive的MapJoin,
可是須要注意小數量的RDD不能過大,否則廣播數據的開銷也很大。
其它高效的例子:如使用mapPartitions替代普通map,使用foreachPartitions替代foreach,使用repartitionAndSortWithinPartitions替代repartition與 sort類操做等。
二、並行度和批大小
對於Spark Streaming這種基於微批處理和實時處理框架來講,其調優不外乎兩點:
一是儘可能縮短每一批次的處理時間
二是設置合適的batch size(即每批處理的數據量),使得數據處理的速度可以適配數據流入的速度。
第一點一般以設置源頭、處理、輸出的併發度來實現。
源頭併發:若是源頭的輸入任務是實時做業的瓶頸,那麼能夠經過加大源頭的併發度提供性能,來保證數據可以流入後續的處理鏈路。在Spark Streaming,能夠經過以下代碼來實現(
一Kafka源頭爲例):
int numStreams = 5;
List<JavaPairDStream<String,String>> kafkaStreams = new ArrayList<JavaPairDStream<String,String>>(numStreams );
for(int i=0;i<numStreams ;i++){
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String,String> unifiedStream = streamingContext.union( kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
處理併發:處理任務的併發決定了實際做業執行的物理視圖。Spark Streaming做業的默認併發度能夠經過spark.default.parllelism來設置,可是實際中不推薦,建議針對每一個任務單獨設置
併發度進行精細控制。
輸出併發:如圖Hadoop做業同樣,實時做業的shuffle操做會涉及數據從新分佈,所以會耗費大量的內存、網絡和計算等資源,所以須要儘可能減小shuffle操做。
batch size:batch size主要影響系統的吞吐量和延遲。batch size 過小,通常處理延遲會下降,可是系統吞吐量會降低;batch size太大,吞吐量上去了,可是處理延遲會提升,同時要求的
內存也會增長,所以實際中須要找到一個平衡點,既能知足吞吐量也能知足延遲的要求,那麼實際中如何設置batch大小呢?
參考資料:《離線和實時大數據開發實戰》