在spark2.X版本後,新增了一個更高級的接口結構化流。html
Structured Streaming (結構化流)是一種基於 Spark SQL 引擎構建的可擴展且容錯的 stream processing engine (流處理引擎)。您能夠以靜態數據表示批量計算的方式來表達 streaming computation (流式計算)。 Spark SQL 引擎將隨着 streaming data 持續到達而增量地持續地運行,並更新最終結果。您能夠使用 Scala , Java , Python 或 R 中的 Dataset/DataFrame API 來表示 streaming aggregations (流聚合), event-time windows (事件時間窗口), stream-to-batch joins (流到批處理鏈接) 等。在同一個 optimized Spark SQL engine (優化的 Spark SQL 引擎)上執行計算。最後,系統經過 checkpointing (檢查點) 和 Write Ahead Logs (預寫日誌)來確保 end-to-end exactly-once (端到端的徹底一次性) 容錯保證。簡而言之,Structured Streaming 提供快速,可擴展,容錯,end-to-end exactly-once stream processing (端到端的徹底一次性流處理),而無需用戶理解 streaming 。sql
理論就很少說了,直接上樣例:apache
測試數據:demo_data1 json
{"name":"xiaoming","age":25} {"name":"xiaohong","age":18} {"name":"xiaobai","age":19} {"name":"xiaoan","age":17}
/Users/dongdong/Desktop/spark_contact/contact/data_test/structured_data/ 這是目錄
代碼:windows
package structstreaming import org.apache.spark.sql.SparkSession import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} /** * Created by dongdong on 18/1/25. */ object StructuredStreamingDemo { def main(args: Array[String]): Unit = { // 建立sparksession val spark = SparkSession .builder .master("local") .appName("StructuredStreamingDemo") .getOrCreate() import spark.implicits._ // 定義schema val userSchema = new StructType().add("name", "string").add("age", "integer") // 讀取數據 val dataFrameStream = spark .readStream .schema(userSchema) .format("json") //只能是監控目錄,當新的目錄進來時,再進行計算 .load("/Users/dongdong/Desktop/spark_contact/contact/data_test/structured_data/") //dataFrameStream.isStreaming //註冊成一張表 dataFrameStream.createOrReplaceTempView("Person") // 對錶進行agg 若是沒有agg的話,直接報錯 val aggDataFrame = spark.sql( """ |select |name, |age, |count(1) as cnt |from |Person |group by |name,age """.stripMargin ) //在控制檯監控 val query = aggDataFrame.writeStream //complete,append,update。目前只支持前面兩種 .outputMode("complete") //console,parquet,memory,foreach 四種 .format("console") //這裏就是設置定時器了 .trigger(ProcessingTime(100)) .start() query.awaitTermination() } }
sparkstructuredstreaming 監控file的時候,只能是有新的文件進來後再計算,無法計算相同的文件,即便這個文件發生了改變session
最後,執行程序的時候,我在那個路徑下,再添加一個文件app
demo_data2ide
{"name":"xiaoming","age":25} {"name":"xiaohong","age":18} {"name":"xiaobai","age":19} {"name":"xiaoan","age":17}
執行以下:測試
第一個batch是計算第一文件,優化
第二個batch是計算兩個完整的文件的數量,而後cnt出現2次