第1章 項目概述1.1 項目簡介1.2 項目目標1.3 業務需求簡介1.3.1 用戶訪問 session 統計1.3.2 頁面單跳轉化率統計1.3.3 區域熱門商品離線統計1.3.4 廣告流量實時統計第2章 項目主體架構2.1 項目架構2.2 離線日誌採集宏觀流程(參考)2.3 實時日誌採集宏觀流程(參考)2.4 離線/實時日誌採集框架第3章 模擬業務數據源3.1 離線數據3.1.1 數據模型與數聽說明3.2 實時數據3.2.1 數據模型與數聽說明第4章 程序框架解析4.1 mock 模塊(模擬數據產生模塊)4.2 commons 模塊(公共模塊)4.3 analyse 模塊(數據分析模塊)第5章 需求解析5.1 需求一:Session 各範圍訪問步長、訪問時長佔比統計5.1.1 需求解析5.1.2 數據源解析5.1.3 數據結構解析5.1.4 需求實現簡要流程5.1.5 需求實現詳細流程5.1.6 MySQL 存儲結構解析5.1.7 代碼解析5.1.8 需求一實現思路整理5.2 需求二:Session 隨機抽取5.2.1 需求解析5.2.2 數據源解析5.2.3 數據結構解析5.2.4 需求實現簡要流程5.2.5 需求實現詳細流程5.2.6 MySQL 存儲結構解析5.2.7 代碼解析5.2.8 需求二實現思路整理5.3 需求三:Top10 熱門品類統計5.3.1 需求解析5.3.2 數據源解析5.3.3 數據結構解析5.3.4 需求實現簡要流程5.3.5 需求實現詳細流程5.3.6 MySQL 存儲結構解析5.3.7 代碼解析5.3.8 需求三實現思路整理5.4 需求四:Top10 熱門品類的 Top10 活躍 Session 統計5.4.1 需求解析5.4.2 數據源解析5.4.3 數據結構解析5.4.4 需求實現簡要流程5.4.5 需求實現詳細流程5.4.6 MySQL 存儲結構解析5.4.7 代碼解析5.4.8 需求3、四實現思路整理5.5 需求五:頁面單跳轉化率統計5.5.1 需求解析5.5.2 數據源解析5.5.3 數據結構解析5.5.4 需求實現簡要流程5.5.5 需求實現詳細流程5.5.6 MySQL 存儲結構解析5.5.7 代碼解析5.5.8 需求五實現思路整理5.6 需求六:各區域 Top3 商品統計5.6.1 需求解析5.6.2 數據源解析5.6.3 數據結構解析5.6.4 需求實現簡要流程5.6.5 需求實現詳細流程5.6.6 MySQL 存儲結構解析5.6.7 代碼解析5.6.8 需求六實現思路整理5.7 需求七:廣告點擊黑名單實時統計5.7.1 需求解析5.7.2 數據源解析5.7.3 數據結構解析5.7.4 需求實現簡要流程5.7.5 需求實現詳細流程5.7.6 MySQL 存儲結構解析5.7.7 代碼解析5.7.8 需求七實現思路整理5.8 需求八:各省各城市廣告點擊量實時統計5.8.1 需求解析5.8.2 數據源解析5.8.3 數據結構解析5.8.4 需求實現簡要流程5.8.5 需求實現詳細流程5.8.6 MySQL 存儲結構解析5.8.7 代碼解析5.8.8 需求八實現思路整理5.9 需求九:天天每一個省份 Top3 熱門廣告5.9.1 需求解析5.9.2 數據源解析5.9.3 數據結構解析5.9.4 需求實現簡要流程5.9.5 需求實現詳細流程5.9.6 MySQL 存儲結構解析5.9.7 代碼解析5.9.8 需求九實現思路整理5.10 需求十:最近一小時廣告點擊量實時統計5.10.1 需求解析5.10.2 數據源解析5.10.3 數據結構解析5.10.4 需求實現簡要流程5.10.5 需求實現詳細流程5.10.6 MySQL 存儲結構解析5.10.7 代碼解析5.10.8 需求7、8、9、十實現思路整理第6章 項目總結java
電商分析平臺是對用戶訪問電商平臺的行爲進行分析。python
本項目主要講解一個大型電商網站後臺的企業級大數據統計分析平臺,該平臺以 Spark 爲主,對電商網站的流量進行離線和實時的分析。
該大數據分析平臺對電商網站的各類用戶行爲(訪問行爲、購物行爲、廣告點擊行爲等)進行復雜的分析。用統計分析出來的數據,輔助公司中的 PM(產品經理)、數據分析師以及管理人員分析現有產品的狀況,並根據用戶行爲分析結果持續改進產品的設計,以及調整公司的戰略和業務。最終達到用大數據技術來幫助提高公司的業績、營業額以及市場佔有率的目標。
項目主要使用了 Spark 技術生態棧中最經常使用的三個技術框架,Spark Core
、Spark SQL
和Spark Streaming
,進行離線計算和實時計算業務模塊的開發。實現了包括用戶訪問 session 分析、頁面單跳轉化率統計、熱門商品離線統計、廣告流量實時統計 4 個業務模塊。
項目中全部的業務功能模塊都是直接從實際企業項目中抽取出來的,業務複雜度沒有任何縮水,經過合理的將實際業務模塊進行技術整合與改造,該項目幾乎徹底涵蓋了 Spark Core、Spark SQL 和 Spark Streaming 這三個技術框架中大部分的功能點、知識點。mysql
一、掌握電商系統中 Spark 的主要使用場景以及建設流程。
二、掌握企業級的 Spark 項目的複雜性能調優、線上故障解決經驗、數據傾斜全套處理方案。
三、經過項目實戰,徹底將 Spark 全部技術點和知識點都應用在項目中,掌握如何靈活應用 Spark 各項技術來實現各類複雜業務需求。nginx
用戶在電商網站上,一般會有不少的訪問行爲,一般都是進入首頁,而後可能點擊首頁上的一些商品,點擊首頁上的一些品類,也可能隨時在搜索框裏面搜索關鍵詞,還可能將一些商品加入購物車,對購物車中的多個商品下訂單,最後對訂單中的多個商品進行支付。
用戶的每一次操做,其實能夠理解爲一個 action,在本項目中,咱們關注點擊
、搜索
、下單
、支付
這四個用戶行爲。
用戶 session,是在電商平臺的角度定義的會話概念,指的就是,從用戶第一次進入首頁,session 就開始了。而後在必定時間範圍內,直到最後操做完(可能作了幾十次、甚至上百次操做),離開網站,關閉瀏覽器,或者長時間沒有作操做,那麼 session 就結束了。
以上用戶在網站內的訪問過程,就稱之爲一次 session
。簡單理解,session 就是某一天某一個時間段內,某個用戶對網站從打開/進入,到作了大量操做,到最後關閉瀏覽器的過程,就叫作 session。
session 實際上就是一個電商網站中最基本的數據。那麼面向消費者/用戶端的大數據分析(C端),最基本的就是面向用戶訪問行爲/用戶訪問 session 的分析。
該模塊主要是對用戶訪問 session 進行統計分析,包括session 的聚合指標計算、按時間比例隨機抽取 session、獲取天天點擊、下單和購買排名前 10 的品類、並獲取 top10 品類的點擊量排名前 10 的 session
。該模塊可讓產品經理、數據分析師以及企業管理層形象地看到各類條件下的具體用戶行爲以及統計指標,從而對公司的產品設計以及業務發展戰略作出調整。主要使用 Spark Core 實現。web
頁面單跳轉化率是一個很是有用的統計數據。
產品經理,能夠根據這個指標,去嘗試分析整個網站/產品,各個頁面的表現怎麼樣,是否是須要去優化產品的佈局;吸引用戶最終能夠進入最後的支付頁面。
數據分析師,能夠基於此數據,作更深一步的計算和分析。
企業管理層,能夠看到整個公司的網站,各個頁面的之間的跳轉的表現如何,作到內心有數,能夠適當調整公司的經營戰略或策略。
該模塊主要是計算關鍵頁面之間的單步跳轉轉化率
,涉及到頁面切片算法以及頁面流匹配算法。該模塊可讓產品經理、數據分析師以及企業管理層看到各個關鍵頁面之間的轉化率,從而對網頁佈局,進行更好的優化設計。主要使用 Spark Core 實現。算法
該模塊主要
實現天天統計出各個區域的 top3 熱門商品
。
咱們認爲,不一樣地區的經濟發展水平不一樣,地理環境及氣候不一樣,人們的風土人情和風俗習慣不一樣,所以對於不一樣商品的需求不一樣,根據區域熱門商品的統計,可讓公司決策層更好的對不一樣類型商品進行佈局,使商品進入最須要他的區域。
該模塊可讓企業管理層看到公司售賣的商品的總體狀況,從而對公司的商品相關的戰略進行調整。主要使用 Spark SQL 實現。sql
網站/app 中常常會給第三方平臺作廣告,這也是一些互聯網公司的核心收入來源;當廣告位招商完成後,廣告會在 網站/app 的某個廣告位發佈出去,當用戶訪問 網站/app 的時候,會看到相應位置的廣告,此時,有些用戶可能就會去點擊那個廣告。
咱們要獲取用戶點擊廣告的行爲,並針對這一行爲進行計算和統計。
用戶每次點擊一個廣告之後,會產生相應的埋點日誌;在大數據實時統計系統中,會經過某些方式將數據寫入到分佈式消息隊列中(Kafka)。
日誌發送給後臺 web 服務器(nginx),nginx 將日誌數據負載均衡到多個 Tomcat 服務器上,Tomcat 服務器會不斷將日誌數據寫入 Tomcat 日誌文件中,寫入後,就會被日誌採集客戶端(好比 Flume Agent)所採集,隨後寫入到消息隊列中(Kafka),咱們的實時計算程序會從消息隊列中( Kafka)去實時地拉取數據,而後對數據進行實時的計算和統計。
Kafka這個模塊的意義在於,讓產品經理、高管能夠實時地掌握到公司打的各類廣告的投放效果。以便於後期持續地對公司的廣告投放相關的戰略和策略,進行調整和優化;以指望得到最好的廣告收益。
該模塊負責實時統計公司的廣告流量,包括廣告展示流量和廣告點擊流量。實現動態黑名單機制,以及黑名單過濾
;實現滑動窗口內的各城市的廣告展示流量和廣告點擊流量的統計
;實現每一個區域每一個廣告的點擊流量實時統計
;實現每一個區域 top3 點擊量的廣告的統計
。主要使用 Spark Streaming 實現。數據庫
用戶行爲數據在網站上最簡單的存在形式就是日誌。網站在運行過程當中會產生大量的原始日誌 RAW LOG,將其存儲在文件系統中,企業會將多種原始日誌按照用戶行爲彙總成會話日誌 SESSION LOG,每個會話日誌表示用戶的一種反饋。
本項目分爲離線分析系統
與實時分析系統
兩大模塊。
在離線分析系統中,咱們將模擬業務數據寫入 Hive 表中,離線分析系統從 Hive 中獲取數據,並根據實際需求(用戶訪問 Session 分析、頁面單跳轉化率分析、各區域熱門商品統計) 對數據進行處理,最終將分析完畢的統計數據存儲到 MySQL 的對應表格中。
在實時分析系統中,咱們將模擬業務數據寫入 Kafka 集羣中, 實時分析系統從 Kafka broker 中獲取數據,經過 Spark Streaming 的流式處理對廣告點擊流量進行實時分析,最終將統計結果存儲到 MySQL 的對應表格中。express
上圖是一個企業級的日誌處理框架,這一框架實現了對日誌信息進行採集、彙總、清洗、聚合、分析的完整過程,並將日誌數據分別存儲到了離線和實時數據處理模塊中,使得分析系統能夠經過離線和實時兩個角度對數據進行分析統計,並根據統計結果指導業務平臺的改良和優化。apache
舉例
一、user_visit_action
user_visit_action 表,存放網站或者 APP 天天的點擊流數據。通俗地講,就是用戶對 網站/APP 每點擊一下,就會產生一條存放在這個表裏面的數據。
字段名稱 說明
date 日期,表明這個用戶點擊行爲是在哪一天發生的
user_id 用戶 ID,惟一地標識某個用戶
session_id Session ID,惟一地標識某個用戶的一個訪問 session
page_id 頁面 ID,點擊了某些商品/品類,也多是搜索了某個關鍵詞,而後進入了某個頁面,頁面的 id
action_time 動做時間,這個點擊行爲發生的時間點
search_keyword 搜索關鍵詞,若是用戶執行的是一個搜索行爲,好比說在 網站/app 中,搜索了某個關鍵詞,而後會跳轉到商品列表頁面
click_category_id 點擊品類 ID,多是在網站首頁,點擊了某個品類(美食、電子設備、電腦)
click_product_id 點擊商品 ID,多是在網站首頁,或者是在商品列表頁,點擊了某個商品(好比呷哺呷哺火鍋 XX 路店 3 人套餐、iphone 6s)
order_category_ids 下單品類 ID,表明了可能將某些商品加入了購物車,而後一次性對購物車中的商品下了一個訂單,這就表明了某次下單的行爲中,有哪些商品品類,可能有 6 個商品,可是就對應了 2 個品類,好比有 3 根火腿腸(食品品類),3 個電池(日用品品類)
order_product_ids 下單商品 ID,某次下單,具體對哪些商品下的訂單
pay_category_ids 付款品類 ID,對某個訂單,或者某幾個訂單,進行了一次支付的行爲,對應了哪些品類
pay_product_ids 付款商品 ID,支付行爲下,對應的哪些具體的商品
city_id 城市 ID,表明該用戶行爲發生在哪一個城市
二、user_info
user_info 表,是一張普通的用戶基本信息表;這張表中存放了 網站/APP 全部註冊用戶的基本信息。
字段名稱 說明
user_id 用戶 ID,惟一地標識某個用戶
username 用戶登陸名
name 用戶暱稱或真實姓名
age 用戶年齡
professional 用戶職業
city 用戶所在城市
sex 用戶性別
三、product_info
product_info 表,是一張普通的商品基本信息表;這張表中存放了 網站/APP 全部商品的基本信息。
字段名稱 說明
proudct_id 商品 ID,惟一地標識某個商品
product_name 商品名稱
extend_info 額外信息,例如商品爲自營商品仍是第三方商品
程序每 5 秒向 Kafka 集羣寫入數據,格式以下:
格式 :timestamp province city userid adid
在線數據的字段解析以下所示:
字段名稱 取值範圍
timestamp 當前時間毫秒
userId 0 – 99
provice/city 1 – 9((0L," 北京"," 北京"),(1L," 上海"," 上海"),(2L," 南京"," 江蘇省"),(3L,"廣州","廣東省"),(4L,"三亞","海南省"),(5L,"武漢","湖北省"),(6L,"長沙","湖南省"),(7L,"西安","陝西省"),(8L,"成都","四川省"),(9L,"哈爾濱","東北省"))
adid 0 - 19
新建一個 maven 工程 commerce_basic 做爲父 maven 工程,引入依賴 pom.xml
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu</groupId>
<artifactId>commerce</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>commons</module>
<module>mock</module>
</modules>
<!-- 聲明子項目公用的配置屬性 -->
<properties>
<spark.version>2.1.1</spark.version>
<scala.version>2.11.8</scala.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.22</slf4j.version>
</properties>
<!-- 聲明並引入子項目共有的依賴 -->
<dependencies>
<!-- 全部子項目的日誌框架 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- 具體的日誌實現 -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<!-- Logging End -->
</dependencies>
<dependencyManagement>
<dependencies>
<!-- 引入 Spark 相關的 Jar 包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<!-- provider 若是存在,那麼運行時該 Jar 包不存在,也不會打包到最終的發佈版本中,只是編譯器有效 -->
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
</dependencyManagement>
<!-- 聲明構建信息 -->
<build>
<!-- 聲明並引入子項目共有的插件:插件就是負責到 Maven 各個聲明週期的具體實現 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<!-- 全部的編譯都依照 JDK1.8 來搞 -->
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<!-- 僅聲明子項目共有的插件,若是子項目須要此插件,那麼子項目須要聲明 -->
<pluginManagement>
<plugins>
<!-- 該插件用於將 Scala 代碼編譯成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!-- 聲明綁定到 maven 的 compile 階段 -->
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 用於項目的打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
新建一個模塊 maven 工程 mock 做爲子 maven 工程,引入依賴 pom.xml
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>commerce</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>mock</artifactId>
<dependencies>
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
類(包)名稱
類(包)結構圖
MockDataGenerate.scala
import java.util.UUID
import commons.model.{ProductInfo, UserInfo, UserVisitAction}
import commons.utils.{DateUtils, StringUtils}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
/**
* 離線模擬數據的生成
*
* date:是當前日期
* age: 0 - 59
* professionals: professional[0 - 99]
* cities: 0 - 99
* sex: 0 - 1
* keywords: ("火鍋", "蛋糕", "重慶辣子雞", "重慶小面", "呷哺呷哺", "新辣道魚火鍋", "國貿大廈", "太古商場", "日本料理", "溫泉")
* categoryIds: 0 - 99
* ProductId: 0 - 99
*/
object MockDataGenerate {
/**
* 模擬用戶的行爲信息
*
* @return
*/
private def mockUserVisitActionData(): Array[UserVisitAction] = {
val rows = ArrayBuffer[UserVisitAction]()
val random = new Random()
val searchKeywords = Array("華爲手機", "聯想筆記本", "小龍蝦", "衛生紙", "吸塵器", "Lamer", "機器學習", "蘋果", "洗面奶", "保溫杯")
// yyyy-MM-dd
val date = DateUtils.getTodayDate()
// 關注四個行爲:搜索、點擊、下單、支付
val actions = Array("search", "click", "order", "pay")
// 一共 100 個用戶(有重複)
for (i <- 0 until 100) {
val userid = random.nextInt(100)
// 每一個用戶產生 10 個 session
for (j <- 0 until 10) {
// 不可變的,全局的,獨一無二的 128bit 長度的標識符,用於標識一個 session,體現一次會話產生的 sessionId 是獨一無二的
val sessionid = UUID.randomUUID().toString().replace("-", "")
// 在 yyyy-MM-dd 後面添加一個隨機的小時時間(0-23)
val baseActionTime = date + " " + random.nextInt(23) // 2019-05-30 12
// 每一個 (userid + sessionid) 生成 0-100 條用戶訪問數據
for (k <- 0 to random.nextInt(100)) {
val pageid = random.nextInt(10)
// 在 yyyy-MM-dd HH 後面添加一個隨機的分鐘時間和秒時間,2019-05-30 12:25:30
val actionTime = baseActionTime + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))) + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59)))
var searchKeyword: String = null
var clickCategoryId: Long = -1L
var clickProductId: Long = -1L
var orderCategoryIds: String = null
var orderProductIds: String = null
var payCategoryIds: String = null
var payProductIds: String = null
val cityid = random.nextInt(10).toLong
// 隨機肯定用戶在當前 session 中的行爲
val action = actions(random.nextInt(4))
// 根據隨機產生的用戶行爲 action 決定對應字段的值
action match {
case "search" => searchKeyword = searchKeywords(random.nextInt(10))
case "click" => clickCategoryId = random.nextInt(100).toLong
clickProductId = String.valueOf(random.nextInt(100)).toLong
case "order" => orderCategoryIds = random.nextInt(100).toString
orderProductIds = random.nextInt(100).toString
case "pay" => payCategoryIds = random.nextInt(100).toString
payProductIds = random.nextInt(100).toString
}
rows += UserVisitAction(date, userid, sessionid,
pageid, actionTime, searchKeyword,
clickCategoryId, clickProductId,
orderCategoryIds, orderProductIds,
payCategoryIds, payProductIds, cityid)
}
}
}
rows.toArray
}
/**
* 模擬用戶信息表
*
* @return
*/
private def mockUserInfo(): Array[UserInfo] = {
val rows = ArrayBuffer[UserInfo]()
val sexes = Array("male", "female")
val random = new Random()
// 隨機產生 100 個用戶的我的信息
for (i <- 0 until 100) {
val userid = i
val username = "user" + i
val name = "name" + i
val age = random.nextInt(60)
val professional = "professional" + random.nextInt(100)
val city = "city" + random.nextInt(100)
val sex = sexes(random.nextInt(2))
rows += UserInfo(userid, username, name, age, professional, city, sex)
}
rows.toArray
}
/**
* 模擬產品數據表
*
* @return
*/
private def mockProductInfo(): Array[ProductInfo] = {
val rows = ArrayBuffer[ProductInfo]()
val random = new Random()
val productStatus = Array(0, 1)
// 隨機產生 100 個產品信息
for (i <- 0 until 100) {
val productId = i
val productName = "product" + i
val extendInfo = "{\"product_status\": " + productStatus(random.nextInt(2)) + "}" // 注意這裏是 json 串
rows += ProductInfo(productId, productName, extendInfo)
}
rows.toArray
}
/**
* 將 DataFrame 插入到 Hive 表中
*
* @param spark SparkSQL 客戶端
* @param tableName 表名
* @param dataDF DataFrame
*/
private def insertHive(spark: SparkSession, tableName: String, dataDF: DataFrame): Unit = {
spark.sql("DROP TABLE IF EXISTS " + tableName)
dataDF.write.saveAsTable(tableName)
}
val USER_VISIT_ACTION_TABLE = "user_visit_action"
val USER_INFO_TABLE = "user_info"
val PRODUCT_INFO_TABLE = "product_info"
/**
* 主入口方法
*
* @param args 啓動參數
*/
def main(args: Array[String]): Unit = {
// 建立 Spark 配置
val sparkConf = new SparkConf().setAppName("MockData").setMaster("local[*]")
// 建立 Spark SQL 客戶端
val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
// 模擬數據
val userVisitActionData = this.mockUserVisitActionData()
val userInfoData = this.mockUserInfo()
val productInfoData = this.mockProductInfo()
// 將模擬數據轉換爲 RDD
val userVisitActionRdd = spark.sparkContext.makeRDD(userVisitActionData)
val userInfoRdd = spark.sparkContext.makeRDD(userInfoData)
val productInfoRdd = spark.sparkContext.makeRDD(productInfoData)
// 加載 SparkSQL 的隱式轉換支持
import spark.implicits._
// 將用戶訪問數據轉換爲 DF 保存到 Hive 表中
val userVisitActionDF = userVisitActionRdd.toDF()
insertHive(spark, USER_VISIT_ACTION_TABLE, userVisitActionDF)
// 將用戶信息數據轉換爲 DF 保存到 Hive 表中
val userInfoDF = userInfoRdd.toDF()
insertHive(spark, USER_INFO_TABLE, userInfoDF)
// 將產品信息數據轉換爲 DF 保存到 Hive 表中
val productInfoDF = productInfoRdd.toDF()
insertHive(spark, PRODUCT_INFO_TABLE, productInfoDF)
spark.close
}
}
MockRealTimeData.scala
import java.util.Properties
import commons.conf.ConfigurationManager
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
object MockRealTimeData {
def main(args: Array[String]): Unit = {
// 獲取配置文件 commerce.properties 中的 Kafka 配置參數
val broker = ConfigurationManager.config.getString("kafka.broker.list")
val topic = ConfigurationManager.config.getString("kafka.topics")
// 建立 Kafka 生產者
val kafkaProducer = createKafkaProducer(broker)
while (true) {
// 隨機產生實時數據並經過 Kafka 生產者發送到 Kafka 集羣中
for (item <- generateMockData()) {
kafkaProducer.send(new ProducerRecord[String, String](topic, item))
}
Thread.sleep(5000)
}
}
/**
* 實時模擬數據的生成
*
* 時間點: 當前時間毫秒
* userId: 0 - 99
* 省份、城市 ID 相同: 1 - 9
* adid: 0 - 19
* ((0L,"北京","北京"),(1L,"上海","上海"),(2L,"南京","江蘇省"),(3L,"廣州","廣東省"),(4L,"三亞","海南省"),(5L,"武漢","湖北省"),(6L,"長沙","湖南省"),(7L,"西安","陝西省"),(8L,"成都","四川省"),(9L,"哈爾濱","東北省"))
*
* 格式 :timestamp province city userid adid
* 某個時間點 某個省份 某個城市 某個用戶 某個廣告
*/
def generateMockData(): Array[String] = {
val array = ArrayBuffer[String]()
val random = new Random()
// 模擬實時數據:timestamp province city userid adid
for (i <- 0 until 50) {
val timestamp = System.currentTimeMillis()
val province = random.nextInt(10)
val city = province
val adid = random.nextInt(20)
val userid = random.nextInt(100)
// 拼接實時數據
array += timestamp + " " + province + " " + city + " " + userid + " " + adid
}
array.toArray
}
def createKafkaProducer(broker: String): KafkaProducer[String, String] = {
// 建立配置對象
val prop = new Properties()
// 添加配置
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// 根據配置建立 Kafka 生產者
new KafkaProducer[String, String](prop)
}
}
新建一個模塊 maven 工程 commons 做爲子 maven 工程,引入依賴 pom.xml
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>commerce</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>commons</artifactId>
<dependencies>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
類(包)名稱
工具類名稱
類(包)結構圖
ConfigurationManager.scala
package commons.conf
import org.apache.commons.configuration2.{FileBasedConfiguration, PropertiesConfiguration}
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder
import org.apache.commons.configuration2.builder.fluent.Parameters
/**
* 配置工具類:新的讀取配置文件信息的方式
*/
object ConfigurationManager {
// 建立用於初始化配置生成器實例的參數對象
private val params = new Parameters()
// FileBasedConfigurationBuilder : 產生一個傳入的類的實例對象
// FileBasedConfiguration : 融合 FileBased 與 Configuration 的接口
// PropertiesConfiguration : 從一個或者多個文件讀取配置的標準配置加載器
// configure() : 經過 params 實例初始化配置生成器
// 向 FileBasedConfigurationBuilder() 中傳入一個標準配置加載器類,生成一個加載器類的實例對象,而後經過 params 參數對其初始化
private val builder = new FileBasedConfigurationBuilder[FileBasedConfiguration](classOf[PropertiesConfiguration])
.configure(params.properties().setFileName("commerce.properties"))
// 經過 getConfiguration 獲取配置對象
val config = builder.getConfiguration()
}
Constants.scala
package commons.constant
/**
* 常量接口
*/
object Constants {
/**
* 項目配置相關的常量
*/
val JDBC_DATASOURCE_SIZE = "jdbc.datasource.size"
val JDBC_URL = "jdbc.url"
val JDBC_USER = "jdbc.user"
val JDBC_PASSWORD = "jdbc.password"
val KAFKA_BROKERS = "kafka.broker.list"
val KAFKA_TOPICS = "kafka.topics"
/**
* Spark 做業相關的常量
*/
val SPARK_APP_NAME_SESSION = "UserVisitSessionAnalyzeSpark"
val SPARK_APP_NAME_PAGE = "PageOneStepConvertRateSpark"
/**
* user_visit_action、user_info、product_info 表中字段對應的字段名常量
*/
val FIELD_SESSION_ID = "sessionid"
val FIELD_SEARCH_KEYWORDS = "searchKeywords"
val FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds"
val FIELD_AGE = "age"
val FIELD_PROFESSIONAL = "professional"
val FIELD_CITY = "city"
val FIELD_SEX = "sex"
val FIELD_VISIT_LENGTH = "visitLength"
val FIELD_STEP_LENGTH = "stepLength"
val FIELD_START_TIME = "startTime"
val FIELD_CLICK_COUNT = "clickCount"
val FIELD_ORDER_COUNT = "orderCount"
val FIELD_PAY_COUNT = "payCount"
val FIELD_CATEGORY_ID = "categoryId"
/**
* Spark 累加器 Key 名稱常量
*/
val SESSION_COUNT = "session_count"
val TIME_PERIOD_1s_3s = "1s_3s"
val TIME_PERIOD_4s_6s = "4s_6s"
val TIME_PERIOD_7s_9s = "7s_9s"
val TIME_PERIOD_10s_30s = "10s_30s"
val TIME_PERIOD_30s_60s = "30s_60s"
val TIME_PERIOD_1m_3m = "1m_3m"
val TIME_PERIOD_3m_10m = "3m_10m"
val TIME_PERIOD_10m_30m = "10m_30m"
val TIME_PERIOD_30m = "30m"
val STEP_PERIOD_1_3 = "1_3"
val STEP_PERIOD_4_6 = "4_6"
val STEP_PERIOD_7_9 = "7_9"
val STEP_PERIOD_10_30 = "10_30"
val STEP_PERIOD_30_60 = "30_60"
val STEP_PERIOD_60 = "60"
/**
* task.params.json 中限制條件對應的常量字段
*/
val TASK_PARAMS = "task.params.json"
val PARAM_START_DATE = "startDate"
val PARAM_END_DATE = "endDate"
val PARAM_START_AGE = "startAge"
val PARAM_END_AGE = "endAge"
val PARAM_PROFESSIONALS = "professionals"
val PARAM_CITIES = "cities"
val PARAM_SEX = "sex"
val PARAM_KEYWORDS = "keywords"
val PARAM_CATEGORY_IDS = "categoryIds"
val PARAM_TARGET_PAGE_FLOW = "targetPageFlow"
}
DataModel.scala
package commons.model
//***************** 輸入表 *********************
/**
* 用戶訪問動做表
*
* @param date 用戶點擊行爲的日期
* @param user_id 用戶的 ID
* @param session_id Session 的 ID
* @param page_id 某個頁面的 ID
* @param action_time 點擊行爲的時間點
* @param search_keyword 用戶搜索的關鍵詞
* @param click_category_id 某一個商品品類的 ID
* @param click_product_id 某一個商品的 ID
* @param order_category_ids 一次訂單中全部品類的 ID 集合
* @param order_product_ids 一次訂單中全部商品的 ID 集合
* @param pay_category_ids 一次支付中全部品類的 ID 集合
* @param pay_product_ids 一次支付中全部商品的 ID 集合
* @param city_id 城市 ID
*/
case class UserVisitAction(date: String,
user_id: Long,
session_id: String,
page_id: Long,
action_time: String,
search_keyword: String,
click_category_id: Long,
click_product_id: Long,
order_category_ids: String,
order_product_ids: String,
pay_category_ids: String,
pay_product_ids: String,
city_id: Long)
/**
* 用戶信息表
*
* @param user_id 用戶的 ID
* @param username 用戶的名稱
* @param name 用戶的名字
* @param age 用戶的年齡
* @param professional 用戶的職業
* @param city 用戶所在的城市
* @param sex 用戶的性別
*/
case class UserInfo(user_id: Long,
username: String,
name: String,
age: Int,
professional: String,
city: String,
sex: String)
/**
* 產品表
*
* @param product_id 商品的 ID
* @param product_name 商品的名稱
* @param extend_info 商品額外的信息
*/
case class ProductInfo(product_id: Long,
product_name: String,
extend_info: String)
PooledMySqlClientFactory.scala
package commons.pool
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import commons.conf.ConfigurationManager
import commons.constant.Constants
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool, GenericObjectPoolConfig}
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
// 建立用於處理 MySQL 查詢結果的類的抽象接口
trait QueryCallback {
def process(rs: ResultSet)
}
/**
* MySQL 客戶端代理對象
*
* @param jdbcUrl MySQL URL
* @param jdbcUser MySQL 用戶
* @param jdbcPassword MySQL 密碼
* @param client 默認客戶端實現
*/
case class MySqlProxy(jdbcUrl: String, jdbcUser: String, jdbcPassword: String, client: Option[Connection] = None) {
// 獲取客戶端鏈接對象
private val mysqlClient = client getOrElse {
DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword)
}
/**
* 執行增刪改 SQL 語句
*
* @param sql
* @param params
* @return 影響的行數
*/
def executeUpdate(sql: String, params: Array[Any]): Int = {
var rtn = 0
var pstmt: PreparedStatement = null
try {
// 第一步:關閉自動提交
mysqlClient.setAutoCommit(false)
// 第二步:根據傳入的 sql 語句建立 prepareStatement
pstmt = mysqlClient.prepareStatement(sql)
// 第三步:爲 prepareStatement 中的每一個參數填寫數值
if (params != null && params.length > 0) {
for (i <- 0 until params.length) {
pstmt.setObject(i + 1, params(i))
}
}
// 第四步:執行增刪改操做
rtn = pstmt.executeUpdate()
// 第五步:手動提交
mysqlClient.commit()
} catch {
case e: Exception => e.printStackTrace
}
rtn
}
/**
* 執行查詢 SQL 語句
*
* @param sql
* @param params
*/
def executeQuery(sql: String, params: Array[Any], queryCallback: QueryCallback) {
var pstmt: PreparedStatement = null
var rs: ResultSet = null
try {
// 第一步:根據傳入的 sql 語句建立 prepareStatement
pstmt = mysqlClient.prepareStatement(sql)
// 第二步:爲 prepareStatement 中的每一個參數填寫數值
if (params != null && params.length > 0) {
for (i <- 0 until params.length) {
pstmt.setObject(i + 1, params(i))
}
}
// 第三步:執行查詢操做
rs = pstmt.executeQuery()
// 第四步:處理查詢後的結果
queryCallback.process(rs)
} catch {
case e: Exception => e.printStackTrace
}
}
/**
* 批量執行 SQL 語句
*
* @param sql
* @param paramsList
* @return 每條SQL語句影響的行數
*/
def executeBatch(sql: String, paramsList: Array[Array[Any]]): Array[Int] = {
var rtn: Array[Int] = null
var pstmt: PreparedStatement = null
try {
// 第一步:關閉自動提交
mysqlClient.setAutoCommit(false)
pstmt = mysqlClient.prepareStatement(sql)
// 第二步:爲 prepareStatement 中的每一個參數填寫數值
if (paramsList != null && paramsList.length > 0) {
for (params <- paramsList) {
for (i <- 0 until params.length) {
pstmt.setObject(i + 1, params(i))
}
pstmt.addBatch()
}
}
// 第三步:執行批量的 SQL 語句
rtn = pstmt.executeBatch()
// 第四步:手動提交
mysqlClient.commit()
} catch {
case e: Exception => e.printStackTrace
}
rtn
}
// 關閉 MySQL 客戶端
def shutdown(): Unit = mysqlClient.close()
}
/**
* 擴展知識:將 MySqlProxy 實例視爲對象,MySqlProxy 實例的建立使用對象池進行維護
*
* 建立自定義工廠類,繼承 BasePooledObjectFactory 工廠類,負責對象的建立、包裝和銷燬
*
* @param jdbcUrl
* @param jdbcUser
* @param jdbcPassword
* @param client
*/
class PooledMySqlClientFactory(jdbcUrl: String, jdbcUser: String, jdbcPassword: String, client: Option[Connection] = None)
extends BasePooledObjectFactory[MySqlProxy] with Serializable {
// 用於池來建立對象
override def create(): MySqlProxy = MySqlProxy(jdbcUrl, jdbcUser, jdbcPassword, client)
// 用於池來包裝對象
override def wrap(obj: MySqlProxy): PooledObject[MySqlProxy] = new DefaultPooledObject(obj)
// 用於池來銷燬對象
override def destroyObject(p: PooledObject[MySqlProxy]): Unit = {
p.getObject.shutdown()
super.destroyObject(p)
}
}
/**
* 建立 MySQL 池工具類
*/
object CreateMySqlPool {
// 加載 JDBC 驅動,只須要一次
Class.forName("com.mysql.jdbc.Driver")
// 在 org.apache.commons.pool2.impl 中預設了三個能夠直接使用的對象池:GenericObjectPool、GenericKeyedObjectPool 和 SoftReferenceObjectPool
// 建立 genericObjectPool 爲 GenericObjectPool
// GenericObjectPool 的特色是能夠設置對象池中的對象特徵,包括 LIFO 方式、最大空閒數、最小空閒數、是否有效性檢查等等
private var genericObjectPool: GenericObjectPool[MySqlProxy] = null
// 伴生對象經過 apply 完成對象的建立
def apply(): GenericObjectPool[MySqlProxy] = {
// 單例模式
if (this.genericObjectPool == null) {
this.synchronized {
// 獲取 MySQL 配置參數
val jdbcUrl = ConfigurationManager.config.getString(Constants.JDBC_URL)
val jdbcUser = ConfigurationManager.config.getString(Constants.JDBC_USER)
val jdbcPassword = ConfigurationManager.config.getString(Constants.JDBC_PASSWORD)
val size = ConfigurationManager.config.getInt(Constants.JDBC_DATASOURCE_SIZE)
val pooledFactory = new PooledMySqlClientFactory(jdbcUrl, jdbcUser, jdbcPassword)
val poolConfig = {
// 建立標準對象池配置類的實例
val c = new GenericObjectPoolConfig
// 設置配置對象參數
// 設置最大對象數
c.setMaxTotal(size)
// 設置最大空閒對象數
c.setMaxIdle(size)
c
}
// 對象池的建立須要工廠類和配置類
// 返回一個 GenericObjectPool 對象池
this.genericObjectPool = new GenericObjectPool[MySqlProxy](pooledFactory, poolConfig)
}
}
genericObjectPool
}
}
Utils.scala
package commons.utils
import java.util.Date
import net.sf.json.JSONObject
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import scala.collection.mutable
/**
* 日期時間工具類
* 使用 joda 實現,若是使用 Java 提供的 Date 會存在線程安全問題
*/
object DateUtils {
val DATE_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd")
val TIME_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
val DATE_KEY_FORMAT = DateTimeFormat.forPattern("yyyyMMdd")
val DATE_TIME_FORMAT = DateTimeFormat.forPattern("yyyyMMddHHmm")
/**
* 判斷一個時間是否在另外一個時間以前
*
* @param time1 第一個時間
* @param time2 第二個時間
* @return 判斷結果
*/
def before(time1: String, time2: String): Boolean = {
if (TIME_FORMAT.parseDateTime(time1).isBefore(TIME_FORMAT.parseDateTime(time2))) {
return true
}
false
}
/**
* 判斷一個時間是否在另外一個時間以後
*
* @param time1 第一個時間
* @param time2 第二個時間
* @return 判斷結果
*/
def after(time1: String, time2: String): Boolean = {
if (TIME_FORMAT.parseDateTime(time1).isAfter(TIME_FORMAT.parseDateTime(time2))) {
return true
}
false
}
/**
* 計算時間差值(單位爲秒)
*
* @param time1 時間1
* @param time2 時間2
* @return 差值
*/
def minus(time1: String, time2: String): Int = {
return (TIME_FORMAT.parseDateTime(time1).getMillis - TIME_FORMAT.parseDateTime(time2).getMillis) / 1000 toInt
}
/**
* 獲取年月日和小時
*
* @param datetime 時間(yyyy-MM-dd HH:mm:ss)
* @return 結果(yyyy-MM-dd_HH)
*/
def getDateHour(datetime: String): String = {
val date = datetime.split(" ")(0)
val hourMinuteSecond = datetime.split(" ")(1)
val hour = hourMinuteSecond.split(":")(0)
date + "_" + hour
}
/**
* 獲取當天日期(yyyy-MM-dd)
*
* @return 當天日期
*/
def getTodayDate(): String = {
DateTime.now().toString(DATE_FORMAT)
}
/**
* 獲取昨天的日期(yyyy-MM-dd)
*
* @return 昨天的日期
*/
def getYesterdayDate(): String = {
DateTime.now().minusDays(1).toString(DATE_FORMAT)
}
/**
* 格式化日期(yyyy-MM-dd)
*
* @param date Date對象
* @return 格式化後的日期
*/
def formatDate(date: Date): String = {
new DateTime(date).toString(DATE_FORMAT)
}
/**
* 格式化時間(yyyy-MM-dd HH:mm:ss)
*
* @param date Date對象
* @return 格式化後的時間
*/
def formatTime(date: Date): String = {
new DateTime(date).toString(TIME_FORMAT)
}
/**
* 解析時間字符串
*
* @param time 時間字符串
* @return Date
*/
def parseTime(time: String): Date = {
TIME_FORMAT.parseDateTime(time).toDate
}
def main(args: Array[String]): Unit = {
print(DateUtils.parseTime("2017-10-31 20:27:53")) // Tue Oct 31 20:27:53 CST 2017
}
/**
* 格式化日期 key
* yyyyMMdd
*
* @param date
* @return
*/
def formatDateKey(date: Date): String = {
new DateTime(date).toString(DATE_KEY_FORMAT)
}
/**
* 解析日期 key
*
* @return
*/
def parseDateKey(datekey: String): Date = {
DATE_KEY_FORMAT.parseDateTime(datekey).toDate
}
/**
* 格式化時間,保留到分鐘級別
* yyyyMMddHHmm
*
* @param date
* @return
*/
def formatTimeMinute(date: Date): String = {
new DateTime(date).toString(DATE_TIME_FORMAT)
}
}
/**
* 數字格式化工具類
*/
object NumberUtils {
/**
* 格式化小數
*
* @param scale 四捨五入的位數
* @return 格式化小數
*/
def formatDouble(num: Double, scale: Int): Double = {
val bd = BigDecimal(num)
bd.setScale(scale, BigDecimal.RoundingMode.HALF_UP).doubleValue()
}
}
/**
* 參數工具類
*/
object ParamUtils {
/**
* 從 JSON 對象中提取參數
*
* @param jsonObject JSON對象
* @return 參數
*/
def getParam(jsonObject: JSONObject, field: String): String = {
jsonObject.getString(field)
}
}
/**
* 字符串工具類
*
*/
object StringUtils {
/**
* 判斷字符串是否爲空
*
* @param str 字符串
* @return 是否爲空
*/
def isEmpty(str: String): Boolean = {
str == null || "".equals(str)
}
/**
* 判斷字符串是否不爲空
*
* @param str 字符串
* @return 是否不爲空
*/
def isNotEmpty(str: String): Boolean = {
str != null && !"".equals(str)
}
/**
* 截斷字符串兩側的逗號
*
* @param str 字符串
* @return 字符串
*/
def trimComma(str: String): String = {
var result = ""
if (str.startsWith(",")) {
result = str.substring(1)
}
if (str.endsWith(",")) {
result = str.substring(0, str.length() - 1)
}
result
}
/**
* 補全兩位數字
*
* @param str
* @return
*/
def fulfuill(str: String): String = {
if (str.length() == 2) {
str
} else {
"0" + str
}
}
/**
* 從拼接的字符串中提取字段
*
* @param str 字符串
* @param delimiter 分隔符
* @param field 字段
* @return 字段值
*/
def getFieldFromConcatString(str: String, delimiter: String, field: String): String = {
try {
val fields = str.split(delimiter);
for (concatField <- fields) {
if (concatField.split("=").length == 2) {
val fieldName = concatField.split("=")(0)
val fieldValue = concatField.split("=")(1)
if (fieldName.equals(field)) {
return fieldValue
}
}
}
} catch {
case e: Exception => e.printStackTrace()
}
null
}
/**
* 從拼接的字符串中給字段設置值
*
* @param str 字符串
* @param delimiter 分隔符
* @param field 字段名
* @param newFieldValue 新的field值
* @return 字段值
*/
def setFieldInConcatString(str: String, delimiter: String, field: String, newFieldValue: String): String = {
val fieldsMap = new mutable.HashMap[String, String]()
for (fileds <- str.split(delimiter)) {
val arra = fileds.split("=")
if (arra(0).compareTo(field) == 0)
fieldsMap += (field -> newFieldValue)
else
fieldsMap += (arra(0) -> arra(1))
}
fieldsMap.map(item => item._1 + "=" + item._2).mkString(delimiter)
}
}
/**
* 校驗工具類
*/
object ValidUtils {
/**
* 校驗數據中的指定字段,是否在指定範圍內(範圍區間)
*
* @param data 數據
* @param dataField 數據字段
* @param parameter 參數
* @param startParamField 起始參數字段
* @param endParamField 結束參數字段
* @return 校驗結果
*/
def between(data: String, dataField: String, parameter: String, startParamField: String, endParamField: String): Boolean = {
val startParamFieldStr = StringUtils.getFieldFromConcatString(parameter, "\\|", startParamField)
val endParamFieldStr = StringUtils.getFieldFromConcatString(parameter, "\\|", endParamField)
if (startParamFieldStr == null || endParamFieldStr == null) {
return true
}
val startParamFieldValue = startParamFieldStr.toInt
val endParamFieldValue = endParamFieldStr.toInt
val dataFieldStr = StringUtils.getFieldFromConcatString(data, "\\|", dataField)
if (dataFieldStr != null) {
val dataFieldValue = dataFieldStr.toInt
if (dataFieldValue >= startParamFieldValue && dataFieldValue <= endParamFieldValue) {
return true
} else {
return false
}
}
false
}
/**
* 校驗數據中的指定字段,是否有值與參數字段的值相同(多選一)
*
* @param data 數據
* @param dataField 數據字段
* @param parameter 參數
* @param paramField 參數字段
* @return 校驗結果
*/
def in(data: String, dataField: String, parameter: String, paramField: String): Boolean = {
val paramFieldValue = StringUtils.getFieldFromConcatString(parameter, "\\|", paramField)
if (paramFieldValue == null) {
return true
}
val paramFieldValueSplited = paramFieldValue.split(",")
val dataFieldValue = StringUtils.getFieldFromConcatString(data, "\\|", dataField)
if (dataFieldValue != null && dataFieldValue != "-1") {
val dataFieldValueSplited = dataFieldValue.split(",")
for (singleDataFieldValue <- dataFieldValueSplited) {
for (singleParamFieldValue <- paramFieldValueSplited) {
if (singleDataFieldValue.compareTo(singleParamFieldValue) == 0) {
return true
}
}
}
}
false
}
/**
* 校驗數據中的指定字段,是否在指定範圍內(二選一)
*
* @param data 數據
* @param dataField 數據字段
* @param parameter 參數
* @param paramField 參數字段
* @return 校驗結果
*/
def equal(data: String, dataField: String, parameter: String, paramField: String): Boolean = {
val paramFieldValue = StringUtils.getFieldFromConcatString(parameter, "\\|", paramField)
if (paramFieldValue == null) {
return true
}
val dataFieldValue = StringUtils.getFieldFromConcatString(data, "\\|", dataField)
if (dataFieldValue != null) {
if (dataFieldValue.compareTo(paramFieldValue) == 0) {
return true
}
}
false
}
}
commerce.properties
# jbdc 配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://localhost:3306/commerce?useUnicode=true&characterEncoding=utf8
jdbc.user=root
jdbc.password=root
# 篩選條件的配置
# 可使用的屬性以下:
# startDate: 格式: yyyy-MM-DD [必選]
# endDate: 格式: yyyy-MM-DD [必選]
# startAge: 範圍: 0 - 59
# endAge: 範圍: 0 - 59
# professionals: 範圍:professionals[0 - 99]
# cities: 0 - 99 ((0,"北京","華北"),(1,"上海","華東"),(2,"南京","華東"),(3,"廣州","華南"),(4,"三亞","華南"),(5,"武漢","華中"),(6,"長沙","華中"),(7,"西安","西北"),(8,"成都","西南"),(9,"哈爾濱","東北"),...)
# sex: 範圍: 0 - 1
# keywords: 範圍: ("火鍋", "蛋糕", "重慶辣子雞", "重慶小面", "呷哺呷哺", "新辣道魚火鍋", "國貿大廈", "太古商場", "日本料理", "溫泉")
# categoryIds: 0 - 99,以逗號分隔
# targetPageFlow: 0 - 99, 以逗號分隔
task.params.json={\
startDate:"2019-06-01", \
endDate:"2019-06-30", \
startAge: 20, \
endAge: 50, \
professionals: "", \
cities: "", \
sex:"", \
keywords:"", \
categoryIds:"", \
targetPageFlow:"1,2,3,4,5,6,7"}
# Kafka 配置
kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka.topics=AdRealTimeLog1
log4j.properties
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
新建一個模塊 maven 工程 analyse 做爲子 maven 工程,刪除掉 src 目錄,引入依賴 pom.xml,添加對 scala 框架的支持。注意
:在該子模塊中有不少子模塊。即具體需求實現的模塊。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>commerce</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>analyse</artifactId>
</project>
analyse 模塊是需求的具體實現模塊, 咱們將會在第 5 章中進行詳細解析。
需求一:要統計出符合篩選條件的 session 中,訪問時長在 1s~3s、4s~6s、7s~9s、10s~30s、30s~60s、1m~3m、3m~10m、10m~30m、30m 以上各個範圍內的 session 佔比;訪問步長在 1~三、4~六、7~九、10~30、30~60、60 以上各個範圍內的 session 佔比,並將結果保存到 MySQL 數據庫中。
在計算以前須要根據查詢條件篩選 session,查詢條件好比搜索過某些關鍵詞的用戶、訪問時間在某個時間段內的用戶、年齡在某個範圍內的用戶、職業在某個範圍內的用戶、所在某個城市的用戶,發起的 session。找到對應的這些用戶的 session,並進行統計,之因此須要有篩選主要是可讓使用者,對感興趣的和關係的用戶羣體,進行後續各類複雜業務邏輯的統計和分析,那麼拿到的結果數據,就是隻是針對特殊用戶羣體的分析結果;而不是對全部用戶進行分析的泛泛的分析結果。好比說,如今某個企業高層,就是想看到用戶羣體中,28~35 歲的老師職業的羣體,對應的一些統計和分析的結果數據,從而輔助高管進行公司戰略上的決策制定。
session 訪問時長,也就是說一個 session 對應的開始的 action 到結束的 action 之間的時間範圍;還有,就是訪問步長,指的是,一個 session 執行期間內,依次點擊過多少個頁面,好比說,一次 session 維持了 1 分鐘,那麼訪問時長就是 1m,而後在這 1 分鐘內,點擊了 10 個頁面,那麼 session 的訪問步長,就是 10。
好比說,符合第一步篩選出來的 session 的數量大概是有 1000 萬個。那麼裏面,咱們要計算出,訪問時長在 1s~3s 內的 session 的數量,併除以符合條件的總 session 數量(好比 1000 萬),好比是 100 萬/1000 萬,那麼 1s~3s 內的 session 佔比就是 10%。依次類推,這裏說的統計,就是這個意思。
這個功能可讓人從全局的角度看到,符合某些條件的用戶羣體使用咱們的產品的一些習慣。好比大多數人,究竟是會在產品中停留多長時間,大多數人,會在一次使用產品的過程當中,訪問多少個頁面。那麼對於使用者來講, 有一個全局和清晰的認識。
一、UserVisitAction 樣例類
/**
* 用戶訪問動做表
*
* @param date 用戶點擊行爲的日期
* @param user_id 用戶的 ID
* @param session_id Session 的 ID
* @param page_id 某個頁面的 ID
* @param action_time 點擊行爲的時間點
* @param search_keyword 用戶搜索的關鍵詞
* @param click_category_id 某一個商品品類的 ID
* @param click_product_id 某一個商品的 ID
* @param order_category_ids 一次訂單中全部品類的 ID 集合
* @param order_product_ids 一次訂單中全部商品的 ID 集合
* @param pay_category_ids 一次支付中全部品類的 ID 集合
* @param pay_product_ids 一次支付中全部商品的 ID 集合
* @param city_id 城市 ID
*/
case class UserVisitAction(date: String,
user_id: Long,
session_id: String,
page_id: Long,
action_time: String,
search_keyword: String,
click_category_id: Long,
click_product_id: Long,
order_category_ids: String,
order_product_ids: String,
pay_category_ids: String,
pay_product_ids: String,
city_id: Long
)
二、UserInfo 樣例類
/**
* 用戶信息表
*
* @param user_id 用戶的 ID
* @param username 用戶的名稱
* @param name 用戶的名字
* @param age 用戶的年齡
* @param professional 用戶的職業
* @param city 用戶所在的城市
* @param sex 用戶的性別
*/
case class UserInfo(user_id: Long,
username: String,
name: String,
age: Int,
professional: String,
city: String,
sex: String
)
爲何聯立用戶表?
用戶表中記錄了用戶詳細的我的信息,包括年齡、職業、城市、性別等,在實際的業務場景中,咱們可能會在一段時間關注某一個羣體的用戶的行爲,好比在某一段時間關注北京的白領們的購物行爲
,那麼咱們就能夠經過聯立用戶表,讓咱們的統計數據中具備用戶屬性
,而後根據用戶屬性對統計信息進行過濾,將不屬於咱們所關注的用戶羣體的用戶所產生的行爲數據過濾掉,這樣就能夠實現對指定人羣的精準分析。
MySQL 寫入數據格式
session_aggr_stat
-- ----------------------------
-- Table structure for `session_aggr_stat`
-- ----------------------------
DROP TABLE IF EXISTS `session_aggr_stat`;
CREATE TABLE `session_aggr_stat` (
`taskid` varchar(255) DEFAULT NULL,
`session_count` int(11) DEFAULT NULL,
`visit_length_1s_3s_ratio` double DEFAULT NULL,
`visit_length_4s_6s_ratio` double DEFAULT NULL,
`visit_length_7s_9s_ratio` double DEFAULT NULL,
`visit_length_10s_30s_ratio` double DEFAULT NULL,
`visit_length_30s_60s_ratio` double DEFAULT NULL,
`visit_length_1m_3m_ratio` double DEFAULT NULL,
`visit_length_3m_10m_ratio` double DEFAULT NULL,
`visit_length_10m_30m_ratio` double DEFAULT NULL,
`visit_length_30m_ratio` double DEFAULT NULL,
`step_length_1_3_ratio` double DEFAULT NULL,
`step_length_4_6_ratio` double DEFAULT NULL,
`step_length_7_9_ratio` double DEFAULT NULL,
`step_length_10_30_ratio` double DEFAULT NULL,
`step_length_30_60_ratio` double DEFAULT NULL,
`step_length_60_ratio` double DEFAULT NULL,
KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
在模塊 analyse 新建一個模塊 session,引入 pom 文件,修改 src 目錄名稱爲 scala,同時添加 scala 框架的支持。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>analyse</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>session</artifactId>
<dependencies>
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- scala-maven-plugin 插件用於在任意的 maven 項目中對 scala 代碼進行編譯/測試/運行/文檔化 -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.session.UserVisitSessionAnalyze</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
代碼實現示例以下:
SessionStat.scala
package com.atguigu.session
import java.util.{Date, UUID}
import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.model.{UserInfo, UserVisitAction}
import commons.utils._
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.collection.mutable
object SessionStat {
def main(args: Array[String]): Unit = {
// 獲取過濾條件,【爲了方便,直接從配置文件中獲取,企業中會從一個調度平臺獲取】
val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
// 獲取過濾條件對應的 JsonObject 對象
val taskParam = JSONObject.fromObject(jsonStr)
// 建立全局惟一的主鍵,每次執行 main 函數都會生成一個獨一無二的 taskUUID,來區分不一樣任務,做爲寫入 MySQL 數據庫中那張表的主鍵
val taskUUID = UUID.randomUUID().toString
// 建立 sparkConf
val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")
// 建立 sparkSession(包含 sparkContext)
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
// 獲取原始的動做表數據(帶有過濾條件)
// actionRDD: RDD[UserVisitAction]
val actionRDD = getOriActionRDD(sparkSession, taskParam)
// 將用戶行爲信息轉換爲 K-V 結構,sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))
// session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一個 sessionId 的數據聚合到一塊兒,獲得斧子形數據
// 將數據進行內存緩存
session2GroupActionRDD.cache()
// sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)
// 建立自定義累加器對象
val sessionStatisticAccumulator = new SessionStatisticAccumulator
// 在 sparkSession 中註冊自定義累加器,這樣後面就能夠用了
sparkSession.sparkContext.register(sessionStatisticAccumulator)
// 根據過濾條件對 sessionId2FullAggrInfoRDD 進行過濾操做,即過濾掉不符合條件的數據,並根據自定義累加器 統計不一樣範圍的 訪問時長 和 訪問步長 的 session 個數 以及 總的 session 個數
val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)
// 必須引入任意一個 action 的算子,才能啓動
seeionId2FilterRDD.foreach(println(_))
// 計算各個 session 的佔比
getSessionRatio(sparkSession,taskUUID, sessionStatisticAccumulator.value)
}
def getSessionRatio(sparkSession: SparkSession, taskUUID: String, value: mutable.HashMap[String, Int]): Unit = {
val session_count = value.getOrElse(Constants.SESSION_COUNT, 1).toDouble
// 先獲取各個值
val visit_length_1s_3s = value.getOrElse(Constants.TIME_PERIOD_1s_3s, 0)
val visit_length_4s_6s = value.getOrElse(Constants.TIME_PERIOD_4s_6s, 0)
val visit_length_7s_9s = value.getOrElse(Constants.TIME_PERIOD_7s_9s, 0)
val visit_length_10s_30s = value.getOrElse(Constants.TIME_PERIOD_10s_30s, 0)
val visit_length_30s_60s = value.getOrElse(Constants.TIME_PERIOD_30s_60s, 0)
val visit_length_1m_3m = value.getOrElse(Constants.TIME_PERIOD_1m_3m, 0)
val visit_length_3m_10m = value.getOrElse(Constants.TIME_PERIOD_3m_10m, 0)
val visit_length_10m_30m = value.getOrElse(Constants.TIME_PERIOD_10m_30m, 0)
val visit_length_30m = value.getOrElse(Constants.TIME_PERIOD_30m, 0)
val step_length_1_3 = value.getOrElse(Constants.STEP_PERIOD_1_3, 0)
val step_length_4_6 = value.getOrElse(Constants.STEP_PERIOD_4_6, 0)
val step_length_7_9 = value.getOrElse(Constants.STEP_PERIOD_7_9, 0)
val step_length_10_30 = value.getOrElse(Constants.STEP_PERIOD_10_30, 0)
val step_length_30_60 = value.getOrElse(Constants.STEP_PERIOD_30_60, 0)
val step_length_60 = value.getOrElse(Constants.STEP_PERIOD_60, 0)
// 計算比例
val visit_length_1s_3s_ratio = NumberUtils.formatDouble(visit_length_1s_3s / session_count, 2)
val visit_length_4s_6s_ratio = NumberUtils.formatDouble(visit_length_4s_6s / session_count, 2)
val visit_length_7s_9s_ratio = NumberUtils.formatDouble(visit_length_7s_9s / session_count, 2)
val visit_length_10s_30s_ratio = NumberUtils.formatDouble(visit_length_10s_30s / session_count, 2)
val visit_length_30s_60s_ratio = NumberUtils.formatDouble(visit_length_30s_60s / session_count, 2)
val visit_length_1m_3m_ratio = NumberUtils.formatDouble(visit_length_1m_3m / session_count, 2)
val visit_length_3m_10m_ratio = NumberUtils.formatDouble(visit_length_3m_10m / session_count, 2)
val visit_length_10m_30m_ratio = NumberUtils.formatDouble(visit_length_10m_30m / session_count, 2)
val visit_length_30m_ratio = NumberUtils.formatDouble(visit_length_30m / session_count, 2)
val step_length_1_3_ratio = NumberUtils.formatDouble(step_length_1_3 / session_count, 2)
val step_length_4_6_ratio = NumberUtils.formatDouble(step_length_4_6 / session_count, 2)
val step_length_7_9_ratio = NumberUtils.formatDouble(step_length_7_9 / session_count, 2)
val step_length_10_30_ratio = NumberUtils.formatDouble(step_length_10_30 / session_count, 2)
val step_length_30_60_ratio = NumberUtils.formatDouble(step_length_30_60 / session_count, 2)
val step_length_60_ratio = NumberUtils.formatDouble(step_length_60 / session_count, 2)
// 封裝數據
val stat = SessionAggrStat(taskUUID, session_count.toInt,
visit_length_1s_3s_ratio, visit_length_4s_6s_ratio, visit_length_7s_9s_ratio,
visit_length_10s_30s_ratio, visit_length_30s_60s_ratio, visit_length_1m_3m_ratio,
visit_length_3m_10m_ratio, visit_length_10m_30m_ratio, visit_length_30m_ratio,
step_length_1_3_ratio, step_length_4_6_ratio, step_length_7_9_ratio,
step_length_10_30_ratio, step_length_30_60_ratio, step_length_60_ratio)
// 樣例類實例 -> 數組 -> RDD
val sessionRatioRDD = sparkSession.sparkContext.makeRDD(Array(stat))
// 寫入 MySQL 數據庫中
import sparkSession.implicits._
sessionRatioRDD.toDF().write
.format("jdbc")
.option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
.option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
.option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
.option("dbtable", "session_aggr_stat")
.mode(SaveMode.Append) // 表存在就追加,表不存在就新建
.save()
}
def calculateVisitLength(visitLength: Long, sessionStatisticAccumulator: SessionStatisticAccumulator) = {
if (visitLength >= 1 && visitLength <= 3) {
sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1s_3s)
} else if (visitLength >= 4 && visitLength <= 6) {
sessionStatisticAccumulator.add(Constants.TIME_PERIOD_4s_6s)
} else if (visitLength >= 7 && visitLength <= 9) {
sessionStatisticAccumulator.add(Constants.TIME_PERIOD_7s_9s)
} else if (visitLength >= 10 && visitLength <= 30) {
sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10s_30s)
} else if (visitLength > 30 && visitLength <= 60) {
sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30s_60s)
} else if (visitLength > 60 && visitLength <= 180) {
sessionStatisticAccumulator.add(Constants.TIME_PERIOD_1m_3m)
} else if (visitLength > 180 && visitLength <= 600) {
sessionStatisticAccumulator.add(Constants.TIME_PERIOD_3m_10m)
} else if (visitLength > 600 && visitLength <= 1800) {
sessionStatisticAccumulator.add(Constants.TIME_PERIOD_10m_30m)
} else if (visitLength > 1800) {
sessionStatisticAccumulator.add(Constants.TIME_PERIOD_30m)
}
}
def calculateStepLength(stepLength: Long, sessionStatisticAccumulator: SessionStatisticAccumulator) = {
if (stepLength >= 1 && stepLength <= 3) {
sessionStatisticAccumulator.add(Constants.STEP_PERIOD_1_3)
} else if (stepLength >= 4 && stepLength <= 6) {
sessionStatisticAccumulator.add(Constants.STEP_PERIOD_4_6)
} else if (stepLength >= 7 && stepLength <= 9) {
sessionStatisticAccumulator.add(Constants.STEP_PERIOD_7_9)
} else if (stepLength >= 10 && stepLength <= 30) {
sessionStatisticAccumulator.add(Constants.STEP_PERIOD_10_30)
} else if (stepLength > 30 && stepLength <= 60) {
sessionStatisticAccumulator.add(Constants.STEP_PERIOD_30_60)
} else if (stepLength > 60) {
sessionStatisticAccumulator.add(Constants.STEP_PERIOD_60)
}
}
def getSessionFilterRDD(taskParam: JSONObject,
sessionId2FullAggrInfoRDD: RDD[(String, String)],
sessionStatisticAccumulator: SessionStatisticAccumulator) = {
// 先獲取所用到的過濾條件:
val startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE)
val endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE)
val professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS)
val cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES)
val sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX)
val keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS)
val categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS)
// 拼裝過濾條件的字符串:
var filterInfo = (if (startAge != null) Constants.PARAM_START_AGE + "=" + startAge + "|" else "") +
(if (endAge != null) Constants.PARAM_END_AGE + "=" + endAge + "|" else "") +
(if (professionals != null) Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" else "") +
(if (cities != null) Constants.PARAM_CITIES + "=" + cities + "|" else "") +
(if (sex != null) Constants.PARAM_SEX + "=" + sex + "|" else "") +
(if (keywords != null) Constants.PARAM_KEYWORDS + "=" + keywords + "|" else "") +
(if (categoryIds != null) Constants.PARAM_CATEGORY_IDS + "=" + categoryIds else "")
// 去除過濾條件字符串末尾的 "|"
if (filterInfo.endsWith("\\|"))
filterInfo = filterInfo.substring(0, filterInfo.length - 1)
// 進行過濾操做(過濾自帶遍歷功能)
sessionId2FullAggrInfoRDD.filter {
case (sessionId, fullAggrInfo) =>
var success = true
// 若是 age 不在過濾條件範圍以內,則當前 sessionId 對應的 fullAggrInfo 數據被過濾掉
if (!ValidUtils.between(fullAggrInfo, Constants.FIELD_AGE, filterInfo, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) { // 範圍用 between
success = false
} else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_PROFESSIONAL, filterInfo, Constants.PARAM_PROFESSIONALS)) {
success = false
} else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_CITY, filterInfo, Constants.PARAM_CITIES)) {
success = false
} else if (!ValidUtils.equal(fullAggrInfo, Constants.FIELD_SEX, filterInfo, Constants.PARAM_SEX)) { // 二選一用 equal
success = false
} else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_SEARCH_KEYWORDS, filterInfo, Constants.PARAM_KEYWORDS)) { // 多選一用 in
success = false
} else if (!ValidUtils.in(fullAggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS, filterInfo, Constants.PARAM_CATEGORY_IDS)) {
success = false
}
// 自定義累加器,統計不一樣範圍的 訪問時長 和 訪問步長 的個數 以及 總的 session 個數
if (success) {
sessionStatisticAccumulator.add(Constants.SESSION_COUNT) // 總的 session 個數
// 獲取當前 sessionId 對應的 訪問時長 和 訪問步長
val visitLength = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH).toLong
val stepLength = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_STEP_LENGTH).toLong
// 統計不一樣範圍的 訪問時長 和 訪問步長 的個數
calculateVisitLength(visitLength, sessionStatisticAccumulator)
calculateStepLength(stepLength, sessionStatisticAccumulator)
}
success
}
}
def getSessionFullAggrInfo(sparkSession: SparkSession,
session2GroupActionRDD: RDD[(String, Iterable[UserVisitAction])]) = {
// userId2PartAggrInfoRDD: RDD[(userId, partAggrInfo)]
val userId2PartAggrInfoRDD = session2GroupActionRDD.map {
// 使用模式匹配:當結果是 KV 對的時候儘可能使用 case 模式匹配,這樣更清楚,更簡潔直觀
case (sessionId, iterableAction) =>
var userId = -1L
var startTime: Date = null
var endTime: Date = null
var stepLength = 0 // 有多少個 action
val searchKeywords = new StringBuffer("") // 搜索行爲
val clickCategories = new StringBuffer("") // 點擊行爲
for (action <- iterableAction) {
if (userId == -1) {
userId = action.user_id
}
val actionTime = DateUtils.parseTime(action.action_time) // action_time = "2019-05-30 18:17:11" 是字符串類型
if (startTime == null || startTime.after(actionTime)) { // startTime 在 actionTime 的後面 正常區間:[startTime, actionTime, endTime]
startTime = actionTime
}
if (endTime == null || endTime.before(actionTime)) { // endTime 在 actionTime 的前面
endTime = actionTime
}
val searchKeyword = action.search_keyword
if (StringUtils.isNotEmpty(searchKeyword) && !searchKeywords.toString.contains(searchKeyword)) {
searchKeywords.append(searchKeyword + ",")
}
val clickCategoryId = action.click_category_id
if (clickCategoryId != -1 && !clickCategories.toString.contains(clickCategoryId)) {
clickCategories.append(clickCategoryId + ",")
}
stepLength += 1
}
// searchKeywords.toString.substring(0, searchKeywords.toString.length - 1) // 等價於下面
val searchKw = StringUtils.trimComma(searchKeywords.toString) // 去除最後一個逗號
val clickCg = StringUtils.trimComma(clickCategories.toString) // 去除最後一個逗號
val visitLength = (endTime.getTime - startTime.getTime) / 1000
// 拼裝聚合數據的字符串:
// (31,sessionid=7291cc307f96432f8da9d926fd7d88e5|searchKeywords=洗面奶,小龍蝦,機器學習,蘋果,華爲手機|clickCategoryIds=11,93,36,66,
// 60|visitLength=3461|stepLength=43|startTime=2019-05-30 14:01:01)
val partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionId + "|" +
Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKw + "|" +
Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCg + "|" +
Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" +
Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" +
Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime) // 格式化時間爲字符串類型
(userId, partAggrInfo)
}
// user_visit_action 表聯立 user_info 表,讓咱們的統計數據中具備用戶屬性
val sql = "select * from user_info"
import sparkSession.implicits._
// userId2InfoRDD: RDD[(userId, UserInfo)]
val userId2InfoRDD = sparkSession.sql(sql).as[UserInfo].rdd.map(item => (item.user_id, item))
val sessionId2FullAggrInfoRDD = userId2PartAggrInfoRDD.join(userId2InfoRDD).map {
case (userId, (partAggrInfo, userInfo)) =>
val age = userInfo.age
val professional = userInfo.professional
val sex = userInfo.sex
val city = userInfo.city
// 拼裝最終的聚合數據字符串:
val fullAggrInfo = partAggrInfo + "|" +
Constants.FIELD_AGE + "=" + age + "|" +
Constants.FIELD_PROFESSIONAL + "=" + professional + "|" +
Constants.FIELD_SEX + "=" + sex + "|" +
Constants.FIELD_CITY + "=" + city
val seesionId = StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.FIELD_SESSION_ID)
(seesionId, fullAggrInfo)
}
sessionId2FullAggrInfoRDD
}
def getOriActionRDD(sparkSession: SparkSession, taskParam: JSONObject) = {
// 先獲取所用到的過濾條件:開始日期 和 結束日期
val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)
val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)
// 把全部的時間範圍在 startDate 和 endDate 之間的數據查詢出來
val sql = "select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'"
// 在對 DataFrame 和 Dataset 進行許多操做都須要這個包進行支持
import sparkSession.implicits._
sparkSession.sql(sql).as[UserVisitAction].rdd // DataFrame(Row類型) -> DataSet(樣例類類型) -> rdd(樣例類)
}
}
自定義累加器 SessionStatisticAccumulator 代碼以下:
package com.atguigu.session
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
/**
* 自定義累加器
*/
class SessionStatisticAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Int]]() {
// 自定義累加器:要求要在類的裏面維護一個 mutable.HashMap 結構
val countMap = new mutable.HashMap[String, Int]()
// 判斷累加器是否爲空
override def isZero: Boolean = {
this.countMap.isEmpty
}
// 複製一個如出一轍的累加器
override def copy(): AccumulatorV2[String, mutable.HashMap[String, Int]] = {
val acc = new SessionStatisticAccumulator
acc.countMap ++= this.countMap // 將兩個 Map 拼接在一塊兒
acc
}
// 重置累加器
override def reset(): Unit = {
this.countMap.clear()
}
// 向累加器中添加 KV 對(K 存在,V 累加1,K 不存在,從新建立)
override def add(k: String): Unit = {
if (!this.countMap.contains(k)) {
this.countMap += (k -> 0)
}
this.countMap.update(k, this.countMap(k) + 1)
}
// 兩個累加器進行合併(先判斷兩個累加器是不是同一類型的,再將兩個 Map 進行合併(是個小難點))
override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Int]]): Unit = {
other match {
// (1 : 100).foldLeft(0) 等價於 (0 : (1 to 100))(_+_) 又等價於 { case (int1, int2) => int1 + int2 }
// acc.countMap.foldLeft(this.countMap) 等價於 this.countMap : acc.countMap 又等價於 this.countMap 和 acc.countMap 的每個 KV 作操做
case acc: SessionStatisticAccumulator => acc.countMap.foldLeft(this.countMap) {
case (map, (k, v)) => map += (k -> (map.getOrElse(k, 0) + v))
}
}
}
override def value: mutable.HashMap[String, Int] = {
this.countMap
}
}
數據模型代碼以下:
package com.atguigu.session
//***************** 輸出表 *********************
/**
* Session 聚合統計表
*
* @param taskid 當前計算批次的 ID
* @param session_count 全部 Session 的總和
* @param visit_length_1s_3s_ratio 1-3s Session 訪問時長佔比
* @param visit_length_4s_6s_ratio 4-6s Session 訪問時長佔比
* @param visit_length_7s_9s_ratio 7-9s Session 訪問時長佔比
* @param visit_length_10s_30s_ratio 10-30s Session 訪問時長佔比
* @param visit_length_30s_60s_ratio 30-60s Session 訪問時長佔比
* @param visit_length_1m_3m_ratio 1-3m Session 訪問時長佔比
* @param visit_length_3m_10m_ratio 3-10m Session 訪問時長佔比
* @param visit_length_10m_30m_ratio 10-30m Session 訪問時長佔比
* @param visit_length_30m_ratio 30m Session 訪問時長佔比
* @param step_length_1_3_ratio 1-3 步長佔比
* @param step_length_4_6_ratio 4-6 步長佔比
* @param step_length_7_9_ratio 7-9 步長佔比
* @param step_length_10_30_ratio 10-30 步長佔比
* @param step_length_30_60_ratio 30-60 步長佔比
* @param step_length_60_ratio 大於 60 步長佔比
*/
case class SessionAggrStat(taskid: String,
session_count: Long,
visit_length_1s_3s_ratio: Double,
visit_length_4s_6s_ratio: Double,
visit_length_7s_9s_ratio: Double,
visit_length_10s_30s_ratio: Double,
visit_length_30s_60s_ratio: Double,
visit_length_1m_3m_ratio: Double,
visit_length_3m_10m_ratio: Double,
visit_length_10m_30m_ratio: Double,
visit_length_30m_ratio: Double,
step_length_1_3_ratio: Double,
step_length_4_6_ratio: Double,
step_length_7_9_ratio: Double,
step_length_10_30_ratio: Double,
step_length_30_60_ratio: Double,
step_length_60_ratio: Double)
/**
* Session 隨機抽取表
*
* @param taskid 當前計算批次的 ID
* @param sessionid 抽取的 Session 的 ID
* @param startTime Session 的開始時間
* @param searchKeywords Session 的查詢字段
* @param clickCategoryIds Session 點擊的類別 id 集合
*/
case class SessionRandomExtract(taskid: String,
sessionid: String,
startTime: String,
searchKeywords: String,
clickCategoryIds: String)
/**
* Session 隨機抽取詳細表
*
* @param taskid 當前計算批次的 ID
* @param userid 用戶的 ID
* @param sessionid Session的 ID
* @param pageid 某個頁面的 ID
* @param actionTime 點擊行爲的時間點
* @param searchKeyword 用戶搜索的關鍵詞
* @param clickCategoryId 某一個商品品類的 ID
* @param clickProductId 某一個商品的 ID
* @param orderCategoryIds 一次訂單中全部品類的 ID 集合
* @param orderProductIds 一次訂單中全部商品的 ID 集合
* @param payCategoryIds 一次支付中全部品類的 ID 集合
* @param payProductIds 一次支付中全部商品的 ID 集合
**/
case class SessionDetail(taskid: String,
userid: Long,
sessionid: String,
pageid: Long,
actionTime: String,
searchKeyword: String,
clickCategoryId: Long,
clickProductId: Long,
orderCategoryIds: String,
orderProductIds: String,
payCategoryIds: String,
payProductIds: String)
/**
* 品類 Top10 表
*
* @param taskid
* @param categoryid
* @param clickCount
* @param orderCount
* @param payCount
*/
case class Top10Category(taskid: String,
categoryid: Long,
clickCount: Long,
orderCount: Long,
payCount: Long)
/**
* Top10 Session
*
* @param taskid
* @param categoryid
* @param sessionid
* @param clickCount
*/
case class Top10Session(taskid: String,
categoryid: Long,
sessionid: String,
clickCount: Long)
在符合條件的 session 中,按照時間比例隨機抽取 1000 個 session。
這個按照時間比例是什麼意思呢?隨機抽取自己是很簡單的,可是按照時間比例,就很複雜了。好比說,這一天總共有 1000 萬的 session。那麼我如今總共要從這 1000 萬 session 中,隨機抽取出來 1000 個 session。可是這個隨機不是那麼簡單的。須要作到以下幾點要求:首先,若是這一天的 12:00~13:00 的 session 數量是 100萬,那麼這個小時的 session 佔比就是 1/10,那麼這個小時中的 100 萬的 session,咱們就要抽取 1/10 * 1000 = 100 個。即從這個小時的 100 萬 session 中,隨機抽取出 100 個 session。以此類推,其餘小時的抽取也是這樣作。
這個功能的做用是說,可讓使用者,可以對於符合條件的 session,按照時間比例均勻的隨機採樣出 1000 個 session,而後觀察每一個 session 具體的點擊流/行爲, 好比先進入了首頁、而後點擊了食品品類、而後點擊了雨潤火腿腸商品、而後搜索了火腿腸罐頭的關鍵詞、接着對王中王火腿腸下了訂單、最後對訂單作了支付。
之因此要作到按時間比例隨機採用抽取,就是要作到,觀察樣本的公平性。
抽取完畢以後,須要將 Session 的相關信息和詳細信息保存到 MySQL 數據庫中。
本需求的數據源來自於需求一中獲取的 Session 聚合數據(fullAggrInfo)。
SessionRandomExtract 樣例類
/**
* Session 隨機抽取表
*
* @param taskid 當前計算批次的 ID
* @param sessionid 抽取的 Session 的 ID
* @param startTime Session 的開始時間
* @param searchKeywords Session 的查詢字段
* @param clickCategoryIds Session 點擊的類別 id 集合
*/
case class SessionRandomExtract(taskid: String,
sessionid: String,
startTime: String,
searchKeywords: String,
clickCategoryIds: String)
MySQL 寫入數據格式
session_detail
-- ----------------------------
-- Table structure for `session_detail`
-- ----------------------------
DROP TABLE IF EXISTS `session_detail`;
CREATE TABLE `session_detail` (
`taskid` varchar(255) DEFAULT NULL,
`userid` int(11) DEFAULT NULL,
`sessionid` varchar(255) DEFAULT NULL,
`pageid` int(11) DEFAULT NULL,
`actionTime` varchar(255) DEFAULT NULL,
`searchKeyword` varchar(255) DEFAULT NULL,
`clickCategoryId` int(11) DEFAULT NULL,
`clickProductId` int(11) DEFAULT NULL,
`orderCategoryIds` varchar(255) DEFAULT NULL,
`orderProductIds` varchar(255) DEFAULT NULL,
`payCategoryIds` varchar(255) DEFAULT NULL,
`payProductIds` varchar(255) DEFAULT NULL, KEY `idx_task_id` (`taskid`),
KEY `idx_session_id` (`sessionid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
session_random_extract
-- ----------------------------
-- Table structure for `session_random_extract`
-- ----------------------------
DROP TABLE IF EXISTS `session_random_extract`;
CREATE TABLE `session_random_extract` (
`taskid` varchar(255) DEFAULT NULL,
`sessionid` varchar(255) DEFAULT NULL,
`startTime` varchar(50) DEFAULT NULL,
`searchKeywords` varchar(255) DEFAULT NULL,
`clickCategoryIds` varchar(255) DEFAULT NULL,
KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
package com.atguigu.session
import java.util.{Date, Random, UUID}
import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.model.{UserInfo, UserVisitAction}
import commons.utils._
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
object SessionStat {
def main(args: Array[String]): Unit = {
// 獲取過濾條件,【爲了方便,直接從配置文件中獲取,企業中會從一個調度平臺獲取】
val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
// 獲取過濾條件對應的 JsonObject 對象
val taskParam = JSONObject.fromObject(jsonStr)
// 建立全局惟一的主鍵,每次執行 main 函數都會生成一個獨一無二的 taskUUID,來區分不一樣任務,做爲寫入 MySQL 數據庫中那張表的主鍵
val taskUUID = UUID.randomUUID().toString
// 建立 sparkConf
val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")
// 建立 sparkSession(包含 sparkContext)
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
// ******************** 需求一:Session 各範圍訪問步長、訪問時長佔比統計 ********************
// 獲取原始的動做表數據(帶有過濾條件)
// actionRDD: RDD[UserVisitAction]
val actionRDD = getOriActionRDD(sparkSession, taskParam)
// 將用戶行爲信息轉換爲 K-V 結構,sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))
// session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一個 sessionId 的數據聚合到一塊兒,獲得斧子形數據
// 將數據進行內存緩存
session2GroupActionRDD.cache()
// sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)
// 建立自定義累加器對象
val sessionStatisticAccumulator = new SessionStatisticAccumulator
// 在 sparkSession 中註冊自定義累加器,這樣後面就能夠用了
sparkSession.sparkContext.register(sessionStatisticAccumulator)
// 根據過濾條件對 sessionId2FullAggrInfoRDD 進行過濾操做,即過濾掉不符合條件的數據,並根據自定義累加器 統計不一樣範圍的 訪問時長 和 訪問步長 的 session 個數 以及 總的 session 個數
val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)
// 必須引入任意一個 action 的算子,才能啓動
seeionId2FilterRDD.foreach(println(_))
// 計算各個 session 的佔比
getSessionRatio(sparkSession, taskUUID, sessionStatisticAccumulator.value)
// ******************** 需求二:Session 隨機抽取 ********************
// sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)],注意:到這裏一個 sessionId 對應一條數據,也就是一個 fullAggrInfo
sessionRandomExtract(sparkSession, taskUUID, seeionId2FilterRDD)
}
// ******************** 需求二:Session 隨機抽取 ********************
/**
* Session 隨機抽取
*
* @param sparkSession
* @param taskUUID
* @param seeionId2FilterRDD
*/
def sessionRandomExtract(sparkSession: SparkSession, taskUUID: String, seeionId2FilterRDD: RDD[(String, String)]): Unit = {
// 因爲是按照 時間 爲 key 進行聚合,因此先將 seeionId2FilterRDD 的 key 轉化爲 時間
// dateHour2FullAggrInfoRDD: RDD[(dateHour, fullAggrInfo)]
val dateHour2FullAggrInfoRDD = seeionId2FilterRDD.map {
case (sessionId, fullAggrInfo) =>
// 先從 fullAggrInfo 中提取出來 startTime
val startTime = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_START_TIME)
// 獲得的 startTime = "2019-05-30 18:17:11" 是字符串類型,須要轉換成咱們須要的格式:yyyy-MM-dd_HH
val dateHour = DateUtils.getDateHour(startTime)
(dateHour, fullAggrInfo)
}
// hourCountMap: Map[(dateHour, count)],示例:(yyyy-MM-dd_HH, 20)
val hourCountMap = dateHour2FullAggrInfoRDD.countByKey()
// dateHourCountMap: Map[data, Map[(hour, count)]],示例:(yyyy-MM-dd, (HH, 20))
val dateHourCountMap = new mutable.HashMap[String, mutable.HashMap[String, Long]]()
for ((dateHour, count) <- hourCountMap) {
val date = dateHour.split("_")(0) // yyyy-MM-dd_HH
val hour = dateHour.split("_")(1) // HH
dateHourCountMap.get(date) match { // Map[(hour, count)
case None =>
dateHourCountMap(date) = new mutable.HashMap[String, Long]() // 先建立 1 個空的 HashMap
dateHourCountMap(date) += (hour -> count) // 再給 HashMap 賦值
case Some(map) =>
dateHourCountMap(date) += (hour -> count) // 直接給 HashMap 賦值
}
}
// 解決問題一:
// 一共有多少天:dateHourCountMap.size
// 一天抽取多少條:1000 / dateHourCountMap.size
val extractPerDay = 1000 / dateHourCountMap.size
// 解決問題二:
// 一共有多少個:session:dateHourCountMap(date).values.sum
// 一個小時有多少個:session:dateHourCountMap(date)(hour)
val dateHourExtractIndexListMap = new mutable.HashMap[String, mutable.HashMap[String, ListBuffer[Int]]]()
// dateHourCountMap: Map[data, Map[(hour, count)]],示例:(yyyy-MM-dd, (HH, 20))
// hourCountMap: Map[(hour, count)],示例:(HH, 20) ,注意:這裏面的 hourCountMap 含義發生變化了,要跟上面的最開始的 hourCountMap 區別開來
for ((date, hourCountMap) <- dateHourCountMap) {
// 一天共有多少個 session
val dataCount = hourCountMap.values.sum
dateHourExtractIndexListMap.get(date) match {
case None =>
dateHourExtractIndexListMap(date) = new mutable.HashMap[String, mutable.ListBuffer[Int]]()
generateRandomIndexList(extractPerDay, dataCount, hourCountMap, dateHourExtractIndexListMap(date))
case Some(map) =>
generateRandomIndexList(extractPerDay, dataCount, hourCountMap, dateHourExtractIndexListMap(date))
}
}
// 到此爲止,咱們得到了每一個小時要抽取的 session 的 index
// 以後在算子中使用 dateHourExtractIndexListMap 這個 Map,因爲這個 Map 可能會很大,因此涉及到 廣播大變量 的問題
// 廣播大變量,提高任務 task 的性能
val dateHourExtractIndexListMapBroadcastVar = sparkSession.sparkContext.broadcast(dateHourExtractIndexListMap)
// dateHour2FullAggrInfoRDD: RDD[(dateHour, fullAggrInfo)]
// dateHour2GroupRDD: RDD[(dateHour, Iterable[fullAggrInfo])]
val dateHour2GroupRDD = dateHour2FullAggrInfoRDD.groupByKey()
// extractSessionRDD: RDD[SessionRandomExtract]
val extractSessionRDD = dateHour2GroupRDD.flatMap {
case (dateHour, iterableFullAggrInfo) =>
val date = dateHour.split("_")(0)
val hour = dateHour.split("_")(1)
val extractIndexList = dateHourExtractIndexListMapBroadcastVar.value.get(date).get(hour)
// 建立一個容器存儲抽取的 session
val extractSessionArrayBuffer = new ArrayBuffer[SessionRandomExtract]()
var index = 0
for (fullAggrInfo <- iterableFullAggrInfo) {
if (extractIndexList.contains(index)) {
// 提取數據,封裝成所須要的樣例類,並追加進 ArrayBuffer 中
val sessionId = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_SESSION_ID)
val startTime = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_START_TIME)
val searchKeywords = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS)
val clickCategoryIds = StringUtils.getFieldFromConcatString(fullAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS)
val sessionRandomExtract = SessionRandomExtract(taskUUID, sessionId, startTime, searchKeywords, clickCategoryIds)
extractSessionArrayBuffer += sessionRandomExtract
}
index += 1
}
extractSessionArrayBuffer
}
// 將抽取後的數據保存到 MySQL
import sparkSession.implicits._
extractSessionRDD.toDF().write
.format("jdbc")
.option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
.option("dbtable", "session_random_extract")
.option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
.option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
.mode(SaveMode.Append)
.save()
}
/**
* 根據每一個小時應該抽取的數量,來產生隨機值
*
* @param extractPerDay 一天抽取的 seesion 個數
* @param dataCount 當天全部的 seesion 總數
* @param hourCountMap 每一個小時的session總數
* @param hourListMap 主要用來存放生成的隨機值
*/
def generateRandomIndexList(extractPerDay: Long,
dataCount: Long,
hourCountMap: mutable.HashMap[String, Long],
hourListMap: mutable.HashMap[String, mutable.ListBuffer[Int]]): Unit = {
// 先遍歷 hourCountMap,hourCountMap: Map[(hour, count)],示例:(HH, 20) ,注意:這裏面的 hourCountMap 含義發生變化了,要跟上面的最開始的 hourCountMap 區別開來
for ((hour, count) <- hourCountMap) {
// 計算一個小時抽取多少個 session
var hourExtractCount = ((count / dataCount.toDouble) * extractPerDay).toInt
// 避免一個小時要抽取的數量超過這個小時的總數
if (hourExtractCount > count) {
hourExtractCount = count.toInt
}
val random = new Random()
hourListMap.get(hour) match {
case None =>
hourListMap(hour) = new mutable.ListBuffer[Int] // 沒有 List,須要新建一個 List
for (i <- 0 until hourExtractCount) {
var index = random.nextInt(count.toInt) // 生成 index
while (hourListMap(hour).contains(index)) { // 若是 index 已存在
index = random.nextInt(count.toInt) // 則從新生成 index
}
// 將生成的 index 放入到 hourListMap 中
hourListMap(hour).append(index)
}
case Some(list) =>
for (i <- 0 until hourExtractCount) {
var index = random.nextInt(count.toInt) // 生成 index
while (hourListMap(hour).contains(index)) { // 若是 index 已存在
index = random.nextInt(count.toInt) // 則從新生成 index
}
// 將生成的 index 放入到 hourListMap 中
hourListMap(hour).append(index)
}
}
}
}
// ******************** 需求一:Session 各範圍訪問步長、訪問時長佔比統計 ********************
// ......
}
在符合條件的 session 中,獲取點擊、下單和支付數量排名前 10 的品類。
數據中的每一個 session 可能都會對一些品類的商品進行點擊、下單和支付等等行爲,那麼如今就須要獲取這些 session 點擊、下單和支付數量排名前 10 的最熱門的品類。也就是說,要計算出全部這些 session 對各個品類的點擊、下單和支付的次數, 而後按照這三個屬性進行排序,獲取前 10 個品類。
這個功能很重要,可讓咱們明白,符合條件的用戶,他最感興趣的商品是什麼種類。這個可讓公司裏的人,清晰地瞭解到不一樣層次、不一樣類型的用戶的心理和喜愛。
計算完成以後,將數據保存到 MySQL 數據庫中。
sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]
/**
* 品類 Top10 表
*
* @param taskid
* @param categoryid
* @param clickCount
* @param orderCount
* @param payCount
*/
case class Top10Category(taskid: String,
categoryid: Long,
clickCount: Long,
orderCount: Long,
payCount: Long)
略
-- ----------------------------
-- Table structure for `top10_category`
-- ----------------------------
DROP TABLE IF EXISTS `top10_category`;
CREATE TABLE `top10_category` (
`taskid` varchar(255) DEFAULT NULL,
`categoryid` int(11) DEFAULT NULL,
`clickCount` int(11) DEFAULT NULL,
`orderCount` int(11) DEFAULT NULL,
`payCount` int(11) DEFAULT NULL,
KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
package com.atguigu.session
import java.util.{Date, Random, UUID}
import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.model.{UserInfo, UserVisitAction}
import commons.utils._
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
object SessionStat {
def main(args: Array[String]): Unit = {
// 獲取過濾條件,【爲了方便,直接從配置文件中獲取,企業中會從一個調度平臺獲取】
val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
// 獲取過濾條件對應的 JsonObject 對象
val taskParam = JSONObject.fromObject(jsonStr)
// 建立全局惟一的主鍵,每次執行 main 函數都會生成一個獨一無二的 taskUUID,來區分不一樣任務,做爲寫入 MySQL 數據庫中那張表的主鍵
val taskUUID = UUID.randomUUID().toString
// 建立 sparkConf
val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")
// 建立 sparkSession(包含 sparkContext)
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
// ******************** 需求一:Session 各範圍訪問步長、訪問時長佔比統計 ********************
// 獲取原始的動做表數據(帶有過濾條件)
// actionRDD: RDD[UserVisitAction]
val actionRDD = getOriActionRDD(sparkSession, taskParam)
// 將用戶行爲信息轉換爲 K-V 結構
// sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))
// session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一個 sessionId 的數據聚合到一塊兒,獲得斧子形數據
// 將數據進行內存緩存
session2GroupActionRDD.cache()
// sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)
// 建立自定義累加器對象
val sessionStatisticAccumulator = new SessionStatisticAccumulator
// 在 sparkSession 中註冊自定義累加器,這樣後面就能夠用了
sparkSession.sparkContext.register(sessionStatisticAccumulator)
// 根據過濾條件對 sessionId2FullAggrInfoRDD 進行過濾操做,即過濾掉不符合條件的數據,並根據自定義累加器 統計不一樣範圍的 訪問時長 和 訪問步長 的 session 個數 以及 總的 session 個數
// seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)
// 必須引入任意一個 action 的算子,才能啓動
seeionId2FilterRDD.foreach(println(_))
// 計算各個 session 的佔比
getSessionRatio(sparkSession, taskUUID, sessionStatisticAccumulator.value)
// ******************** 需求二:Session 隨機抽取 ********************
// sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)],注意:到這裏一個 sessionId 對應一條數據,也就是一個 fullAggrInfo
sessionRandomExtract(sparkSession, taskUUID, seeionId2FilterRDD)
// ******************** 需求三:Top10 熱門品類統計 ********************
// sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
// seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
// join 默認是內鏈接,即不符合條件的不顯示(即被過濾掉)
// 獲取全部符合過濾條件的原始的 UserVisitAction 數據
val seeionId2ActionFilterRDD = sessionId2ActionRDD.join(seeionId2FilterRDD).map {
case (sessionId, (userVisitAction, fullAggrInfo)) =>
(sessionId, userVisitAction)
}
val top10CategoryArray = top10PopularCategories(sparkSession, taskUUID, seeionId2ActionFilterRDD)
}
/**
* Top10 熱門品類統計
*
* @param sparkSession
* @param taskUUID
* @param seeionId2ActionFilterRDD 全部符合過濾條件的原始的 UserVisitAction 數據
*/
def top10PopularCategories(sparkSession: SparkSession, taskUUID: String, seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
// 第一步:獲取全部發生過點擊、下單、付款的 categoryId,注意:其中被點擊的 categoryId 只有一個,被下單和被付款的 categoryId 有多個,categoryId 之間使用逗號隔開的
var cid2CidRDD = seeionId2ActionFilterRDD.flatMap {
case (sessionId, userVisitAction) =>
val categoryIdBuffer = new ArrayBuffer[(Long, Long)]()
// 提取出數據填充 ArrayBuffer
if (userVisitAction.click_category_id != -1) { // 點擊行爲
categoryIdBuffer += ((userVisitAction.click_category_id, userVisitAction.click_category_id)) // 只有第一個 key 有用,第二個 value 任何值均可以,可是不能夠沒有
} else if (userVisitAction.order_category_ids != null) { // 下單行爲
for (order_category_id <- userVisitAction.order_category_ids.split(",")) {
categoryIdBuffer += ((order_category_id.toLong, order_category_id.toLong))
}
} else if (userVisitAction.pay_category_ids != null) { // 付款行爲
for (pay_category_id <- userVisitAction.pay_category_ids.split(",")) {
categoryIdBuffer += ((pay_category_id.toLong, pay_category_id.toLong))
}
}
categoryIdBuffer
}
// 第二步:進行去重操做
cid2CidRDD = cid2CidRDD.distinct()
// 第三步:統計各品類 被點擊的次數、被下單的次數、被付款的次數
val cid2ClickCountRDD = getClickCount(seeionId2ActionFilterRDD)
val cid2OrderCountRDD = getOrderCount(seeionId2ActionFilterRDD)
val cid2PayCountRDD = getPayCount(seeionId2ActionFilterRDD)
// 第四步:獲取各個 categoryId 的點擊次數、下單次數、付款次數,並進行拼裝
// cid2FullCountRDD: RDD[(cid, aggrCountInfo)]
// (81,categoryId=81|clickCount=68|orderCount=64|payCount=72)
val cid2FullCountRDD = getFullCount(cid2CidRDD, cid2ClickCountRDD, cid2OrderCountRDD, cid2PayCountRDD)
// 第五步:根據點擊次數、下單次數、付款次數依次排序,會用到 【二次排序】,實現自定義的二次排序的 key
// 第六步:封裝 SortKey
val sortKey2FullCountRDD = cid2FullCountRDD.map {
case (cid, fullCountInfo) =>
val clickCount = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CLICK_COUNT).toLong
val orderCount = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_ORDER_COUNT).toLong
val payCount = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_PAY_COUNT).toLong
val sortKey = SortKey(clickCount, orderCount, payCount)
(sortKey, fullCountInfo)
}
// 第七步:降序排序,取出 top10 熱門品類
val top10CategoryArray = sortKey2FullCountRDD.sortByKey(false).take(10)
// 第八步:將 Array 結構轉化爲 RDD,封裝 Top10Category
val top10CategoryRDD = sparkSession.sparkContext.makeRDD(top10CategoryArray).map {
case (sortKey, fullCountInfo) =>
val categoryid = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
val clickCount = sortKey.clickCount
val orderCount = sortKey.orderCount
val payCount = sortKey.payCount
Top10Category(taskUUID, categoryid, clickCount, orderCount, payCount)
}
// 第九步:寫入 MySQL 數據庫
import sparkSession.implicits._
top10CategoryRDD.toDF().write
.format("jdbc")
.option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
.option("dbtable", "top10_category")
.option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
.option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
.mode(SaveMode.Append)
.save()
top10CategoryArray
}
/**
*
* @param cid2CidRDD
* @param cid2ClickCountRDD
* @param cid2OrderCountRDD
* @param cid2PayCountRDD
* @return
*/
def getFullCount(cid2CidRDD: RDD[(Long, Long)],
cid2ClickCountRDD: RDD[(Long, Long)],
cid2OrderCountRDD: RDD[(Long, Long)],
cid2PayCountRDD: RDD[(Long, Long)]) = {
// 左外鏈接:不符合添加顯示爲空(null)
// 4.1 全部品類id 和 被點擊的品類 作左外鏈接
val cid2ClickInfoRDD = cid2CidRDD.leftOuterJoin(cid2ClickCountRDD).map {
case (cid, (categoryId, option)) =>
val clickCount = if (option.isDefined) option.get else 0
val aggrCountInfo = Constants.FIELD_CATEGORY_ID + "=" + cid + "|" + Constants.FIELD_CLICK_COUNT + "=" + clickCount
(cid, aggrCountInfo)
}
// 4.2 4.1 的結果 和 被下單的品類 作左外鏈接
val cid2OrderInfoRDD = cid2ClickInfoRDD.leftOuterJoin(cid2OrderCountRDD).map {
case (cid, (clickInfo, option)) =>
val orderCount = if (option.isDefined) option.get else 0
val aggrCountInfo = clickInfo + "|" + Constants.FIELD_ORDER_COUNT + "=" + orderCount
(cid, aggrCountInfo)
}
// 4.3 4.2 的結果 和 被付款的品類 作左外鏈接
val cid2PayInfoRDD = cid2OrderInfoRDD.leftOuterJoin(cid2PayCountRDD).map {
case (cid, (orderInfo, option)) =>
val payCount = if (option.isDefined) option.get else 0
val aggrCountInfo = orderInfo + "|" + Constants.FIELD_PAY_COUNT + "=" + payCount
(cid, aggrCountInfo)
}
cid2PayInfoRDD
}
/**
* 統計各品類被點擊的次數
*
* @param seeionId2ActionFilterRDD
*/
def getClickCount(seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
// 方式一:把發生過點擊的 action 過濾出來
val clickActionFilterRDD = seeionId2ActionFilterRDD.filter {
case (sessionId, userVisitAction) =>
userVisitAction.click_category_id != 1L
}
// 方式二:把發生點擊的 action 過濾出來,兩者等價
// val clickActionFilterRDD2 = seeionId2ActionFilterRDD.filter(item => item._2.click_category_id != -1L)
// 獲取每種類別的點擊次數
val clickNumRDD = clickActionFilterRDD.map {
case (sessionId, userVisitAction) =>
(userVisitAction.click_category_id, 1L)
}
// 計算各個品類的點擊次數
clickNumRDD.reduceByKey(_ + _)
}
/**
* 統計各品類被下單的次數
*
* @param seeionId2ActionFilterRDD
*/
def getOrderCount(seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
// 把發生過下單的 action 過濾出來
val orderActionFilterRDD = seeionId2ActionFilterRDD.filter {
case (sessionId, userVisitAction) =>
userVisitAction.order_category_ids != null
}
// 獲取每種類別的下單次數
val orderNumRDD = orderActionFilterRDD.flatMap {
case (sessionId, userVisitAction) =>
userVisitAction.order_category_ids.split(",").map(item => (item.toLong, 1L))
}
// 計算各個品類的下單次數
orderNumRDD.reduceByKey(_ + _)
}
/**
* 統計各品類被付款的次數
*
* @param seeionId2ActionFilterRDD
*/
def getPayCount(seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)]) = {
// 把發生過付款的 action 過濾出來
val payActionFilterRDD = seeionId2ActionFilterRDD.filter {
case (sessionId, userVisitAction) =>
userVisitAction.pay_category_ids != null
}
// 獲取每種類別的支付次數
val payNumRDD = payActionFilterRDD.flatMap {
case (sessionId, userVisitAction) =>
userVisitAction.pay_category_ids.split(",").map(item => (item.toLong, 1L))
}
// 計算各個品類的支付次數
payNumRDD.reduceByKey(_ + _)
}
// ******************** 需求二:Session 隨機抽取 ********************
// ******************** 需求一:Session 各範圍訪問步長、訪問時長佔比統計 ********************
}
略
對於排名前 10 的品類,分別獲取其點擊次數排名前 10 的 session。
這個就是說,對於 top10 的品類,每個都要獲取對它點擊次數排名前 10 的 session。
這個功能,可讓咱們看到,對某個用戶羣體最感興趣的品類,各個品類最感興趣最典型的用戶的 session 的行爲。
計算完成以後,將數據保存到 MySQL 數據庫中。
seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]
top10CategoryArray: Array[(sortKey, fullCountInfo)]
/**
* Top10 Session
*
* @param taskid
* @param categoryid
* @param sessionid
* @param clickCount
*/
case class Top10Session(taskid: String,
categoryid: Long,
sessionid: String,
clickCount: Long)
/**
* Session 隨機抽取詳細表
*
* @param taskid 當前計算批次的 ID
* @param userid 用戶的 ID
* @param sessionid Session 的 ID
* @param pageid 某個頁面的 ID
* @param actionTime 點擊行爲的時間點
* @param searchKeyword 用戶搜索的關鍵詞
* @param clickCategoryId 某一個商品品類的 ID
* @param clickProductId 某一個商品的 ID
* @param orderCategoryIds 一次訂單中全部品類的 ID 集合
* @param orderProductIds 一次訂單中全部商品的 ID 集合
* @param payCategoryIds 一次支付中全部品類的 ID 集合
* @param payProductIds 一次支付中全部商品的 ID 集合
**/
case class SessionDetail(taskid: String,
userid: Long,
sessionid: String,
pageid: Long,
actionTime: String,
searchKeyword: String,
clickCategoryId: Long,
clickProductId: Long,
orderCategoryIds: String,
orderProductIds: String,
payCategoryIds: String,
payProductIds: String)
略
-- ----------------------------
-- Table structure for `top10_session`
-- ----------------------------
DROP TABLE IF EXISTS `top10_session`;
CREATE TABLE `top10_session` (
`taskid` varchar(255) DEFAULT NULL,
`categoryid` int(11) DEFAULT NULL,
`sessionid` varchar(255) DEFAULT NULL,
`clickCount` int(11) DEFAULT NULL, KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
package com.atguigu.session
import java.util.{Date, Random, UUID}
import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.model.{UserInfo, UserVisitAction}
import commons.utils._
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
object SessionStat {
def main(args: Array[String]): Unit = {
// 獲取過濾條件,【爲了方便,直接從配置文件中獲取,企業中會從一個調度平臺獲取】
val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
// 獲取過濾條件對應的 JsonObject 對象
val taskParam = JSONObject.fromObject(jsonStr)
// 建立全局惟一的主鍵,每次執行 main 函數都會生成一個獨一無二的 taskUUID,來區分不一樣任務,做爲寫入 MySQL 數據庫中那張表的主鍵
val taskUUID = UUID.randomUUID().toString
// 建立 sparkConf
val sparkConf = new SparkConf().setAppName("session").setMaster("local[*]")
// 建立 sparkSession(包含 sparkContext)
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
// ******************** 需求一:Session 各範圍訪問步長、訪問時長佔比統計 ********************
// 獲取原始的動做表數據(帶有過濾條件)
// actionRDD: RDD[UserVisitAction]
val actionRDD = getOriActionRDD(sparkSession, taskParam)
// 將用戶行爲信息轉換爲 K-V 結構
// sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))
// session2GroupActionRDD: RDD[(sessionId, Iterable[UserVisitAction])]
val session2GroupActionRDD = sessionId2ActionRDD.groupByKey() // 把同一個 sessionId 的數據聚合到一塊兒,獲得斧子形數據
// 將數據進行內存緩存
session2GroupActionRDD.cache()
// sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)]
val sessionId2FullAggrInfoRDD = getSessionFullAggrInfo(sparkSession, session2GroupActionRDD)
// 建立自定義累加器對象
val sessionStatisticAccumulator = new SessionStatisticAccumulator
// 在 sparkSession 中註冊自定義累加器,這樣後面就能夠用了
sparkSession.sparkContext.register(sessionStatisticAccumulator)
// 根據過濾條件對 sessionId2FullAggrInfoRDD 進行過濾操做,即過濾掉不符合條件的數據,並根據自定義累加器 統計不一樣範圍的 訪問時長 和 訪問步長 的 session 個數 以及 總的 session 個數
// seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
val seeionId2FilterRDD = getSessionFilterRDD(taskParam, sessionId2FullAggrInfoRDD, sessionStatisticAccumulator)
// 必須引入任意一個 action 的算子,才能啓動
seeionId2FilterRDD.foreach(println(_))
// 計算各個 session 的佔比
getSessionRatio(sparkSession, taskUUID, sessionStatisticAccumulator.value)
// ******************** 需求二:Session 隨機抽取 ********************
// sessionId2FullAggrInfoRDD: RDD[(sessionId, fullAggrInfo)],注意:到這裏一個 sessionId 對應一條數據,也就是一個 fullAggrInfo
sessionRandomExtract(sparkSession, taskUUID, seeionId2FilterRDD)
// ******************** 需求三:Top10 熱門品類統計 ********************
// sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
// seeionId2FilterRDD: RDD[(sessionId, fullAggrInfo)]
// join 默認是內鏈接,即不符合條件的不顯示(即被過濾掉)
// 獲取全部符合過濾條件的原始的 UserVisitAction 數據
// seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]
val seeionId2ActionFilterRDD = sessionId2ActionRDD.join(seeionId2FilterRDD).map {
case (sessionId, (userVisitAction, fullAggrInfo)) =>
(sessionId, userVisitAction)
}
val top10CategoryArray = top10PopularCategories(sparkSession, taskUUID, seeionId2ActionFilterRDD)
// ******************** 需求四:Top10 熱門品類的 Top10 活躍 Session 統計 ********************
// seeionId2ActionFilterRDD: RDD[(sessionId, UserVisitAction)]
// top10CategoryArray: Array[(sortKey, fullCountInfo)]
top10ActiveSession(sparkSession, taskUUID, seeionId2ActionFilterRDD, top10CategoryArray)
}
// ******************** 需求四:Top10 熱門品類的 Top10 活躍 Session 統計 ********************
/**
* Top10 熱門品類的 Top10 活躍 Session 統計
*
* @param sparkSession
* @param taskUUID
* @param seeionId2ActionFilterRDD
* @param top10CategoryArray
*/
def top10ActiveSession(sparkSession: SparkSession,
taskUUID: String,
seeionId2ActionFilterRDD: RDD[(String, UserVisitAction)],
top10CategoryArray: Array[(SortKey, String)]): Unit = {
// 第一步:獲取全部點擊過 Top10 熱門品類的 UserVisitAction
// 第一種方法:Join 方法,該方式須要引發 Shuffle,比較麻煩
/*
// 將 top10CategoryArray 轉化爲 RDD,而後將其 key sortKey 轉化爲 cid
val cid2FullCountInfoRDD = sparkSession.sparkContext.makeRDD(top10CategoryArray).map {
case (sortKey, fullCountInfo) =>
// 取出 categoryId
val cid = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
// 返回所需的 RDD
(cid, fullCountInfo)
}
// 將 seeionId2ActionFilterRDD 的 key sessionId 轉化爲 cid,對其使用 map 操做便可
val cid2ActionRDD = seeionId2ActionFilterRDD.map {
case (sessionId, userVisitAction) =>
val cid = userVisitAction.click_category_id
(cid, userVisitAction)
}
// joinn 操做(即內鏈接):兩邊都有的才留下,不然過濾掉
cid2FullCountInfoRDD.join(cid2ActionRDD).map {
case (cid, (fullCountInfo, userVisitAction)) =>
val sid = userVisitAction.session_id
(sid, userVisitAction)
}*/
// 第二種方法:使用 filter
// cidArray: Array[Long] 包含了 Top10 熱門品類的 id
val cidArray = top10CategoryArray.map {
case (sortKey, fullCountInfo) =>
val cid = StringUtils.getFieldFromConcatString(fullCountInfo, "\\|", Constants.FIELD_CATEGORY_ID).toLong
cid
}
// 全部符合過濾條件的,而且點擊過 Top10 熱門品類的 UserVisitAction
val seeionId2ActionRDD = seeionId2ActionFilterRDD.filter {
case (sessionId, userVisitAction) =>
cidArray.contains(userVisitAction.click_category_id)
}
// 第二步:先對 全部符合過濾條件的,而且點擊過 Top10 熱門品類的 UserVisitAction 按照 sessionId 進行聚合
val seeionId2GroupRDD = seeionId2ActionRDD.groupByKey()
// 第三步:統計 每個 sessionId 對於點擊過的每個品類的點擊次數
// cid2SessionCountRDD: RDD[(cid, sessionN=sessionCount)]
val cid2SessionCountRDD = seeionId2GroupRDD.flatMap {
case (sessionId, iterableUserVisitAction) =>
// 建立 Map,用於保存當前每個 sessionId 對於點擊過的每個品類的點擊次數
val categoryCountMap = new mutable.HashMap[Long, Long]()
for (userVisitAction <- iterableUserVisitAction) {
val cid = userVisitAction.click_category_id
if (!categoryCountMap.contains(cid))
categoryCountMap += (cid -> 0)
categoryCountMap.update(cid, categoryCountMap(cid) + 1)
}
// 該 Map 記錄了一個 session 對於它全部點擊過的品類的點擊次數
// categoryCountMap
for ((cid, sessionCount) <- categoryCountMap)
yield (cid, sessionId + "=" + sessionCount)
}
// 第四步:對 cid2SessionCountRDD 進行聚合
// cid2GroupRDD: RDD[(cid, Iterable[sessionN=sessionCount]))]
// cid2GroupRDD 的每一條數據都是一個 cid 和它對應的全部點擊過它的 sessionId 對它的點擊次數
val cid2GroupRDD = cid2SessionCountRDD.groupByKey()
// 第五步:取出 top10SessionRDD: RDD[Top10Session]
val top10SessionRDD = cid2GroupRDD.flatMap {
case (cid, iterablesSessionCount) =>
val sortList = iterablesSessionCount.toList.sortWith((item1, item2) => { // true: item1 放在前面
item1.split("=")(1).toLong > item2.split("=")(1).toLong // item1: sessionCount 字符串類型 sessionIdN=count
}).take(10)
// 封裝數據,準備寫入 MySQL 數據庫
val top10Session = sortList.map {
case item => {
val categoryid = cid
val sessionid = item.split("=")(0)
val clickCount = item.split("=")(1).toLong
Top10Session(taskUUID, categoryid, sessionid, clickCount)
}
}
top10Session
}
// 寫入 MySQL 數據庫
import sparkSession.implicits._
top10SessionRDD.toDF().write
.format("jdbc")
.option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
.option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
.option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
.option("dbtable", "top10_session")
.mode(SaveMode.Append)
.save()
}
// ******************** 需求三:Top10 熱門品類統計 ********************
// ******************** 需求二:Session 隨機抽取 ********************
// ******************** 需求一:Session 各範圍訪問步長、訪問時長佔比統計 ********************
}
計算頁面單跳轉化率 什麼是頁面單跳轉換率 好比一個用戶在一次 Session 過程當中訪問的頁面路徑 3,5,7,9,10,21,那麼頁面 3 跳到頁面 5 叫一次單跳,7-9 也叫一次單跳,那麼單跳轉化率就是要統計頁面點擊的機率,好比: 計算 3-5 的單跳轉化率,先獲取符合條件的 Session 對於頁面 3 的訪問次數(PV)爲 A,而後獲取符合條件的 Session 中訪問了頁面 3 又緊接着訪問了頁面 5 的次數爲 B,那麼 B/A 就是 3-5 的頁面單跳轉化率,咱們記爲 C;那麼頁面 5-7 的轉化率怎麼求呢?先須要求出符合條件的 Session 中訪問頁面 5 又緊接着訪問了頁面 7 的次數爲 D,那麼 D/B 即爲 5-7 的單跳轉化率。
產品經理,能夠根據這個指標,去嘗試分析整個網站、產品各個頁面的表現怎麼樣,是否是須要去優化產品的佈局;吸引用戶最終能夠進入最後的支付頁面。
數據分析師,能夠此數據作更深一步的計算和分析。
企業管理層, 能夠看到整個公司的網站,各個頁面的之間的跳轉的表現如何,能夠適當調整公司的經營戰略或策略。
在如下模塊中,須要根據查詢對象中設置的 Session 過濾條件,先將對應的 Session 過濾出來,而後根據查詢對象中設置的頁面路徑,計算頁面單跳轉化率,好比查詢的頁面路徑爲:三、五、七、8,那麼就要計算 3-五、5-七、7-8 的頁面單跳轉化率。須要注意的一點是,頁面的訪問是有前後的。
動做表
/**
* 用戶訪問動做表
*
* @param date 用戶點擊行爲的日期
* @param user_id 用戶的 ID
* @param session_id Session 的 ID
* @param page_id 某個頁面的 ID
* @param action_time 點擊行爲的時間點
* @param search_keyword 用戶搜索的關鍵詞
* @param click_category_id 某一個商品品類的 ID
* @param click_product_id 某一個商品的 ID
* @param order_category_ids 一次訂單中全部品類的 ID 集合
* @param order_product_ids 一次訂單中全部商品的 ID 集合
* @param pay_category_ids 一次支付中全部品類的 ID 集合
* @param pay_product_ids 一次支付中全部商品的 ID 集合
* @param city_id 城市 ID
*/
case class UserVisitAction(date: String,
user_id: Long,
session_id: String,
page_id: Long,
action_time: String,
search_keyword: String,
click_category_id: Long,
click_product_id: Long,
order_category_ids: String,
order_product_ids: String,
pay_category_ids: String,
pay_product_ids: String,
city_id: Long)
舉例
如何作
略
-- ----------------------------
-- Table structure for `page_split_convert_rate`
-- ----------------------------
DROP TABLE IF EXISTS `page_split_convert_rate`;
CREATE TABLE `page_split_convert_rate` (
`taskid` varchar(255) DEFAULT NULL,
`convertRate` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
在 analyse 中新建子模塊 page,配置 pom.xml 文件,添加 scala 框架的支持
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>analyse</artifactId>
<groupId>com.atguigu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>page</artifactId>
<dependencies>
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- Spark 的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- scala-maven-plugin 插件用於在任意的 maven 項目中對 scala 代碼進行編譯/測試/運行/文檔化 -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.page.PageOneStepConvertRate</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
示例代碼:
package com.atguigu.page
import java.util.UUID
import commons.conf.ConfigurationManager
import commons.constant.Constants
import commons.model.UserVisitAction
import commons.utils.{DateUtils, ParamUtils}
import net.sf.json.JSONObject
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable
object PageConvertStat {
def main(args: Array[String]): Unit = {
// 獲取過濾條件,【爲了方便,直接從配置文件中獲取,企業中會從一個調度平臺獲取】
val jsonStr = ConfigurationManager.config.getString(Constants.TASK_PARAMS)
// 獲取過濾條件對應的 JsonObject 對象
val taskParam = JSONObject.fromObject(jsonStr)
// 建立全局惟一的主鍵,每次執行 main 函數都會生成一個獨一無二的 taskUUID,來區分不一樣任務,做爲寫入 MySQL 數據庫中那張表的主鍵
val taskUUID = UUID.randomUUID().toString
// 建立 sparkConf
val sparkConf = new SparkConf().setAppName("pageConvert").setMaster("local[*]")
// 建立 sparkSession(包含 sparkContext)
val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
// ******************** 需求五:頁面單跳轉化率統計 ********************
// 獲取原始的動做表數據(帶有過濾條件)
// actionRDD: RDD[UserVisitAction]
val actionRDD = getOriActionRDD(sparkSession, taskParam)
// 將用戶行爲信息轉換爲 K-V 結構
// sessionId2ActionRDD: RDD[(sessionId, UserVisitAction)]
val sessionId2ActionRDD = actionRDD.map(item => (item.session_id, item))
// 將數據進行內存緩存
sessionId2ActionRDD.persist(StorageLevel.MEMORY_ONLY)
// 目標頁面切片:將頁面流路徑轉換爲頁面切片
// targetPageFlowStr:"1,2,3,4,5,6,7"
val targetPageFlowStr = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW)
// targetPageFlowArray: Array[Long][1,2,3,4,5,6,7]
val targetPageFlowArray = targetPageFlowStr.split(",")
// targetPageFlowArray.slice(0, targetPageFlowArray.length - 1): [1,2,3,4,5,6]
// targetPageFlowArray.tail: [2,3,4,5,6,7]
// targetPageFlowArray.slice(0, targetPageFlowArray.length - 1).zip(targetPageFlowArray.tail): [(1,2),(2,3),(3,4),(4,5),(5,6),(6,7)]
val targetPageSplit = targetPageFlowArray.slice(0, targetPageFlowArray.length - 1).zip(targetPageFlowArray.tail).map {
case (page1, page2) =>
(page1 + "_" + page2)
}
// 獲取實際頁面切片
// 對 <sessionId,訪問行爲> RDD,作一次 groupByKey 操做,生成頁面切片
val session2GroupActionRDD = sessionId2ActionRDD.groupByKey()
// realPageSplitNumRDD: RDD[(String, 1L)]
val realPageSplitNumRDD = session2GroupActionRDD.flatMap {
case (sessionId, iterableUserVisitAction) =>
// item1: UserVisitAction
// item2: UserVisitAction
// sortList: List[UserVisitAction] // 排好序的 UserVisitAction
val sortList = iterableUserVisitAction.toList.sortWith((item1, item2) => {
DateUtils.parseTime(item1.action_time).getTime < DateUtils.parseTime(item2.action_time).getTime
})
// 獲取 page 信息
// pageList: List[Long]
val pageList = sortList.map {
case userVisitAction =>
userVisitAction.page_id
}
// pageList.slice(0, pageList.length - 1): List[1,2,3,...,N-1]
// pageList.tail: List[2,3,4,...,N]
// pageList.slice(0, pageList.length - 1).zip(pageList.tail): List[(1,2),(2,3),(3,4),...,(N-1,N)]
val realPageSplit = pageList.slice(0, pageList.length - 1).zip(pageList.tail).map {
case (page1, page2) =>
(page1 + "_" + page2)
}
// 過濾:留下存在於 targetPageSplit 中的頁面切片
val realPageSplitFilter = realPageSplit.filter {
case realPageSplit =>
targetPageSplit.contains(realPageSplit)
}
realPageSplitFilter.map {
case realPageSplitFilter =>
(realPageSplitFilter, 1L)
}
}
// 聚合
// realPageSplitCountMap; Map[(page1_page2, count)]
val realPageSplitCountMap = realPageSplitNumRDD.countByKey()
realPageSplitCountMap.foreach(println(_))
val startPage = targetPageFlowArray(0).toLong
val startPageCount = sessionId2ActionRDD.filter {
case (sessionId, userVisitAction) =>
userVisitAction.page_id == startPage.toLong
}.count()
println("哈啊哈"+ startPageCount)
// 獲得最後的統計結果
getPageConvertRate(sparkSession, taskUUID, targetPageSplit, startPageCount, realPageSplitCountMap)
}
// ******************** 需求五:頁面單跳轉化率統計 ********************
/**
* 計算頁面切片轉化率
*
* @param sparkSession
* @param taskUUID
* @param targetPageSplit
* @param startPageCount
* @param realPageSplitCountMap
*/
def getPageConvertRate(sparkSession: SparkSession,
taskUUID: String,
targetPageSplit: Array[String],
startPageCount: Long,
realPageSplitCountMap: collection.Map[String, Long]): Unit = {
val pageSplitRatioMap = new mutable.HashMap[String, Double]()
var lastPageCount = startPageCount.toDouble
// 1_2,2_3,3_4,...
for (pageSplit <- targetPageSplit) {
// 第一次循環:currentPageSplitCount: page1_page2 lastPageCount: page1
// 第二次循環:currentPageSplitCount: page2_page3 lastPageCount: page1_page2
val currentPageSplitCount = realPageSplitCountMap.get(pageSplit).get.toDouble
val rate = currentPageSplitCount / lastPageCount
pageSplitRatioMap.put(pageSplit, rate)
lastPageCount = currentPageSplitCount
}
val convertRateStr = pageSplitRatioMap.map {
case (pageSplit, rate) =>
pageSplit + "=" + rate }.mkString(