項目的一些經驗準則,最後進行代碼實現。java
接下來須要進行session聚合統計,統計出訪問時長和訪問步長,各個區間的session數量佔總session數量的比例。sql
若是不進行重構,直接來實現,思路以下:
1. sessionRDD,映射成<sessioinid,Row>
的格式;
2. 按session聚合,計算出每一個session的訪問時長和訪問步長,生成一個新的RDD;
3. 遍歷新生成的RDD,將每一個session的訪問時長和訪問步長,去更新自定義Accumulator中對應的值;
4. 使用自定義Accumulator中的統計值,去計算各個區間的比例;
5. 將最後計算出來的結果,寫入MySQL對應的表中。shell
普通實現思路的問題:
1. 爲何還要用actionRDD去映射?其實以前在session聚合的時候已經作過映射了,屢次一舉;
2. 是否是必定要爲了session的聚合這個功能單獨去遍歷一遍session?其實不必,已經有session數據,以前過濾session的時候,其實至關因而在遍歷session了,那麼這裏就不必再過濾一遍了。apache
在傳統的J2EE或者.NET或者PHP,軟件/系統/網站開發中,架構和可維護性,可擴展性的重要程度遠遠高於性能,大量的分佈式的架構,設計模式,代碼的劃分,類的劃分(高併發網站除外)在大數據項目中,好比MapReduce,Hive,Spark,Storm中,性能的重要程度遠遠大於一次額代碼的規範和設計模式,代碼的劃分,類的劃分;大數據最重要的是性能。json
主要就是由於大數據以及大數據項目的特色決定了大數據的程度和項目的速度都比較慢,若是不優先考慮性能的話,會致使一個大數據處理程序運行時間長達數小時,甚至數十個小時,因此,推薦大數據項目,在開發和代碼的架構中,優先考慮性能,其次考慮功能代碼的劃分、解耦合。設計模式
咱們若是採用第一種實現方案,那麼其實就是代碼劃分(解耦合、可維護)優先,設計優先若是採用第二種方案,那麼其實就是性能優先。api
重構後的UserVisitSessionAnalyzeSpark.java
代碼以下:數組
package com.erik.sparkproject.spark; import java.text.ParseException; import java.util.Date; import java.util.Iterator; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.HiveContext; import com.alibaba.fastjson.JSONObject; import com.erik.sparkproject.conf.ConfigurationManager; import com.erik.sparkproject.constant.Constants; import com.erik.sparkproject.dao.ITaskDAO; import com.erik.sparkproject.domain.Task; import com.erik.sparkproject.impl.DAOFactory; import com.erik.sparkproject.test.MockData; import com.erik.sparkproject.util.*; import scala.Tuple2; /** * 用戶訪問session分析spark做業 * * 接收用戶建立的分析任務,用戶可能指定的條件以下: * 1.時間範圍:起始日期-結束日期 * 2.性別:男或女 * 3.年齡範圍 * 4.職業:多選 * 5.城市:多選 * 6.搜索詞:多個搜索詞,只要某個session中的任何一個 * action搜索過指定的關鍵詞,那麼session就符合條件 * 7.點擊品類:多個品類,只要某個session中的任何一個 * action點擊過某個品類,那麼session就符合條件 * * 咱們的Spark做業如何接受用戶建立的任務呢? * J2EE平臺在接收用戶建立任務的請求以後,會將任務信息插入MySQL的task表中, * 任務參數以JSON格式封裝在task_param字段中 * 接着J2EE平臺執行咱們的spark-submit shell腳本,並將taskid做爲參數傳遞給spark-submit shell腳本 * spark-submit shell腳本,在執行時,是能夠接收參數的,而且會將接收的參數傳遞給spark做業的main函數 * 參數就封裝在main函數獲得args數組中 * * 這是spark本事提供的特性 * * * @author Erik * */ public class UserVisitSessionAnalyzeSpark { public static void main(String[] args) { args = new String[]{"2"}; //構建spark上下文 //首先在Constants.java中設置spark做業相關的常量 //String SPARK_APP_NAME = "UserVisitSessionAnalyzeSpark"; //保存Constants.java配置 SparkConf conf = new SparkConf() .setAppName(Constants.SPARK_APP_NAME) .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = getSQLContext(sc.sc()); //生成模擬測試數據 mockData(sc, sqlContext); //建立須要使用的DAO組件 ITaskDAO taskDAO = DAOFactory.getTaskDAO(); //那麼就首先得查詢出來指定的任務,並獲取任務的查詢參數 long taskid = ParamUtils.getTaskIdFromArgs(args); Task task = taskDAO.findById(taskid); JSONObject taskParam = JSONObject.parseObject(task.getTaskParam()); //若是要進行session粒度的數據聚合, //首先要從user_visit_action表中,查詢出來指定日期範圍內的數據 JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext, taskParam); //聚合 //首先,能夠將行爲數據按照session_id進行groupByKey分組 //此時的數據粒度就是session粒度了,而後能夠將session粒度的數據與用戶信息數據驚醒join //而後就能夠獲取到session粒度的數據,同時數據裏面還包含了session對應的user信息 //到這裏爲止,獲取的數據是<sessionid,(sessionid,searchKeywords, //clickCategoryIds,age,professional,city,sex)> JavaPairRDD<String, String> sessionid2AggrInfoRDD = aggregateBySession(sqlContext, actionRDD); //接着,就要針對session粒度的聚合數據,按照使用者指定的篩選參數進行數據過濾 //至關於咱們本身編寫的算子,是要訪問外面的任務參數對象的 //匿名內部類(算子函數),訪問外部對象,是要給外部對象使用final修飾的 JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = filterSession(sessionid2AggrInfoRDD, taskParam); /** * session聚合統計(統計出訪問時長和訪問步長,各個區間的session數量佔總session數量的比例) */ //關閉spark上下文 sc.close(); } /** * 獲取SQLContext * 若是在本地測試環境的話,那麼久生成SQLC哦那text對象 *若是在生產環境運行的話,那麼就生成HiveContext對象 * @param sc SparkContext * @return SQLContext */ private static SQLContext getSQLContext(SparkContext sc) { //在my.properties中配置 //spark.local=true(打包以前改成flase) //在ConfigurationManager.java中添加 //public static Boolean getBoolean(String key) { // String value = getProperty(key); // try { // return Boolean.valueOf(value); // } catch (Exception e) { // e.printStackTrace(); // } // return false; //} //在Contants.java中添加 //String SPARK_LOCAL = "spark.local"; boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL); if(local) { return new SQLContext(sc); }else { return new HiveContext(sc); } } /** * 生成模擬數據 * 只有是本地模式,纔會生成模擬數據 * @param sc * @param sqlContext */ private static void mockData(JavaSparkContext sc, SQLContext sqlContext) { boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL); if(local) { MockData.mock(sc, sqlContext); } } /** * 獲取指定日期範圍內的用戶訪問行爲數據 * @param sqlContext SQLContext * @param taskParam 任務參數 * @return 行爲數據RDD */ private static JavaRDD<Row> getActionRDDByDateRange( SQLContext sqlContext, JSONObject taskParam) { //先在Constants.java中添加任務相關的常量 //String PARAM_START_DATE = "startDate"; //String PARAM_END_DATE = "endDate"; String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE); String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE); String sql = "select * " + "from user_visit_action" + "where date>='" + startDate + "'" + "and date<='" + endDate + "'"; DataFrame actionDF = sqlContext.sql(sql); return actionDF.javaRDD(); } /** * 對行爲數據按sesssion粒度進行聚合 * @param actionRDD 行爲數據RDD * @return session粒度聚合數據 */ private static JavaPairRDD<String, String> aggregateBySession( SQLContext sqlContext, JavaRDD<Row> actionRDD) { //如今actionRDD中的元素是Row,一個Row就是一行用戶訪問行爲記錄,好比一次點擊或者搜索 //如今須要將這個Row映射成<sessionid,Row>的格式 JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair( /** * PairFunction * 第一個參數,至關因而函數的輸入 * 第二個參數和第三個參數,至關因而函數的輸出(Tuple),分別是Tuple第一個和第二個值 */ new PairFunction<Row, String, Row>() { private static final long serialVersionUID = 1L; public Tuple2<String, Row> call(Row row) throws Exception { //按照MockData.java中字段順序獲取 //此時須要拿到session_id,序號是2 return new Tuple2<String, Row>(row.getString(2), row); } }); //對行爲數據按照session粒度進行分組 JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD = sessionid2ActionRDD.groupByKey(); //對每個session分組進行聚合,將session中全部的搜索詞和點擊品類都聚合起來 //到此爲止,獲取的數據格式以下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)> JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair( new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() { private static final long serialVersionUID = 1L; public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple) throws Exception { String sessionid = tuple._1; Iterator<Row> iterator = tuple._2.iterator(); StringBuffer searchKeywordsBuffer = new StringBuffer(""); StringBuffer clickCategoryIdsBuffer = new StringBuffer(""); Long userid = null; //session的起始和結束時間 Date startTime = null; Date endTime = null; //session的訪問步長 int stepLength = 0; //遍歷session全部的訪問行爲 while(iterator.hasNext()) { //提取每一個 訪問行爲的搜索詞字段和點擊品類字段 Row row = iterator.next(); if(userid == null) { userid = row.getLong(1); } String searchKeyword = row.getString(5); Long clickCategoryId = row.getLong(6); //實際上這裏要對數聽說明一下 //並非每一行訪問行爲都有searchKeyword和clickCategoryId兩個字段的 //其實,只有搜索行爲是有searchKeyword字段的 //只有點擊品類的行爲是有clickCaregoryId字段的 //因此,任何一行行爲數據,都不可能兩個字段都有,因此數據是可能出現null值的 //因此是否將搜索詞點擊品類id拼接到字符串中去 //首先要知足不能是null值 //其次,以前的字符串中尚未搜索詞或者點擊品類id if(StringUtils.isNotEmpty(searchKeyword)) { if(!searchKeywordsBuffer.toString().contains(searchKeyword)) { searchKeywordsBuffer.append(searchKeyword + ","); } } if(clickCategoryId != null) { if(!clickCategoryIdsBuffer.toString().contains( String.valueOf(clickCategoryId))) { clickCategoryIdsBuffer.append(clickCategoryId + ","); } } //計算session開始和結束時間 Date actionTime = DateUtils.parseTime(row.getString(4)); if(startTime == null) { startTime = actionTime; } if(endTime == null) { endTime = actionTime; } if(actionTime.before(startTime)) { startTime = actionTime; } if(actionTime.after(endTime)) { endTime = actionTime; } //計算session訪問步長 stepLength ++; } //計算session開始和結束時間 //如今DateUtils.java中添加方法 //public static Date parseTime(String time) { // try { // return TIME_FORMAT.parse(time); // } catch (ParseException e) { // e.printStackTrace(); // } // return null; //} //StringUtils引入的包是import com.erik.sparkproject.util.trimComma; String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString()); String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString()); //計算session訪問時長(秒) long visitLength = (endTime.getTime() - startTime.getTime()) / 1000; //返回的數據便是<sessionid, partAggrInfo> //可是,這一步聚合後,其實還須要將每一行數據,根對應的用戶信息進行聚合 //問題來了,若是是跟用戶信息進行聚合的話,那麼key就不該該是sessionid,而應該是userid //纔可以跟<userid, Row>格式的用戶信息進行聚合 //若是咱們這裏直接返回<sessionid, partAggrInfo>,還得再作一次mapToPair算子 //將RDD映射成<userid,partAggrInfo>的格式,那麼就畫蛇添足 //因此,咱們這裏其實能夠直接返回數據格式就是<userid,partAggrInfo> //而後在直接將返回的Tuple的key設置成sessionid //最後的數據格式,仍是<sessionid,fullAggrInfo> //聚合數據,用什麼樣的格式進行拼接? //咱們這裏統必定義,使用key=value|key=vale //在Constants.java中定義spark做業相關的常量 //String FIELD_SESSION_ID = "sessionid"; //String FIELD_SEARCH_KEYWORDS = "searchKeywords"; //String FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds"; //String FIELD_VISIT_LENGTH = "visitLength"; //String FIELD_STEP_LENGTH = "stepLength"; String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|" + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|" + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|" + Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" + Constants.FIELD_STEP_LENGTH + "=" + stepLength; return new Tuple2<Long, String>(userid, partAggrInfo); } }); //查詢全部用戶數據 String sql = "select * from user_info"; JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD(); JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair( new PairFunction<Row, Long, Row>(){ private static final long serialVersionUID = 1L; public Tuple2<Long, Row> call(Row row) throws Exception { return new Tuple2<Long, Row>(row.getLong(0), row); } }); //將session粒度聚合數據,與用戶信息進行join JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD = userid2PartAggrInfoRDD.join(userid2InfoRDD); //對join起來的數據進行拼接,而且返回<sessionid,fullAggrInfo>格式的數據 JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair( new PairFunction<Tuple2<Long, Tuple2<String, Row>>, String, String>() { private static final long serialVersionUID = 1L; public Tuple2<String, String> call( Tuple2<Long, Tuple2<String, Row>> tuple) throws Exception { String partAggrInfo = tuple._2._1; Row userInfoRow = tuple._2._2; String sessionid = StringUtils.getFieldFromConcatString( partAggrInfo, "\\|", Constants.FIELD_SESSION_ID); int age = userInfoRow.getInt(3); String professional = userInfoRow.getString(4); String city = userInfoRow.getString(5); String sex = userInfoRow.getString(6); //在Constants.java中添加如下常量 //String FIELD_AGE = "age"; //String FIELD_PROFESSIONAL = "professional"; //String FIELD_CITY = "city"; //String FIELD_SEX = "sex"; String fullAggrInfo = partAggrInfo + "|" + Constants.FIELD_AGE + "=" + age + "|" + Constants.FIELD_PROFESSIONAL + "=" + professional + "|" + Constants.FIELD_CITY + "=" + city + "|" + Constants.FIELD_SEX + "=" + sex ; return new Tuple2<String, String>(sessionid, fullAggrInfo); } }); return sessionid2FullAggrInfoRDD; } /** * 過濾session數據 * @param sessionid2AggrInfoRDD * @return */ private static JavaPairRDD<String, String> filterSession( JavaPairRDD<String, String> sessionid2AggrInfoRDD, final JSONObject taskParam) { //爲了使用後面的ValieUtils,因此,首先將全部的篩選參數拼接成一個鏈接串 String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE); String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE); String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS); String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES); String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX); String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS); String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS); String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "") + (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "") + (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "") + (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "") + (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "") + (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "") + (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds : ""); if (_parameter.endsWith("\\|")) { _parameter = _parameter.substring(0, _parameter.length() - 1); } final String parameter = _parameter; //根據篩選參數進行過濾 JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter( new Function<Tuple2<String, String>, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Tuple2<String, String> tuple) throws Exception { //首先,從tuple中,獲取聚合數據 String aggrInfo = tuple._2; //接着,依次按照篩選條件進行過濾 //按照年齡範圍進行過濾(startAge、endAge) //先在Constants.java中添加常量 //String PARAM_START_AGE = "startAge"; //String PARAM_END_AGE = "endage"; //String PARAM_PROFESSIONALS = "professionals"; //String PARAM_CITIES = "cities"; //String PARAM_SEX = "sex"; //String PARAM_KEYWORDS = "keywords"; //String PARAM_CATEGORY_IDS = "categoryIds"; if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE, parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) { return false; } //按照職業範圍進行過濾(professionals) if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL, parameter, Constants.PARAM_PROFESSIONALS)) { return false; } //按照城市範圍進行過濾(cities) if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY, parameter, Constants.PARAM_CATEGORY_IDS)) { return false; } //按照性別過濾 if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX, parameter, Constants.PARAM_SEX)) { return false; } //按照搜索詞過濾 if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS, parameter, Constants.PARAM_KEYWORDS)) { return false; } //按照點擊品類id進行搜索 if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS, parameter, Constants.PARAM_CATEGORY_IDS)) { return false; } return true; } }); return null; } }