SparkStructuredStreaming 的例子

在spark2.X版本後,新增了一個更高級的接口結構化流。html

Structured Streaming (結構化流)是一種基於 Spark SQL 引擎構建的可擴展且容錯的 stream processing engine (流處理引擎)。您能夠以靜態數據表示批量計算的方式來表達 streaming computation (流式計算)。 Spark SQL 引擎將隨着 streaming data 持續到達而增量地持續地運行,並更新最終結果。您能夠使用 Scala , Java , Python 或 R 中的 Dataset/DataFrame API 來表示 streaming aggregations (流聚合), event-time windows (事件時間窗口), stream-to-batch joins (流到批處理鏈接) 等。在同一個 optimized Spark SQL engine (優化的 Spark SQL 引擎)上執行計算。最後,系統經過 checkpointing (檢查點) 和 Write Ahead Logs (預寫日誌)來確保 end-to-end exactly-once (端到端的徹底一次性) 容錯保證。簡而言之,Structured Streaming 提供快速,可擴展,容錯,end-to-end exactly-once stream processing (端到端的徹底一次性流處理),而無需用戶理解 streaming 。sql

理論就很少說了,直接上樣例:apache

測試數據:demo_data1 json

{"name":"xiaoming","age":25}
{"name":"xiaohong","age":18}
{"name":"xiaobai","age":19}
{"name":"xiaoan","age":17}
/Users/dongdong/Desktop/spark_contact/contact/data_test/structured_data/  這是目錄

代碼:windows

package structstreaming
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/**
  * Created by dongdong on 18/1/25.
  */
object StructuredStreamingDemo {

  def main(args: Array[String]): Unit = {

    //    建立sparksession
    val spark = SparkSession
      .builder
      .master("local")
      .appName("StructuredStreamingDemo")
      .getOrCreate()

    import spark.implicits._

    //  定義schema
    val userSchema = new StructType().add("name", "string").add("age", "integer")

    // 讀取數據
    val dataFrameStream = spark
      .readStream
      .schema(userSchema)
      .format("json")
      //只能是監控目錄,當新的目錄進來時,再進行計算
      .load("/Users/dongdong/Desktop/spark_contact/contact/data_test/structured_data/")


    //dataFrameStream.isStreaming
    //註冊成一張表
    dataFrameStream.createOrReplaceTempView("Person")

    // 對錶進行agg 若是沒有agg的話,直接報錯
    val aggDataFrame = spark.sql(
      """
        |select
        |name,
        |age,
        |count(1) as cnt
        |from
        |Person
        |group by
        |name,age
      """.stripMargin
    )

    //在控制檯監控
    val query = aggDataFrame.writeStream
      //complete,append,update。目前只支持前面兩種
      .outputMode("complete")
      //console,parquet,memory,foreach 四種
      .format("console")
      //這裏就是設置定時器了
      .trigger(ProcessingTime(100))
      .start()

    query.awaitTermination()

  }
}

sparkstructuredstreaming 監控file的時候,只能是有新的文件進來後再計算,無法計算相同的文件,即便這個文件發生了改變session

最後,執行程序的時候,我在那個路徑下,再添加一個文件app

demo_data2ide

{"name":"xiaoming","age":25}
{"name":"xiaohong","age":18}
{"name":"xiaobai","age":19}
{"name":"xiaoan","age":17}

執行以下:測試

第一個batch是計算第一文件,優化

第二個batch是計算兩個完整的文件的數量,而後cnt出現2次

相關文章
相關標籤/搜索