以前本人整理的大多爲學習筆記進行知識點的整理,而這篇將會把之前的大部分知識點串聯起來,搞一個完整的項目,主要涉及的流程爲模擬用戶日誌數據的生成,ETL以及編寫sql分析函數進行最終的APP層數據的生成,因爲該項目以前有作過,所以本次會在之前基礎上作一些改進,將大數據組件的選型由原來的Hive變爲Hive + Spark,提升計算速度,好,如今咱們正式開始!php
本人使用的集成開發環境仍然爲IntelliJ IDEA,項目的Module取名爲"music164",項目的代碼所在文件夾以及資源文件夾截圖以下所示:html
其中項目的pom文件的依賴導入以下所示,同時,因爲項目中還涉及到部分scala代碼,所以在一開始添加框架支持時不要忘了添加scala插件:前端
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.oldboy</groupId> <artifactId>music164</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>6</source> <target>6</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.54</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.4.3</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>com.maxmind.db</groupId> <artifactId>maxmind-db</artifactId> <version>1.1.0</version> </dependency> </dependencies> </project>
該項目的第一步將會是生成一份模擬的用戶日誌數據,這裏先作一個簡單說明:互聯網時代下,數據可謂是無處不在,而若是作一個簡單分類,可將平常數據的產生大體分爲這幾類,客戶端產生、手機移動端產生、網頁產生等等,而用戶無時無刻不在進行的手機屏幕點擊事件最終都將變成一條條的數據發送到服務器,而服務器會進行數據的收集、處理以及分析和預測,海量數據就是這樣來的,而在本項目中,咱們處理的日誌數據均爲JSON格式的數據(Javascript object notation),下面,咱們會先說明這樣的數據到底是如何產生的java
AppBaseLog類:mysql
package com.oldboy.music164.common; import java.io.Serializable; /** * 日誌基礎類 */ public abstract class AppBaseLog implements Serializable { public static final String LOGTYPE_ERROR = "error"; public static final String LOGTYPE_EVENT = "event"; public static final String LOGTYPE_PAGE = "page"; public static final String LOGTYPE_USAGE = "usage"; public static final String LOGTYPE_STARTUP = "startup"; private String logType; //日誌類型 private Long createdAtMs; //日誌建立時間 private String deviceId; //設備惟一標識 private String appVersion; //App版本 private String appChannel; //渠道,安裝時就在清單中制定了,appStore等。 private String appPlatform; //平臺 private String osType; //操做系統 private String deviceStyle; //機型 public String getLogType() { return logType; } public void setLogType(String logType) { this.logType = logType; } public Long getCreatedAtMs() { return createdAtMs; } public void setCreatedAtMs(Long createdAtMs) { this.createdAtMs = createdAtMs; } public String getDeviceId() { return deviceId; } public void setDeviceId(String deviceId) { this.deviceId = deviceId; } public String getAppVersion() { return appVersion; } public void setAppVersion(String appVersion) { this.appVersion = appVersion; } public String getAppChannel() { return appChannel; } public void setAppChannel(String appChannel) { this.appChannel = appChannel; } public String getAppPlatform() { return appPlatform; } public void setAppPlatform(String appPlatform) { this.appPlatform = appPlatform; } public String getOsType() { return osType; } public void setOsType(String osType) { this.osType = osType; } public String getDeviceStyle() { return deviceStyle; } public void setDeviceStyle(String deviceStyle) { this.deviceStyle = deviceStyle; } }
AppErrorLog類:nginx
package com.oldboy.music164.common; /** * errorLog * 分析用戶對手機App使用過程當中的錯誤 * 以便對產品進行調整 */ public class AppErrorLog extends AppBaseLog { private String errorBrief; //錯誤摘要 private String errorDetail; //錯誤詳情 public AppErrorLog() { setLogType(LOGTYPE_ERROR); } public String getErrorBrief() { return errorBrief; } public void setErrorBrief(String errorBrief) { this.errorBrief = errorBrief; } public String getErrorDetail() { return errorDetail; } public void setErrorDetail(String errorDetail) { this.errorDetail = errorDetail; } }
AppEventLog類:sql
package com.oldboy.music164.common; /** * 應用上報的事件相關信息 */ public class AppEventLog extends AppBaseLog { private String eventId; //事件惟一標識,包括用戶對特定音樂的操做,好比分享,收藏,主動播放,聽完,跳過,取消收藏,拉黑 private String musicID; //歌曲名稱 private String playTime; //什麼時刻播放 private String duration; //播放時長,若是播放時長在30s以內則斷定爲跳過 private String mark; //打分,分享4分,收藏3分,主動播放2分,聽完1分,跳過-1分,取消收藏-3, 拉黑-5分 public AppEventLog() { setLogType(LOGTYPE_EVENT); } public String getEventId() { return eventId; } public void setEventId(String eventId) { this.eventId = eventId; } public String getMusicID() { return musicID; } public void setMusicID(String musicID) { this.musicID = musicID; } public String getPlayTime() { return playTime; } public void setPlayTime(String playTime) { this.playTime = playTime; } public String getDuration() { return duration; } public void setDuration(String duration) { this.duration = duration; } public String getMark() { return mark; } public void setMark(String mark) { this.mark = mark; } }
AppPageLog類:數據庫
package com.oldboy.music164.common; /** * 應用上報的頁面相關信息 */ public class AppPageLog extends AppBaseLog { /* * 一次啓動中的頁面訪問次數(應保證每次啓動的全部頁面日誌在一次上報中,即最後一條上報的頁面記錄的nextPage爲空) */ private int pageViewCntInSession = 0; private String pageId; //頁面id private String visitIndex; //訪問順序號,0爲第一個頁面 private String nextPage; //下一個訪問頁面,如爲空則表示爲退出應用的頁面 private String stayDurationSecs; //當前頁面停留時長 public AppPageLog() { setLogType(LOGTYPE_PAGE); } public int getPageViewCntInSession() { return pageViewCntInSession; } public void setPageViewCntInSession(int pageViewCntInSession) { this.pageViewCntInSession = pageViewCntInSession; } public String getPageId() { return pageId; } public void setPageId(String pageId) { this.pageId = pageId; } public String getNextPage() { return nextPage; } public void setNextPage(String nextPage) { this.nextPage = nextPage; } public String getVisitIndex() { return visitIndex; } public void setVisitIndex(String visitIndex) { this.visitIndex = visitIndex; } public String getStayDurationSecs() { return stayDurationSecs; } public void setStayDurationSecs(String stayDurationSecs) { this.stayDurationSecs = stayDurationSecs; } }
AppStartupLog類:apache
package com.oldboy.music164.common; /** * 啓動日誌 */ public class AppStartupLog extends AppBaseLog { private String country; //國家,終端不用上報,服務器自動填充該屬性,經過GeoLite private String province; //省份,終端不用上報,服務器自動填充該屬性 private String ipAddress; //ip地址 private String network; //網絡 private String carrier; //運營商 private String brand; //品牌 private String screenSize; //分辨率 public AppStartupLog() { setLogType(LOGTYPE_STARTUP); } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public String getIpAddress() { return ipAddress; } public void setIpAddress(String ipAddress) { this.ipAddress = ipAddress; } public String getNetwork() { return network; } public void setNetwork(String network) { this.network = network; } public String getCarrier() { return carrier; } public void setCarrier(String carrier) { this.carrier = carrier; } public String getBrand() { return brand; } public void setBrand(String brand) { this.brand = brand; } public String getScreenSize() { return screenSize; } public void setScreenSize(String screenSize) { this.screenSize = screenSize; } }
AppUsageLog類:json
package com.oldboy.music164.common; /** * 應用上報的使用時長相關信息 */ public class AppUsageLog extends AppBaseLog { private String singleUseDurationSecs; //單次使用時長(秒數),指一次啓動內應用在前臺的持續時長 private String singleUploadTraffic; //單次使用過程當中的上傳流量 private String singleDownloadTraffic; //單次使用過程當中的下載流量 public AppUsageLog() { setLogType(LOGTYPE_USAGE); } public String getSingleUseDurationSecs() { return singleUseDurationSecs; } public void setSingleUseDurationSecs(String singleUseDurationSecs) { this.singleUseDurationSecs = singleUseDurationSecs; } public String getSingleUploadTraffic() { return singleUploadTraffic; } public void setSingleUploadTraffic(String singleUploadTraffic) { this.singleUploadTraffic = singleUploadTraffic; } public String getSingleDownloadTraffic() { return singleDownloadTraffic; } public void setSingleDownloadTraffic(String singleDownloadTraffic) { this.singleDownloadTraffic = singleDownloadTraffic; } }
AppLogAggEntity類:
簡單說明:該類實際上至關於一個聚合體,將全部類型的日誌概括到了一個類中去了,既包含基礎信息,又包含各種的以數組形式出現的其餘App類
package com.oldboy.music164.common; import java.util.List; /** * App日誌聚合體,phone端程序上報日誌使用 */ public class AppLogAggEntity { private String deviceId; //設備惟一標識 private String appVersion; //版本 private String appChannel; //渠道,安裝時就在清單中制定了,appStore等。 private String appPlatform; //平臺 private String osType; //操做系統 private String deviceStyle; //機型 private List<AppStartupLog> appStartupLogs; //啓動相關信息的數組 private List<AppPageLog> appPageLogs; //頁面跳轉相關信息的數組 private List<AppEventLog> appEventLogs; //事件相關信息的數組 private List<AppUsageLog> appUsageLogs; //app使用狀況相關信息的數組 private List<AppErrorLog> appErrorLogs; //錯誤相關信息的數組 public String getDeviceId() { return deviceId; } public void setDeviceId(String deviceId) { this.deviceId = deviceId; } public String getAppVersion() { return appVersion; } public void setAppVersion(String appVersion) { this.appVersion = appVersion; } public String getAppChannel() { return appChannel; } public void setAppChannel(String appChannel) { this.appChannel = appChannel; } public String getAppPlatform() { return appPlatform; } public void setAppPlatform(String appPlatform) { this.appPlatform = appPlatform; } public String getOsType() { return osType; } public void setOsType(String osType) { this.osType = osType; } public String getDeviceStyle() { return deviceStyle; } public void setDeviceStyle(String deviceStyle) { this.deviceStyle = deviceStyle; } public List<AppStartupLog> getAppStartupLogs() { return appStartupLogs; } public void setAppStartupLogs(List<AppStartupLog> appStartupLogs) { this.appStartupLogs = appStartupLogs; } public List<AppPageLog> getAppPageLogs() { return appPageLogs; } public void setAppPageLogs(List<AppPageLog> appPageLogs) { this.appPageLogs = appPageLogs; } public List<AppEventLog> getAppEventLogs() { return appEventLogs; } public void setAppEventLogs(List<AppEventLog> appEventLogs) { this.appEventLogs = appEventLogs; } public List<AppUsageLog> getAppUsageLogs() { return appUsageLogs; } public void setAppUsageLogs(List<AppUsageLog> appUsageLogs) { this.appUsageLogs = appUsageLogs; } public List<AppErrorLog> getAppErrorLogs() { return appErrorLogs; } public void setAppErrorLogs(List<AppErrorLog> appErrorLogs) { this.appErrorLogs = appErrorLogs; } }
因爲在項目中,多處須要與關係型數據庫進行交互,所以自定義一個JDBC鏈接池類將有助於優化數據庫鏈接而且有效避免「too many connections」異常的出現,該類使用到的技術點以下:
1.java「懶漢式」單例設計模式
2.使用LinkedList做爲數據庫鏈接池技術底層實現
3.線程休眠
4.java接口的匿名內部類實現以及高級特性——callback回調機制
JDBCPool類:
package com.oldboy.music164.jdbc; import com.oldboy.music164.constant.Constants; import com.oldboy.music164.util.PropUtil; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.LinkedList; public class JDBCPool { private static JDBCPool instance = null; //實現線程安全 public static JDBCPool getInstance() { if (instance == null) { synchronized (JDBCPool.class) { if (instance == null) { instance = new JDBCPool(); } } } return instance; } //數據庫鏈接池 private LinkedList<Connection> dataSource = new LinkedList<Connection>(); private JDBCPool() { int datasourceSize = PropUtil.getIntValue(Constants.DS_SIZE); String driver = PropUtil.getValue(Constants.JDBC_DRIVER); String url = PropUtil.getValue(Constants.JDBC_URL); String username = PropUtil.getValue(Constants.JDBC_USER); String password = PropUtil.getValue(Constants.JDBC_PASS); for (int i = 0; i < datasourceSize; i++) { try { Class.forName(driver); Connection conn = DriverManager.getConnection(url, username, password); dataSource.push(conn); } catch (Exception e) { e.printStackTrace(); } } } public synchronized Connection getConnection() { while (dataSource.size() == 0) { try { Thread.sleep(10); } catch (Exception e) { e.printStackTrace(); } } return dataSource.poll(); } public void executeQuery(String sql, Object[] params, QueryCallback callback) { Connection conn = null; PreparedStatement ppst = null; ResultSet rs = null; try { conn = getConnection(); ppst = conn.prepareStatement(sql); for (int i = 0; i < params.length; i++) { ppst.setObject(i + 1, params[i]); } rs = ppst.executeQuery(); callback.process(rs); } catch (Exception e) { e.printStackTrace(); }finally { if (conn != null){ dataSource.push(conn); } } } public interface QueryCallback { void process(ResultSet rs) throws Exception; } }
Constants類:該類定義了一個常量類,將一些諸如數據庫鏈接的驅動,表名,鏈接池初始化的鏈接數量等等設置成了一個個的常量,這樣在項目進展過程當中,若是一部分的配置發生了變化,就不須要每次在代碼中作大量更改,而是隻須要更改配置文件便可,這樣增長了項目的可維護性
package com.oldboy.music164.constant; /* 此類的做用是用來定義一些作數據庫鏈接的常量,而這些常量從配置文件中讀取 */ public class Constants { public static final String JDBC_DRIVER = "jdbc.driver"; public static final String JDBC_URL = "jdbc.url"; public static final String JDBC_USER = "jdbc.username"; public static final String JDBC_PASS = "jdbc.password"; public static final String DS_SIZE = "datasource.poolsize"; public static final String MUSIC_TABLENAME = "music.tablename"; }
DictUtil類:在資源文件夾中事先已經存入了一個數據字典,在生成數據時,成員變量的取值都會從這個數據字典中隨機獲取,該數據字典的格式以下所示:
DictUtil類的代碼以下所示:
package com.oldboy.music164.util; /* 此類的做用是讀取數據字典文件,並從文件中隨機獲取到一個值做爲隨機生成的數據 */ import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Random; public class DictUtil { //先初始化一個字典 private static Map<String, ArrayList<String>> map = new HashMap<String, ArrayList<String>>(); //將dictionary.dat中的數據加載到一個字典中去,而且只加載一次 //所以考慮使用靜態代碼塊的方式實現 static { try { //使用如下固定方法來從資源文件夾中加載數據 InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("dictionary.dat"); BufferedReader br = new BufferedReader(new InputStreamReader(is)); //如下開始將數據放入map中去 String line = null; ArrayList<String> list = null; while((line = br.readLine()) != null){ if(line.startsWith("[")){ list = new ArrayList<String>(); map.put(line.substring(1, line.length() - 1), list); }else{ list.add(line); } } is.close(); br.close(); } catch (Exception e) { } } //再寫一個方法從字典中的一個key對應的list中獲取到任意的值 public static String getRandomValue(String key){ Random r = new Random(); ArrayList<String> list = map.get(key); //避免出現字典中不存在的值,使用try-catch語句塊 try { return list.get(r.nextInt(list.size())); } catch (Exception e) { return null; } } //在生成音樂事件日誌中須要使用到,須要區分生成喜歡的和不喜歡的音樂事件 public static String randomValue_positive(){ Random r = new Random(); ArrayList<String> values = map.get("eventid"); if(values == null){ return null; } //0-3 return values.get(r.nextInt(values.size() - 4)); } public static String randomValue_negative(){ Random r = new Random(); ArrayList<String> values = map.get("eventid"); if(values == null){ return null; } //4-7 return values.get(r.nextInt(values.size() - 4) + 4); } }
GenLogTimeUtil類:因爲在生成數據時須要大量使用到時間戳,所以此類中專門設定了時間戳生成的邏輯
package com.oldboy.music164.util; /* 此類的做用是用於隨機生成一個時間戳 */ import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.Random; public class GenLogTimeUtil { //隨機生成某一天的時間戳 //若是是週中,儘可能生成13-14點的時間 //而若是是週末,就儘可能生成10-11點的時間 public static long genTime(String date){ try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); Date d = sdf.parse(date); Calendar calendar = Calendar.getInstance(); calendar.setTime(d); //獲取該日期是星期幾 int i = calendar.get(Calendar.DAY_OF_WEEK); Random r = new Random(); if(i == 7 || i == 1){ return genWeekendTime(date, r.nextInt(3)); } return genWeekdayTime(date, r.nextInt(3)); } catch (Exception e) { } return 0; } //生成周末的時間戳中的時間部分 private static long genWeekendTime(String date, int i){ Random r = new Random(); String hour; String minute; String newDate; switch (i){ case 0: hour = intFormat(r.nextInt(24)); minute = intFormat(r.nextInt(59)); newDate = date + " " + hour + ":" + minute; return parseTime(newDate); case 1: hour = "10"; minute = intFormat(r.nextInt(59)); newDate = date + " " + hour + ":" + minute; return parseTime(newDate); case 2: hour = intFormat(r.nextInt(24)); minute = intFormat(r.nextInt(59)); newDate = date + " " + hour + ":" + minute; return parseTime(newDate); } return 0; } //生成周中的時間戳的時間部分 private static long genWeekdayTime(String date, int i){ Random r = new Random(); String hour; String minute; String newDate; switch (i){ case 0: hour = intFormat(r.nextInt(24)); minute = intFormat(r.nextInt(59)); newDate = date + " " + hour + ":" + minute; return parseTime(newDate); case 1: hour = "13"; minute = intFormat(r.nextInt(59)); newDate = date + " " + hour + ":" + minute; return parseTime(newDate); case 2: hour = intFormat(r.nextInt(24)); minute = intFormat(r.nextInt(59)); newDate = date + " " + hour + ":" + minute; return parseTime(newDate); } return 0; } //將一個時間串轉化爲時間戳 private static long parseTime(String newDate){ try { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); Date date = sdf.parse(newDate); return date.getTime(); } catch (Exception e) { } return 0; } //將一個數字轉換成00的格式 private static String intFormat(int i){ DecimalFormat df = new DecimalFormat("00"); return df.format(i); } }
MusicTableUtil類:
package com.oldboy.music164.util; /* 此類的做用很簡單,就是將全部不一樣種類的音樂用數字映射出來 */ public class MusicTableUtil { public static String parseTable(int type){ switch (type) { case 1: return "music_mix"; //流行歌曲 case 2: return "music_folk"; //民謠 case 3: return "music_custom"; //古風 case 4: return "music_old"; //老歌 case 5: return "music_rock1"; //歐美搖滾 case 6: return "music_rock2"; //國與搖滾 case 7: return "music_comic"; //二次元 case 8: return "music_yueyu"; //粵語 case 9: return "music_light"; //輕音樂 default: try { throw new Exception("參數必須爲1-9"); } catch (Exception e) { e.printStackTrace(); } break; } return null; } }
MusicUtil類:
package com.oldboy.music164.util; /* 此類的做用是鏈接mysql數據庫,並將對應的信息存放在Map中去 */ import com.oldboy.music164.constant.Constants; import com.oldboy.music164.jdbc.JDBCPool; import java.sql.ResultSet; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class MusicUtil { public static final Map<String, List<Map<String, String>>> MUSIC_MAP_LIST = new HashMap<String, List<Map<String, String>>>(); //初始化時候,將全部Music信息放在一個Map中 static { JDBCPool pool = JDBCPool.getInstance(); Object[] objs = {}; String tablenames = PropUtil.getValue(Constants.MUSIC_TABLENAME); String[] tablenameArr = tablenames.split(","); for (String tablename : tablenameArr) { final List<Map<String, String>> list = new ArrayList<Map<String, String>>(); pool.executeQuery("select mname,mtime from " + tablename, objs, new JDBCPool.QueryCallback() { @Override public void process(ResultSet rs) throws Exception { while (rs.next()) { Map<String, String> map = new HashMap<String, String>(); map.put("mname", rs.getString("mname")); map.put("mtime", rs.getString("mtime")); list.add(map); } } }); MUSIC_MAP_LIST.put(tablename,list); } } public static final Map<String, String> MARK_MAPPING = new HashMap<String, String>(); static { MARK_MAPPING.put("share", "4"); MARK_MAPPING.put("favourite", "3"); MARK_MAPPING.put("play", "2"); MARK_MAPPING.put("listen", "1"); MARK_MAPPING.put("skip", "-1"); MARK_MAPPING.put("black", "-5"); MARK_MAPPING.put("nofavourite", "-3"); MARK_MAPPING.put("null", "0"); } }
ParseIPUtil類:須要注意的是,因爲該類須要使用到解析IP的功能,所以須要從外部導入包com.maxmind.db
package com.oldboy.music164.util; /* 此類的做用是從一個給定的IP地址解析出國家和省份信息 */ import com.fasterxml.jackson.databind.JsonNode; import com.maxmind.db.Reader; import java.io.InputStream; import java.net.InetAddress; import java.util.HashMap; import java.util.Map; public class ParseIPUtil { private static Reader reader; private static Map<String,String> map = new HashMap<String,String>(); static { try { InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("GeoLite2-City.mmdb"); reader = new Reader(is); } catch (Exception e) { } } private static String processIp(String ip){ try { //其中,"country"表明國家,而"subdivisions"表明省份 JsonNode jsonNode = reader.get(InetAddress.getByName(ip)); String country = jsonNode.get("country").get("names").get("zh-CN").asText(); String province = jsonNode.get("subdivisions").get(0).get("names").get("zh-CN").asText(); map.put(ip,country+","+province); } catch (Exception e) { map.put(ip,"unknown,unknown"); } return map.get(ip); } public static String getCountry(String ip){ return processIp(ip).split(",")[0]; } public static String getProvince(String ip){ return processIp(ip).split(",")[1]; } }
PropUtil類:
package com.oldboy.music164.util; /* 此類的做用是從配置文件中獲取到鏈接數據庫所須要用到的字符串 */ import java.io.InputStream; import java.util.Properties; public class PropUtil { private static Properties prop; static { try { prop = new Properties(); InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("music.properties"); prop.load(is); } catch (Exception e) { } } public static String getValue(String key){ try { return prop.getProperty(key); } catch (Exception e) { return null; } } public static Integer getIntValue(String key){ try { return Integer.parseInt(prop.getProperty(key)); } catch (Exception e) { return 0; } } }
說明:因爲項目須要生成大量數據,這就意味着須要給大量的字段賦值,這裏使用到了java反射技術,經過反射的方式獲取到全部App類的全部字段,並給字段進行賦值
GenLogUtil類:
package com.oldboy.music164.genlog; /* 此類用於生成日誌聚合體 */ import com.oldboy.music164.common.AppBaseLog; import com.oldboy.music164.common.AppEventLog; import com.oldboy.music164.util.DictUtil; import com.oldboy.music164.util.GenLogTimeUtil; import com.oldboy.music164.util.MusicTableUtil; import com.oldboy.music164.util.MusicUtil; import java.lang.reflect.Field; import java.util.Map; import java.util.Random; public class GenLogUtil { static Random r = new Random(); public static int type; public static String date; public GenLogUtil(String date) { this.date = date; } public GenLogUtil(int type, String date) { this.type = type; this.date = date; } public static <T> T genLog(Class<T> clazz) throws Exception{ T t1 = clazz.newInstance(); //先賦值數據字典中有的那部分 if(t1 instanceof AppBaseLog){ Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { //這裏須要加一個判斷,只有是字符串纔給字段賦值 if(field.getType() == String.class){ field.setAccessible(true); field.set(t1, DictUtil.getRandomValue(field.getName().toLowerCase())); } } ((AppBaseLog)t1).setCreatedAtMs(GenLogTimeUtil.genTime(date)); } if(t1 instanceof AppEventLog){ AppEventLog eventLog = (AppEventLog) t1; //設置一個邏輯,若是是0就使用喜歡的音樂,若是是1就使用不喜歡的音樂 switch (r.nextInt(2)) { case 0: genPositive(eventLog); break; case 1: genNegative(eventLog); break; } } return t1; } //生成喜歡的音樂對應的各項參數 private static void genPositive(AppEventLog eventLog){ String table = MusicTableUtil.parseTable(type); String positive = DictUtil.randomValue_positive(); int i = r.nextInt(MusicUtil.MUSIC_MAP_LIST.get(table).size()); Map<String, String> music_time = MusicUtil.MUSIC_MAP_LIST.get(table).get(i); //設置歌曲名稱 eventLog.setMusicID(music_time.get("mname")); //設置播放時間和播放時長 if(positive.equals("play") || positive.equals("listen")){ eventLog.setDuration(music_time.get("mtime")); eventLog.setPlayTime(eventLog.getCreatedAtMs() + ""); } //設置事件ID和得分 eventLog.setEventId(positive); eventLog.setMark(MusicUtil.MARK_MAPPING.get(positive)); } //生成不喜歡的音樂對應的各項參數 private static void genNegative(AppEventLog eventLog){ String table = MusicTableUtil.parseTable(type); //negative變量是差評的eventid String negative = DictUtil.randomValue_negative(); int i = r.nextInt(MusicUtil.MUSIC_MAP_LIST.get(table).size()); Map<String, String> music_time = MusicUtil.MUSIC_MAP_LIST.get(table).get(i); //設置歌曲名稱 eventLog.setMusicID(music_time.get("mname")); //設置打分 eventLog.setMark(MusicUtil.MARK_MAPPING.get(negative)); //設置播放時間和播放時長 if (negative.equals("skip")) { eventLog.setDuration("00:20"); eventLog.setPlayTime(eventLog.getCreatedAtMs() + ""); } eventLog.setEventId(negative); eventLog.setMark(MusicUtil.MARK_MAPPING.get(negative)); } }
GenLogAgg類:
package com.oldboy.music164.genlog; /* 用來測試生成日誌聚合體 */ import com.alibaba.fastjson.JSON; import com.oldboy.music164.common.*; import com.oldboy.music164.util.DictUtil; import java.lang.reflect.Field; import java.util.*; public class GenLogAgg { public static String genLogAgg(int type, String deviceId, String date){ try { Class clazz = AppLogAggEntity.class; Object t1 = clazz.newInstance(); Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { field.setAccessible(true); field.set(t1, DictUtil.getRandomValue(field.getName().toLowerCase())); } ((AppLogAggEntity) t1).setAppErrorLogs(genLogList(AppErrorLog.class, date, type)); ((AppLogAggEntity) t1).setAppEventLogs(genLogList(AppEventLog.class, date, type)); ((AppLogAggEntity) t1).setAppPageLogs(genLogList(AppPageLog.class, date, type)); ((AppLogAggEntity) t1).setAppStartupLogs(genLogList(AppStartupLog.class, date, type)); ((AppLogAggEntity) t1).setAppUsageLogs(genLogList(AppUsageLog.class, date, type)); ((AppLogAggEntity) t1).setDeviceId(deviceId); return JSON.toJSONString(t1,true); } catch (Exception e) { } return null; } public static <T>List<T> genLogList(Class<T> clazz, String date, int type) throws Exception{ List<T> list = new ArrayList<T>(); Random r = new Random(); if(clazz.equals(AppStartupLog.class)){ for(int i = 0; i < 2; i++){ list.add(new GenLogUtil(date).genLog(clazz)); } } else if(clazz.equals(AppEventLog.class)){ for(int i = 0; i < r.nextInt(10); i++){ list.add(new GenLogUtil(type, date).genLog(clazz)); } } else { for(int i = 0; i < 3; i++){ list.add(new GenLogUtil(date).genLog(clazz)); } } return list; } }
DataSender類:該類使用的是以前已經封裝好了的功能,進行數據的生成,而且生成的時間設定爲2018年12月份一整個月的數據,設定100個用戶而且每一個用戶產生100條日誌,總計300,000條日誌
package com.oldboy.music164.phone; import com.oldboy.music164.genlog.GenLogAgg; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.text.DecimalFormat; /** * 模擬音樂手機客戶端手機日誌生成主類 */ public class DataSender { public static void main(String[] args) throws Exception { DecimalFormat df = new DecimalFormat("00"); //生成2018年12月份1號到30號的日誌 for (int i = 1; i <= 30; i++) { genUser(100, "2018-12-" + df.format(i), 100); } } /** * 產生指定日期的日誌 * * @param userNum 用戶總數 * @param date 指定日期 * @param logNum 每一個用戶生成日誌包數(日誌包做爲上傳到服務端日誌的最小單元) */ public static void genUser(int userNum, final String date, final int logNum) { //產生 for (int i = 0; i < userNum; i++) { DecimalFormat df = new DecimalFormat("000000"); final String deviceID = "Device" + df.format(i); //表映射 eg:1 => music_mix,參見TypeUtil final int type = (i % 9) + 1; genData(deviceID, type, date, logNum); } } /** * 爲指定用戶,根據用戶喜歡歌曲類型生成帶有音樂偏好的指定數目的日誌包 * * @param deviceID 用戶id或用戶設備id * @param type 用戶喜歡歌曲類型 * @param date 指定日期 * @param num 指定用戶生成日誌包個數 */ public static void genData(String deviceID, int type, String date, int num) { for (int i = 0; i < num; i++) { //生成日誌工具類 String logAgg = GenLogAgg.genLogAgg(type, deviceID, date); doSend(logAgg); } } private static void doSend(String json) { try { String strUrl = "http://s201:80"; URL url = new URL(strUrl); HttpURLConnection conn = (HttpURLConnection)url.openConnection(); //設置請求方式 conn.setRequestMethod("POST"); //設置能夠傳輸數據 conn.setDoOutput(true); conn.setRequestProperty("client_time",System.currentTimeMillis() +""); OutputStream os = conn.getOutputStream(); os.write(json.getBytes()); os.flush(); os.close(); System.out.println(conn.getResponseCode()); } catch (Exception e) { e.printStackTrace(); } } }
本項目涉及到的主機共有5臺,s201-s205,其中,發送數據時會使用到nginx搭建一個反向代理,並將s201做爲反向代理服務器,將數據發送到s202-s204三臺虛擬機上去,架構以下圖所示:
事實上,本項目實際使用到的軟件爲openresty,本質上就是nginx加上了一堆插件,因爲openresty是用C++寫的,所以在進行該軟件的安裝部署時,須要進行編譯,安裝後,最終軟件的目錄是在/usr/local/openresty下
s201的配置文件以下,注意,該配置文件只須要指定upstream server便可,須要重點配置的地方爲黑體加粗部分,其餘地方基本不須要動,s201的nginx.conf文件以下:
worker_processes 4;
events { worker_connections 10240; } http { include mime.types; default_type application/octet-stream;
sendfile on; keepalive_timeout 65; underscores_in_headers on; upstream nginx_server{ server s202:80 max_fails=2 fail_timeout=2 weight=2; server s203:80 max_fails=2 fail_timeout=2 weight=2; server s204:80 max_fails=2 fail_timeout=2 weight=2; } server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { root html; index index.html index.htm; proxy_pass http://nginx_server; } #error_page 404 /404.html; # redirect server error pages to the static page /50x.html # error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } # proxy the PHP scripts to Apache listening on 127.0.0.1:80 # #location ~ \.php$ { # proxy_pass http://127.0.0.1; #} } }
做爲數據實際接收方的s202-s204的配置文件以下所示:
#user nobody; worker_processes 4;
events { worker_connections 10240; } http { include mime.types; default_type application/octet-stream; underscores_in_headers on; log_format main escape=json $msec#$remote_addr#$http_client_time#$status#$request_body; access_log logs/access.log main; sendfile on; #tcp_nopush on; #keepalive_timeout 0; keepalive_timeout 65; #gzip on; server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { root html; index index.html index.htm; error_page 405 =200 $uri; lua_need_request_body on; content_by_lua 'local s = ngx.var.request_body'; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } } }
說明:至此,nginx反向代理生成數據已經搭建完畢,全部的數據都將會落在s202-s204的 /usr/local/openresty/nginx/logs/access.log文件中,接下去,咱們將會使用到Flume做爲一個日誌的收集工具將全部服務器的數據統一傳輸到HDFS分佈式文件系統上去
躍點使用時的細節說明:使用Flume將數據上傳,這件事自己並不難實現,本人一開始的想法就是經過寫多個配置文件,每一個配置文件都指定將數據傳輸到HDFS的某個文件夾上去就好了,而這些文件夾將會以主機名進行命名,可是,這樣勢必會產生一個問題,最終咱們要分析的仍然是全量數據,所以最終仍是要將全部數據收集到一起去,若是使用這樣的架構,每當咱們新增一臺主機,就須要從新手動進行數據的聚合,這樣極大地提升了維護的成本,此問題的發生致使本人進行了一個改進,那就是使用Flume躍點技術,該技術將會使用一臺機器充當一個數據的中間層收集端,全部其餘機器上的數據都會統一將數據發送給它,而後由它進行最終的統一上傳,這樣就避免了不少維護帶來的問題,中間傳輸過程,咱們在本項目使用的是avro技術,這是一種數據的串行化系統,能夠大幅縮短數據的大小以及傳輸時間,該架構以下圖所示:
s202-s204端發送組Flume配置文件,sender.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /usr/local/openresty/nginx/logs/access.log
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.153.205
a1.sinks.k1.port = 4444
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
s205端聚合組Flume配置文件,collector.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4444
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y/%m/%d/
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 設置滾動大小
a1.sinks.k1.hdfs.rollSize = 134217728
# 文件若是未激活狀態超過60s,則會自動滾動
a1.sinks.k1.hdfs.idleTimeout = 60
# 文件類型 純文本
a1.sinks.k1.hdfs.fileType = DataStream
# 將間隔滾動設爲0
a1.sinks.k1.hdfs.rollInterval = 0
# 單個文件中事件個數
a1.sinks.k1.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
至此,全部模擬用戶生成的原生的log日誌文件已經在HDFS的/flume/events/目錄下出現,而且還有更細的以年月日爲基礎的文件夾層次,如今開始進入到整個項目的第二階段——ETL階段,即將用戶產生的原生數據轉化成數據倉庫的一張張維表,即從ods層轉化爲dw層,這裏須要特別說明的是,在實際生產環境中,應將hive表構形成分區表,而在本項目中,爲演示方便,直接加載一個月的全量數據,所以未使用分區表
ods層建表以及加載數據語句以下:
create table ODS_MUSIC_APP(line String); load data inpath '/flume/events/2020/04/02' into table ODS_MUSIC_APP
dw層建表語句以下,使用了優化的數據格式parquet文件對數據進行列式存儲,除此以外,這些DW層的表在設計時還遵循了兩大原則:
1.爲防止後續出現過多的無謂的連表過程,如device_id, client_ip, log_create_time等公共字段將會出如今全部的表當中
2.全部數據類型所有統一爲string,這樣省去了以後數據類型轉換給開發帶來的困擾
-- 建立dw_log_music_error表 create table DW_LOG_MUSIC_ERROR(DEVICE_ID string, DEVICE_MODEL string, CLIENT_IP string, CLIENT_TIME string, SERVER_TIME string, LOG_CREATE_TIME string, ERROR_BRIEF string, ERROR_DETAIL string, APP_VERSION string, APP_STORE string, APP_PLATFORM string, APP_OSTYPE string) row format delimited fields terminated by '\t' lines terminated by '\n' stored as parquet; -- 建立dw_log_music_event表 create table DW_LOG_MUSIC_EVENT(DEVICE_ID string, DEVICE_MODEL string, CLIENT_IP string, CLIENT_TIME string, SERVER_TIME string, LOG_CREATE_TIME string, EVENT_TYPE string, EVENT_MARK string, EVENT_MUSIC string, EVENT_PLAYTIME string, EVENT_DURATION string, APP_VERSION string, APP_STORE string, APP_PLATFORM string, APP_OSTYPE string) row format delimited fields terminated by '\t' lines terminated by '\n' stored as parquet; -- 建立dw_log_music_page表 create table DW_LOG_MUSIC_PAGE(DEVICE_ID string, DEVICE_MODEL string, CLIENT_IP string, CLIENT_TIME string, SERVER_TIME string, LOG_CREATE_TIME string, PAGE_ID string, PAGE_NEXT string, PAGE_VIEW_CNT string, PAGE_DURATION string, PAGE_VISIT_INDEX string, APP_VERSION string, APP_STORE string, APP_PLATFORM string, APP_OSTYPE string) row format delimited fields terminated by '\t' lines terminated by '\n' stored as parquet; -- 建立dw_log_music_startup表 create table DW_LOG_MUSIC_STARTUP(DEVICE_ID string, DEVICE_BRAND string, DEVICE_MODEL string, DEVICE_SCREENSIZE string, DEVICE_CARRIER string, CLIENT_IP string, CLIENT_COUNTRY string, CLIENT_PROVINCE string, CLIENT_TIME string, SERVER_TIME string, LOG_CREATE_TIME string, CLIENT_NETWORK string, APP_VERSION string, APP_STORE string, APP_PLATFORM string, APP_OSTYPE string) row format delimited fields terminated by '\t' lines terminated by '\n' stored as parquet; - 建立dw_log_music_usage表 create table DW_LOG_MUSIC_USAGE(DEVICE_ID string, DEVICE_MODEL string, CLIENT_IP string, CLIENT_TIME string, SERVER_TIME string, LOG_CREATE_TIME string, APP_VERSION string, APP_STORE string, APP_PLATFORM string, APP_OSTYPE string, ONCE_USE_DURATION string, ONCE_UPLOAD_TRAFFIC string, ONCE_DOWNLOAD_TRAFFIC string) row format delimited fields terminated by '\t' lines terminated by '\n' stored as parquet;
至此,原表以及目標表都已經建立完成,以後咱們只須要集中精力完成數據的ETL過程便可,注意,在此過程當中會涉及到JSON串的解析,所以須要導入阿里的fast json包,並注意將其放入spark的lib文件夾下(第三方包)
ETL過程思路詳解:本人在第一次作這個項目時,僅僅使用了hive,當時java頻繁gc致使出現了OOM以及速度慢等問題,所以本次進行升級,改用hive + spark這樣的架構,更爲穩定,也提高了速度,咱們的思路是直接使用spark讀取HDFS文件並將其轉化爲rdd,再使用scala的隱式轉換包將rdd轉化爲Dataframe,最後經過spark sql完成整個過程;而在使用fastjson解析日誌時,則將日誌的層次結構劃分紅了0、一、2這三個層級,並將這些層級結構記錄在了mysql的table_shadow數據庫中,建庫建表語句以下所示:
/* Navicat Premium Data Transfer Source Server : big13 Source Server Type : MySQL Source Server Version : 50724 Source Host : localhost:3306 Source Schema : table_shadow Target Server Type : MySQL Target Server Version : 50724 File Encoding : 65001 Date: 15/01/2019 18:10:52 */ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- create the database create database table_shadow; -- ---------------------------- -- Table structure for music_log_error -- ---------------------------- DROP TABLE IF EXISTS `music_log_error`; CREATE TABLE `music_log_error` ( `id` int(10) NOT NULL AUTO_INCREMENT, `table_key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `type` int(10) NOT NULL COMMENT 'key在日誌串中的位置,client_time#{appErrorLogs:{errorBrief:xxx}}#', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 13 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of music_log_error -- ---------------------------- INSERT INTO `music_log_error` VALUES (1, 'appErrorLogs', 'deviceId', 1); INSERT INTO `music_log_error` VALUES (2, 'appErrorLogs', 'deviceStyle', 1); INSERT INTO `music_log_error` VALUES (3, 'appErrorLogs', 'remote_addr', 0); INSERT INTO `music_log_error` VALUES (4, 'appErrorLogs', 'http_client_time', 0); INSERT INTO `music_log_error` VALUES (5, 'appErrorLogs', 'msec', 0); INSERT INTO `music_log_error` VALUES (6, 'appErrorLogs', 'createdAtMs', 2); INSERT INTO `music_log_error` VALUES (7, 'appErrorLogs', 'errorBrief', 2); INSERT INTO `music_log_error` VALUES (8, 'appErrorLogs', 'errorDetail', 2); INSERT INTO `music_log_error` VALUES (9, 'appErrorLogs', 'appVersion', 1); INSERT INTO `music_log_error` VALUES (10, 'appErrorLogs', 'appChannel', 1); INSERT INTO `music_log_error` VALUES (11, 'appErrorLogs', 'appPlatform', 1); INSERT INTO `music_log_error` VALUES (12, 'appErrorLogs', 'osType', 1); -- ---------------------------- -- Table structure for music_log_event -- ---------------------------- DROP TABLE IF EXISTS `music_log_event`; CREATE TABLE `music_log_event` ( `id` int(10) NOT NULL AUTO_INCREMENT, `table_key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `type` int(10) NOT NULL COMMENT 'key在日誌串中的位置,client_time#{appErrorLogs:{errorBrief:xxx}}#', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 16 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of music_log_event -- ---------------------------- INSERT INTO `music_log_event` VALUES (1, 'appEventLogs', 'deviceId', 1); INSERT INTO `music_log_event` VALUES (2, 'appEventLogs', 'deviceStyle', 1); INSERT INTO `music_log_event` VALUES (3, 'appEventLogs', 'remote_addr', 0); INSERT INTO `music_log_event` VALUES (4, 'appEventLogs', 'http_client_time', 0); INSERT INTO `music_log_event` VALUES (5, 'appEventLogs', 'msec', 0); INSERT INTO `music_log_event` VALUES (6, 'appEventLogs', 'createdAtMs', 2); INSERT INTO `music_log_event` VALUES (7, 'appEventLogs', 'eventId', 2); INSERT INTO `music_log_event` VALUES (8, 'appEventLogs', 'mark', 2); INSERT INTO `music_log_event` VALUES (9, 'appEventLogs', 'musicID', 2); INSERT INTO `music_log_event` VALUES (10, 'appEventLogs', 'playTime', 2); INSERT INTO `music_log_event` VALUES (11, 'appEventLogs', 'duration', 2); INSERT INTO `music_log_event` VALUES (12, 'appEventLogs', 'appVersion', 1); INSERT INTO `music_log_event` VALUES (13, 'appEventLogs', 'appChannel', 1); INSERT INTO `music_log_event` VALUES (14, 'appEventLogs', 'appPlatform', 1); INSERT INTO `music_log_event` VALUES (15, 'appEventLogs', 'osType', 1); -- ---------------------------- -- Table structure for music_log_page -- ---------------------------- DROP TABLE IF EXISTS `music_log_page`; CREATE TABLE `music_log_page` ( `id` int(10) NOT NULL AUTO_INCREMENT, `table_key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `type` int(10) NOT NULL COMMENT 'key在日誌串中的位置,client_time#{appErrorLogs:{errorBrief:xxx}}#', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 16 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of music_log_page -- ---------------------------- INSERT INTO `music_log_page` VALUES (1, 'appPageLogs', 'deviceId', 1); INSERT INTO `music_log_page` VALUES (2, 'appPageLogs', 'deviceStyle', 1); INSERT INTO `music_log_page` VALUES (3, 'appPageLogs', 'remote_addr', 0); INSERT INTO `music_log_page` VALUES (4, 'appPageLogs', 'http_client_time', 0); INSERT INTO `music_log_page` VALUES (5, 'appPageLogs', 'msec', 0); INSERT INTO `music_log_page` VALUES (6, 'appPageLogs', 'createdAtMs', 2); INSERT INTO `music_log_page` VALUES (7, 'appPageLogs', 'pageId', 2); INSERT INTO `music_log_page` VALUES (8, 'appPageLogs', 'nextPage', 2); INSERT INTO `music_log_page` VALUES (9, 'appPageLogs', 'pageViewCntInSession', 1); INSERT INTO `music_log_page` VALUES (10, 'appPageLogs', 'stayDurationSecs', 1); INSERT INTO `music_log_page` VALUES (11, 'appPageLogs', 'visitIndex', 1); INSERT INTO `music_log_page` VALUES (12, 'appPageLogs', 'appVersion', 1); INSERT INTO `music_log_page` VALUES (13, 'appPageLogs', 'appChannel', 1); INSERT INTO `music_log_page` VALUES (14, 'appPageLogs', 'appPlatform', 1); INSERT INTO `music_log_page` VALUES (15, 'appPageLogs', 'osType', 1); -- ---------------------------- -- Table structure for music_log_startup -- ---------------------------- DROP TABLE IF EXISTS `music_log_startup`; CREATE TABLE `music_log_startup` ( `id` int(10) NOT NULL AUTO_INCREMENT, `table_key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `type` int(10) NOT NULL COMMENT 'key在日誌串中的位置,client_time#{appErrorLogs:{errorBrief:xxx}}#', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 18 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of music_log_startup -- ---------------------------- INSERT INTO `music_log_startup` VALUES (1, 'appStartupLogs', 'deviceId', 1); INSERT INTO `music_log_startup` VALUES (2, 'appStartupLogs', 'brand', 2); INSERT INTO `music_log_startup` VALUES (3, 'appStartupLogs', 'deviceStyle', 1); INSERT INTO `music_log_startup` VALUES (4, 'appStartupLogs', 'screenSize', 2); INSERT INTO `music_log_startup` VALUES (5, 'appStartupLogs', 'carrier', 2); INSERT INTO `music_log_startup` VALUES (6, 'appStartupLogs', 'remote_addr', 0); INSERT INTO `music_log_startup` VALUES (7, 'appStartupLogs', 'country', 1); INSERT INTO `music_log_startup` VALUES (8, 'appStartupLogs', 'province', 1); INSERT INTO `music_log_startup` VALUES (9, 'appStartupLogs', 'http_client_time', 0); INSERT INTO `music_log_startup` VALUES (10, 'appStartupLogs', 'msec', 0); INSERT INTO `music_log_startup` VALUES (11, 'appStartupLogs', 'createdAtMs', 2); INSERT INTO `music_log_startup` VALUES (12, 'appStartupLogs', 'network', 2); INSERT INTO `music_log_startup` VALUES (13, 'appStartupLogs', 'appVersion', 1); INSERT INTO `music_log_startup` VALUES (14, 'appStartupLogs', 'appstore', 1); INSERT INTO `music_log_startup` VALUES (15, 'appStartupLogs', 'appPlatform', 1); INSERT INTO `music_log_startup` VALUES (16, 'appStartupLogs', 'osType', 1); -- ---------------------------- -- Table structure for music_log_usage -- ---------------------------- DROP TABLE IF EXISTS `music_log_usage`; CREATE TABLE `music_log_usage` ( `id` int(10) NOT NULL AUTO_INCREMENT, `table_key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `type` int(10) NOT NULL COMMENT 'key在日誌串中的位置,client_time#{appUsageLogs:{errorBrief:xxx}}#', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 14 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of music_log_usage -- ---------------------------- INSERT INTO `music_log_usage` VALUES (1, 'appUsageLogs', 'deviceId', 1); INSERT INTO `music_log_usage` VALUES (2, 'appUsageLogs', 'deviceStyle', 1); INSERT INTO `music_log_usage` VALUES (3, 'appUsageLogs', 'remote_addr', 0); INSERT INTO `music_log_usage` VALUES (4, 'appUsageLogs', 'http_client_time', 0); INSERT INTO `music_log_usage` VALUES (5, 'appUsageLogs', 'msec', 0); INSERT INTO `music_log_usage` VALUES (6, 'appUsageLogs', 'createdAtMs', 2); INSERT INTO `music_log_usage` VALUES (7, 'appUsageLogs', 'singleDownloadTraffic', 2); INSERT INTO `music_log_usage` VALUES (8, 'appUsageLogs', 'singleUploadTraffic', 2); INSERT INTO `music_log_usage` VALUES (9, 'appUsageLogs', 'singleUseDurationSecs', 2); INSERT INTO `music_log_usage` VALUES (10, 'appUsageLogs', 'appVersion', 1); INSERT INTO `music_log_usage` VALUES (11, 'appUsageLogs', 'appChannel', 1); INSERT INTO `music_log_usage` VALUES (12, 'appUsageLogs', 'appPlatform', 1); INSERT INTO `music_log_usage` VALUES (13, 'appUsageLogs', 'osType', 1); SET FOREIGN_KEY_CHECKS = 1;
最終ETL的scala代碼以下所示:
package com.oldboy.music164.odsdw /* 此類的做用是ETL,經過解析JSON串將數據從ODS導入到DW */ import java.sql.DriverManager import com.alibaba.fastjson.{JSON, JSONObject} import com.oldboy.music164.util.ParseIPUtil import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object GenDW { def main(args: Array[String]): Unit = { //將HDFS文件夾下全部文件/flume/events/2020/04/02寫入到Hive的全部表中去 val conf = new SparkConf() conf.setAppName("spark_dw") conf.setMaster("spark://s201:7077") val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() val rdd1 = spark.sparkContext.textFile("hdfs:///flume/events/2020/04/02") val tablelist = Array[String]("music_log_error", "music_log_event", "music_log_page", "music_log_startup", "music_log_usage") spark.sql("use music164") import spark.implicits._ for(table <- tablelist){ if(table.equals("music_log_error")){ val df = rdd1.map(e => parseErrorLog(table, e)).toDF() df.createOrReplaceTempView("v1") spark.sql("insert into dw_log_music_error select * from v1") } if(table.equals("music_log_event")){ val df = rdd1.map(e => parseEventLog(table, e)).toDF() df.createOrReplaceTempView("v1") spark.sql("insert into dw_log_music_event select * from v1") } if(table.equals("music_log_page")){ val df = rdd1.map(e => parsePageLog(table, e)).toDF() df.createOrReplaceTempView("v1") spark.sql("insert into dw_log_music_page select * from v1") } if(table.equals("music_log_startup")){ val df = rdd1.map(e => parseStartupLog(table, e)).toDF() df.createOrReplaceTempView("v1") spark.sql("insert into dw_log_music_startup select * from v1") } if(table.equals("music_log_usage")){ val df = rdd1.map(e => parseUsageLog(table, e)).toDF() df.createOrReplaceTempView("v1") spark.sql("insert into dw_log_music_usage select * from v1") } } } def parseJson(tableName : String, line : String) : ListBuffer[String] = { val buf = new ListBuffer[String] val res = line.replaceAll("\\\\n", "").replaceAll("\\\\t", "").replaceAll("\\\\", "") val jsonString = res.split("#")(4) val jo = JSON.parseObject(jsonString) //單獨寫一個邏輯用來處理startuplog的數據 if(tableName.equals("music_log_startup")){ val JArray = jo.getJSONArray("appStartupLogs") val jo1: JSONObject = JArray.get(0).asInstanceOf[JSONObject] buf.append(jo.getString("deviceId")) buf.append(jo1.getString("brand")) buf.append(jo.getString("deviceStyle")) buf.append(jo1.getString("screenSize")) buf.append(jo1.getString("carrier")) buf.append(res.split("#")(1)) buf.append(ParseIPUtil.getCountry(res.split("#")(1))) buf.append(ParseIPUtil.getProvince(res.split("#")(1))) buf.append(res.split("#")(2)) buf.append(res.split("#")(0)) buf.append(jo1.getString("createdAtMs")) buf.append(jo1.getString("network")) buf.append(jo.getString("appVersion")) buf.append("null") buf.append(jo.getString("appPlatform")) buf.append(jo.getString("osType")) }else{ val url = "jdbc:mysql://s201:3306/table_shadow" val username = "root" val password = "root" val conn = DriverManager.getConnection(url, username, password) val stmt = conn.createStatement() val rs = stmt.executeQuery("select * from " + tableName) while (rs.next()) { val table_key = rs.getString("table_key") val field_name = rs.getString("field_name") val field_type = rs.getInt("field_type") //根據field_type所提供的層級信息來判斷如何對該字符串進行截取 if (field_type == 0) { if (field_name == "msec") { buf.append(res.split("#")(0)) } if (field_name == "remote_addr") { buf.append(res.split("#")(1)) } if (field_name == "http_client_time") { buf.append(res.split("#")(2)) } } if (field_type == 1) { if(jo.getString(field_name) == null){ buf.append("null") }else{ buf.append(jo.getString(field_name)) } } if (field_type == 2) { val JArray = jo.getJSONArray(table_key) if (JArray != null && JArray.size() > 0) { //進行判斷,只有當arr中有元素的時候,咱們才進行後續操做,而且咱們默認拿出第一個索引 //進行判斷,出現null的時候用空值填充 val jo1: JSONObject = JArray.get(0).asInstanceOf[JSONObject] if(jo1.getString(field_name) == null){ buf.append("null") }else{ buf.append(jo1.getString(field_name)) } } } } conn.close() } buf } //因爲scala中Tuple必須先指定元組中元素的個數,所以須要定義多個函數進行轉換 def parseErrorLog(tableName : String, line : String) : Tuple12[String,String,String,String, String,String,String,String,String,String,String,String] = { val buf = parseJson(tableName, line) if(buf.size == 12){ val tuple = Tuple12[String,String,String,String,String,String,String,String,String, String,String,String](buf(0),buf(1),buf(2),buf(3),buf(4),buf(5),buf(6),buf(7),buf(8), buf(9),buf(10),buf(11)) tuple }else{ val tuple = Tuple12[String,String,String,String,String,String,String,String,String, String,String,String]("null","null","null","null","null","null","null","null", "null","null","null","null") tuple } } def parseEventLog(tableName : String, line : String) : Tuple15[String,String,String,String, String,String,String,String,String,String,String,String,String,String,String] = { val buf = parseJson(tableName, line) if(buf.size == 15){ val tuple = Tuple15[String,String,String,String,String,String,String,String,String, String,String,String,String,String,String](buf(0),buf(1),buf(2),buf(3),buf(4),buf(5),buf(6),buf(7),buf(8), buf(9),buf(10),buf(11),buf(12),buf(13),buf(14)) tuple }else{ val tuple = Tuple15[String,String,String,String,String,String,String,String,String, String,String,String,String,String,String]("null","null","null","null","null","null","null","null", "null","null","null","null","null","null","null") tuple } } def parsePageLog(tableName : String, line : String) : Tuple15[String,String,String,String, String,String,String,String,String,String,String,String,String,String,String] = { val buf = parseJson(tableName, line) if(buf.size == 15){ val tuple = Tuple15[String,String,String,String,String,String,String,String,String, String,String,String,String,String,String](buf(0),buf(1),buf(2),buf(3),buf(4),buf(5),buf(6),buf(7),buf(8), buf(9),buf(10),buf(11),buf(12),buf(13),buf(14)) tuple }else{ val tuple = Tuple15[String,String,String,String,String,String,String,String,String, String,String,String,String,String,String]("null","null","null","null","null","null","null","null", "null","null","null","null","null","null","null") tuple } } def parseStartupLog(tableName : String, line : String) : Tuple16[String,String,String,String, String,String,String,String,String,String,String,String,String,String,String,String] = { val buf = parseJson(tableName, line) if(buf.size == 16){ val tuple = Tuple16[String,String,String,String,String,String,String,String,String, String,String,String,String,String,String,String](buf(0),buf(1),buf(2),buf(3),buf(4),buf(5),buf(6),buf(7),buf(8), buf(9),buf(10),buf(11),buf(12),buf(13),buf(14),buf(15)) tuple }else{ val tuple = Tuple16[String,String,String,String,String,String,String,String,String, String,String,String,String,String,String,String]("null","null","null","null","null","null", "null","null","null","null","null","null","null","null","null","null") tuple } } def parseUsageLog(tableName : String, line : String) : Tuple13[String,String,String,String, String,String,String,String,String,String,String,String,String] = { val buf = parseJson(tableName, line) if(buf.size == 13){ val tuple = Tuple13[String,String,String,String,String,String,String,String,String, String,String,String,String](buf(0),buf(1),buf(2),buf(3),buf(4),buf(5),buf(6),buf(7),buf(8), buf(9),buf(10),buf(11),buf(12)) tuple }else{ val tuple = Tuple13[String,String,String,String,String,String,String,String,String, String,String,String,String]("null","null","null","null","null","null","null","null", "null","null","null","null","null") tuple } } }
除此以外,本來存放於mysql數據庫的兩張表user以及music也須要經過sqoop轉到Hive中去,腳本以下:
sqoop import --connect jdbc:mysql://192.168.153.201:3306/big14 --username root --password root --table user --hive-import --create-hive-table --hive-table users --hive-database music164 --delete-target-dir --fields-terminated-by '\t' --lines-terminated-by '\n' -m 1
sqoop import --connect jdbc:mysql://192.168.153.201:3306/big14 --username root --password root --table music --hive-import --create-hive-table --hive-table music --hive-database music164 --delete-target-dir --fields-terminated-by '\t' --lines-terminated-by '\n' -m 1
其中,因爲表格user與系統中的user表重名,爲避免報錯,使用以下設置,可是建議不要使用user做爲自定義表名,建議更改成"users"
set hive.support.sql11.reserved.keywords = false ;
至此,DW層的全部表所有ETL完成!!!
最終使用IDEA對項目進行打包,而後使用spark-submit命令提交到集羣上運行便可,提交腳本以下所示:
spark-submit --master spark://s201:7077 --class com.oldboy.music164.odsdw.GenDW /home/centos/music164.jar
業務需求:
1.活躍度:
計算指標是以活躍度指數計算的
計算每一個用戶的:播放次數 + 收藏數量 x 2 + 日均播放時長 = 活躍度指數
根據活躍度指數將全部的數值劃分爲10檔,分數爲0-100分
日均播放時長計算方式改進:正常計算該指標的方式應爲先計算出一個月的某個用戶的總播放時長,而後除以天數便可,然而這會致使一個問題,那就是當某用戶在一個月的某幾天播放時長特別高而在剩餘天數裏播放時長几乎爲零時,他的平均值計算出來有多是和天天播放時長都同樣的用戶是同樣的,所以本人改進了計算方式,將播放時長的波動狀況,即標準差當作是一個懲罰,將平均值除以這個標準差,這樣對於天天堅持聽歌的用戶來講就更爲公平了
2.音樂風格排行榜:
統計每一個用戶最喜歡的音樂風格的前十名
3.歌手榜
統計每一個用戶最喜歡的歌手的前十名
4.歌曲榜
統計每一個用戶最喜歡的歌曲的前十名
5.週中播放時刻傾向
統計週中(即週一至週五)每一個用戶最喜歡的播放時刻的前十名
6.週末播放時刻傾向
統計週末(即雙休日)每一個用戶最喜歡的播放時刻的前十名
7.播放語言擊敗用戶百分比
根據每一個用戶的每種播放語言統計各自超過了其餘百分之多少的用戶
8.付費度
統計付費用戶擊敗其餘用戶的百分比
最終代碼實現以下所示,其中,在從一個時間戳解析出週中仍是週末中使用了spark的udf註冊函數:
GenApp.scala
package com.oldboy.music164.odsdw import java.util.{Calendar, Date} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object GenApp { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("spark_dw") conf.setMaster("spark://s201:7077") val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() //使用spark註冊週中或週末函數 spark.udf.register("dayofweek_type", (time : Long) => { val d = new Date(time) val calendar = Calendar.getInstance() calendar.setTime(d) val i = calendar.get(Calendar.DAY_OF_WEEK) if(i == 1 || i == 7){ "weekend" } else{ "weekday" } }) spark.sql("use music164") //1.將數據轉儲入活躍度統計表 spark.sql("create table if not exists APP_ACTIVE(DEVICE_ID string,ACTIVE_LEVEL int ) stored as parquet") spark.sql("insert overwrite table APP_ACTIVE select device_id, if(activity = 0, 0, ntile(10)over(order by activity) * 10) as active_level from (select device_id, (play_count + fav_count * 2 + daily_avg) as activity from (select play_sum.device_id, play_count, fav_count, daily_avg from (select device_id, count(*) as play_count from dw_log_music_event where event_mark = '1' or event_mark = '2' group by device_id) play_sum full outer join (select device_id, count(*) as fav_count from dw_log_music_event where event_mark = '3' group by device_id) fav_sum on play_sum.device_id = fav_sum.device_id full outer join (select device_id, (play_avg / play_stddev) as daily_avg from (select device_id, avg(play_day_sum) as play_avg, stddev_pop(play_day_sum) as play_stddev from (select device_id, day, sum(play_time) as play_day_sum from (select device_id, from_unixtime(cast(substr(log_create_time, 1, 10) as bigint), 'dd') as day, if(event_duration = 'null', 0, cast(split(event_duration,\":\")[0] as double) + cast(split(event_duration,\":\")[1] as double) / 60) as play_time from dw_log_music_event where device_id <> 'null' and from_unixtime(cast(substr(log_create_time, 1, 10) as bigint), 'yyyy') = 2018 and from_unixtime(cast(substr(log_create_time, 1, 10) as bigint), 'MM') = 12) a group by device_id, day) b group by device_id) c) d on play_sum.device_id = d.device_id) e) f") //2.將數據轉儲入音樂風格表 spark.sql("create table if not exists APP_MUSIC_TYPE(DEVICE_ID string,MUSIC_TYPE string,MUSIC_TYPE_COUNT int)stored as parquet") spark.sql("insert into APP_MUSIC_TYPE select device_id, style, count from (select device_id, style, count, row_number()over(partition by device_id order by count desc) as rank from (select device_id, style, count from (select device_id, style, count(style) as count from (select b.device_id, b.event_music, a.style from (select mname, style from music lateral view explode(split(mstyle, '\\\\|')) xxx as style) a, dw_log_music_event b where a.mname = b.event_music and b.event_mark > 0) c group by device_id, style) d order by device_id, count desc) e) f where rank < 11") //3.將數據轉儲入歌手榜 spark.sql("create table if not exists APP_FAVOURITE_SINGER(DEVICE_ID string,MUSIC_SINGER string,SINGER_RANK string) stored as parquet") spark.sql("insert overwrite table APP_FAVOURITE_SINGER select device_id,msinger,SINGER_RANK from (select device_id,msinger,cnt,row_number() over(partition by device_id order by cnt desc) SINGER_RANK from (select a.device_id, b.msinger,count(*) cnt from dw_log_music_event a join music b on a.event_music = b.mname and event_mark in ('4','3','2','1') group by a.device_id, b.msinger) c) d where SINGER_RANK < 11") //4.將數據轉儲入歌曲榜 spark.sql("create table if not exists APP_FAVOURITE_SONG(DEVICE_ID string,MUSIC_NAME string,MUSIC_RANK string) stored as parquet") spark.sql("insert overwrite table APP_FAVOURITE_SONG select DEVICE_ID,event_music,music_rank from (select DEVICE_ID, event_music, row_number()over(partition by DEVICE_ID order by count desc) as music_rank from (select DEVICE_ID, event_music, count(event_music) as count from (select DEVICE_ID, event_music from dw_log_music_event where event_mark in ('4','3','2','1') and event_music <> 'null') a group by DEVICE_ID,event_music) b) c where music_rank <11") //5.週中播放時刻傾向 spark.sql("create table if not exists APP_MUSIC_PLAY_WORKTIME(DEVICE_ID string,TIME string,TIME_RANK int)stored as parquet") spark.sql("insert overwrite table APP_MUSIC_PLAY_WORKTIME select device_id, day_hour, count from (select device_id, day_hour, count, row_number()over(partition by device_id order by count desc) as rank from (select device_id, day_hour, count(*) as count from (select device_id, dayofweek_type(event_playtime) as day_type, from_unixtime(cast(substr(event_playtime, 1, 10) as bigint), 'HH') as day_hour from dw_log_music_event where event_playtime <> 'null') a where day_type = 'weekday' group by device_id, day_hour) b) c where rank < 11") //6.週末播放時刻傾向 spark.sql("create table if not exists APP_MUSIC_PLAY_WEEKEND(DEVICE_ID string,TIME string,TIME_RANK int)stored as parquet") spark.sql("insert overwrite table APP_MUSIC_PLAY_WEEKEND select device_id, day_hour, count from (select device_id, day_hour, count, row_number()over(partition by device_id order by count desc) as rank from (select device_id, day_hour, count(*) as count from (select device_id, dayofweek_type(event_playtime) as day_type, from_unixtime(cast(substr(event_playtime, 1, 10) as bigint), 'HH') as day_hour from dw_log_music_event where event_playtime <> 'null') a where day_type = 'weekend' group by device_id, day_hour) b) c where rank < 11") //7.播放語言百分比 spark.sql("create table if not exists APP_MUSIC_LANGUAGE_PERCENTAGE(DEVICE_ID string, MUSIC_LANGUAGE string, COUNT int, PERCENTAGE double) stored as parquet") spark.sql("insert overwrite table APP_MUSIC_LANGUAGE_PERCENTAGE select device_id, MUSIC_LANGUAGE, COUNT, cume_dist()over(partition by MUSIC_LANGUAGE order by COUNT) as cum from (select device_id , MUSIC_LANGUAGE,count(*) as count from (select device_id,mlanguage as MUSIC_LANGUAGE from music join dw_log_music_event where event_music = mname and event_mark in ('4','3','2','1')) a group by device_id, MUSIC_LANGUAGE) b") //8.付費度 spark.sql("create table if not exists APP_MUSIC_PAY(DEVICE_ID string, PAYMENT string) stored as parquet") spark.sql("insert overwrite table APP_MUSIC_PAY select device_id, cume_dist()over(order by cnt)*100 from (select a.device_id, count(*) cnt from dw_log_music_event a join music b on a.event_music = b.mname and event_mark in ('4','3','2','1') and misfree=1 group by a.device_id ) c") } }
將該應用程序提交到spark集羣的腳本以下:
spark-submit --master spark://s201:7077 --class com.oldboy.music164.odsdw.GenApp /home/centos/music164.jar
最終Hive的表結構以下所示:
至此,大數據開發部分所有完成!!!接下去只需和前端開發人員對接,商討數據可視化方案便可,因爲此部份內容已經超出了本文討論範疇,所以再也不描述
技術能夠改變時間,可是技術也不是萬能的,可是將合適的技術用在合適的地方就能最大化的發揮技術的優點,作項目也是同樣,知道項目的優點在哪兒,劣勢在哪兒,就能因地制宜,真正幫助企業解決問題,發現問題,那麼接下去,本人將會羅列如下本項目的優點和劣勢:
項目優點
架構優點
Flume中:
一、基於HDFS塊大小,設定Flume上傳單個文件大小
二、使用容災措施,避免丟失數據
三、使用躍點,統一對數據進行上傳,避免多用戶寫入
Hive + Spark中:
一、清晰數據結構
二、減小重複開發
三、統一數據接口
四、複雜問題簡單化
五、Spark運行速度快
成本優點
開源框架下降成本
產品優點
一、將用戶信息標籤化
二、用於千人千面、個性化推薦、精準營銷.....用戶畫像優點
項目劣勢:
一、未能真正使用分區表,不徹底是真實生產環境
二、雖然集羣提交過程一切正常,Spark在IDEA運行時卻時常出現初始任務資源分配不足的問題,配置文件中各項參數的設置有待提高
三、Flume使用memory channel時會有內存溢出的風險,更好的方法是使用更爲穩定的file channel或是一步到位,使用kafka channel