[TOC]java
大數據處理目前比較流行的是兩種方法,一種是離線處理,一種是在線處理,基本處理架構以下:android
在互聯網應用中,不論是哪種處理方式,其基本的數據來源都是日誌數據,例如對於web應用來講,則多是用戶的訪問日誌、用戶的點擊日誌等。ios
若是對於數據的分析結果在時間上有比較嚴格的要求,則能夠採用在線處理的方式來對數據進行分析,如使用Spark、Storm等進行處理。比較貼切的一個例子是天貓雙十一的成交額,在其展板上,咱們看到交易額是實時動態進行更新的,對於這種狀況,則須要採用在線處理。git
固然,若是隻是但願獲得數據的分析結果,對處理的時間要求不嚴格,就能夠採用離線處理的方式,好比咱們能夠先將日誌數據採集到HDFS中,以後再進一步使用MapReduce、Hive等來對數據進行分析,這也是可行的。github
本文主要分享對某個電商網站產生的用戶訪問日誌(access.log)進行離線處理與分析的過程,基於MapReduce的處理方式,最後會統計出某一天不一樣省份訪問該網站的uv與pv。web
在咱們的場景中,Web應用的部署是以下的架構:redis
即比較典型的Nginx負載均衡+KeepAlive高可用集羣架構
,在每臺Web服務器上,都會產生用戶的訪問日誌,業務需求方給出的日誌格式以下:apache
1001 211.167.248.22 eecf0780-2578-4d77-a8d6-e2225e8b9169 40604 1 GET /top HTTP/1.0 408 null null 1523188122767 1003 222.68.207.11 eecf0780-2578-4d77-a8d6-e2225e8b9169 20202 1 GET /tologin HTTP/1.1 504 null Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070309 Firefox/2.0.0.3 1523188123267 1001 61.53.137.50 c3966af9-8a43-4bda-b58c-c11525ca367b 0 1 GET /update/pass HTTP/1.0 302 null null 1523188123768 1000 221.195.40.145 1aa3b538-2f55-4cd7-9f46-6364fdd1e487 0 0 GET /user/add HTTP/1.1 200 null Mozilla/4.0 (compatible; MSIE 7.0; Windows NT5.2) 1523188124269 1000 121.11.87.171 8b0ea90a-77a5-4034-99ed-403c800263dd 20202 1 GET /top HTTP/1.0 408 null Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1523188120263
其每一個字段的說明以下:瀏覽器
appid ip mid userid login_type request status http_referer user_agent time 其中: appid包括:web:1000,android:1001,ios:1002,ipad:1003 mid:惟一的id此id第一次會種在瀏覽器的cookie裏。若是存在則再也不種。做爲瀏覽器惟一標示。移動端或者pad直接取機器碼。 login_type:登陸狀態,0未登陸、1:登陸用戶 request:相似於此種 "GET /userList HTTP/1.1" status:請求的狀態主要有:200 ok、404 not found、408 Request Timeout、500 Internal Server Error、504 Gateway Timeout等 http_referer:請求該url的上一個url地址。 user_agent:瀏覽器的信息,例如:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36" time:時間的long格式:1451451433818。
根據給定的時間範圍內的日誌數據,如今業務方有以下需求:服務器
統計出每一個省每日訪問的PV、UV。
數據採集工做由運維人員來完成,對於用戶訪問日誌的採集,使用的是Flume,而且會將採集的數據保存到HDFS中,其架構以下:
能夠看到,不一樣的Web Server上都會部署一個Agent用於該Server上日誌數據的採集,以後,不一樣Web Server的Flume Agent採集的日誌數據會下沉到另一個被稱爲Flume Consolidation Agent
(聚合Agent)的Flume Agent上,該Flume Agent的數據落地方式爲輸出到HDFS。
在咱們的HDFS中,能夠查看到其採集的日誌:
後面咱們的工做正是要基於Flume採集到HDFS中的數據作離線處理與分析。
剛剛採集到HDFS中的原生數據,咱們也稱爲不規整數據,即目前來講,該數據的格式還沒法知足咱們對數據處理的基本要求,須要對其進行預處理,轉化爲咱們後面工做所須要的較爲規整的數據,因此這裏的數據清洗,其實指的就是對數據進行基本的預處理,以方便咱們後面的統計分析,因此這一步並非必須的,須要根據不一樣的業務需求來進行取捨,只是在咱們的場景中須要對數據進行必定的處理。
原來的日誌數據格式是以下的:
appid ip mid userid login_type request status http_referer user_agent time 其中: appid包括:web:1000,android:1001,ios:1002,ipad:1003 mid:惟一的id此id第一次會種在瀏覽器的cookie裏。若是存在則再也不種。做爲瀏覽器惟一標示。移動端或者pad直接取機器碼。 login_type:登陸狀態,0未登陸、1:登陸用戶 request:相似於此種 "GET /userList HTTP/1.1" status:請求的狀態主要有:200 ok、404 not found、408 Request Timeout、500 Internal Server Error、504 Gateway Timeout等 http_referer:請求該url的上一個url地址。 user_agent:瀏覽器的信息,例如:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36" time:時間的long格式:1451451433818。
可是若是須要按照省份來統計uv、pv,其所包含的信息還不夠,咱們須要對這些數據作必定的預處理,好比須要,對於其中包含的IP信息,咱們須要將其對應的IP信息解析出來;爲了方便咱們的其它統計,咱們也能夠將其request信息解析爲method
、 request_url
、 http_version
等,
因此按照上面的分析,咱們但願預處理以後的日誌數據包含以下的數據字段:
appid; ip; //經過ip來衍生出來的字段 province和city province; city; mid; userId; loginType; request; //經過request 衍生出來的字段 method request_url http_version method; requestUrl; httpVersion; status; httpReferer; userAgent; //經過userAgent衍生出來的字段,即用戶的瀏覽器信息 browser; time;
即在原來的基礎上,咱們增長了其它新的字段,如province
、city
等。
咱們採用MapReduce來對數據進行預處理,預處理以後的結果,咱們也是保存到HDFS
中,即採用以下的架構:
數據清洗的過程主要是編寫MapReduce
程序,而MapReduce
程序的編寫又分爲寫Mapper
、Reducer
、Job
三個基本的過程。可是在咱們這個案例中,要達到數據清洗的目的,實際上只須要Mapper
就能夠了,並不須要Reducer
,緣由很簡單,咱們只是預處理數據,在Mapper
中就已經能夠對數據進行處理了,其輸出的數據並不須要進一步通過Redcuer
來進行彙總處理。
因此下面就直接編寫Mapper
和Job
的程序代碼。
package cn.xpleaf.dataClean.mr.mapper; import cn.xpleaf.dataClean.mr.writable.AccessLogWritable; import cn.xpleaf.dataClean.utils.JedisUtil; import cn.xpleaf.dataClean.utils.UserAgent; import cn.xpleaf.dataClean.utils.UserAgentUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; import redis.clients.jedis.Jedis; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; /** * access日誌清洗的主要mapper實現類 * 原始數據結構: * appid ip mid userid login_tpe request status http_referer user_agent time ---> 10列內容 * 清洗以後的結果: * appid ip province city mid userid login_type request method request_url http_version status http_referer user_agent browser yyyy-MM-dd HH:mm:ss */ public class AccessLogCleanMapper extends Mapper<LongWritable, Text, NullWritable, Text> { private Logger logger; private String[] fields; private String appid; //數據來源 web:1000,android:1001,ios:1002,ipad:1003 private String ip; //經過ip來衍生出來的字段 province和city private String province; private String city; private String mid; //mid:惟一的id此id第一次會種在瀏覽器的cookie裏。若是存在則再也不種。做爲瀏覽器惟一標示。移動端或者pad直接取機器碼。 private String userId; //用戶id private String loginType; //登陸狀態,0未登陸、1:登陸用戶 private String request; //相似於此種 "GET userList HTTP/1.1" //經過request 衍生出來的字段 method request_url http_version private String method; private String requestUrl; private String httpVersion; private String status; //請求的狀態主要有:200 ok、/404 not found、408 Request Timeout、500 Internal Server Error、504 Gateway Timeout等 private String httpReferer; //請求該url的上一個url地址。 private String userAgent; //瀏覽器的信息,例如:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.106 Safari/537.36" //經過userAgent來獲取對應的瀏覽器 private String browser; //private long time; //action對應的時間戳 private String time;//action對應的格式化時間yyyy-MM-dd HH:mm:ss private DateFormat df; private Jedis jedis; @Override protected void setup(Context context) throws IOException, InterruptedException { logger = Logger.getLogger(AccessLogCleanMapper.class); df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); jedis = JedisUtil.getJedis(); } /** * appid ip mid userid login_tpe request status http_referer user_agent time ---> 10列內容 * || * || * appid ip province city mid userid login_type request method request_url http_version status http_referer user_agent browser yyyy-MM-dd HH:mm:ss */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { fields = value.toString().split("\t"); if (fields == null || fields.length != 10) { // 有異常數據 return; } // 由於全部的字段沒有進行特殊操做,只是文本的輸出,因此沒有必要設置特定類型,所有設置爲字符串便可, // 這樣在作下面的操做時就能夠省去類型的轉換,可是若是對數據的合法性有嚴格的驗證的話,則要保持類型的一致 appid = fields[0]; ip = fields[1]; // 解析IP if (ip != null) { String ipInfo = jedis.hget("ip_info", ip); province = ipInfo.split("\t")[0]; city = ipInfo.split("\t")[1]; } mid = fields[2]; userId = fields[3]; loginType = fields[4]; request = fields[5]; method = request.split(" ")[0]; requestUrl = request.split(" ")[1]; httpVersion = request.split(" ")[2]; status = fields[6]; httpReferer = fields[7]; userAgent = fields[8]; if (userAgent != null) { UserAgent uAgent = UserAgentUtil.getUserAgent(userAgent); if (uAgent != null) { browser = uAgent.getBrowserType(); } } try { // 轉換有可能出現異常 time = df.format(new Date(Long.parseLong(fields[9]))); } catch (NumberFormatException e) { logger.error(e.getMessage()); } AccessLogWritable access = new AccessLogWritable(appid, ip, province, city, mid, userId, loginType, request, method, requestUrl, httpVersion, status, httpReferer, this.userAgent, browser, time); context.write(NullWritable.get(), new Text(access.toString())); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // 資源釋放 logger = null; df = null; JedisUtil.returnJedis(jedis); } }
package cn.xpleaf.dataClean.mr.job; import cn.xpleaf.dataClean.mr.mapper.AccessLogCleanMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * 清洗用戶access日誌信息 * 主要的驅動程序 * 主要用做組織mapper和reducer的運行 * * 輸入參數: * hdfs://ns1/input/data-clean/access/2018/04/08 hdfs://ns1/output/data-clean/access * 即inputPath和outputPath * 目前outputPath統一到hdfs://ns1/output/data-clean/access * 而inputPath則不肯定,由於咱們的日誌採集是按天來生成一個目錄的 * 因此上面的inputPath只是清洗2018-04-08這一天的 */ public class AccessLogCleanJob { public static void main(String[] args) throws Exception { if(args == null || args.length < 2) { System.err.println("Parameter Errors! Usage <inputPath...> <outputPath>"); System.exit(-1); } Path outputPath = new Path(args[args.length - 1]); Configuration conf = new Configuration(); String jobName = AccessLogCleanJob.class.getSimpleName(); Job job = Job.getInstance(conf, jobName); job.setJarByClass(AccessLogCleanJob.class); // 設置mr的輸入參數 for( int i = 0; i < args.length - 1; i++) { FileInputFormat.addInputPath(job, new Path(args[i])); } job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(AccessLogCleanMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); // 設置mr的輸出參數 outputPath.getFileSystem(conf).delete(outputPath, true); // 避免job在運行的時候出現輸出目錄已經存在的異常 FileOutputFormat.setOutputPath(job, outputPath); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); // map only操做,沒有reducer job.waitForCompletion(true); } }
將上面的mr程序打包後上傳到咱們的Hadoop環境中,這裏,對2018-04-08
這一天產生的日誌數據進行清洗,執行以下命令:
yarn jar data-extract-clean-analysis-1.0-SNAPSHOT-jar-with-dependencies.jar\ cn.xpleaf.dataClean.mr.job.AccessLogCleanJob \ hdfs://ns1/input/data-clean/access/2018/04/08 \ hdfs://ns1/output/data-clean/access
觀察其執行結果:
...... 18/04/08 20:54:21 INFO mapreduce.Job: Running job: job_1523133033819_0009 18/04/08 20:54:28 INFO mapreduce.Job: Job job_1523133033819_0009 running in uber mode : false 18/04/08 20:54:28 INFO mapreduce.Job: map 0% reduce 0% 18/04/08 20:54:35 INFO mapreduce.Job: map 50% reduce 0% 18/04/08 20:54:40 INFO mapreduce.Job: map 76% reduce 0% 18/04/08 20:54:43 INFO mapreduce.Job: map 92% reduce 0% 18/04/08 20:54:45 INFO mapreduce.Job: map 100% reduce 0% 18/04/08 20:54:46 INFO mapreduce.Job: Job job_1523133033819_0009 completed successfully 18/04/08 20:54:46 INFO mapreduce.Job: Counters: 31 ......
能夠看到MapReduce
Job
執行成功!
上面的MapReduce
程序執行成功後,能夠看到在HDFS中生成的數據輸出目錄:
咱們能夠下載其中一個結果數據文件,並用Notepadd++
打開查看其數據信息:
通過數據清洗以後,就獲得了咱們作數據的分析統計所須要的比較規整的數據,下面就能夠進行數據的統計分析了,即按照業務需求,統計出某一天中每一個省份的PV和UV。
咱們依然是須要編寫MapReduce
程序,而且將數據保存到HDFS中,其架構跟前面的數據清洗是同樣的:
如今咱們已經獲得了規整的數據,關於在於如何編寫咱們的MapReduce
程序。
由於要統計的是每一個省對應的pv和uv,pv就是點擊量,uv是獨立訪客量,須要將省相同的數據拉取到一塊兒,拉取到一塊的這些數據每一條記錄就表明了一次點擊(pv + 1),這裏面有同一個用戶產生的數據(經過mid來惟一地標識是同一個瀏覽器,用mid進行去重,獲得的就是uv)。
而拉取數據,可使用Mapper
來完成,對數據的統計(pv、uv的計算)則能夠經過Reducer
來完成,即Mapper
的各個參數能夠爲以下:
Mapper<LongWritable, Text, Text(Province), Text(mid)>
而Reducer
的各個參數能夠爲以下:
Reducer<Text(Province), Text(mid), Text(Province), Text(pv + uv)>
根據前面的分析,來編寫咱們的MapReduce程序
。
package cn.xpleaf.dataClean.mr.mapper; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Mapper<LongWritable, Text, Text(Province), Text(mid)> * Reducer<Text(Province), Text(mid), Text(Province), Text(pv + uv)> */ public class ProvincePVAndUVMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); if(fields == null || fields.length != 16) { return; } String province = fields[2]; String mid = fields[4]; context.write(new Text(province), new Text(mid)); } }
package cn.xpleaf.dataClean.mr.reducer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.HashSet; import java.util.Set; /** * 統計該標準化數據,產生結果 * 省 pv uv * 這裏面有同一個用戶產生的數|據(經過mid來惟一地標識是同一個瀏覽器,用mid進行去重,獲得的就是uv) * Mapper<LongWritable, Text, Text(Province), Text(mid)> * Reducer<Text(Province), Text(mid), Text(Province), Text(pv + uv)> */ public class ProvincePVAndUVReducer extends Reducer<Text, Text, Text, Text> { private Set<String> uvSet = new HashSet<>(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { long pv = 0; uvSet.clear(); for(Text mid : values) { pv++; uvSet.add(mid.toString()); } long uv = uvSet.size(); String pvAndUv = pv + "\t" + uv; context.write(key, new Text(pvAndUv)); } }
package cn.xpleaf.dataClean.mr.job; import cn.xpleaf.dataClean.mr.mapper.ProvincePVAndUVMapper; import cn.xpleaf.dataClean.mr.reducer.ProvincePVAndUVReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * 統計每一個省的pv和uv值 * 輸入:通過clean以後的access日誌 * appid ip province city mid userid login_type request method request_url http_version status http_referer user_agent browser yyyy-MM-dd HH:mm:ss * 統計該標準化數據,產生結果 * 省 pv uv * * 分析:由於要統計的是每一個省對應的pv和uv * pv就是點擊量,uv是獨立訪客量 * 須要將省相同的數據拉取到一塊兒,拉取到一塊的這些數據每一條記錄就表明了一次點擊(pv + 1) * 這裏面有同一個用戶產生的數據(經過mid來惟一地標識是同一個瀏覽器,用mid進行去重,獲得的就是uv) * Mapper<LongWritable, Text, Text(Province), Text(mid)> * Reducer<Text(Province), Text(mid), Text(Province), Text(pv + uv)> * * 輸入參數: * hdfs://ns1/output/data-clean/access hdfs://ns1/output/pv-uv */ public class ProvincePVAndUVJob { public static void main(String[] args) throws Exception { if (args == null || args.length < 2) { System.err.println("Parameter Errors! Usage <inputPath...> <outputPath>"); System.exit(-1); } Path outputPath = new Path(args[args.length - 1]); Configuration conf = new Configuration(); String jobName = ProvincePVAndUVJob.class.getSimpleName(); Job job = Job.getInstance(conf, jobName); job.setJarByClass(ProvincePVAndUVJob.class); // 設置mr的輸入參數 for (int i = 0; i < args.length - 1; i++) { FileInputFormat.addInputPath(job, new Path(args[i])); } job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(ProvincePVAndUVMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 設置mr的輸出參數 outputPath.getFileSystem(conf).delete(outputPath, true); // 避免job在運行的時候出現輸出目錄已經存在的異常 FileOutputFormat.setOutputPath(job, outputPath); job.setOutputFormatClass(TextOutputFormat.class); job.setReducerClass(ProvincePVAndUVReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); job.waitForCompletion(true); } }
將上面的mr程序打包後上傳到咱們的Hadoop環境中,這裏,對前面預處理以後的數據進行統計分析,執行以下命令:
yarn jar data-extract-clean-analysis-1.0-SNAPSHOT-jar-with-dependencies.jar \ cn.xpleaf.dataClean.mr.job.ProvincePVAndUVJob \ hdfs://ns1/output/data-clean/access \ hdfs://ns1/output/pv-uv
觀察其執行結果:
...... 18/04/08 22:22:42 INFO mapreduce.Job: Running job: job_1523133033819_0010 18/04/08 22:22:49 INFO mapreduce.Job: Job job_1523133033819_0010 running in uber mode : false 18/04/08 22:22:49 INFO mapreduce.Job: map 0% reduce 0% 18/04/08 22:22:55 INFO mapreduce.Job: map 50% reduce 0% 18/04/08 22:22:57 INFO mapreduce.Job: map 100% reduce 0% 18/04/08 22:23:03 INFO mapreduce.Job: map 100% reduce 100% 18/04/08 22:23:03 INFO mapreduce.Job: Job job_1523133033819_0010 completed successfully 18/04/08 22:23:03 INFO mapreduce.Job: Counters: 49 ......
能夠看到MapReduce
Job
執行成功!
上面的MapReduce
程序執行成功後,能夠看到在HDFS中生成的數據輸出目錄:
咱們能夠下載其結果數據文件,並用Notepadd++
打開查看其數據信息:
至此,就完成了一個完整的數據採集、清洗、處理的完整離線數據分析案例。
相關的代碼我已經上傳到GitHub,有興趣能夠參考一下:
https://github.com/xpleaf/data-extract-clean-analysis