假定用戶有某個週末網民網購停留時間的日誌文本,基於某些業務要求,要求開發apache
Spark應用程序實現以下功能:app
一、實時統計連續網購時間超過半個小時的女性網民信息。spa
二、週末兩天的日誌文件第一列爲姓名,第二列爲性別,第三列爲本次停留時間,單日誌
位爲分鐘,分隔符爲「,」。code
數據:server
log1.txt:週六網民停留日誌blog
LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
log2.txt:週日網民停留日誌開發
LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
統計日誌文件中本週末網購停留總時間超過2個小時的女性網民信息。get
一、接收Kafka中數據,生成相應DStream。kafka
二、篩選女性網民上網時間數據信息。
三、彙總在一個時間窗口內每一個女性上網時間。
四、篩選連續上網時間超過閾值的用戶,並獲取結果。
1.啓動zk
./zkServer.sh start
2.啓動Kafka
./kafka-server-start.sh /root/apps/kafka/config/server.properties
3.建立topic
[root@mini3 kafka]# bin/kafka-console-producer.sh --broker-list mini1:9092 --topic sparkhomework-test
4.生產數據
代碼
package org.apache.spark import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils /** * Created by Administrator on 2019/6/13. */ object SparkHomeWork { val updateFunction = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(v => (x, v)) } } def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("SparkHomeWork") val ssc = new StreamingContext(conf, Seconds(5)) //將回滾點寫到hdfs ssc.checkpoint("hdfs://mini1:9000/kafkatest") val Array(zkQuorum, groupId, topics, numThreads) = Array[String]("mini1:2181,mini2:2181,mini3:2181", "g1", "sparkhomework-test", "2") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap).map(_._2) //篩選女性網民上網時間數據信息 val data = lines.flatMap(_.split(" ")).filter(_.contains("female")) //彙總每一個女性上網時間 val femaleData: DStream[(String, Int)] = data.map { line => val t = line.split(',') (t(0), t(2).toInt) }.reduceByKey(_ + _) //篩選出時間大於兩個小時的女性網民信息,並輸出 val results = femaleData.filter(line => line._2 > 120).updateStateByKey(updateFunction, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) results.print() ssc.start() ssc.awaitTermination() } }
打印結果: