---------------------------------------------------------------------------------------------------------------html
由於整個項目工程師很是龐大的,一方面因爲整個開發流量很是繁瑣,不可能經過一篇文章就能夠說得清楚的,另外一方面因爲保密性,因此這裏只分享其中的部份內容。首先會從總體架構等提及,經過本文的學習主要是進一步瞭解海量數據挖掘的框架流程,對數據採集流程、內容識別、知識庫的創建以及行爲軌跡加強有初步的瞭解,學會簡單的url清洗以及可以開發出簡單的分類MapReducer程序。固然,若是部份內容看不懂也不要緊,畢竟這須要經驗的積累,不要太急於求成,能夠先查看個人其餘文章!java
對於運營商來講,使用海量數據挖掘對客戶移動互聯網行爲進行採集,分析,發現用戶關注的內容,爲開展營銷提供號碼支持。固然,也不侷限於這些功能。例如一個用戶在用手機看小說,那麼確定有一個url的網址啦,用戶全部訪問的網址,ip,時間戳,上下行流量,基站,網絡模式,手機型號等一大串信息都會被記錄下來並在運營商的的雲端進行存儲,這個數據量是很是很是大的。這個時候咱們能夠經過抓取到用戶訪問過的url和總流量,而後經過爬蟲去分析用戶瀏覽的這個網頁是怎麼樣的一個網頁,經過內容識別機制來找出這個網頁的內容,例如一個用戶在看的是新聞url,而後咱們經過爬蟲發現其訪問的是新浪新聞,而後內容識別發現其常常訪問關於娛樂新聞,明星八卦等,(基於流量和這類新聞的瀏覽次數來判讀是否常常訪問),那麼這個時候咱們就能夠給其推送一些娛樂周邊新聞等,只要用戶點擊去瀏覽了那麼確定會產生流量啊,那麼運營商的營銷的目的不就達成了麼,哈哈哈!
mysql
該項目投產以後的收效爲:web
既然前面說到了要抓取url的網址,那麼咱們如何來得到這些用戶的數據呢!固然,普通人確定是得不到運營商的數據啦,由於這些數據都是保密的,那麼我要說的是運營商如何獲得用戶數據呢!sql
項目所處理的數據
硬件設備(網關,基站等)
其餘系統(運營系統等)
業務日誌:
HTTP日誌/WAP日誌/MMS日誌/CONN日誌/DNS日誌
數據庫
從移動運營商的核心網關中把須要的數據發送到ftp服務器上,而後咱們這邊就會提供ftp的客戶端去採集ftp服務器的數據,而後處理以後過來進行分析。apache
把url通過爬蟲而後到識別系統,分析出網站名,主題,類別,(做者)等
將分類體系導入到數據庫中,url json封裝的內容信息。
大量的日誌不斷的產生,而後經過行爲軌跡加強,經過一個mapreduce,
若是這個數據匹配到了,則將原始行+內容分析結果信息(從知識庫來的數據)導出到加強日誌。若是匹配不到的數據就輸出到一個待爬清單中。
識別系統:天然語言處理SVM(實時識別),人工識別(人工一條條的去識別),模板識別(一個網頁的內容的位置通常不會變,用xpath來定位到咱們所須要查找的節點)
相信學過xml的應該都會使用xpath了,若是不會的話,能夠查閱我這篇文章:http://blog.csdn.net/sdksdk0/article/details/51555090
json
在這個項目中庸xpath來作這個模板匹配:例如ubuntu
對於一個網頁的html頁面來講,咱們能夠這樣來匹配其標題,例如咱們打開搜狐的html,咱們能夠看到他的這個網頁標題是<title></title>的,因此咱們對於這類網站就能夠用xpath來定位這個title在哪裏,而後去獲取這title節點中內容瀏覽器
<site> souhu.com <site> <property> <name>movie_name</name> <xpath>/path/.../</xpath> </property> </site> </site>固然了,使用xpath和xml去作這種模板匹配有必定的侷限性,適用於一個結構很是清晰的網頁,例如視頻、小說、音樂等,對於那種奇奇怪怪的網友就不適用了。
因此總的來講,對於內容識別要採用多種方式去作,不要侷限於一種,不一樣類型的網站最好有不一樣的解決方案。
咱們使用天然語言處理來進行分析的時候還有問題就是,一個網頁的內容太多了,svm分析有時候不能徹底的識別到咱們想要的內容,就像一條新聞,原本這個新聞的主標題纔是中國網頁的主要內容,而使用天然語言處理系統的話它可能會把新聞下面的廣告讀成了這個網頁的主要內容了,因此這樣的話就會有偏差了。固然咯,天然語言分析仍是頗有用,那麼爲了更精確的識別照顧好網頁的內容怎麼辦呢。好吧,那固然是最傳統的人工讀取了,由可愛的實習生們把這些網頁一條條的瀏覽,而後記錄這個網頁的主要內容!(好吧,不要驚訝,移動就是這麼幹的)。而後讀取大概10萬個網頁,這樣的話就造成了一個規則庫,這個就比天然語言處理和模板匹配的結果更加精確了。
兩個知識庫,一個規則庫(人工分析的),還有一個實例庫(自動分析系統)。
先把url進行規則分析,若是有則輸出,沒有沒有則放到實例庫,若是實例庫也沒有了,就放到待爬清單中。
先拿1T的樣本數據,而後網址按流量彙總排序出來,總流量的前80%,總條數10萬條。
由於只要挑選出來就能夠了,不須要實時在運行的,因此只要一個job就能夠了。
這裏咱們主要拿到一個url和總流量來進行分析和處理,其餘更爲複雜的狀況這裏就不分享了哦。
咱們首先能夠在eclipse總新建一個jav工程,導入各類hadoop/lib下面的jar包,或者直接新建一個mapRedecer工程也能夠。
新建一個bean類:記得要繼承一個Comparable接口。
package cn.tf.kpi; public class FlowBean implements Comparable<FlowBean>{ private String url; private long upflow; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public long getUpflow() { return upflow; } public void setUpflow(long upflow) { this.upflow = upflow; } public FlowBean(String url, long upflow) { super(); this.url = url; this.upflow = upflow; } public FlowBean() { super(); } @Override public int compareTo(FlowBean o) { return (int) (o.getUpflow() - this.upflow) ; } @Override public String toString() { return "FlowBean [url=" + url + ", upflow=" + upflow + "]"; } }而後寫主方法:
其實這裏和我以前寫的那個用戶流量分析系統有不少相似的地方。
package cn.tf.kpi; import java.io.IOException; import java.util.TreeSet; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import cn.tf.kpi.TopURL.TopURLMapper.TopURlReducer; public class TopURL { public static class TopURLMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private Text k = new Text(); private LongWritable v = new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { String url = fields[26]; long upFlow = Long.parseLong(fields[30]); k.set(url); v.set(upFlow); context.write(k, v); } catch (Exception e) { e.printStackTrace(); } } public static class TopURlReducer extends Reducer<Text, LongWritable, Text, LongWritable> { private Text k = new Text(); private LongWritable v = new LongWritable(); TreeSet<FlowBean> urls = new TreeSet<FlowBean>(); //全局流量和 long globalFlowSum =0; @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable v : values) { count += v.get(); } globalFlowSum +=count; FlowBean bean = new FlowBean(key.toString(), count); urls.add(bean); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { long tempSum=0; for(FlowBean bean:urls){ //取前80%的 if(tempSum/globalFlowSum<0.8){ k.set(bean.getUrl()); v.set(bean.getUpflow()); context.write(k,v); tempSum+=bean.getUpflow(); }else{ return; } } } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(); job.setJarByClass(TopURL.class); // 指定本job使用的mapper類 job.setMapperClass(TopURLMapper.class); // 指定本job使用的reducer類 job.setReducerClass(TopURlReducer.class); // 指定mapper輸出的kv的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 指定reducer輸出的kv數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 指定本job要處理的文件所在的路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); // 指定本job輸出的結果文件放在哪一個路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 將本job向hadoop集羣提交執行 boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
如今啓動hadoop集羣服務,
把採集到的數據上傳到hdfs的/topflow/data目錄下,下載地址在文末貼出。(源數據下載地址:http://download.csdn.net/detail/sdksdk0/9551559)。把這個log.1文件上傳你的hdfs目錄中,這個log.1主要是幾十萬條採集到的用戶流量的數據。
bin/hadoop fs -mkdir -p /topflow/data bin/hadoop fs -put ../lx/log.1 /topflow/data
把jar包放到hadoop中執行:
bin/hadoop jar ../lx/top.jar cn.tf.kpi.TopURL /topflow/data /topflow/output
內容以下:就是一個url網址+總流量大小,而且按照降序排列,因此咱們只要拿到這個數據源的總流量最大的前80%的用戶數據就能夠了,而後就能夠進行下一步的操做了。編譯好的這個文件會存放在你的hdfs目錄中,去查看一下就能夠了。
而後你須要先準備一個mysql數據庫,用於存放清洗好的數據,也就是咱們把剛纔獲得的數據,是存放在hdfs的/topflow/output目錄下的那個文件,把這個文件存到mysql數據庫中,做爲一個知識庫。
在mysql數據中新建一個test數據庫,而後建一個名字叫rule的表:
create database test; use test; create table rule( url varchar(1024), info varchar(20) default 'complited' )
而後須要導入數據,這裏我直接使用sqoop來把hdfs中的數據導入到mysql中去,先去下載一個sqoop,
sqoop是一個用來在hadoop體系和關係型數據庫之間進行數據互導的工具
----實質就是將導入導出命令轉換成mapreduce程序來實現。sqoop安裝:安裝在一臺節點上就能夠了。
首先下載好sqoop以後解壓進入conf目錄下,配置hadoop的路徑,而後把mysql的驅動jar包複製一份放到sqoop的lib目錄中。
在sqoop的conf目錄下,修改sqoop-env.sh
export HADOOP_COMMON_HOME=/home/admin1/hadoop/hadoop-2.5.2 export HADOOP_MAPRED_HOME=/home/admin1/hadoop/hadoop-2.5.2
bin/sqoop export --connect jdbc:mysql://ubuntu2:3306/test --username hive --password a \ --table rule \ --export-dir /topflow/output \ --columns url \ --input-fields-terminated-by '\t'
select *from rule;查看這個表裏面的內容。
到這裏,咱們這個url知識庫就創建好了,接下來就能夠愉快的進行後續操做了。
用戶行爲加強就是把原始的數據源來和這個知識庫作匹配,若是有則把原來的數據+分析後的數據做爲一個加強的模塊,若是沒有在這個知識庫中匹配到,則放到待爬數據中。接下來演示的就是把原始數據(也就是前面提到的log.1)的數據與咱們剛纔存放到mysql中的六千多條數據進行匹配,如有,則輸出到加強日至,若無則存放到待爬數據。
這個部分的內容實際上是比較簡單的,就是流程比較複雜一點。接下來繼續在那個eclipse中新建3個類。
先把mysql的驅動包導入咱們新建的這個工程,並build path一下。
這裏寫一個鏈接數據庫的淚,由於咱們要和數據庫中的知識庫作對比嘛,這是一個基礎的jdbc鏈接類,這種很簡單的我相信大街上隨便一我的都會寫的哈。這裏就再也不重複囉嗦啦!
package cn.tf.kpi; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.HashMap; public class DBLoader { public static void dbLoader(HashMap<String, String> ruleMap) { Connection conn = null; Statement st = null; ResultSet res = null; try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://ubuntu2:3306/test", "hive", "a"); st = conn.createStatement(); res = st.executeQuery("select url,info from rule"); while (res.next()) { ruleMap.put(res.getString(1), res.getString(2)); } } catch (Exception e) { e.printStackTrace(); } finally { try{ if(res!=null){ res.close(); } if(st!=null){ st.close(); } if(conn!=null){ conn.close(); } }catch(Exception e){ e.printStackTrace(); } } } public static void main(String[] args) { DBLoader db = new DBLoader(); HashMap<String, String> map = new HashMap<String,String>(); db.dbLoader(map); System.out.println(map.size()); } }
package cn.tf.kpi; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LogEnhanceOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataOutputStream tocrawlOut = fs.create(new Path("hdfs://ubuntu2:9000/topflow/tocrawl/url.list")); FSDataOutputStream enhancedOut = fs.create(new Path("hdfs://ubuntu2:9000/topflow/enhanced/enhanced.log")); return new LogEnhanceRecordWriter(tocrawlOut,enhancedOut); } public static class LogEnhanceRecordWriter extends RecordWriter<Text, NullWritable>{ private FSDataOutputStream tocrawlOut; private FSDataOutputStream enhancedOut; public LogEnhanceRecordWriter(FSDataOutputStream tocrawlOut, FSDataOutputStream enhancedOut) { this.tocrawlOut = tocrawlOut; this.enhancedOut = enhancedOut; } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { if(key.toString().contains("tocrawl")){ //寫入待爬清單目錄(hdfs://) tocrawlOut.write(key.toString().getBytes()); }else{ //寫入加強日誌目錄(hdfs://) enhancedOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStream(enhancedOut); IOUtils.closeStream(tocrawlOut); } } }
package cn.tf.kpi; import java.io.IOException; import java.util.HashMap; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LogEnhance { public static class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> { private HashMap<String, String> contentMap = new HashMap<String, String>(); @Override protected void setup(Context context) throws IOException, InterruptedException { // 加載整個內容識別知識庫到內存中 DBLoader.dbLoader(contentMap); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); try { // 抽取url字段,去匹配規則庫獲取url頁面的內容識別信息 String url = fields[26]; String contentResult = contentMap.get(url); String result = null; // 若是知識庫中沒有這條url的內容識別信息,就只輸出url字段到待爬清單 if (StringUtils.isBlank(contentResult)) { result = url + "\t" + "tocrawl" + "\n"; } else { // 在原始日誌內容後面追加內容識別結果信息做爲加強以後的輸出 result = line + "\t" + contentResult +"\n"; } context.write(new Text(result), NullWritable.get()); } catch (Exception e) { context.getCounter("malformed", "line").increment(1); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(); job.setJarByClass(LogEnhance.class); // 指定本job使用的mapper類 job.setMapperClass(LogEnhanceMapper.class); // 指定mapper輸出的kv的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 指定reducer輸出的kv數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 指定採用自定義的outputformat job.setOutputFormatClass(LogEnhanceOutputFormat.class); // 指定本job要處理的文件所在的路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); // 雖然自定義outputformat中已有輸出目錄,可是這裏仍是要設置一個目錄用來輸出_SUCCESS文件 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 將本job向hadoop集羣提交執行 boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }寫完以後發現這三個類內容好簡單,有木有!
繼續把這個工程達成jar包,命名爲top1.jar,而後放到hadoop下面去運行:
bin/hadoop jar top.jar cn.tf.kpi.LogEnhance /topflow/data /topflow/output1
也能夠到瀏覽器中查看:
到這裏,就在於分享完畢了,按照我分享的這個流量一步步作,我估計要一兩天的時間才能夠徹底作完這個部分!其實總的來講就是過程很是繁瑣,不過也正符合咱們的實際工做,要知道,咱們實際工做中,遠超乎這些繁瑣流程,中間還會報各類奇奇怪怪的錯,有很是多的細節要處理,若是沒有耐心,估計都會瘋掉,哈哈!
歡迎關注,歡迎在評論區留言!