package com.spark.train import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /* spark streaming 有狀態的WordCount updateStateByKey */ object statefulWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("updateStateByKey for wordCount") val ssc = new StreamingContext(conf, Seconds(4)) // need a checkPoint ssc.checkpoint("/opt/datas/spark_data/updateStateByKey1") val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")).map((_, 1)) val wordCount = words.updateStateByKey[Int](updateFunc _) wordCount.print() ssc.start() ssc.awaitTermination() } /** updateStateByKey update function * currentValues: new count * preValues: previous count * * Some(): som */ def updateFunc(currentValues:Seq[Int], preValuse:Option[Int]):Option[Int] = { val newCount = currentValues.sum val preCount = preValuse.getOrElse(0) Some(newCount + preCount) } }
package com.spark.train import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 黑名單過濾 */ object filterBlackListSparkStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("filter black list") val ssc = new StreamingContext(conf, Seconds(5)) /** * 黑名單轉 RDD * */ val blackList = List("ls","zs") val rddBlackList = ssc.sparkContext.parallelize(blackList).map(x => (x, true)) val logs = ssc.socketTextStream("localhost", 9999) .map(x => (x.split(",")(1), x)) .transform(rdd => { rdd.leftOuterJoin(rddBlackList) .filter(x => x._2._2.getOrElse(false) != true) .map(x => x._2._1) }) logs.print() ssc.start() ssc.awaitTermination() } }
package com.spark.test import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils object flumeSparkStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("flumeDstream") val ssc = new StreamingContext(conf, Seconds(5)) /** 第一種方式 * define flumeDstream * 先啓動spark streaming ,在啓動flume,最好telnet 開始wordCount */ val flumeDstream = FlumeUtils.createStream(ssc, "bigdata", 3333) val wordCnt = flumeDstream.map(x => new String(x.event.getBody.array()).trim()) .flatMap(_.split(" ")) .map((_,1)).reduceByKey(_ + _) wordCnt.print() /**第二種方式 * createPollingStream * 須要從新設置flume sink爲 org.apache.spark.streaming.flume.sink.SparkSink * 其餘配置同樣,須要先啓動flume,數據會緩存,而後streaming 去pull 數據 */ // val XXX = FlumeUtils.createPollingStream(ssc, "bigdata", 3333) // .map(x => new String(x.event.getBody.array()).trim) // .flatMap(_.split("")) // .map((_, 1)) // .reduceByKey(_ + _) ssc.start() ssc.awaitTermination() } }