[Spark] 06 - Structured Streaming

基本瞭解

響應更快,對過去的架構進行了全新的設計和處理。sql

核心思想:將實時數據流視爲一張正在不斷添加數據的表,參見Spark SQL's DataFrame。編程

 

1、微批處理(默認)

寫日誌操做 保證一致性。json

由於要寫入日誌操做,每次進行微批處理以前,都要先把當前批處理的數據的偏移量要先寫到日誌裏面去。架構

如此,就帶來了微小的延遲。app

數據到達 和 獲得處理 並輸出結果 之間的延時超過100毫秒。 異步

 

2、持續批處理

例如:"欺詐檢測",在100ms以內判斷盜刷行爲,並給予制止。socket

由於 「異步」 寫入日誌,因此致使:至少處理一次,不能保證「僅被處理一次」。oop

Spark SQL 只能處理靜態處理。測試

Structured Streaming 能夠處理數據流。ui

 

 

 

Structured Streaming 編程

1、基本步驟

 

2、demo 示範

代碼展現

統計每一個單詞出現的頻率。

from pyspark.sql import SparkSession
from pyspark.sql.functions import split from pyspark.sql.functions import explode if __name__ == "__main__":
  spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()   spark.sparkContext.setLogLevel('WARN')
  
# 建立一個輸入數據源,相似"套接子流",只是「相似」。   lines = spark.readStream.format("socket").option("host」, 「localhost").option("port", 9999).load()
  # Explode獲得一個DataFrame,一個單詞變爲一行;   # 再給DataFrame這列的title設置爲 "word";   # 根據word這一列進行分組詞頻統計,獲得「每一個單詞到底出現了幾回。
  words = lines.select( explode( split( lines.value, " " ) ).alias("word") )
  wordCounts
= words.groupBy("word").count()   # <--- 獲得結果
  # 啓動流計算並輸出結果
  query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="8 seconds").start()
  query.awaitTermination()

程序要依賴於Hadoop HDFS。

$ cd /usr/local/hadoop
$ sbin/start-dfs.sh

 

新建」數據源「終端

$ nc -lk 9999

 

新建」流計算「終端

$ /usr/local/spark/bin/spark-submit StructuredNetworkWordCount.py

 

 

 

輸入源

1、File 輸入源

(1) 建立程序生成JSON格式的File源測試數據

例如,對Json格式文件進行內容統計。目錄下面有1000 json files,格式以下:

 

(2) 建立程序對數據進行統計

import os
import shutil
from pprint import pprint

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, asc
from pyspark.sql.types import StructType, StructField from pyspark.sql.types import TimestampType, StringType

TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'

if __name__ == "__main__":

  # 定義模式
  schema = StructType([
    StructField("eventTime" TimestampType(), True), 
    StructField("action",   StringType(),    True),
    StructField("district", StringType(),    True) ])

  spark = SparkSession.builder.appName("StructuredEMallPurchaseCount").getOrCreate()
  spark.sparkContext.setLogLevel("WARN")

  lines = spark.readStream.format("json").schema(schema).option("maxFilesPerTrigger", 100).load(TEST_DATA_DIR_SPARK)

  # 定義窗口
  windowDuration = '1 minutes'
  windowedCounts = lines.filter("action = 'purchase'")  \
            .groupBy('district', window('eventTime', windowDuration))  \
            .count()  \
            .sort(asc('window''))
   
# 啓動流計算   query = windowedCounts  \     .writeStream  \     .outputMode("complete")  \     .format("console")  \     .option('truncate', 'false')  \     .trigger(processingTime = "10 seconds")  \  # 每隔10秒,執行一次流計算     .start()   query.awaitTermination()

 

(3) 測試運行程序

a. 啓動 HDFS

$ cd /usr/local/hadoop
$ sbin/start-dfs.sh

b. 運行數據統計程序

/usr/local/spark/bin/spark-submit spark_ss_filesource.py

c. 運行結果

 

 

2、Socket源和 Rate源

(由於只能r&d,不能生產時間,故,這裏暫時略)

通常不用於生產模式,實驗測試模式卻是能夠。

from pyspark.sql import SparkSession

if __name__ == "__main__":

  spark = SparkSession.builder.appName("TestRateStreamSource").getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

緊接着是下面的程序:

# 每秒鐘發送五行,屬於rate源;

# query 表明了流計算啓動模式;

 

運行程序

$ /usr/local/spark/bin/spark-submit spark_ss_rate.py

 

 

  

輸出操做

1、啓動流計算

writeStream()方法將會返回DataStreamWrite接口。

query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="8 seconds").start() 

 

輸出 outputMode 模式

 

接收器 format 類型

系統內置的輸出接收器包括:File, Kafka, Foreach, Console (debug), Memory (debug), etc。

 

生成parquet文件

能夠考慮讀取後轉化爲DataFrame;或者使用strings查看文件內容。

代碼展現:StructuredNetworkWordCountFileSink.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import split from pyspark.sql.functions import explode from pyspark.sql.functions import length

只要長度爲5的dataframe,也就是單詞長度都是5。

 

"數據源" 終端

# input string to simulate stream.
nc -lk 9999

  

"流計算" 終端

/usr/local/spark/bin/spark-submit StructuredNetworkWordCountFileSink.py

 

End.

相關文章
相關標籤/搜索