Spark Streaming

Spark Streaming 近實時數據處理

 Spark Streaming是Spark Core API的一種擴展,它能夠用於進行大規模、高吞吐量、容錯的實時數據流的處理。它支持從不少種數據源中讀取數據,好比Kafka、Flume、Twitter、ZeroMQ、Kinesis或者是TCP Socket。而且可以使用相似高階函數的複雜算法來進行數據處理,好比map、reduce、join和window。處理後的數據能夠被保存到文件系統、數據庫、Dashboard等存儲中。html

 

在內部,它的工做原理以下。Spark Streaming接收實時輸入數據流,並將數據分紅批(batch),而後由Spark引擎處理數據,以分批生成最終的結果流。java

Spark-Streaming提供了一種高級的抽象,叫作DStream,英文全稱爲Discretized Stream,中文翻譯爲「離散流」,它表明了一個持續不斷的數據流。DStream能夠經過輸入數據源來建立,好比Kafka、Flume和Kinesis;也能夠經過對其餘DStream應用高階函數來建立,好比map、reduce、join、window。 
DStream的內部,其實一系列持續不斷產生的RDD。RDD是Spark Core的核心抽象,即,不可變的,分佈式的數據集。DStream中的每一個RDD都包含了一個時間段內的數據。git

本指南介紹如何開始使用DStreams編寫Spark Streaming程序。您能夠在Scala,Java或Python中編寫Spark Streaming程序(在Spark 1.2中引入),全部這些都在本指南中介紹。您能夠在本指南中找到標籤,讓您能夠選擇不一樣語言的代碼段。github

 

 

Spark Streaming發揮流式處理的優點,選擇一次處理一批數據而不是一次處理一個事件。這統一了Spark對批處理和實時處理的編程模型。算法

micro batching

 micro batching定義爲流數據切分爲幾組小批量的程序,有助於提升流處理的性能,同時低延遲處理每條信息。數據庫

spark streaming中mirco batch基於時間建立而不是數據量,必定週期(一般是毫秒)的數據彙集爲一批次。保證了數據處理的低延遲。apache

另外一個優點是保證處理的數據量在控制範圍內。假設處理引擎須要在每條數據消費後發送回覆(ack),批處理的狀況下,回覆將在批次處理完後發送而不是每條信息處理後都發送。編程

缺點是,有錯誤發生時,批次的數據都須要從新處理。vim

 

Getting Started

SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaReceiverInputDStream<String> StreamingLines = stream
JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() );
JavaPairDStream<String, Integer> wordCounts = words.mapT
wordCounts.print();
streamingContext.start();
streamingContext.awaitTermination();

1.建立Streaming Context,Java中是JavaSteamingContext網絡

2.StreamingContext接受兩個參數,SparkConf,batchDuration決定小批次的數據間隔

3.調用start方法啓動Spark Streaming Job

4.調用awaitTermination或stop 顯示終止Job

 

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>

 

Streaming Source

Kafka

1.基於Receiver

這種方式使用Receiver接收數據。Receiver使用Kafka的高階API實現。與全部接收器同樣,經過接收器從Kafka接收的數據存儲在Spark Executor中,而後由Spark Streaming啓動的做業處理數據。

然而,默認配置下,這種方法在故障時可能形成數據丟失。能夠同步地將全部接收到的Kafka數據同時保存在分佈式文件系統(例如HDFS)上的寫入日誌中,以便在故障時能夠恢復全部數據。

 

2.Direct

在Spark 1.3中引入了這種新的無接收器「直接」方法,以確保更強的端到端保證。該方法不是使用接收器來接收數據,而是按期查詢Kafka中每一個主題分區中的最新偏移量,並相應地定義每一個批處理中要處理的偏移範圍。當啓動處理數據的做業時,Kafka的簡單消費者API用於讀取Kafka定義的偏移範圍(相似於從文件系統讀取文件)。請注意,這是Spark 1.3中爲Scala和Java API引入的實驗功能。Spark 1.4添加了一個Python API,但它尚未徹底同步。

 

Streaming Transformations

場景:

航空公司須要處理航班傳遞的實時溫度數據:

  1.僅僅打印或保存實時溫度,無狀態的流處理(stateless)。

  2.每隔十分鐘,統計前一個小時的平均溫度。每十分鐘需肯定一小時的時間窗口,屬於固定時間窗的有狀態流處理(stateful within a window time frame )。

  3.統計航班整個行程的平均溫度,非特定時間週期,屬於spark streaming job全程的有狀態處理。

 

數據以JSON格式回傳:

{"flightId":"tz302","timestamp":1494423926816,"temperature":21.12,"landed":false}

where flightId = unique id of the flight
timestamp = time at which event is generated
temperature = temperature reading at the time of event generation
landed = boolean flag which defines whether flight has been landed or still on the way

Following POJO will be used to map messages to Java Object for further processing

public class FlightDetails implements Serializable {
private String flightId;
private double temp;
private boolean landed;
private long temperature;
}

 

Stateless transformation

1.map 

JavaReceiverInputDStream<String> inStream= jssc.socketTextStream("localhost", 9999);
JavaDStream<FlightDetails> flightDetailsStream = inStream.map(x -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(x, FlightDetails.class);
});

flightDetailsStream.foreachRDD(rdd->rdd.saveAsTextFile());

 

2.flatMap

3.filter

4.join    DStream由鍵值對RDD組成,(K,V)及(K,X),返回新的鍵值對DStream (K,(V,X))

5.union

6.count

7.reduce

8.reduceByKey

 

 

Stateful transformation

在必定週期內(大於批處理間隔或整個job session內),維護狀態值。經過建立檢查點實現,嚴格來說,windowing不須要檢查點,但windowing期間的其餘操做須要。

 

Window Operations

Spark Streaming還提供了窗口計算,容許您在數據的滑動窗口上應用轉換。下圖說明了這個滑動窗口。

如圖所示,每當窗口滑過源DStream時,落在窗口內的源RDD被組合並運行,以產生窗口DStream的RDD。在這種具體狀況下,操做應用於最近3個時間單位的數據,並以2個時間單位滑動。這代表任何窗口操做都須要指定兩個參數。

  window length - 窗口的持續時間(圖中的3)。

  sliding interval - 執行窗口操做的間隔(圖中的2)。

 這兩個參數必須是源DStream的批間隔的倍數(圖中的1)。

 

咱們以一個例子來講明窗口操做。假設您但願經過在過去30秒的數據中每10秒產生一個字數來擴展早期的示例。爲此,咱們必須在最近30秒的數據中對(word,1)對的對DStream應用reduceByKey操做。這是使用reduceByKeyAndWindow操做完成的。

 

———————————————————————————————————————————————————————————————————————————————

cache和checkpoint的區別

Caching / Persistence

 與RDD相似,DStreams還容許開發人員將流的數據保留在內存中。也就是說,在DStream上使用persist()方法將自動保留該DStream的每一個RDD在內存中。若是DStream中的數據將被屢次計算(例如,相同數據上的多個操做),這是有用的。對於基於窗口的操做,如reduceByWindow和reduceByKeyAndWindow以及基於狀態的操做,如updateStateByKey,這是隱含的。所以,基於窗口的操做生成的DStream會自動保存在內存中,即便開發人員不調用persist()。

對於經過網絡接收數據(例如Kafka,Flume,套接字等)的輸入流,默認持久性級別被設置爲將數據複製到兩個節點進行容錯。

請注意,與RDD不一樣,DStream的默認持久性級別將數據序列化在內存中。這在「性能調優」部分進一步討論。有關不一樣持久性級別的更多信息,請參見「Spark編程指南」。

Checkpointing

每個Spark Streaming應用,正常來講,都是要7*24小時運轉的,這就是實時計算程序的特色,由於要持續不斷地對數據進行計算,所以,對實時計算應用的要求,應該是必需要可以對應用程序邏輯無關的失敗,進行容錯,若是要實現這個目標,Spark-Streaming程序就必須將足夠的信息checkpoint到容錯的存儲系統上(HDFS),從而讓它可以在失敗中進行恢復。檢查點有兩種類型的數據:

1.元數據checkpoint —-—將定義了流式計算邏輯的信息,報錯到容錯的存儲系統上,好比HDFS。當運行Spark-Streaming應用程序的Driver進程所在的節點失敗時,該信息能夠用於進行恢復。 

  元數據信息包括了: 
    1.1:配置信息—建立Spark-Streaming應用程序的配置信息,好比SparkConf 
    1.2:DStream的操做信息—-定義了Spark-Stream應用程序的計算邏輯的DStream操做信息 
    1.3:未處理的batch信息—-哪些job正在排隊,還沒處理的batch信息。

 

2.數據checkpoint ———將實時計算過程當中產生的RDD的數據保存到可靠的存儲系統中。對於一些將多個batch的數據進行聚合的,有狀態的transformation操做,這是很是有用的。在這種tranformation操做中,生成的RDD是依賴與以前的batch的,這會致使隨着時間的推移,Rdd的依賴鏈條愈來愈長,要避免因爲依賴鏈條愈來愈長,致使一塊兒變得愈來愈長的失敗恢復時間,有狀態的transformation  操做執行過程當中間產生的RDD,會按期的被checkpoint盜可靠的存儲系統上,好比HDFS,從而削減RDD的依賴鏈條,進而縮短失敗恢復時, RDD的恢復時間。

總而言之,元數據檢查點主要用於從驅動程序故障恢復,而數據或RDD檢查點是必要的,尤爲是使用基本功能的有狀態轉換(stateful transformation)時。

什麼時候啓用checkpoint機制 

1.使用了有狀態的transformation操做          ———好比updateStateByKey,或者reduceByKeyAndWindow操做被使用了, 那麼checkpoint目錄要求是必須提供的,也就必須開啓checkpoint機制,從而進行週期性的RDD checkpoint 
2.要保證能夠從Driver失敗中進行恢復         ———元數據checkpoint須要啓用,來進行這種狀況的恢復。

要注意的是,並非說全部的Spark-Streaming應用程序,都要啓用checkpoint機制,若是不強制要求從Driver 失敗中自動進行恢復,有沒有使用有狀態的transformation操做,那麼就不須要啓用checkpoint,事實上這麼作反而是用利於提高性能的。

如何對dstream作checkpoint?

要配置該機制,首先要調用StreamingContext的checkpoint()方法設置一個checkpoint目錄,而後須要將spark.streaming.receiver.writeAheadLog.enable參數設置爲true。這將容許您使用上述有狀態轉換。此外,若是要使應用程序從驅動程序故障中恢復,您應該重寫流式應用程序以具備如下行爲:

1.當程序第一次啓動時,它將建立一個新的StreamingContext,設置全部流,而後調用start()。

2.當程序在失敗後從新啓動時,它將從checkpoint目錄中的檢查點數據從新建立一個StreamingContext。

// Create a factory object that can create a and setup a new JavaStreamingContext JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { JavaStreamingContext jssc = new JavaStreamingContext(...); // new context JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams ... jssc.checkpoint(checkpointDirectory); // set checkpoint directory return jssc; } }; // Get JavaStreamingContext from checkpoint data or create a new one JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start(); context.awaitTermination();

若是checkpointDirectory存在,則將從檢查點數據從新建立上下文。若是目錄不存在(即第一次運行),則將調用函數contextFactory來建立新的上下文並設置DStream。請參閱Java示例JavaRecoverableNetworkWordCount。此示例將網絡數據的字數添加到文件中。

除了使用getOrCreate以外,還須要確保在失敗時自動從新啓動驅動程序進程。這隻能由用於運行應用程序的部署基礎結構完成。這在「部署」部分進一步討論。

請注意,RDD的檢查點會增長保存到可靠存儲的成本。這可能會致使須要保存check point的RDD的批處理時間增長。所以,須要仔細設置檢查點的間隔。在小批量(例如1秒)的狀況下,每一個批次的檢查點可能會顯着下降操做吞吐量。相反,檢查點太少會致使譜系和任務大小增加,這可能會產生不利影響。對於須要RDD檢查點的狀態轉換,默認間隔是至少10秒的批間隔的倍數。它能夠經過使用dstream.checkpoint(checkpointInterval)進行設置。一般,DStream的5到10個滑動間隔的檢查點間隔是一個很好的設置。

 

 

啓動預寫日誌機制 
預寫日誌機制,簡寫爲WAL,全稱爲Write Ahead Log,從Spark 1.2版本開始,就引入了基於容錯的文件系統的WAL機制,若是啓用該機制,Receiver接收到的全部數據都會被寫入配置的checkpoint目錄中的預寫日誌這種機制可讓driver在恢復的時候,避免數據丟失,而且能夠確保整個實時計算過程當中,零數據丟失

然而這種極強的可靠性機制,會致使Receiver的吞吐量大幅度降低,由於單位時間內有至關一部分時間須要將數據寫入預寫日誌,若是又但願開啓預寫日誌機制,確保數據零損失,又不但願影響系統的吞吐量,那麼能夠建立多個輸入DStream啓動多個Reciver,而後將這些receiver接收到的數據使用ssc.union()方法將這些dstream中的數據進行合併 此外在啓用了預寫日誌機制以後,推薦將複製持久化機制禁用掉,由於全部數據已經保存在容錯的文件系統中了,不須要再用複製機制進行持久化,保存一份副本了,只要將輸入的DStream的持久化機制設置一下便可

相關文章
相關標籤/搜索