大數據技術之_26_交通狀態預測項目_01_數據模擬 + 數據採集 + 數據建模 + 數據預測 + 項目總結

一 項目背景二 項目架構與環境2.1 項目架構2.2 項目環境三 項目實現3.1 數據模擬3.1.1 數據結構3.1.2 編寫代碼3.1.3 測試3.2 數據採集3.2.1 編寫代碼3.2.2 測試3.3 數據建模3.3.1 編寫代碼3.3.2 測試3.4 數據預測3.4.1 編寫代碼3.4.2 測試四 項目總結php


小段子分享:有我的姓鐵,他不長頭髮,請問他得了什麼病?答:老鐵沒毛病。java

一 項目背景

該項目以車輛預測爲基礎,學習業務解決的方法論。
學習完本項目後,能夠解決以下問題或適用於以下業務場景:
  一、公路堵車預測
  二、地鐵人流量預測
  三、共享單車匯集點預測等等redis

擴展知識:算法

spark-shell --master yarn --conf spark.eventLog.dir=hdfs://hadoop/tmp/spark2 --jars /home/hadoop-SNAPSHOT.jar

智能判斷:
    軌跡:將某一輛車在全部監測點留下的蹤影聚合連線,則爲該車的軌跡。
    跟車:判斷某一輛車是否被跟蹤(此處「跟蹤」爲中性詞彙)等。好比:婚車(判斷是否屬於一個車隊)
    碰撞:這裏不是撞車分析,而是在幾個電子圍欄內(好比,監測點1,監測點2),同一輛車,在某一個時間範圍內,檢測出該車出如今不一樣的監測點。電子圍欄:好比OA打卡。

技偵支持:
    布控:警方輸入布控的車輛信息(車牌號,車型,顏色等等)
    實時報警:符合布控標準,則報警
    套牌分析:相同車牌號,車輛信息不一致
    落腳點:車輛在哪一個區域停留時間長

統計分析:
    流量統計:哪幾個監測點的車流量比較高
    外來車輛統計等等

數據結構示例:
日期            關卡id       攝像id       車輛          發生時間          速度    公路id   區域id(維度=特徵值向量)

2017-04-25      0001        09203       京W47147     2017-04-25 20:58:17     138     49      04
2017-04-25      0005        06975       京W47147     2017-04-25 20:12:39     50      10      06
2017-04-25      0001        02846       京W47147     2017-04-25 20:20:20     214     21      00
2017-04-25      0003        06044       京W47147     2017-04-25 20:15:58     78      47      01
2017-04-25      0000        01599       京W47147     2017-04-25 20:40:58     59      32      01
2017-04-25      0002        09260       京M91266     2017-04-25 09:09:57     105     15      00

一個 Event(事件)至少包含一行數據。
由於 Kafka 是基於事件的。

爲何一個 Event(事件)包含多行數據?
答:咱們將多行數據封裝(打包)成一個 Event,發送給 Kafka,這樣的好處是減小網絡IO。
如何打包呢?
答:使用 json 格式,以下:

{
    "monitor_arr":
    [
        {
            "time""2017-04-25",
            monitor_id:"0001",
            ...
        },
        {
            "time""2017-04-25",
            monitor_id:"0005",
            ...
        },
        ......
    ]
}

有監督學習:有標籤(label)的訓練 --> 建模的過程 --> 求通項公式的過程 --> 求擬合函數的過程 --> 求參數的過程 --> 連續數據,經常使用算法:迴歸算法 --> 線性迴歸、邏輯斯特迴歸
無監督學習:沒有標籤(label)的訓練 --> 離散數據 --> 好比歸類問題,經常使用算法:聚類算法 --> 支持向量機、隨機森林(起源於決策樹,萬能藥)、K-means 算法
半監督學習:一部分有標籤,一部分無標籤。

老羅的錘子手機不賺錢,爲了交個朋友--情懷,賣配件、T恤等賺錢。

平民化的最接近科學/科研 --> 計算機

維度認知:shell

二 項目架構與環境

2.1 項目架構

2.2 項目環境

如下環境爲本次項目使用的情景,並不是只有在此環境下才能夠完成總體業務需求。請靈活變更。(本例已在如下環境中完成測試)數據庫

三 項目實現

咱們新建 java 項目 tf,以後的每個項目模塊都創建於該項目之下。而後刪除掉 src 目錄。apache

3.1 數據模擬

  請確保 zookeeper 和 kafka 的正確配置。
  若是以前安裝的 scala 版本不是 2.11.8 請替換之:json

$ tar -zxf /opt/software/scala-2.11.8.tgz -C /opt/module/

使用 root 用戶,配置環境變量:
[atguigu@hadoop102 module]$ sudo vim /etc/profile

#SCALA_HOME
export SCALA_HOME=/opt/module/scala/scala-2.11.8
export PATH=$PATH:$SCALA_HOME/bin

使環境變量生效:
[atguigu@hadoop102 module]$ sudo source /etc/profile

  咱們須要產生一些監測點的模擬車速數據,並將這些數據實時的發送給 kafka,保存到 traffic 主題中,以供後續的 Spark 讀取數據並加工以後存放於 redis。bootstrap

3.1.1 數據結構

3.1.2 編寫代碼

思路:
  a) 新建模塊 maven 工程:tf_producer
  b) 配置 maven 依賴。
  c) 由於要把數據發送給 kafka,因此配置 kafka 屬性,保存於某個配置文件中。
  d) 編寫 kafka 加載屬性的工具類。
  e) 每隔 5 分鐘,切換一次模擬狀態,例如第一個五分鐘,車速都在 30km/h 以上,下一個五分鐘,車速都在 10km/h 如下,往復模擬公路一會堵車,一會不堵車的狀況。
  f) 啓動 zookeeper 集羣和 kafka 集羣,並建立 kafka 主題,檢查主題存在性。
  g) 將數據發送至 kafka 並使用 kafka console-consumer 進行檢測。vim

1) 新建項目:tf_producer

2) maven 的 pom.xml 文件配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>tf_producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>
    </dependencies>
</project>

3) kafka 屬性配置文件:kafka.properties(生產者)

# 設置 kafka 的 brokerlist
bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 生產者序列化
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

acks=all
retries=0

# 設置消費者所屬的消費組
group.id=g_traffic1

# 設置是否自動確認 offset
enable.auto.commit=true

# 設置自動確認 offset 的時間間隔
auto.commit.interval.ms=30000

# 設置本次消費的主題
kafka.topics=traffic

# 設置 zookeeper 中 follower 和 leader 之間的關於 kafka 的信息同步時間間隔
zookeeper.sync.time.ms=250
num.io.threads=12
batch.size=65536
buffer.memory=524288

# kafka 中消息保存的時間(單位是小時),企業開發中是 7 天
log.retention.hours=2

4) 編寫 kafka 屬性加載工具類:PropertyUtil.scala

package com.atguigu.utils

import java.util.Properties

object PropertyUtil {
  val properties = new Properties()
  // 加載配置屬性
  try {
    val inputStream = ClassLoader.getSystemResourceAsStream("kafka.properties")
    properties.load(inputStream)
  } catch {
    case ex: Exception => println(ex)
  } finally {

  }

  // 定義經過鍵獲得屬性值的方法
  def getProperty(key: String): String = properties.getProperty(key)
}

5) 開始模擬數據,每隔 5 分鐘切換一次模擬狀態,文件:Producer.scala

package com.atguigu.producer

import java.text.DecimalFormat
import java.util.Calendar

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import com.atguigu.utils.PropertyUtil

import scala.util.Random
import java._

import com.alibaba.fastjson.JSON

/**
  * 模擬產生數據,同時把數據實時的發送到 kafka
  * 隨機產生 監測點id 以及 速度
  * 序列化爲 json
  * 發送給 kafka
  */

object Producer {

  def main(args: Array[String]): Unit = {
    // 讀取配置文件信息
    val properties = PropertyUtil.properties
    // 建立 kafka 生產者對象
    val producer = new KafkaProducer[String, String](properties)

    // 模擬產生實時數據,單位爲:秒
    var startTime = Calendar.getInstance().getTimeInMillis() / 1000

    // 數據模擬,堵車狀態切換的週期單位爲:秒
    val trafficCycle = 300

    val df = new DecimalFormat("0000")
    // 開始不停的實時產生數據
    while (true) {
      // 模擬產生監測點 id:1~20
      val randomMonitorId = df.format(Random.nextInt(20) + 1)
      // 模擬車速
      var randomSpeed = "000"

      // 獲得本條數據產生時的當前時間,單位爲:秒
      val currentTime = Calendar.getInstance().getTimeInMillis() / 1000
      // 每 5 分鐘切換一次公路狀態
      if (currentTime - startTime > trafficCycle) {
        randomSpeed = new DecimalFormat("000").format(Random.nextInt(16))
        if (currentTime - startTime > trafficCycle * 2) {
          startTime = currentTime
        }
      } else {
        randomSpeed = new DecimalFormat("000").format(Random.nextInt(31) + 30)
      }

      // 該 Map 集合用於存放生產出來的數據
      val jsonmMap = new util.HashMap[String, String]()
      jsonmMap.put("monitor_id", randomMonitorId)
      jsonmMap.put("speed", randomSpeed)

      // 由於 kafka 是基於事件的,在此,咱們每一條產生的數據都序列化爲一個 json 事件
      val event = JSON.toJSON(jsonmMap)

      // 發送事件到 kafka 集羣中
      producer.send(new ProducerRecord[String, String](PropertyUtil.getProperty("kafka.topics"), event.toString))

      Thread.sleep(500)

      // 測試
      // println("監測點id:" + randomMonitorId + "," + "車速:" + randomSpeed)
      println(event)
    }
  }
}

6) 啓動集羣中的其餘相關節點(zookeeper,hadoop 等),啓動 kafka,並建立 kafka 主題,檢查主題存在性

[atguigu@hadoop102 ~]$ start-cluster.sh 

Linux 集羣服務羣起腳本
(1) 啓動腳本:start-cluster.sh

#!/bin/bash
echo "================        開始啓動全部節點服務      ==========="
echo "================        正在啓動 Zookeeper      ==========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh start'
done

echo "================        正在啓動 HDFS           ==========="
ssh atguigu@hadoop102 '/opt/module/hadoop-2.7.2/sbin/start-dfs.sh'

echo "================        正在啓動 YARN           ==========="
ssh atguigu@hadoop103 '/opt/module/hadoop-2.7.2/sbin/start-yarn.sh'

echo "================    hadoop102 節點正在啓動 JobHistoryServer   ==========="
ssh atguigu@hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh start historyserver'

(2) 中止腳本:stop-cluster.sh

#!/bin/bash
echo "================        開始中止全部節點服務      ==========="
echo "================    hadoop102 節點正在中止 JobHistoryServer ==========="
ssh atguigu@hadoop102 '/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh stop historyserver'

echo "================        正在中止 YARN           ==========="
ssh atguigu@hadoop103 '/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh'

echo "================        正在中止 HDFS           ==========="
ssh atguigu@hadoop102 '/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh'

echo "================        正在中止 Zookeeper      ==========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop'
done

(3) 查看進程腳本:util.sh

#!/bin/bash
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    echo "================      $i 的全部進程       ==========="
    ssh $i '/opt/module/jdk1.8.0_144/bin/jps'
done

尖叫提示:腳本學會以後,若是後續再有新的節點須要添加到羣起任務中,能夠自行解決之。
尖叫提示:啓動與中止注意腳本的執行順序,並且中止腳本的中止過程應該是啓動過程的倒序。


zookeeper 集羣羣起腳本:

[atguigu@hadoop102 ~]$ zkstart.sh

(1) 啓動腳本:zkstart.sh

#!/bin/bash
echo "==========  正在啓動 zookeeper 集羣  =========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh start'
done

(2) 中止腳本:zkstop.sh

#!/bin/bash
echo "==========  正在中止 zookeeper 集羣  =========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop'
done

(3) 狀態腳本:zkstatus.sh

#!/bin/bash
echo "==========  正在查看 zookeeper 集羣狀態  =========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh status'
done

kafka 集羣腳本:

[atguigu@hadoop102 ~]$ kafka-start.sh

(1) 啓動腳本:kafka-start.sh

#!/bin/bash
echo "================        正在啓動 Kafka 集羣       ==========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties'
done

(2) 中止腳本:kafka-stop.sh

#!/bin/bash
echo "================        正在中止 Kafka 集羣       ==========="
for i in atguigu@hadoop102 atguigu@hadoop103 atguigu@hadoop104
do
    ssh $i 'source /etc/profile;/opt/module/kafka/bin/kafka-server-stop.sh -daemon'
done

建立 kafka 主題:traffic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --replication-factor 1 --partitions 3 --topic traffic

刪除 kafka 主題:traffic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181 --topic traffic

檢查 kafka 的 traffic 主題是否正常:

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --list --zookeeper hadoop102:2181

3.1.3 測試

將數據發送至 kafka 並使用 kafka console-consumer 進行檢測,持續運行若干分鐘後,查看數據是否穩定輸入輸出。
啓動 kafka 控制檯消費者:

// kafka-console-consumer
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic trafffic

kafka 控制檯消費者消費數據以下圖所示;

3.2 數據採集

  咱們將實時模擬出來的數據,放置於 redis 中。

3.2.1 編寫代碼

思路:
  a) 新建工程:tf_consumer
  b) 配置 maven 依賴並添加 scala 框架的支持。
  c) 配置 redis 並測試。
  d) 將剛纔 kafka.properties 以及 PropertyUtil 拷貝過來,並進行相應的修改。
  e) 編寫 redis 操做工具類:RedisUtil
  f) 讀取 kafka 中的數據,實時保存到 redis 中,而且按照分鐘和監測點聚合車速和車輛個數。

1) 新建工程:tf_consumer

2) 配置 maven 的 pom.xml 文件以及 kafka.properties:
pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>tf_consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>
</project>

3) 修改 kafka.properties 配置文件(消費者):

# 設置 kafka 的 brokerlist
bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 消費者反序列化
key.deserializer=org.apache.kafka.common.serialization.StringDeSerializer
value.deserializer=org.apache.kafka.common.serialization.StrinDegSerializer

acks=all
retries=0

# Kafka 老版本中的元數據服務列表
metadata.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 設置消費者所屬的消費組
group.id=g_traffic1

# 設置是否自動確認 offset
enable.auto.commit=true

# 設置自動確認 offset 的時間間隔
auto.commit.interval.ms=30000

# 設置本次消費的主題
kafka.topics=traffic

# 設置 zookeeper 中 follower 和 leader 之間的關於 kafka 的信息同步時間間隔
zookeeper.sync.time.ms=250
num.io.threads=12
batch.size=65536
buffer.memory=524288

# kafka 中消息保存的時間(單位是小時),企業開發中是 7 天
log.retention.hours=2

3) 配置 Redis(單節點)環境並測試

// 經過 wget 下載 Redis 的源碼
[atguigu@hadoop102 software]$ wget http://download.redis.io/releases/redis-4.0.2.tar.gz

// 將源代碼解壓到指定目錄 /opt/module 下
[atguigu@hadoop102 software]$ tar -zxf redis-4.0.2.tar.gz -C /opt/module

// 進入 Redis 源代碼目錄,編譯安裝(由於 redis 是用 C 語言寫的)
[atguigu@hadoop102 module]$ cd redis-4.0.2/

// 安裝 GCC
[atguigu@hadoop102 module]$ sudo yum install gcc

// 編譯源代碼
[atguigu@hadoop102 redis-4.0.2]$ make MALLOC=libc

若是報錯
zmalloc.h:50:31: error: jemalloc/jemalloc.h: No such file or directory
zmalloc.h:55:2: error: #error "Newer version of jemalloc required"
make[1]: *** [adlist.o] Error 1
make[1]: Leaving directory `/opt/module/redis-4.0.2/src'
make: *** [all] Error 2
解決辦法是:
make MALLOC=libc

注意:Redis 並無本身實現內存池,沒有在標準的系統內存分配器上再加上本身的東西。
redis-2.4 以上自帶 jemalloc,你不須要加任何參數,經過 zmalloc.c 源碼中咱們能夠看到,Redis 在編譯時,會先判斷是否使用 tcmalloc,若是是,會用 tcmalloc 對應的函數替換掉標準的 libc 中的函數實現。其次會判斷 jemalloc 是否使用,最後若是都沒有使用纔會用標準的 libc 中的內存管理函數。因此用 tcmalloc 優化請謹慎使用,這兩個分配器碎片率相差不大,建議用自帶 jemalloc。

若是要安裝 tcmalloc 能夠這樣:
make USE_TCMALLOC=yes

// 編譯安裝(注意:要使用 root 用戶權限)
[atguigu@hadoop102 redis-4.0.2]$ sudo make install

// 建立配置文件,放入指定的目錄
[atguigu@hadoop102 redis-4.0.2]$ sudo cp /opt/module/redis-4.0.2/redis.conf /opt/module/redis-4.0.2/myredis

// 修改配置文件中如下內容(注意 redis 新版的 4.x 與 老版本 3.x 上配置的細微差異)
[atguigu@hadoop102 redis-4.0.2]$ sudo vim /opt/module/redis-4.0.2/myredis/redis.conf

bind 0.0.0.0                                            #69行       #綁定主機 IP,默認值爲127.0.0.1,咱們是跨機器運行,因此須要更改,表示任意機器集羣都可訪問,實際開發是中不建議這樣改
daemonize yes                                           #136行      #是否之後臺 daemon 方式運行,默認不是後臺運行
pidfile /var/run/redis/redis_6379.pid                   #158行      #redis 的 PID 文件路徑(可選)
logfile "/opt/module/redis-4.0.2/myredis/redis.log"     #171行      #定義 log 文件位置,模式 log 信息定向到 stdout,輸出到 /dev/null(可選)
dir "/opt/module/redis-4.0.2/myredis"                   #263行      #本地數據庫存放路徑,默認爲./(可選)

// 編譯安裝默認存在在 /usr/local/bin 目錄下,以下
[atguigu@hadoop102 redis-4.0.2]$ cd /usr/local/bin/
[atguigu@hadoop102 bin]$ ll
總用量 9572
-rw-r--r-- 1 root root      83 5月   8 01:27 dump6379.rdb
-rw-r--r-- 1 root root      83 5月   8 01:27 dump6380.rdb
-rw-r--r-- 1 root root      83 5月   8 01:27 dump6381.rdb
lrwxrwxrwx 1 root root       6 4月  28 17:17 nc -> netcat
-rwxr-xr-x 1 root root  103479 4月  28 17:17 netcat
-rwxr-xr-x 1 root root  290454 5月  23 12:37 redis-benchmark
-rwxr-xr-x 1 root root 2971304 5月  23 12:37 redis-check-aof
-rwxr-xr-x 1 root root   45443 5月   6 17:27 redis-check-dump
-rwxr-xr-x 1 root root 2971304 5月  23 12:37 redis-check-rdb
-rwxr-xr-x 1 root root  419907 5月  23 12:37 redis-cli
lrwxrwxrwx 1 root root      12 5月  23 12:37 redis-sentinel -> redis-server
-rwxr-xr-x 1 root root 2971304 5月  23 12:37 redis-server

在安裝完 Redis 以後,啓動 Redis

// 啓動 Redis 服務器
[atguigu@hadoop102 redis-4.0.2]$ redis-server /opt/module/redis-4.0.2/myredis/redis.conf

// 鏈接 Redis 服務器
[atguigu@hadoop102 redis-4.0.2]$ redis-cli -h 192.168.25.102 -p 6379
192.168.25.102:6379set k1 123
OK
192.168.25.102:6379get k1
"123"
192.168.25.102:6379> keys *
1"k1"
2"uid:2"
192.168.25.102:6379> lrange uid:2 0 -1      #查看列表的某個範圍的數據
1"150:5.0"
2"144:3.0"
3"110:4.0"
192.168.25.102:6379> lpush uid:1 3671:3.0 2968:1.0 2455:2.5     #存一組列表數據
192.168.25.102:6379> flushall       #清空全部數據
192.168.25.102:6379select 1       #選擇數據庫

// 查看 Redis 的啓動狀況
[atguigu@hadoop102 redis-4.0.2]$ ps -ef | grep redis
atguigu    6033      1  0 13:08 ?        00:00:00 redis-server 0.0.0.0:6379                              
atguigu    6046   4336  0 13:12 pts/0    00:00:00 grep redis 

// 中止 Redis 服務器
[atguigu@hadoop102 redis-4.0.2]$ redis-cli shutdown

4) 將剛纔 kafka.properties 以及 PropertyUtil 拷貝過來,kafka.properties 須要進行相應的修改
5) 編寫 redis 操做工具類:RedisUtil.scala

package com.atguigu.utils

import redis.clients.jedis._

// 代碼寫在半生對象中,這些代碼會在類加載的時候,自動的進行初始化
object RedisUtil {
  // 配置 redis 基本鏈接參數
  val host = "192.168.25.102"
  val port = 6379
  val timeout = 30000

  val config = new JedisPoolConfig

  // 設置鏈接池容許最大的鏈接個數
  config.setMaxTotal(200)
  // 設置最大空閒鏈接數
  config.setMaxIdle(50)
  // 設置最小空閒鏈接數
  config.setMinIdle(8)

  // 設置鏈接時的最大等待的毫秒數
  config.setMaxWaitMillis(10000)
  // 設置在獲取鏈接時,檢查鏈接的有效性
  config.setTestOnBorrow(true)
  // 設置在釋放鏈接時,檢查鏈接的有效性
  config.setTestOnReturn(true)

  // 設置在鏈接空閒時,檢查鏈接的有效性
  config.setTestWhileIdle(true)

  // 設置兩次掃描之間的時間間隔毫秒數
  config.setTimeBetweenEvictionRunsMillis(30000)
  // 設置每次掃描的最多的對象數
  config.setNumTestsPerEvictionRun(10)
  // 設置逐出鏈接的最小時間間隔,默認是 1800000 毫秒 = 30 分鐘
  config.setMinEvictableIdleTimeMillis(60000)

  //  鏈接池
  lazy val pool = new JedisPool(config, host, port, timeout)

  // 釋放資源
  lazy val hook = new Thread{ // 鉤子函數:執行一些善後操做,正常退出
    override def run() = {
      pool.destroy()
    }
  }

  sys.addShutdownHook(hook.run())
}

6) 在 SparkConsumer.scala 中讀取 kafka 中的數據,實時保存到 redis 中,而且按照分鐘和監測點聚合車速和車輛個數。用到 Spark Streaming 的時間窗口函數進行聚合。

package com.atguigu.consumer

import java.text.SimpleDateFormat
import java.util.Calendar

import com.alibaba.fastjson.{JSON, TypeReference}
import com.atguigu.utils.{PropertyUtil, RedisUtil}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 堵車預測:處理實時數據,消費數據到 redis
  */

object SparkConsumer {
  def main(args: Array[String]): Unit = {
    // 初始化 Spark
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TrafficStreaming")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))

    // 設置檢查點目錄
    ssc.checkpoint("./ssc/checkpoint")

    // 配置 kafka 參數,使用的是 spark 爲咱們封裝的一套操做 kafka coonsumer 的工具包
    val kafkaParam = Map("metadata.broker.list" -> PropertyUtil.getProperty("metadata.broker.list"))

    // 配置 kafka 主題
    val topics = Set(PropertyUtil.getProperty("kafka.topics"))

    // 讀取 kafka 主題 中的每個事件 event
    val kafkaLineDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics)
      .map(_._2) // 因爲咱們 event 中的鍵是 null,因此咱們須要把值映射出來

    // 解析 json 字符串
    val event = kafkaLineDStream.map(line => { // {"monitor_id":"0001","speed":"038"}
      // 使用 fastjson 來解析當前事件中封裝的數據信息,因爲該 json 字符串不支持 Scala Map,因此須要先將 json 字符串解析爲 Java Map
      val lineJavaMap = JSON.parseObject(line, new TypeReference[java.util.Map[String, String]]() {})
      // 將 Java Map 轉換成 Scala Map
      import scala.collection.JavaConverters._
      val lineScalaMap: collection.mutable.Map[String, String] = mapAsScalaMapConverter(lineJavaMap).asScala
      println(lineScalaMap) // Map[String, String] = ("monitor_id" -> "0001", "speed" -> "038")
      lineScalaMap
    })

    // 將每一條數據根據 monitor_id 聚合,聚合每一條數據中的 「車輛速度」 疊加
    // 例如:聚合好的數據形式:(monitor_id, (speed, 1))  ("0001", (038, 1))
    // 最終結果舉例:("0001", (1365, 30))
    val sumOfSpeedAndCount 
= event
      .map(e => (e.get("monitor_id").get, e.get("speed").get)) // ("0001", "038")、("0001", "048")、("0002", "015")
      .mapValues(s => (s.toInt, 1)) // ("0001", (038, 1))、("0001", (048, 1))、("0002", (015, 1))
      .reduceByKeyAndWindow( // reduce 表示從左邊開始執行將獲得的結果返回給第一個參數
      (t1: (Int, Int), t2: (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2),
      Seconds(60), // 滑動窗口大小 60 秒,偏差最大 59 秒,即上一分鐘的數據當成下一分鐘的數據來用了。
      Seconds(60)) // 滑動步長 60 秒,對咱們實際建模的影響忽略不計,由於:實際中,不可能1分鐘內就形成大量擁堵,或者堵車不可能1分鐘以內就緩解了!!!後面建模的時候會進行線性濾波。

    // 定義 redis 數據庫中的數據庫索引 index
    val dbIndex = 1
    // 將採集到的數據,按照每分鐘放置於redis 中,將用於後邊的數據建模
    sumOfSpeedAndCount.foreachRDD(rdd => {
      rdd
        .foreachPartition(partitionRecords => {
          partitionRecords
            .filter((tuple: (String, (Int, Int))) => tuple._2._1 > 0// 過濾掉元組數據中的速度小於0的數據
            .foreach(pair => {
            // 開始取出這 60 秒的 windows 中全部的聚合數據進行封裝,準備存入 redis 數據庫
            val jedis = RedisUtil.pool.getResource

            val monitorId = pair._1
            val sumOfCarSpeed = pair._2._1
            val sumOfCarCount = pair._2._2

            // 模擬數據爲實時流入
            // 兩種狀況:
            // 一、數據生產時,會產生時間戳字段,流入到 kafka 的事件中
            // 二、數據消費時,數據消費的時間,就當作數據的生產時間(會有一些小小偏差),本業務選擇這種方式

            val dateSDF = new SimpleDateFormat("yyyyMMdd"// 用於 redis 中的 key
            val hourMinuteSDF = new SimpleDateFormat("HHmm"// 用於 redis 中的 fields

            val currentTime = Calendar.getInstance().getTime

            val dateTime = dateSDF.format(currentTime) // 20190528
            val hourMinuteTime = hourMinuteSDF.format(currentTime) // 1617

            // 選擇存入的數據庫
            jedis.select(dbIndex)
            jedis.hset(dateTime + "_" + monitorId, hourMinuteTime, sumOfCarSpeed + "_" + sumOfCarCount)

            println(dateTime + "_" + monitorId, hourMinuteTime, sumOfCarSpeed + "_" + sumOfCarCount)

            // RedisUtil.pool.returnResource(jedis) // 老的 API
            jedis.close() // 新的 API
          })
        })
    })

    // Spark 開始工做
    ssc.start()
    ssc.awaitTermination()
  }
}

// 複習 Scala 中 Map 的取值方式:

// 方式1-使用 map(key)
//   一、若是 key 存在,則返回對應的值。
//   二、若是 key 不存在,則拋出異常 [java.util.NoSuchElementException]。
//   三、在 Java 中,若是 key 不存在則返回 null。
// 方式2-使用 contains 方法檢查是否存在 key
//  使用 containts 先判斷再取值,能夠防止異常,並加入相應的處理邏輯。
//   一、若是 key 存在,則返回 true。
//   二、若是 key 不存在,則返回 false。
// 方式3-使用 map.get(key).get 取值
//   一、若是 key 存在,則 map.get(key) 就會返回 Some(值),而後 Some(值).get 就能夠取出。
//   二、若是 key 不存在,則 map.get(key) 就會返回 None。
// 方式4-使用 map.getOrElse(key, defaultvalue) 取值
//   底層是:def getOrElse[V1 >: V](key: K, default: => V1)
//   一、若是 key 存在,則返回 key 對應的值。
//   二、若是 key 不存在,則返回默認值。在 java 中底層有不少相似的操做。
// 如何選擇取值方式建議
//   若是咱們肯定 map 有這個 key,則應當使用 map(key),速度快。
//   若是咱們不能肯定 map 是否有 key,並且有不一樣的業務邏輯,使用 map.contains() 先判斷再加入邏輯。
//   若是隻是簡單的但願獲得一個值,使用 map4.getOrElse("ip", "127.0.0.1")

3.2.2 測試

咱們使用集羣的羣起腳本:

開啓 zookeeper 集羣:

[atguigu@hadoop102 ~]$ zkstart.sh

開啓 kafka 集羣:

[atguigu@hadoop102 ~]$ kafka-start.sh

開啓 redis,在 redis 根目錄執行:

// 啓動 Redis 服務器
[atguigu@hadoop102 redis-4.0.2]$ redis-server /opt/module/redis-4.0.2/myredis/redis.conf

運行數據生產
運行數據消費
查看運行結果:
在 redis 根目錄中,舉個例子依次執行:

[atguigu@hadoop102 redis-4.0.2]$ redis-cli -h 192.168.25.102 -p 6379
192.168.25.102:6379> select 1
OK
192.168.25.102:6379[1]> keys *
 1"20190528_0014"
 2"20190528_0005"
 3"20190528_0019"
 4"20190528_0009"
 5"20190528_0004"
 6"20190528_0013"
 7"20190528_0016"
 8"20190528_0020"
 9"20190528_0015"
10"20190528_0010"
11"20190528_0018"
12"20190528_0008"
13"20190528_0001"
14"20190528_0003"
15"20190528_0007"
16"20190528_0012"
17"20190528_0002"
18"20190528_0011"
19"20190528_0017"
20"20190528_0006"
192.168.25.102:6379[1]> hgetall 20190528_0001
 1"1646"
 2"279_7"
 3"1647"
 4"239_6"
 5"1648"
 6"240_5"
 7"1649"
 8"318_7"
 9"1650"
10"184_6"
11"1651"
12"54_8"
13"1652"
14"81_10"
15"1653"
16"69_9"
17"1654"
18"69_9"
19"1655"
20"57_8"
21"1656"
22"262_6"
23"1657"
24"149_3"
25"1659"
26"168_4"
27"1700"
28"134_4"
29"1701"
30"65_8"
31"1702"
32"81_10"

注意:不要直接複製,每次操做有些內容是有變更的。好比時間相關的,好比 IP 相關的。


小結:

堵車內容回顧:
1、數據生產
    目的:可以讓咱們清楚數據結構是什麼樣子的,實際開發中這部分不是咱們作;實際開發中:已有數據結構,已有目標,要作的就是目前手中已有的資料如何實現目標
    數據結構:卡口id,車速(沒有包含數據生產時的時間戳)
    堵車狀態的轉換邏輯(if else),爲的是生產的數據儘量的貼近現實狀況

2、數據消費
    kafka(高級 API,spark 提供的工具包) --> redis
    時間窗口的大小爲 60 秒
    時間窗口的滑動步長爲 60 秒
    數據存儲在 redis 中,使用的是數據類型是 Hash(即 Map 類型):KV 模式不變,可是 V 也是一個鍵值對
        key : 20190528_0001
        field : 1754
        value : 1365_30

天貓雙十一(使用 Storm + Flink 實現)
1、若是咱們使用 SparkStreaming 實現,時間窗口的寬度不能設置太大,可能會出現內存溢出。
25秒內聚合的數據該如何處理呢?答:保存到 redis 中(即落盤)。
3、那麼下一個時間窗口的新的數據該如何處理呢?答:先將 redis 中前一個 5 秒的數據讀出來,而後和此次的 5 秒數據進行相加後,再放回到 redis 中(即落盤)。小結:全部的流式框架都是這樣作的。

流式框架的根本的哲學意義是:僅僅處理中間邏輯,便是進行運算(計算)的,不負責數據存儲的。
若是在內存中想進行長期的數據累加,就至關於一個不斷微分再積分的過程,把時間微分到足夠細,細到不會致使內存溢出爲止,而後再微分的基礎上求和,再把全部的微分結果進行積分。

某一個小時間段內的數據量越小,則時間窗口的寬度就能夠設置的越大,那麼數據展現的延遲就會變長,可是總體數據處理的效率就會變得越高。--> 不像流式處理了!

3.3 數據建模

  在此咱們選擇經過有監督學習中的手段創建能夠預測下一時刻堵車狀態的模型。
擬牛頓圖解:

線性濾波圖解:

目標卡口與相關卡口關係:

模型圖解:

3.3.1 編寫代碼

思路:
  a) 肯定要對哪一個監測點進行建模,咱們稱之爲目標監測點。
  b) 找到目標監測點的其餘相關監測點(好比相關監測點與目標監測點屬於一條公路的)。
  c) 從 redis 中訪問獲得以上全部監測點若干小時內的歷史數據信息(一部分做爲訓練數據,一部分做爲測試數據)。
  d) 提取組裝特徵向量與目標向量,訓練參數集,訓練模型。
  e) 測試模型吻合度,將符合吻合度的模型保存到 HDFS 中,同時將模型的保存路徑放置於 redis 中。

1) 新建 module:tf_modeling
2) 編寫 maven 的 pom.xml 文件,添加 scala 框架支持

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>tf_modeling</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>
</project>

3) 建立 Train.scala 實現上述思路:

package com.atguigu.train

import java.io.{File, PrintWriter}
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

import com.atguigu.utils.RedisUtil
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * 堵車預測:建模,不一樣的卡口不一樣的模型(函數)
  */

object Train {
  def main(args: Array[String]): Unit = {
    // 寫入文件的輸出流,將本次評估結果保存到下面這個文件中
    val writer = new PrintWriter(new File("model_train.txt"))

    // 初始化 Spark
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TrafficTrainModel")
    val sc = new SparkContext(sparkConf)

    // 定義 redis 的數據庫相關參數
    val dbIndex = 1
    // 獲取 redis 鏈接
    val jedis = RedisUtil.pool.getResource
    jedis.select(dbIndex)

    // 設定 目標監測點:你要對哪幾個監測點進行建模(本例中對 2 個檢測點進行建模)
    val targetMonitorIDs = List("0005""0015")
    // 取出 目標監測點的相關監測點:算法工程師告訴咱們的(本例中咱們隨意寫幾個)
    val relationMonitors = Map[String, Array[String]](
      "0005" -> Array("0003""0004""0005""0006""0007"),
      "0015" -> Array("0013""0014""0015""0016""0017")
    )

    // 訪問 redis 取出 目標監測點的相關監測點 的數據

    // 遍歷 目標監測點的相關監測點 的 Map 集合
    targetMonitorIDs.map(targetMonitorID => { // 這個 map 執行 2 次
      // 初始化時間
      // 獲取當前時間
      val currentDate = Calendar.getInstance().getTime

      // 格式化 當前時間 爲 年月日 對象
      val dateSDF = new SimpleDateFormat("yyyyMMdd")
      // 格式化 當前時間 爲 小時分鐘數 對象
      val hourMinuteSDF = new SimpleDateFormat("HHmm")

      // 格式化當前時間
      val dateSDFString = dateSDF.format(currentDate) // 20190528

      // 獲取 當前目標監測點的相關監測點
      val relationMonitorArray = relationMonitors(targetMonitorID)
      // 根據 當前目標監測點的相關監測點,取出當前時間的全部相關監測點的平均車速
      val relationMonitorInfo = relationMonitorArray.map(relationMonitorID => { // 這個 map 執行 5 次
        (relationMonitorID, jedis.hgetAll(dateSDFString + "_" + relationMonitorID))
        // ("0003", {"0900":"1356_30", "0901":"100_2", ..., "0959":"134_4"})
      })

      // 建立 3 個數組:由於要使用 擬牛頓法(LBFGS)進行建模,該方法須要
      // 第一個數組放 特徵因子數據集,
      // 第二個數組放 label 標籤向量(特徵因子對應的結果數據集),
      // 第三個數組放 前二者之間的關聯(即真正的特徵向量)
      val dataX = ArrayBuffer[Double]() // 實際的每一分鐘的平均車速
      val dataY = ArrayBuffer[Double]() // 第 4 分鐘的平均車速

      // 用於存放 特徵因子數據集 和 特徵因子對應的結果數據集 的映射關係
      val dataTrain = ArrayBuffer[LabeledPoint]()

      // 肯定使用多少時間內的數據進行建模(本例中取 1 小時)
      val hours = 1

      // 將時間回退到當前時間的 1 小時以前,時間單位:分鐘
      // 遍歷 目標監測點的數據(外循環)
      for (i <- Range(60 * hours, 2, -1)) { // 本例中是 60 到 2(不包括2),步長是 -1,即 60, 59, 58, ..., 5, 4,
        dataX.clear()
        dataY.clear()

        // 遍歷 目標監測點的全部相關監測點 的數據(內循環)
        for (index <- 0 to 2) {
          // 當前for循環 的時間 = 當前時間的毫秒數 - 1 個小時的毫秒數 + 0分鐘的毫秒數,1分鐘的毫秒數,2分鐘的毫秒數  (第3分鐘做爲監督學習的結果向量--label 向量)
          val oneMoment = currentDate.getTime - 60 * i * 1000 + 60 * index * 1000
          // 獲取 當前for循環 的時間的小時分鐘數
          val oneHM = hourMinuteSDF.format(new Date(oneMoment))

          // 獲取當前小時分鐘數的數據
          for ((k, v) <- relationMonitorInfo) { // ("0003", {"0900":"1356_30", "0901":"100_2", ..., "0959":"134_4"})

            // hours 個小時前的後 3 分鐘的數據,組裝到 dataX 中
            if (v.containsKey(oneHM)) { // 判斷本次時刻的數據是否存在,若是存在,則取值,不然,則取 -1(表示數據缺失)
              val speedAndCarCount = v.get(oneHM).split("_")
              val valueX = speedAndCarCount(0).toFloat / speedAndCarCount(1).toFloat // 獲得當前這一分鐘的平均車速
              dataX += valueX
            } else {
              dataX += -59.0F
            }

            // 若是 index == 2,說明已經獲得 hours 個小時前的後 3 分鐘的數據,並組裝到了 dataX 中;若是是目標卡口,則說明下一分鐘數據是 label 向量的數據,ze存放 dataY 中
            if (index == 2 && targetMonitorID == k) {
              val nextMoment = oneMoment + 60 * 1000
              val nextHM = hourMinuteSDF.format(new Date(nextMoment))
              if (v.containsKey(nextHM)) { // 判斷本次時刻的數據是否存在,若是存在,則取值,不然,則無論它(有默認值 0)
                val speedAndCarCount = v.get(nextHM).split("_")
                val valueY = speedAndCarCount(0).toFloat / speedAndCarCount(1).toFloat // 獲得第 4 分鐘的平均車速
                dataY += valueY
              }
            }

          }
        }

        // 準備訓練模型
        // 先將 dataX 和 dataY 映射到一個 LabeledPoint 對象中
        if (dataY.toArray.length == 1) { // 說明結果集中有數據了
          val label = dataY.toArray.head
          val record = LabeledPoint(
            // 由於使用的是 擬牛頓法(LBFGS) 進行建模,該方法須要 特徵結果 有幾種狀況(不能是無窮種狀況)
            // label 範圍爲 0~6(7個類別),越大則道路越通暢
            if (label / 10 < 6) (label / 10).toInt else 6, Vectors.dense(dataX.toArray)
          )
          dataTrain += record
        }
      }

      // 將特徵數據集寫入到文件中方便查看,至此,咱們的特徵數據集已經封裝完畢
      dataTrain.foreach(record => {
        println(record)
        writer.write(record.toString() + "\r\n")
      })

      // 將特徵數據集轉爲 rdd 數據集
      val rddData = sc.parallelize(dataTrain)
      // 隨機封裝訓練集和測試集
      val randomSplits = rddData.randomSplit(Array(0.60.4), 11L)
      val trainData = randomSplits(0)
      val testData = randomSplits(1)

      if (!rddData.isEmpty()) {
        // 使用訓練數據集進行訓練模型
        val model = new LogisticRegressionWithLBFGS().setNumClasses(7).run(trainData)

        // 使用測試數據集測試訓練好的模型
        val predictAndLabel = testData.map {
          case LabeledPoint(label, feature) =>
            val predict = model.predict(feature)
            (predict, label)
        }

        // 獲得當前 目標監測點 的評估值
        val metrics = new MulticlassMetrics(predictAndLabel)
        val accuracy = metrics.accuracy
        println("評估值:" + accuracy)
        writer.write(accuracy.toString + "\r\n")

        // 設置評估閾值,評估值範圍爲[0.0, 1.0],越大 model 越優秀,咱們保存評估值大於 0 的評估模型
        if (accuracy > 0.6) 
{
          // 將模型保存到 hdfs 中,並將模型路徑保存到 redis 中
          val hdfsPath = "hdfs://hadoop102:9000/traffic/model/" + targetMonitorID + "_" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date
          (currentDate.getTime))
          model.save(sc, hdfsPath)

          jedis.hset("model", targetMonitorID, hdfsPath)
        }
      }
    })

    // 釋放 redis 鏈接
    // RedisUtil.pool.returnResource(jedis) // 老的 API
    jedis.close() // 新的 API
    writer.close()
  }
}

3.3.2 測試

  運行數據模擬與數據採集,等待一會以後,開始進行預測,查看 http://hadoop102:50070 中是否產生對應的模型樣本。同時查看 redis 中是否有保存訓練好的模型存放路徑。

3.4 數據預測

3.4.1 編寫代碼

思路:
  a) 用戶傳入想要進行預測的時間節點,讀取該時間節點以前 3 分鐘,2 分鐘和 1 分鐘的數據。
  b) 此時應該已經獲得了歷史數據集,經過該歷史數據集預測傳入時間點的車流狀態。
  尖叫提示:爲了方便觀察測試,建議傳一個歷史時間點,這樣能夠很直觀的看到預測結果是否符合指望值。

1) 新建 module:tf_prediction
2) 配置 maven 的 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>
    <artifactId>tf_prediction</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>
</project>

3) 新建 Prediction.scala 文件,實現上述思路

package com.atguigu.predict

import java.text.SimpleDateFormat
import java.util.Date

import com.atguigu.utils.RedisUtil
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * 堵車預測:根據訓練出來的模型進行堵車預測
  */

object Prediction {
  def main(args: Array[String]): Unit = {
    // 初始化 Spark
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TrafficPrediction")
    val sc = new SparkContext(sparkConf)

    // 時間設置:爲了拼湊出 redis 中的 key 和 field 的字段

    // 格式化 時間 爲 年月日 對象
    val dateSDF = new SimpleDateFormat("yyyyMMdd")
    // 格式化 時間 爲 小時分鐘數 對象
    val hourMinuteSDF = new SimpleDateFormat("HHmm")

    // 2019-05-29 13:00
    val userSDF = new SimpleDateFormat("yyyy-MM-dd HH:mm")

    // 定義用戶傳入的日期:想要預測是否堵車的日期
    val inputDateString = "2019-05-29 10:29"
    val inputDate = userSDF.parse(inputDateString)

    // 獲得 redis 中的 key
    val dateSDFString = dateSDF.format(inputDate) // 20180529

    val dbIndex = 1
    val jedis = RedisUtil.pool.getResource
    jedis.select(dbIndex)

    // 想要預測的監測點
    // 設定 目標監測點:你要對哪幾個監測點進行建模(本例中對 2 個檢測點進行建模)
    val targetMonitorIDs = List("0005""0015")
    // 取出 目標監測點的相關監測點:算法工程師告訴咱們的(本例中咱們隨意寫幾個)
    val relationMonitors = Map[String, Array[String]](
      "0005" -> Array("0003""0004""0005""0006""0007"),
      "0015" -> Array("0013""0014""0015""0016""0017")
    )

    // 遍歷 目標監測點的相關監測點 的 Map 集合
    targetMonitorIDs.map(targetMonitorID => { // 這個 map 執行 2 次
      // 獲取 當前目標監測點的相關監測點
      val relationMonitorArray = relationMonitors(targetMonitorID)
      // 根據 當前目標監測點的相關監測點,取出當前時間的全部相關監測點的平均車速
      val relationMonitorInfo = relationMonitorArray.map(relationMonitorID => { // 這個 map 執行 5 次
        (relationMonitorID, jedis.hgetAll(dateSDFString + "_" + relationMonitorID))
        // ("0003", {"0900":"1356_30", "0901":"100_2", ..., "0959":"134_4"})
      })

      // 裝載目標時間點以前的 3 分鐘的歷史數據
      val dataX = ArrayBuffer[Double]() // 實際的每一分鐘的平均車速

      // 組裝數據
      for (index <- Range(30, -1)) {
        val oneMoment = inputDate.getTime - 60 * index * 1000
        val oneHM = hourMinuteSDF.format(new Date(oneMoment)) // 1257

        for ((k, v) <- relationMonitorInfo) {
          if (v.containsKey(oneHM)) {
            val speedAndCarCount = v.get(oneHM).split("_")
            val valueX = speedAndCarCount(0).toFloat / speedAndCarCount(1).toFloat
            dataX += valueX
          } else {
            dataX += -59.0F
          }
        }
      }

      // 加載模型
      val modelPath = jedis.hget("model", targetMonitorID)
      val model = LogisticRegressionModel.load(sc, modelPath)

      // 進行預測
      val predict = model.predict(Vectors.dense(dataX.toArray))

      // 打印展現
      println(targetMonitorID + ",堵車評估值:" + predict + ",是否通暢:" + (if (predict > 3"通暢" else "擁堵"))

      // 結果保存
      jedis.hset(inputDateString, targetMonitorID, predict.toString)
    })

    // 釋放 redis 鏈接
    // RedisUtil.pool.returnResource(jedis) // 老的 API
    jedis.close() // 新的 API
  }
}

3.4.2 測試

  預測任務執行完畢後,進入redis,經過查看對應監測點,對應傳入時間節點的具體車速值,來驗證預測結果是否正確。

四 項目總結

  與該項目相似的需求還有不少不少,涵蓋了生活各個方面。不一樣的業務,不一樣的邏輯,不一樣的思路,不一樣的數學模型,須要具體狀況具體分析。有相似的、但不徹底同樣的需求,就須要多思考,靈活處理了。

相關文章
相關標籤/搜索