Spark Streaming鏈接TCP Socket

1.Spark Streaming是什麼

Spark Streaming是在Spark上創建的可擴展的高吞吐量實時處理流數據的框架,數據能夠是來自多種不一樣的源,例如kafka,Flume,Twitter,ZeroMQ或者TCP Socket等。在這個框架下,支持對流數據的各類運算,好比map,reduce,join等。處理事後的數據能夠存儲到文件系統或數據庫。算法

利用Spark Streaming,你可使用與批量加載數據相同的API來建立數據管道,並經過數據管道處理流式數據。此外,Spark Steaming的「micro-batching」方式提供至關好的彈性來應對某些緣由形成的任務失敗。數據庫

2. Spark Streaming的基本原理

Spark Streaming對數據的處理方式主要採用的方法是對Stream數據進行時間切片,分紅小的數據片斷,經過相似批處理的方式處理數據片斷。框架

Spark Streaming把實時輸入數據流以時間片Δt (如1秒)爲單位切分紅塊。Spark Streaming會把每塊數據做爲一個RDD,並使用RDD操做處理每一小塊數據。socket

Spark Streaming將流式計算分解成一系列短小的批處理做業。Spark Streaming的輸入數據分紅一段一段的數據(DStreaming),每一段數據都轉換成Spark中的RDD,而後將Spark Streaming中對DStream的操做變爲針對Spark中對RDD的操做,將RDD通過操做變成中間結果保存在內存中。分佈式

3. DStream

上面提到了DStreaming,那麼DStreaming究竟是什麼呢:函數

DStreaming至關於在Streaming的框架下對RDD進行封裝,表示的是咱們處理的一個實時數據流。相似於RDD,DStream提供了轉換操做,窗口轉換操做和輸出操做三種操做方法。spa

4.Spark Streaming的優點

Spark Streaming是一種構建在Spark上的實時計算框架,它擴展了Spark處理大規模流式數據的能力。線程

實時性:能運行在100+的結點上,並達到秒級延遲。Spark Streaming將流式計算分解成多個Spark Job,對於每一段數據的處理都會通過Spark的任務集的調度過程。其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),因此Spark Streaming可以知足除對實時性要求很是高的全部流式準實時計算場景。code

高效和容錯的特性:對於流式計算來講,容錯性相當重要。在spark中每個RDD都是一個不可變的分佈式可重算的數據集,其記錄着肯定性的操做,只要輸入數據是可容錯的,那麼任意一個RDD的分區出錯或不可用,都是能夠利用原始輸入數據經過轉換操做而從新算出的。而spark Streaming使用基於內存的Spark做爲執行引擎, 其容錯性天然很好。orm

吞吐量:Spark Streaming能集成Spark的批處理和交互查詢,其吞吐量比Storm至少高2~5倍。而且它爲實現複雜的算法提供了和批處理相似的簡單接口。

 

接下來用Spark  Streaming鏈接TCP Socket來講明如何使用Spark  Streaming:

1 建立StreamingContext對象

首先使用StreamingContext模塊,這個模塊的做用是提供全部的流數據處理的功能:

1 from pyspark import SparkContext
2 from pyspark.streaming import StreamingContext
3 
4 sc = SparkContext("local[2]", "streamwordcount")
5 # 建立本地的SparkContext對象,包含2個執行線程
6 
7 ssc = StreamingContext(sc, 2)
8 # 建立本地的StreamingContext對象,處理的時間片間隔時間,設置爲2s

2 建立DStream對象

咱們須要鏈接一個打開的 TCP 服務端口,從而獲取流數據,這裏使用的源是TCP Socket,因此使用socketTextStream()函數:

lines = ssc.socketTextStream("localhost", 8888)
# 建立DStream,指明數據源爲socket:來自localhost本機的8888端口

3 對DStream進行操做

咱們開始對lines進行處理,首先對當前2秒內獲取的數據進行分割並執行標準的MapReduce流程計算。

words = lines.flatMap(lambda line: line.split(" "))
# 使用flatMap和Split對2秒內收到的字符串進行分割

獲得的words是一系列的單詞,再執行下面的操做:

pairs = words.map(lambda word: (word, 1))
# map操做將獨立的單詞映射到(word,1)元組

wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# reduceByKey操做對pairs執行reduce操做得到(單詞,詞頻)元組

5 輸出數據

將處理後的數據輸出到一個文件中:

outputFile = "/home/feige/streaming/ss"
# 輸出文件夾的前綴,Spark Streaming會自動使用當前時間戳來生成不一樣的文件夾名稱

wordCounts.saveAsTextFiles(outputFile)
# 將結果輸出

6 啓動應用

要使程序在Spark Streaming上運行起來,須要執行Spark Streaming啓動的流程,調用start()函數啓動,awaitTermination()函數等待處理結束的信號。

ssc.start() 
# 啓動Spark Streaming應用
ssc.awaitTermination()

打開終端執行:

nc -lk 8888

nc的-l參數表示建立一個監聽端口,等待新的鏈接。-k參數表示當前鏈接結束後仍然保持監聽,必須與-l參數同時使用。

執行完上面的命令後不關閉終端,咱們將在這個終端中輸入一些處理的數據:

打開一個新的終端來執行咱們的Spark Streaming應用:

 這裏是spark streaming執行的過程

如今咱們來看看程序執行的效果,程序每隔2秒掃描一次監控窗口輸入的內容,咱們查看一下:

結束語:

最近壓力比較大,雜事諸多,相信這段時間事後一切都會好起來的,加油!!!

相關文章
相關標籤/搜索