SparkStreaming updateStateByKey 保存記錄信息

object SparkStreaming_StateFul {

def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val conf = new SparkConf().setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
.set("spark.executor.memory", "2g")
.set("spark.cores.max", "8")
.setJars(Array("E:\\ScalaSpace\\Spark_Streaming\\out\\artifacts\\Spark_Streaming.jar"))
val context = new SparkContext(conf)

val updateFunc = (values : Seq[Int],state : Option[Int]) => {
val currentCount = values.foldLeft(0)(_+_)
val previousCount = state.getOrElse(0)
查看是否存在,若是存在直接獲取
Some(currentCount + previousCount)
}

//step1 create streaming context
val ssc = new StreamingContext(context,Seconds(10))
ssc.checkpoint(
".")


//step2 create a networkInputStream on get ip:port and count the words in input stream of \n delimited text
val lines = ssc.socketTextStream("218.193.154.79",12345)

val data = lines.flatMap(_.split(" "))
val wordDstream = data.map(x => (x,1))

//使用updateStateByKey 來更新狀態
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)

stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}

ssc.checkPoint 若是在集羣上運行會報出以下的錯誤:
org.apache.spark.SparkException: Checkpoint RDD ReliableCheckpointRDD[9] at print at SparkStreaming_StateFul.scala:43(0) has different number of partitions from original RDD MapPartitionsRDD[8] at updateStateByKey at SparkStreaming_StateFul.scala:41(2)
	at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:73)
	at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:74)

這是由於沒有將文件保存到hdfs環境中致使的






相關文章
相關標籤/搜索