Spark2.2(三十三):Spark Streaming和Spark Structured Streaming更新broadcast總結(一)

背景:

須要在spark2.2.0更新broadcast中的內容,網上也搜索了很多文章,都在講解spark streaming中如何更新,但沒有spark structured streaming更新broadcast的用法,因而就這幾天進行了反覆測試。通過了一下兩個測試::Spark Streaming更新broadcast、Spark Structured Streaming更新broadcast。html

1)Spark Streaming更新broadcast(可行)

  def sparkStreaming(): Unit = {
    // Create a local StreamingContext with two working thread and batch interval of 1 second.
    // The master requires 2 cores to prevent a starvation scenario.
    val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(15))

    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.socketTextStream(ipAddr, 19999)
    val mro = lines.map(row => {
      val fields = row.split(",")
      Mro(fields(0), fields(1))
    })

    val cellJoinMro = mro.transform(row => {
      if (1 < 3) {
        println("更新broadcast..." + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date()))
        BroadcastWrapper.update(ssc.sparkContext)
      }
      var broadcastCellRes = BroadcastWrapper.getInstance(ssc.sparkContext)
      row.map(row => {
        val int_id: String = row.int_id
        val rsrp: String = row.rsrp
        val findResult: String = String.join(",", broadcastCellRes.value.get(int_id).get)
        val timeStamps: String = String.join(",", findResult)

        CellJoinMro(int_id, rsrp, timeStamps)
      })
    })

    cellJoinMro.print()

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
  }
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast

object BroadcastWrapper {
  @volatile private var instance: Broadcast[Map[String, java.util.List[String]]] = null
  private val baseDir = "/user/my/streaming/test/"

  def loadData(): Map[String, java.util.List[String]] = {
    val files = HdfsUtil.getFiles(baseDir)

    var latest: String = null
    for (key <- files.keySet) {
      if (latest == null) latest = key
      else if (latest.compareTo(key) <= 0) latest = key
    }

    val filePath = baseDir + latest

    val map = HdfsUtil.getFileContent(filePath)
    map
  }

  def update(sc: SparkContext, blocking: Boolean = false): Unit = {
    if (instance != null)
      instance.unpersist(blocking)
    instance = sc.broadcast(loadData())
  }

  def getInstance(sc: SparkContext): Broadcast[Map[String, java.util.List[String]]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.broadcast(loadData)
        }
      }
    }
    instance
  }

}

import java.io.{BufferedReader, InputStreamReader}
import java.text.SimpleDateFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import scala.collection.mutable

object HdfsUtil {
  private val sdf = new SimpleDateFormat("yyyy-MM-dd 00:00:00")

  def getFiles(path: String): mutable.Map[String, String] = {
    val fileItems = new mutable.LinkedHashMap[String, String]
    val fs = FileSystem.get(new Configuration())
    val files = fs.listStatus(new Path(path))
    var pathStr: String = ""
    for (file <- files) {
      if (file.isFile) {
        pathStr = file.getPath().getName()
        fileItems.put(pathStr.split("/")(pathStr.split("/").length - 1), pathStr)
      }
    }

    fs.close()

    fileItems
  }

  def getFileContent(filePath: String): Map[String, java.util.List[String]] = {
    val map = new mutable.LinkedHashMap[String, java.util.List[String]]

    val fs = FileSystem.get(new Configuration())
    val path = new Path(filePath)
    if (fs.exists(path)) {
      val bufferedReader = new BufferedReader(new InputStreamReader(fs.open(path)))
      var line: String = null
      line = bufferedReader.readLine()
      while (line != null) {

        val fields: Array[String] = line.split(",")
        val int_id: String = fields(0)
        val date = new java.util.Date(java.lang.Long.valueOf(fields(2)))
        val time = sdf.format(date)
        System.out.println(line + "(" + time + ")")

        if (!map.keySet.contains(int_id))
          map.put(int_id, new java.util.ArrayList[String])
        map.get(int_id).get.add(time)

        line = bufferedReader.readLine()
      }

      map.toMap
    } else {
      throw new RuntimeException("the file do not exists")
    }
  }
}

測試日誌:java

18/11/19 16:50:15 INFO scheduler.DAGScheduler: Job 2 finished: print at App.scala:59, took 0.080061 s
-------------------------------------------
Time: 1542617415000 ms
-------------------------------------------
CellJoinMro(2,333,2018-11-05 00:00:00)
。。。。
18/11/19 16:50:15 INFO storage.BlockManagerInfo: Removed input-0-1542617392400 on 10.60.0.11:1337 in memory (size: 12.0 B, free: 456.1 MB)
》》》》》》》》》》》》》》》》此時路徑上傳新資源文件》》》》》》》》》》》》》》》》》》》》》》
更新broadcast...2018-11-19 16:50:30
。。。
1,111,1541433600000(2018-11-06 00:00:00)
2,222,1541433600000(2018-11-06 00:00:00)
3,333,1541433600000(2018-11-06 00:00:00)
18/11/19 16:50:30 INFO memory.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 688.0 B, free 456.1 MB)
。。
18/11/19 16:50:30 INFO scheduler.JobScheduler: Starting job streaming job 1542617430000 ms.0 from job set of time 1542617430000 ms
-------------------------------------------
Time: 1542617430000 ms
-------------------------------------------

18/11/19 16:50:30 INFO scheduler.JobScheduler: Finished job streaming job 1542617430000 ms.0 from job set of time 1542617430000 ms
。。。。
18/11/19 16:50:32 WARN storage.BlockManager: Block input-0-1542617432400 replicated to only 0 peer(s) instead of 1 peers
18/11/19 16:50:32 INFO receiver.BlockGenerator: Pushed block input-0-1542617432400
更新broadcast...2018-11-19 16:50:45
1,111,1541433600000(2018-11-06 00:00:00)
2,222,1541433600000(2018-11-06 00:00:00)
3,333,1541433600000(2018-11-06 00:00:00)
18/11/19 16:50:45 INFO memory.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 688.0 B, free 456.1 MB)
。。。。
18/11/19 16:50:45 INFO scheduler.DAGScheduler: Job 3 finished: print at App.scala:59, took 0.066975 s
-------------------------------------------
Time: 1542617445000 ms
-------------------------------------------
CellJoinMro(3,4444,2018-11-06 00:00:00)

18/11/19 16:50:45 INFO scheduler.JobScheduler: Finished job streaming job 1542617445000 ms.0 from job set of time 1542617445000 ms
18/11/19 16:50:45 INFO scheduler.JobScheduler: Total delay: 0.367 s for time 1542617445000 ms (execution: 0.083 s)
18/11/19 16:50:45 INFO rdd.MapPartitionsRDD: Removing RDD 9 from persistence list

日誌分析:sql

每一個batch都執行transform中的更新broadcast代碼,並且也執行了broadcast獲取代碼。所以,每次均可進行更新broadcast內容,而且獲取到broadcast中的內容。apache

2)Spark Structured Streaming更新broadcast(不可行【可行】

目前測試可行請參考《Spark2.3(四十二):Spark Streaming和Spark Structured Streaming更新broadcast總結(二)app

 def sparkStructuredStreaming(): Unit = {
    val spark = SparkSession.builder.appName("Test_Broadcast_ByScala_App").getOrCreate()
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
        println("*************** onQueryStarted  ***************")
      }

      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println("*************** onQueryProgress  ***************")
        //        這段代碼能夠把broadcast對象更新成功,可是spark structured streaming內部讀取到的broadcast對象數據依然是老數據。
        //        BroadcastWrapper.update(spark.sparkContext, true)
        println("*************** onQueryProgress update broadcast " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date()))

      }

      override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
        println("*************** onQueryTerminated  ***************")
      }
    })
    // Create DataFrame representing the stream of input lines from connection to localhost:9999
    val lines = spark.readStream.format("socket").option("host", ipAddr).option("port", 19999).load()

    import spark.implicits._
    val mro = lines.as(Encoders.STRING).map(row => {
      val fields = row.split(",")
      Mro(fields(0), fields(1))
    })

    val cellJoinMro = mro.transform(row => {
      //      這段代碼在第一次觸發時執行,以後觸發就再也不執行。
      println("更新broadcast..." + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date()))
      if (1 < 3) {
        println("------------------------1111-----------------------------")
        BroadcastWrapper.update(spark.sparkContext)
      }
      var broadcastCellRes = BroadcastWrapper.getInstance(spark.sparkContext)
      row.map(row => {
        val int_id: String = row.int_id
        val rsrp: String = row.rsrp
        val findResult: String = String.join(",", broadcastCellRes.value.get(int_id).get)
        val timeStamps: String = String.join(",", findResult)

        CellJoinMro(int_id, rsrp, timeStamps)
      })
    })

    val query = cellJoinMro.writeStream.format("console")
      .outputMode("update")
      .trigger(Trigger.ProcessingTime(15, TimeUnit.SECONDS))
      .start()

    query.awaitTermination()
  }

執行日誌:socket

18/11/19 17:12:49 INFO state.StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
18/11/19 17:12:50 WARN streaming.TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
更新broadcast...2018-11-19 17:12:51
------------------------1111-----------------------------
1,111,1541347200000(2018-11-05 00:00:00)
2,222,1541347200000(2018-11-05 00:00:00)
3,333,1541347200000(2018-11-05 00:00:00)
.....
-------------------------------------------
Batch: 0
-------------------------------------------
18/11/19 17:13:03 INFO codegen.CodeGenerator: Code generated in 82.760622 ms
。。。。
18/11/19 17:13:19 INFO scheduler.DAGScheduler: Job 4 finished: start at App.scala:109, took 4.215709 s
+------+----+-------------------+
|int_id|rsrp|          timestamp|
+------+----+-------------------+
|     1|  22|2018-11-05 00:00:00|
+------+----+-------------------+

18/11/19 17:14:00 INFO streaming.StreamExecution: Committed offsets for batch 1. Metadata OffsetSeqMetadata(0,1542618840003,Map(spark.sql.shuffle.partitions -> 600))

此時更新資源文件,附加2018-11-06的資源文件。
-------------------------------------------
Batch: 1
-------------------------------------------
18/11/19 17:14:00 INFO spark.SparkContext: Starting job: start at App.scala:109
。。。
18/11/19 17:14:05 INFO scheduler.DAGScheduler: Job 9 finished: start at App.scala:109, took 3.068106 s
+------+----+-------------------+
|int_id|rsrp|          timestamp|
+------+----+-------------------+
|     2| 333|2018-11-05 00:00:00|
+------+----+-------------------+

日誌分析:ide

測試結論:

Spark Streaming更新broadcast(可行)、Spark Structured Streaming更新broadcast(不可行也可行,若是按照上邊spark streaming的方法是不行的,可是有其餘方案),緣由Spark Streaming的執行引擎是Spark Engine,是代碼執行,在算子的構造函數中能夠訪問SparkContext,SparkSession,並且這些類構造函數是能夠每次都執行的。函數

而Spark Structured Streaming的執行引擎是Spark Sql Engine,是把代碼優化爲Spark Sql Engine但願的格式去執行,不能夠在每次trigger事件觸發都執行執行塊之外的代碼,所以這些類構造函數塊代碼只能執行一次,執行塊相似MapFunction的call()函數內,不容許訪問SparkContext,SparkSession對象,所以無處進行每次trigger都進行broadcast更新。oop

那麼,如何在Spark Struectured Streaming中實現更新broadcast的方案,升級spark版本,從spark2.3.0開始,spark structured streaming支持了stream join stream(請參考《Spark2.3(三十七):Stream join Stream(res文件天天更新一份)》)。post

實際上,@2019-03-27測試結果中能夠獲得新的方案,也是使用broadcast方式更新獲得方案。目前測試可行請參考《Spark2.3(四十二):Spark Streaming和Spark Structured Streaming更新broadcast總結(二)

相關文章
相關標籤/搜索