spark做業

假定用戶有某個週末網民網購停留時間的日誌文本,基於某些業務要求,要求開發apache

Spark應用程序實現以下功能:app

一、實時統計連續網購時間超過半個小時的女性網民信息。spa

二、週末兩天的日誌文件第一列爲姓名,第二列爲性別,第三列爲本次停留時間,單日誌

位爲分鐘,分隔符爲「,」。code

數據:server

log1.txt:週六網民停留日誌blog

 

LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60

 

log2.txt:週日網民停留日誌開發

 

LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60

 

統計日誌文件中本週末網購停留總時間超過2個小時的女性網民信息。get

一、接收Kafka中數據,生成相應DStream。kafka

二、篩選女性網民上網時間數據信息。

三、彙總在一個時間窗口內每一個女性上網時間。

四、篩選連續上網時間超過閾值的用戶,並獲取結果。

 

 

1.啓動zk

./zkServer.sh start

2.啓動Kafka

 ./kafka-server-start.sh /root/apps/kafka/config/server.properties

3.建立topic

[root@mini3 kafka]# bin/kafka-console-producer.sh --broker-list mini1:9092 --topic sparkhomework-test

4.生產數據

代碼

 

package org.apache.spark

import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * Created by Administrator on 2019/6/13.
  */
object SparkHomeWork {
  val updateFunction = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
    iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(v => (x, v)) }
  }

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkHomeWork")
    val ssc = new StreamingContext(conf, Seconds(5))
    //將回滾點寫到hdfs
    ssc.checkpoint("hdfs://mini1:9000/kafkatest")

    val Array(zkQuorum, groupId, topics, numThreads) = Array[String]("mini1:2181,mini2:2181,mini3:2181", "g1", "sparkhomework-test", "2")
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

    val lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap).map(_._2)

    //篩選女性網民上網時間數據信息
    val data = lines.flatMap(_.split(" ")).filter(_.contains("female"))
    //彙總每一個女性上網時間
    val femaleData: DStream[(String, Int)] = data.map { line =>
      val t = line.split(',')
      (t(0), t(2).toInt)
    }.reduceByKey(_ + _)
    //篩選出時間大於兩個小時的女性網民信息,並輸出
    val results = femaleData.filter(line => line._2 > 120).updateStateByKey(updateFunction, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }

}

 

打印結果:

相關文章
相關標籤/搜索