[Spark] 04 - What is Spark Streaming

前言


Ref: 一文讀懂 Spark 和 Spark Streaming【簡明扼要的概覽】html

在講解 "流計算" 以前,先作一個簡單的回顧,親!python

 

1、MapReduce 的問題所在

MapReduce 模型的誕生是大數據處理從無到有的飛躍。但隨着技術的進步,對大數據處理的需求也變得愈來愈複雜,MapReduce 的問題也日漸凸顯。mysql

一般,咱們將 MapReduce 的輸入和輸出數據保留在 HDFS 上,不少時候,複雜的 ETL、數據清洗等工做沒法用一次 MapReduce 完成,因此須要將多個 MapReduce 過程鏈接起來:sql

Figure,上圖中只有兩個 MapReduce 串聯,實際上可能有幾十個甚至更多,依賴關係也更復雜。數據庫

這種方式下,每次中間結果都要寫入 HDFS 落盤保存,代價很大(別忘了,HDFS 的每份數據都須要冗餘若干份拷貝)。編程

另外,因爲本質上是屢次 MapReduce 任務,調度也比較麻煩,實時性無從談起。併發

 

2、Spark 與 RDD 模型

Fault-tolerance

通常來講,想作到 fault-tolerance 只有兩個方案:app

    1. 要麼存儲到外部(例如 HDFS),
    2. 要麼拷貝到多個副本。
    3. Spark 大膽地提出了第三種——重算一遍。

可是之因此能作到這一點,是依賴於一個額外的假設:全部計算過程都是肯定性的(deterministic)。框架

Spark 借鑑了函數式編程思想,提出了 RDD(Resilient Distributed Datasets),譯做「彈性分佈式數據集」。less

Figure,上圖演示了 RDD 分區的恢復。爲了簡潔並無畫出分區,實際上恢復是以分區爲單位的。

 

只讀的、分區的 數據集合

RDD 的數據由多個分區(partition)構成,這些分區能夠分佈在集羣的各個機器上,這也就是 RDD 中 「distributed」 的含義。

熟悉 DBMS(數據庫管理系統)的同窗能夠把 RDD 理解爲邏輯執行計劃,partition 理解爲物理執行計劃。 

 

窄依賴 & 寬依賴 

以前有涉及到一部份內容:[Spark] 01 - What is Spark

簡單的說,寬依賴就是 "一對多「。

 

在執行時,窄依賴能夠很容易的按流水線(pipeline)的方式計算:對於每一個分區從前到後依次代入各個算子便可。

然而,寬依賴須要等待前繼 RDD 中全部分區計算完成;

換句話說,寬依賴就像一個柵欄(barrier)會阻塞到以前的全部計算完成。整個計算過程被寬依賴分割成多個階段(stage),如上右圖所示。

 

聲明式編程 - Spark SQL

命令式 & 聲明式 編程:聲明式的要簡潔的多!但聲明式編程依賴於執行者產生真正的程序代碼,因此除了上面這段程序,還須要把數據模型(即 schema)一併告知執行者。

聲明式編程最廣爲人知的形式就是 SQL。Spark SQL 就是這樣一個基於 SQL 的聲明式編程接口。

你能夠將它看做在 Spark 之上的一層封裝,在 RDD 計算模型的基礎上,提供了 DataFrame API 以及一個內置的 SQL 執行計劃優化器 Catalyst。

 

代碼生成(codegen)轉化成直接對 RDD 的操做

DataFrame 就像數據庫中的表,除了數據以外它還保存了數據的 schema 信息。

Catalyst 是一個內置的 SQL 優化器,負責把用戶輸入的 SQL 轉化成執行計劃。

Catelyst 強大之處是它利用了 Scala 提供的代碼生成(codegen)機制,物理執行計劃通過編譯,產出的執行代碼效率很高,和直接操做 RDD 的命令式代碼幾乎沒有分別。

上圖是 Catalyst 的工做流程,與大多數 SQL 優化器同樣是一個 Cost-Based Optimizer (CBO),但最後使用代碼生成(codegen)轉化成直接對 RDD 的操做。

 

 

 

流計算框架:Spark Streaming 


基本認識

1、批處理 & 流計算

"批處理"和"流計算"被看做大數據系統的兩個方面。

    • 以 Kafka、Storm 爲表明的流計算框架用於實時計算,
    • 而 Spark 或 MapReduce 則負責天天、每小時的數據批處理。

在 ETL 等場合,這樣的設計經常致使一樣的計算邏輯被實現兩次,耗費人力不說,保證一致性也是個問題。

Spark Streaming 基於 Spark,另闢蹊徑提出了 D-Stream(Discretized Streams)方案:將流數據切成很小的批(micro-batch),用一系列的短暫、無狀態、肯定性的批處理實現流處理。

開發者只須要維護一套 ETL 邏輯便可同時用於批處理和流計算

 

更快速的失敗恢復 D-Stream

The figure,

    • 左圖,爲了在持續算子模型的流計算系統中保證一致性,不得不在主備機之間使用同步機制,致使性能損失,Spark Streaming 徹底沒有這個問題;
    • 右圖,D-Stream 的原理示意圖。

實際上,新的狀態 RDD 老是不斷生成,而舊的 RDD 並不會被「替代」,而是做爲新 RDD 的前繼依賴。

對於底層的 Spark 框架來講,並無時間步的概念,有的只是不斷擴張的 DAG 圖和新的 RDD 節點。

 

Spark SQL 之 流處理 Spark Structured Streaming

Spark 經過 Spark Streaming 擁有了流計算能力,那 Spark SQL 是否也能具備相似的流處理能力呢?答案是確定的。

只要將數據流建模成一張不斷增加、沒有邊界的表,在這樣的語義之下,不少 SQL 操做等就能直接應用在流數據上。

 

出人意料的是,Spark Structured Streaming 的流式計算引擎並無複用 Spark Streaming,而是在 Spark SQL 上設計了新的一套引擎。

所以,從 Spark SQL 遷移到 Spark Structured Streaming 十分容易,但從 Spark Streaming 遷移過來就要困可貴多。

 

基於這樣的模型,Spark SQL 中的大部分接口、實現都得以在 Spark Structured Streaming 中直接複用。

將用戶的 SQL 執行計劃轉化成流計算執行計劃的過程被稱爲 增量化(incrementalize),這一步是由 Spark 框架自動完成的。

對於用戶來講只要知道:每次計算的輸入是某一小段時間的流數據,而輸出是對應數據產生的計算結果。

 

2、窗口(window)

窗口(window)是對過去某段時間的定義。

批處理中,查詢一般是全量的(例如:總用戶量是多少);而流計算中,咱們一般關心近期一段時間的數據(例如:最近24小時新增的用戶量是多少)。

用戶經過選用合適的窗口來得到本身所需的計算結果,常見的窗口有滑動窗口(Sliding Window)、滾動窗口(Tumbling Window)等。

 

3、水位(watermark)

水位(watermark)用來丟棄過早的數據

在流計算中,上游的輸入事件可能存在不肯定的延遲,而流計算系統的內存是有限的、只能保存有限的狀態,必定時間以後必須丟棄歷史數據。

以雙流 A JOIN B 爲例,假設窗口爲 1 小時,那麼 A 中比當前時間減 1 小時更早的數據(行)會被丟棄;若是 B 中出現 1 小時前的事件,由於沒法處理只能忽略。

 

4、三個角色

Spark 中有三個角色:Driver, Worker 和 Cluster Manager。

  • 驅動程序(Driver)即用戶編寫的程序,對應一個 SparkContext,負責任務的構造、調度、故障恢復等。驅動程序能夠直接運行在客戶端,例如用戶的應用程序中;也能夠託管在 Master 上,這被稱爲集羣模式(cluster mode),一般用於流計算等長期任務。
  • Cluster Manager 顧名思義負責集羣的資源分配,Spark 自帶的 Spark Master 支持任務的資源分配,幷包含一個 Web UI 用來監控任務運行情況。多個 Master 能夠構成一主多備,經過 ZooKeeper 進行協調和故障恢復。一般 Spark 集羣使用 Spark Master 便可,但若是用戶的集羣中不只有 Spark 框架、還要承擔其餘任務,官方推薦使用 Mesos 做爲集羣調度器。
  • Worker節點 負責執行計算任務,上面保存了 RDD 等數據。

 

5、總結

Spark 是一個同時支持批處理流計算的分佈式計算系統。Spark 的全部計算均構建於 RDD 之上,RDD 經過算子鏈接造成 DAG 的執行計劃,RDD 的肯定性及不可變性是 Spark 實現故障恢復的基礎。Spark Streaming 的 D-Stream 本質上也是將輸入數據分紅一個個 micro-batch 的 RDD。

Spark SQL 是在 RDD 之上的一層封裝,相比原始 RDD,DataFrame API 支持數據表的 schema 信息,從而能夠執行 SQL 關係型查詢,大幅下降了開發成本。

Spark Structured Streaming 是 Spark SQL 的流計算版本,它將輸入的數據流看做不斷追加的數據行。

 

 

 

"廈大" 流計算

至此,經過 一文讀懂 Spark 和 Spark Streaming 瞭解了大概框架和概念,下面繼續」廈大「課程的學習,goto: 流計算概述

 

1、概述、特徵

流計算特徵

  • 快速到達,大小不定;
  • 來源衆多,格式複雜;
  • 一次性處理,也能夠以後丟棄;
  • 整體價值大於個別數據;
  • 順序以及完整性不過重要;

 

流計算應用

對時間比較敏感,過去過久的數據,其價值迅速下降。好比:用戶點擊流

根據大數據,進行秒級響應進行商品推薦。

 

流計算框架

  • 高性能
  • 海量式
  • 實時性
  • 分佈式
  • 可靠性

 

  • 商業級流計算平臺:IBM StreamBase, IBM InfoSphere Streams
  • 開源流計算框架:Twitter Storm, Yahoo! S4
  • 定製流計算框架:Dstream (Baidu), Puma (Facebook)

 

  1. 數據實時採集(每秒數百MB的數據採集,such as Facebook Scribe, Linkedin Kafka, Chukwa, Flume
  2. 數據實時計算 
  3. 數據實時服務

既然是實時結果,那天然是主動推送給用戶,也就是實時推薦。

 

2、SparkStreaming 解析

DStream 角色

本質是「tiny batch processing",切分到秒級響應。

DStream就是一堆Tiny RDD的集合,使用」微小「批處理模擬流計算。

 

Receiver 角色

每一個receiver 單獨負責一個 Input DStream。

    • 套接字流(基本輸入源)
    • 文件流(基本輸入源)
    • RDD 隊列流(基本輸入源)
    • 從Kafka中讀取的輸入流(高級輸入源)

Receiver監控這幾種流,而後交給流計算組件去處理。

 

3、SparkStreaming 編程

SparkStream 的指揮官

編寫Streaming程序的套路,指揮官:streamingContext。

(1) 建立DStream,也就定義了輸入源

(2) 對DStream進行一些 「轉換操做」 "輸出操做"

(3) 啓動流計算,接收數據:streamingContext.start()

(4) 結束流計算,streamingContext.awaitTermination()

(5) 手動結束流計算進程:streamingContext.stop()


交互環境

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)

 

獨立程序

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)

 

基本輸入源編程

* 文件流

實時自動監控文件內容、目錄內容。文件夾中新的文件添加進來,就會造成流,讀入。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# 定義輸入源 ssc
= StreamingContext(sc, 10) lines = ssc.textFileStream('file:///usr/local/spark/mycode/streaming/logfile')  # <---- 這是文件夾!
# 轉換和輸出操做 words
= lines.flatMap(lambda line: line.split(' '))  # 拍扁了一個個可樂罐,獲得單詞集合 wordCounts = words.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)  # 詞頻彙總
wordCounts.pprint() ssc.start() ssc.awaitTermination()

運行代碼:

/usr/local/spark/bin/spark-submit FileStreaming.py

 

* 套接字流 

 

客戶端,接收

客戶端進行刺頻統計,並顯示結果。

#!/usr/bin/env python3
# NetworkWordCount.py

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
  if len(sys.argv) != 3:
    print("Usage: NetworkWordCount.py <hostname> <port>", file=sys.stderr)
    exit(-1)

  sc = SparkContext(appName = "PythonStreamingNetworkWordCount")
  ssc = StreamingContext(sc, 1)
  lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

  counts = lines.flatMap(lambda line: line.split(" "))  \
      .map(lambda word: (word, 1))  \
      .reduceByKey(lambda a,b: a+b)

  counts.pprint()
  ssc.start()
  ssc.awaitTermination()

客戶端從服務端接收流數據:

# 用客戶端向服務端發送流數據
$ /usr/local/spark/bin/spark-submit NetworkWordCount.py localhost <端口>

 

服務端,發送

(a) 系統自帶服務端 nc。

# 打開服務端
$nc -lk <端口號>

(b) 自定義服務端:有鏈接則發送一段字符串過去。

#!/usr/bin/env python3
# DataSourceSocket.py import socket server =socket.socket() server.bind('localhost', 9999) server.listen(1) while 1:
# step 1   
print("I'm waiting for the connect")   conn, addr = server.accept()   print("Connect success! Connection is from %s " % addr[0])
# step 2,發送的內容做爲測試例子   
print("Sending data ...")   conn.send('I love hadoop I love spark hadoop is good spark is fast'.encode())
# step 3   conn.close()   
print('Connection is broken.')

啓動服務端發送流數據:

# 用客戶端向服務端發送流數據
$ /usr/local/spark/bin/spark-submit DataSourceSocket.py

 

* RDD隊列流

#!/usr/bin/env python3
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__name__":
  sc = SparkContext(appName="PythonStreamingQueueStream")
  ssc = StreamingContext(sc, 2)

  rddQueue = []
  for i in range(5):
    # 每一個rdd包含這1000個數字。
    rddQueue += [ssc.sparkContext.parallelize( [j for j in range(1, 1001) ], 10)]
    time.sleep(1)

  
# 建立了一個RDD隊列流   inputStream = ssc.queueStream(rddQueue)   # 轉換和輸出操做:統計每一個餘數出現的頻率   mappedStream = inputStream.map(lambda x: (x % 10, 1))   reducedStream = mappedStream.reduceByKey(lambda a, b: a+b)   reducedStream.pprint()   ssc.start()   ssc.stop(stopSparkContext = True, stopGraceFully = True)  # 等待全部的數據處理完後優雅地退出

  

 

 

 

DStream轉換操做


Ref: Spark入門:DStream轉換操做

1、DStream "無狀態" 轉換操做

何爲「無狀態」?

咱們以前 「套接字流」 部分介紹的詞頻統計,就是採用無狀態轉換;

每次統計,都是隻統計當前批次到達的單詞的詞頻,和以前批次無關,不會進行累計

 

下面給出一些無狀態轉換操做的含義:

* map(func) :對源DStream的每一個元素,採用func函數進行轉換,獲得一個新的DStream;
* flatMap(func): 與map類似,可是每一個輸入項可用被映射爲0個或者多個輸出項;
* filter(func): 返回一個新的DStream,僅包含源DStream中知足函數func的項;
* repartition(numPartitions): 經過建立更多或者更少的分區改變DStream的並行程度;
* union(otherStream): 返回一個新的DStream,包含源DStream和其餘DStream的元素;
* count():統計源DStream中每一個RDD的元素數量;
* reduce(func):利用函數func彙集源DStream中每一個RDD的元素,返回一個包含單元素RDDs的新DStream;
* countByValue():應用於元素類型爲K的DStream上,返回一個(K,V)鍵值對類型的新DStream,每一個鍵的值是在原DStream的每一個RDD中的出現次數;
* reduceByKey(func, [numTasks]):當在一個由(K,V)鍵值對組成的DStream上執行該操做時,返回一個新的由(K,V)鍵值對組成的DStream,每個key的值均由給定的recuce函數(func)彙集起來;
* join(otherStream, [numTasks]):當應用於兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對),返回一個包含(K, (V, W))鍵值對的新DStream;
* cogroup(otherStream, [numTasks]):當應用於兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對),返回一個包含(K, Seq[V], Seq[W])的元組;
* transform(func):經過對源DStream的每一個RDD應用RDD-to-RDD函數,建立一個新的DStream。支持在新的DStream中作任何RDD操做。

 

 

2、DStream "有狀態" 轉換操做 

基於滑動窗口的轉換

下面給給出一些窗口轉換操做的含義:

* window(windowLength, slideInterval):基於源DStream產生的窗口化的批數據,計算獲得一個新的DStream;
* countByWindow(windowLength, slideInterval):返回流中元素的一個滑動窗口數;
* reduceByWindow(func, windowLength, slideInterval):返回一個單元素流。利用函數func彙集滑動時間間隔的流的元素建立這個單元素流。函數func必須知足結合律,從而能夠支持並行計算;
* reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):應用到一個(K,V)鍵值對組成的DStream上時,會返回一個由(K,V)鍵值對組成的新的DStream。每個key的值均由給定的reduce函數(func函數)進行聚合計算。注意:在默認狀況下,這個算子利用了Spark默認的併發任務數去分組。能夠經過numTasks參數的設置來指定不一樣的任務數;
* reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):更加高效的reduceByKeyAndWindow,每一個窗口的reduce值,是基於先前窗口的reduce值進行增量計算獲得的;它會對進入滑動窗口的新數據進行reduce操做,並對離開窗口的老數據進行「逆向reduce」操做。可是,只能用於「可逆reduce函數」,即那些reduce函數都有一個對應的「逆向reduce函數」(以InvFunc參數傳入);
* countByValueAndWindow(windowLength, slideInterval, [numTasks]):當應用到一個(K,V)鍵值對組成的DStream上,返回一個由(K,V)鍵值對組成的新的DStream。每一個key的值都是它們在滑動窗口中出現的頻率。

 

逆操做,如何減小計算量

參考:http://www.javashuo.com/article/p-ghrpdumz-mv.html

問:reduceByWindow() 和 reduceByKeyAndWindow() 讓咱們能夠對每一個窗口更高效地進行歸約操做

它們接收一個歸約函數,在整個窗口上執行,好比 +。除此之外,它們還有一種特殊形式,經過只考慮新進入窗口的數據和離開窗 口的數據,讓 Spark 增量計算歸約結果

這種特殊形式須要提供歸約函數的一個逆函數,比 如 + 對應的逆函數爲 -。對於較大的窗口,提供逆函數能夠大大提升執行效率。

[左圖] 本來的意圖是」從新計算這個窗口」,但沒有必要。

[右圖] 只計算「增量」。

 

代碼展現:

歸約操做在以下代碼中的體現:30表明窗口大小,10是步長;

兩個lambda中的y的理解,第一個表明「新增」,第二個表明「淘汰出窗口「的內容。

 

「數據源」終端:

nc -lk 9999

 

「流計算」終端:

/usr/local/spark/bin/spark-submit  \
> WindowedNetworkWordCount.py localhost 9999

 

追蹤狀態變化的轉換

經過 updateStateByKey() 操做實現歷史狀態不斷地累加,第二個參數last_sum至關於static變量。

經過 ssc.checkpoint() 聲明一個檢查點,防止數據丟失

 

代碼展現:

from __future__ import print_function
 
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
 
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
sc
= SparkContext(appName="PythonStreamingStatefulNetworkWordCount") ssc = StreamingContext(sc, 1)
ssc.checkpoint(
"file:///usr/local/spark/mycode/streaming/")
# RDD with initial state (key, value) pairs initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)]) def updateFunc(new_values, last_sum): return sum(new_values) + (last_sum or 0)  # 第一次時last_sum不存在的,就使用0
# 建立「套接字流」 lines
= ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts
= lines.flatMap(lambda line: line.split(" "))\ .map(lambda word: (word, 1))\   .updateStateByKey(updateFunc, initialRDD=initialStateRDD)  # <--- 跨批次的狀態累加   
# map以後,按正常來說用reduceByKey(),是「無狀態」模式;
# (hadoop,1), (hadoop,1), (spark,1), (spark,1)
# (hadoop,2), (spark,2)

# 將結果保存在文本或打印出
running_counts.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/output.txt")
running_counts.pprint()
 
    ssc.start()
    ssc.awaitTermination()

 

"數據源" 終端:

nc -lk 9999

測試「跨批次」狀態維護,因此前後輸入若干句子,詞頻結果可以「跨批次」累計。

 

「流計算」 終端:

/usr/local/spark/bin/spark-submit  \
NetworkWordCountStateful.py localhost 9999

 

 

 

  

DStream輸出操做


1、輸出到文本

流計算的結果保存到文本文件中去,放在updateStateByKey以後。

running_counts.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/output.txt")

 

 

2、寫入數據庫

數據庫方面的操做。

service mysql start
mysql -u root -p
# 提示輸入密碼
mysql
> use spark mysql> create table wordcount(word char(20), count int(4));

爲了讓MySQL支持Python,須要安裝 PyMySQL

 

流計算的結果保存到數據庫中去,放在running_counts.pprint() 以後。

# 寫入操做: each partition
def
dbfunc(records):   db = pymysql.connect("localhost", "root", "123456", "spark")   cursor = db.cursor()
  
def doinsert(p):     sql = "insert into wordcount(word,count) values ('%s','%s')" % (str(p)[0]),str(p[1]))     try:       cursor.execute(sql)       db.commit()     except:       db.rollback()
# 每個分區中的每一條記錄,進行sql操做   
for item in records:     doinsert(item) 

def func(rdd):
# 分區太多,可能會併發鏈接數據庫;故,這裏改成3個分區   repartitionedRDD = rdd.repartition(3)
# 處理每個分區   repartitionedRDD.foreachPartition(dbfunc)


# DStream 類型:running_counts,本質上就是一堆rdd的集合,這裏天然就可以挨個遍歷rdd running_counts.foreachRDD(func) ssc.start() ssc.awaitTermination()

  

End.

相關文章
相關標籤/搜索