響應更快,對過去的架構進行了全新的設計和處理。sql
核心思想:將實時數據流視爲一張正在不斷添加數據的表,參見Spark SQL's DataFrame。編程
寫日誌操做 保證一致性。json
由於要寫入日誌操做,每次進行微批處理以前,都要先把當前批處理的數據的偏移量要先寫到日誌裏面去。架構
如此,就帶來了微小的延遲。app
數據到達 和 獲得處理 並輸出結果 之間的延時超過100毫秒。 異步
例如:"欺詐檢測",在100ms以內判斷盜刷行爲,並給予制止。socket
由於 「異步」 寫入日誌,因此致使:至少處理一次,不能保證「僅被處理一次」。oop
Spark SQL 只能處理靜態處理。測試
Structured Streaming 能夠處理數據流。ui
統計每一個單詞出現的頻率。
from pyspark.sql import SparkSession from pyspark.sql.functions import split from pyspark.sql.functions import explode if __name__ == "__main__":
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate() spark.sparkContext.setLogLevel('WARN')
# 建立一個輸入數據源,相似"套接子流",只是「相似」。 lines = spark.readStream.format("socket").option("host」, 「localhost").option("port", 9999).load()
# Explode獲得一個DataFrame,一個單詞變爲一行; # 再給DataFrame這列的title設置爲 "word"; # 根據word這一列進行分組詞頻統計,獲得「每一個單詞到底出現了幾回。
words = lines.select( explode( split( lines.value, " " ) ).alias("word") )
wordCounts = words.groupBy("word").count() # <--- 獲得結果
# 啓動流計算並輸出結果 query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="8 seconds").start() query.awaitTermination()
程序要依賴於Hadoop HDFS。
$ cd /usr/local/hadoop
$ sbin/start-dfs.sh
$ nc -lk 9999
$ /usr/local/spark/bin/spark-submit StructuredNetworkWordCount.py
(1) 建立程序生成JSON格式的File源測試數據
例如,對Json格式文件進行內容統計。目錄下面有1000 json files,格式以下:
(2) 建立程序對數據進行統計
import os import shutil from pprint import pprint from pyspark.sql import SparkSession from pyspark.sql.functions import window, asc from pyspark.sql.types import StructType, StructField from pyspark.sql.types import TimestampType, StringType TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/' if __name__ == "__main__": # 定義模式 schema = StructType([ StructField("eventTime" TimestampType(), True), StructField("action", StringType(), True), StructField("district", StringType(), True) ]) spark = SparkSession.builder.appName("StructuredEMallPurchaseCount").getOrCreate() spark.sparkContext.setLogLevel("WARN") lines = spark.readStream.format("json").schema(schema).option("maxFilesPerTrigger", 100).load(TEST_DATA_DIR_SPARK) # 定義窗口 windowDuration = '1 minutes' windowedCounts = lines.filter("action = 'purchase'") \ .groupBy('district', window('eventTime', windowDuration)) \ .count() \ .sort(asc('window''))
# 啓動流計算 query = windowedCounts \ .writeStream \ .outputMode("complete") \ .format("console") \ .option('truncate', 'false') \ .trigger(processingTime = "10 seconds") \ # 每隔10秒,執行一次流計算 .start() query.awaitTermination()
(3) 測試運行程序
a. 啓動 HDFS
$ cd /usr/local/hadoop
$ sbin/start-dfs.sh
b. 運行數據統計程序
/usr/local/spark/bin/spark-submit spark_ss_filesource.py
c. 運行結果
(由於只能r&d,不能生產時間,故,這裏暫時略)
通常不用於生產模式,實驗測試模式卻是能夠。
from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.appName("TestRateStreamSource").getOrCreate() spark.sparkContext.setLogLevel('WARN')
緊接着是下面的程序:
# 每秒鐘發送五行,屬於rate源;
# query 表明了流計算啓動模式;
運行程序
$ /usr/local/spark/bin/spark-submit spark_ss_rate.py
writeStream()方法將會返回DataStreamWrite接口。
query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="8 seconds").start()
輸出 outputMode 模式
接收器 format 類型
系統內置的輸出接收器包括:File, Kafka, Foreach, Console (debug), Memory (debug), etc。
生成parquet文件
能夠考慮讀取後轉化爲DataFrame;或者使用strings查看文件內容。
代碼展現:StructuredNetworkWordCountFileSink.py
from pyspark.sql import SparkSession from pyspark.sql.functions import split from pyspark.sql.functions import explode from pyspark.sql.functions import length
只要長度爲5的dataframe,也就是單詞長度都是5。
"數據源" 終端
# input string to simulate stream.
nc -lk 9999
"流計算" 終端
/usr/local/spark/bin/spark-submit StructuredNetworkWordCountFileSink.py
End.