海量數據挖掘之中移動流量運營系統

---------------------------------------------------------------------------------------------------------------
[ 版權申明:本文系做者原創,轉載請註明出處]
文章出處: http://blog.csdn.net/sdksdk0/article/details/51691862
做者:朱培   ID:sdksdk0

---------------------------------------------------------------------------------------------------------------html

由於整個項目工程師很是龐大的,一方面因爲整個開發流量很是繁瑣,不可能經過一篇文章就能夠說得清楚的,另外一方面因爲保密性,因此這裏只分享其中的部份內容。首先會從總體架構等提及,經過本文的學習主要是進一步瞭解海量數據挖掘的框架流程,對數據採集流程、內容識別、知識庫的創建以及行爲軌跡加強有初步的瞭解,學會簡單的url清洗以及可以開發出簡單的分類MapReducer程序。固然,若是部份內容看不懂也不要緊,畢竟這須要經驗的積累,不要太急於求成,能夠先查看個人其餘文章!java


1、項目背景介紹


1.1 項目背景

對於運營商來講,使用海量數據挖掘對客戶移動互聯網行爲進行採集,分析,發現用戶關注的內容,爲開展營銷提供號碼支持。固然,也不侷限於這些功能。例如一個用戶在用手機看小說,那麼確定有一個url的網址啦,用戶全部訪問的網址,ip,時間戳,上下行流量,基站,網絡模式,手機型號等一大串信息都會被記錄下來並在運營商的的雲端進行存儲,這個數據量是很是很是大的。這個時候咱們能夠經過抓取到用戶訪問過的url和總流量,而後經過爬蟲去分析用戶瀏覽的這個網頁是怎麼樣的一個網頁,經過內容識別機制來找出這個網頁的內容,例如一個用戶在看的是新聞url,而後咱們經過爬蟲發現其訪問的是新浪新聞,而後內容識別發現其常常訪問關於娛樂新聞,明星八卦等,(基於流量和這類新聞的瀏覽次數來判讀是否常常訪問),那麼這個時候咱們就能夠給其推送一些娛樂周邊新聞等,只要用戶點擊去瀏覽了那麼確定會產生流量啊,那麼運營商的營銷的目的不就達成了麼,哈哈哈!
mysql

該項目投產以後的收效爲:web

l用戶響應率有三倍提高
本次營銷相關激勵措施和前期開展的WAP PUSH營銷相同;
對比以往的羣發響應率1%-3%,本次羣發響應率效果明顯,達到5.80%-10.21%,有近乎3倍的提高效果。
l訪問用戶活躍度高
用戶後有繼續點擊其餘內容的行爲,最高佔比達到91.4%
產生二次點擊行爲的用戶數的佔比高,反映貼合用戶需求的內容對用戶的吸引力,用戶粘性越高。


1.2 項目概況


在每一個省單獨一套系統,分爲3個集羣:數據採集集羣(6-10個節點),行爲軌跡加強處理集羣(20-25節點),ETL、統計分析集羣(35節點)

節點就是:PC服務器,放到機櫃中,配置(4顆12核cpu,64G或128G內存,磁盤1T*8/12)

數據量,天天2T左右(10億行以上),時間維度,地域維度。最長分析3個月的。天天增量不斷增加。
項目組成員:
  研發團隊:數據採集(3-4人),行爲軌跡加強(10人左右),ETL(20人左右,須要寫web程序)
  實施團隊:部署環境,2-3個實施人員帶上20多個開發人員
  運維團隊:
  銷售團隊等

1.3 系統架構圖




2、數據採集系統架構

既然前面說到了要抓取url的網址,那麼咱們如何來得到這些用戶的數據呢!固然,普通人確定是得不到運營商的數據啦,由於這些數據都是保密的,那麼我要說的是運營商如何獲得用戶數據呢!sql

項目所處理的數據
硬件設備(網關,基站等)
其餘系統(運營系統等)

業務日誌:
HTTP日誌/WAP日誌/MMS日誌/CONN日誌/DNS日誌
數據庫



從移動運營商的核心網關中把須要的數據發送到ftp服務器上,而後咱們這邊就會提供ftp的客戶端去採集ftp服務器的數據,而後處理以後過來進行分析。apache




3、內容識別模塊

把url通過爬蟲而後到識別系統,分析出網站名,主題,類別,(做者)等
將分類體系導入到數據庫中,url json封裝的內容信息。
大量的日誌不斷的產生,而後經過行爲軌跡加強,經過一個mapreduce,
若是這個數據匹配到了,則將原始行+內容分析結果信息(從知識庫來的數據)導出到加強日誌。若是匹配不到的數據就輸出到一個待爬清單中。


識別系統:天然語言處理SVM(實時識別),人工識別(人工一條條的去識別),模板識別(一個網頁的內容的位置通常不會變,用xpath來定位到咱們所須要查找的節點)

相信學過xml的應該都會使用xpath了,若是不會的話,能夠查閱我這篇文章:http://blog.csdn.net/sdksdk0/article/details/51555090


json

在這個項目中庸xpath來作這個模板匹配:例如ubuntu

對於一個網頁的html頁面來講,咱們能夠這樣來匹配其標題,例如咱們打開搜狐的html,咱們能夠看到他的這個網頁標題是<title></title>的,因此咱們對於這類網站就能夠用xpath來定位這個title在哪裏,而後去獲取這title節點中內容瀏覽器


<site>
souhu.com
<site>
	<property>
		<name>movie_name</name>
		<xpath>/path/.../</xpath>
	</property>

</site>
</site>
固然了,使用xpath和xml去作這種模板匹配有必定的侷限性,適用於一個結構很是清晰的網頁,例如視頻、小說、音樂等,對於那種奇奇怪怪的網友就不適用了。

因此總的來講,對於內容識別要採用多種方式去作,不要侷限於一種,不一樣類型的網站最好有不一樣的解決方案。

咱們使用天然語言處理來進行分析的時候還有問題就是,一個網頁的內容太多了,svm分析有時候不能徹底的識別到咱們想要的內容,就像一條新聞,原本這個新聞的主標題纔是中國網頁的主要內容,而使用天然語言處理系統的話它可能會把新聞下面的廣告讀成了這個網頁的主要內容了,因此這樣的話就會有偏差了。固然咯,天然語言分析仍是頗有用,那麼爲了更精確的識別照顧好網頁的內容怎麼辦呢。好吧,那固然是最傳統的人工讀取了,由可愛的實習生們把這些網頁一條條的瀏覽,而後記錄這個網頁的主要內容!(好吧,不要驚訝微笑,移動就是這麼幹的)。而後讀取大概10萬個網頁,這樣的話就造成了一個規則庫,這個就比天然語言處理和模板匹配的結果更加精確了。


4、知識庫url挑選

兩個知識庫,一個規則庫(人工分析的),還有一個實例庫(自動分析系統)。
先把url進行規則分析,若是有則輸出,沒有沒有則放到實例庫,若是實例庫也沒有了,就放到待爬清單中。
先拿1T的樣本數據,而後網址按流量彙總排序出來,總流量的前80%,總條數10萬條。
由於只要挑選出來就能夠了,不須要實時在運行的,因此只要一個job就能夠了。


這裏咱們主要拿到一個url和總流量來進行分析和處理,其餘更爲複雜的狀況這裏就不分享了哦。

咱們首先能夠在eclipse總新建一個jav工程,導入各類hadoop/lib下面的jar包,或者直接新建一個mapRedecer工程也能夠。

新建一個bean類:記得要繼承一個Comparable接口。

package cn.tf.kpi;

public class FlowBean implements Comparable<FlowBean>{

	private String url;
	private long upflow;
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public long getUpflow() {
		return upflow;
	}
	public void setUpflow(long upflow) {
		this.upflow = upflow;
	}
	public FlowBean(String url, long upflow) {
		super();
		this.url = url;
		this.upflow = upflow;
	}
	public FlowBean() {
		super();
	}
	@Override
	public int compareTo(FlowBean o) {
		return (int) (o.getUpflow() - this.upflow) ;
	}
	@Override
	public String toString() {
		return "FlowBean [url=" + url + ", upflow=" + upflow + "]";
	}	
}
而後寫主方法:

其實這裏和我以前寫的那個用戶流量分析系統有不少相似的地方。

package cn.tf.kpi;

import java.io.IOException;
import java.util.TreeSet;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.tf.kpi.TopURL.TopURLMapper.TopURlReducer;

public class TopURL {

	public static class TopURLMapper extends
			Mapper<LongWritable, Text, Text, LongWritable> {

		private Text k = new Text();
		private LongWritable v = new LongWritable();

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			String line = value.toString();
			String[] fields = StringUtils.split(line, "\t");
			try {
				String url = fields[26];

				long upFlow = Long.parseLong(fields[30]);

				k.set(url);
				v.set(upFlow);

				context.write(k, v);
			} catch (Exception e) {
				e.printStackTrace();
			}

		}

		public static class TopURlReducer extends
				Reducer<Text, LongWritable, Text, LongWritable> {

			private Text k = new Text();
			private LongWritable v = new LongWritable();

			TreeSet<FlowBean> urls = new TreeSet<FlowBean>();

			//全局流量和
			long globalFlowSum  =0;
			
			
			@Override
			protected void reduce(Text key, Iterable<LongWritable> values,
					Context context) throws IOException, InterruptedException {

				long count = 0;
				for (LongWritable v : values) {
					count += v.get();
				}
				
				globalFlowSum +=count;
				FlowBean bean = new FlowBean(key.toString(), count);
				urls.add(bean);
			}
			
			@Override
			protected void cleanup(Context context) throws IOException, InterruptedException {
					
				long tempSum=0;
				for(FlowBean bean:urls){
					//取前80%的
					if(tempSum/globalFlowSum<0.8){
					k.set(bean.getUrl());
					v.set(bean.getUpflow());
					context.write(k,v);
					
					tempSum+=bean.getUpflow();
					}else{
						return;
					}
				}
			}
		}
	}

	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance();

		job.setJarByClass(TopURL.class);

		// 指定本job使用的mapper類
		job.setMapperClass(TopURLMapper.class);
		// 指定本job使用的reducer類
		job.setReducerClass(TopURlReducer.class);

		// 指定mapper輸出的kv的數據類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);

		// 指定reducer輸出的kv數據類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		// 指定本job要處理的文件所在的路徑
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		// 指定本job輸出的結果文件放在哪一個路徑
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		// 將本job向hadoop集羣提交執行
		boolean res = job.waitForCompletion(true);

		System.exit(res ? 0 : 1);

	}

}

而後把這個工程打成一個jar包,命名爲top.jar。存放到你指定的一個位置就能夠了,我存放的位置是在/home/admin1/hadoop/lx/top.jar

如今啓動hadoop集羣服務,
把採集到的數據上傳到hdfs的/topflow/data目錄下,下載地址在文末貼出。(源數據下載地址:http://download.csdn.net/detail/sdksdk0/9551559)。把這個log.1文件上傳你的hdfs目錄中,這個log.1主要是幾十萬條採集到的用戶流量的數據。

bin/hadoop fs -mkdir -p /topflow/data
bin/hadoop fs -put ../lx/log.1  /topflow/data

把jar包放到hadoop中執行:

bin/hadoop jar ../lx/top.jar  cn.tf.kpi.TopURL  /topflow/data   /topflow/output  

執行以後會見過系統的處理就能夠選出url了。

內容以下:就是一個url網址+總流量大小,而且按照降序排列,因此咱們只要拿到這個數據源的總流量最大的前80%的用戶數據就能夠了,而後就能夠進行下一步的操做了。編譯好的這個文件會存放在你的hdfs目錄中,去查看一下就能夠了。



而後你須要先準備一個mysql數據庫,用於存放清洗好的數據,也就是咱們把剛纔獲得的數據,是存放在hdfs的/topflow/output目錄下的那個文件,把這個文件存到mysql數據庫中,做爲一個知識庫。

在mysql數據中新建一個test數據庫,而後建一個名字叫rule的表:

create database test;
use test;
create table rule(
  url  varchar(1024),
  info varchar(20) default 'complited'
)

而後須要導入數據,這裏我直接使用sqoop來把hdfs中的數據導入到mysql中去,先去下載一個sqoop,

sqoop是一個用來在hadoop體系和關係型數據庫之間進行數據互導的工具
----實質就是將導入導出命令轉換成mapreduce程序來實現。sqoop安裝:安裝在一臺節點上就能夠了。

首先下載好sqoop以後解壓進入conf目錄下,配置hadoop的路徑,而後把mysql的驅動jar包複製一份放到sqoop的lib目錄中。

在sqoop的conf目錄下,修改sqoop-env.sh

export HADOOP_COMMON_HOME=/home/admin1/hadoop/hadoop-2.5.2
export HADOOP_MAPRED_HOME=/home/admin1/hadoop/hadoop-2.5.2

而後啓動:進入sqoop的目錄下,運行下面這個程序

bin/sqoop export --connect jdbc:mysql://ubuntu2:3306/test --username hive --password  a \
--table rule \
--export-dir /topflow/output \
--columns url \
--input-fields-terminated-by '\t'

而後就會把數據所有存入到mysql中,通過清洗以後,這裏大概是6070條數據,在實際生產中的數據量可不只僅只有這麼一點點。

select *from rule;查看這個表裏面的內容。


到這裏,咱們這個url知識庫就創建好了,接下來就能夠愉快的進行後續操做了。


5、用戶行爲軌跡加強


用戶行爲加強就是把原始的數據源來和這個知識庫作匹配,若是有則把原來的數據+分析後的數據做爲一個加強的模塊,若是沒有在這個知識庫中匹配到,則放到待爬數據中。接下來演示的就是把原始數據(也就是前面提到的log.1)的數據與咱們剛纔存放到mysql中的六千多條數據進行匹配,如有,則輸出到加強日至,若無則存放到待爬數據。

這個部分的內容實際上是比較簡單的,就是流程比較複雜一點。接下來繼續在那個eclipse中新建3個類。

先把mysql的驅動包導入咱們新建的這個工程,並build path一下。

這裏寫一個鏈接數據庫的淚,由於咱們要和數據庫中的知識庫作對比嘛,這是一個基礎的jdbc鏈接類,這種很簡單的我相信大街上隨便一我的都會寫的哈大笑。這裏就再也不重複囉嗦啦!

package cn.tf.kpi;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.ResultSet;

import java.sql.Statement;

import java.util.HashMap;



public class DBLoader {

	public static void dbLoader(HashMap<String, String> ruleMap) {

		Connection conn = null;

		Statement st = null;

		ResultSet res = null;

		try {

			Class.forName("com.mysql.jdbc.Driver");

			conn = DriverManager.getConnection("jdbc:mysql://ubuntu2:3306/test", "hive", "a");

			st = conn.createStatement();

			res = st.executeQuery("select url,info from rule");

			while (res.next()) {

				ruleMap.put(res.getString(1), res.getString(2));

			}

		} catch (Exception e) {

			e.printStackTrace();

		} finally {

			try{

				if(res!=null){

					res.close();

				}

				if(st!=null){

					st.close();

				}

				if(conn!=null){

					conn.close();

				}
			}catch(Exception e){

				e.printStackTrace();

			}

		}

	}

	public static void main(String[] args) {

		DBLoader db = new DBLoader();

		HashMap<String, String> map = new HashMap<String,String>();

		db.dbLoader(map);

		System.out.println(map.size());

	}

}

接下來就是一個加強日誌類:

package cn.tf.kpi;



import java.io.IOException;



import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class LogEnhanceOutputFormat extends FileOutputFormat<Text, NullWritable>{



	@Override

	public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {

		

		FileSystem	fs = FileSystem.get(context.getConfiguration());

		FSDataOutputStream tocrawlOut = fs.create(new Path("hdfs://ubuntu2:9000/topflow/tocrawl/url.list"));

		FSDataOutputStream enhancedOut = fs.create(new Path("hdfs://ubuntu2:9000/topflow/enhanced/enhanced.log"));

		

		

		return new LogEnhanceRecordWriter(tocrawlOut,enhancedOut);

	}


	public static class LogEnhanceRecordWriter extends RecordWriter<Text, NullWritable>{


		private FSDataOutputStream tocrawlOut;

		private FSDataOutputStream enhancedOut;

		

		public LogEnhanceRecordWriter(FSDataOutputStream tocrawlOut, FSDataOutputStream enhancedOut) {

			this.tocrawlOut = tocrawlOut;

			this.enhancedOut = enhancedOut;

		}



		@Override

		public void write(Text key, NullWritable value) throws IOException, InterruptedException {


			if(key.toString().contains("tocrawl")){

				//寫入待爬清單目錄(hdfs://)

				tocrawlOut.write(key.toString().getBytes());

			}else{

				//寫入加強日誌目錄(hdfs://)

				enhancedOut.write(key.toString().getBytes());		

			}
		}



		@Override

		public void close(TaskAttemptContext context) throws IOException, InterruptedException {

			IOUtils.closeStream(enhancedOut);

			IOUtils.closeStream(tocrawlOut);

			

		}	

		

	}

}

最後把最重要的一個類寫完就能夠了;這裏只要寫一個map就能夠了

package cn.tf.kpi;



import java.io.IOException;

import java.util.HashMap;



import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class LogEnhance {



	public static class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

		private HashMap<String, String> contentMap = new HashMap<String, String>();

		@Override

		protected void setup(Context context) throws IOException, InterruptedException {

			// 加載整個內容識別知識庫到內存中

			DBLoader.dbLoader(contentMap);

		}

		@Override

		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {



			String line = value.toString();

			String[] fields = StringUtils.split(line, "\t");



			try {

				// 抽取url字段,去匹配規則庫獲取url頁面的內容識別信息
				String url = fields[26];

				String contentResult = contentMap.get(url);


				String result = null;

				// 若是知識庫中沒有這條url的內容識別信息,就只輸出url字段到待爬清單

				if (StringUtils.isBlank(contentResult)) {

					result = url + "\t" + "tocrawl" + "\n";

				} else {

					// 在原始日誌內容後面追加內容識別結果信息做爲加強以後的輸出

					result = line + "\t" + contentResult +"\n";

				}


				context.write(new Text(result), NullWritable.get());

			} catch (Exception e) {

				context.getCounter("malformed", "line").increment(1);

			}

		}

	}



	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();

		Job job = Job.getInstance();



		job.setJarByClass(LogEnhance.class);



		// 指定本job使用的mapper類

		job.setMapperClass(LogEnhanceMapper.class);



		// 指定mapper輸出的kv的數據類型

		job.setMapOutputKeyClass(Text.class);

		job.setMapOutputValueClass(NullWritable.class);



		// 指定reducer輸出的kv數據類型

		job.setOutputKeyClass(Text.class);

		job.setOutputValueClass(NullWritable.class);



		// 指定採用自定義的outputformat

		job.setOutputFormatClass(LogEnhanceOutputFormat.class);



		// 指定本job要處理的文件所在的路徑

		FileInputFormat.setInputPaths(job, new Path(args[0]));

		// 雖然自定義outputformat中已有輸出目錄,可是這裏仍是要設置一個目錄用來輸出_SUCCESS文件

		FileOutputFormat.setOutputPath(job, new Path(args[1]));



		// 將本job向hadoop集羣提交執行

		boolean res = job.waitForCompletion(true);



		System.exit(res ? 0 : 1);

	}



}
寫完以後發現這三個類內容好簡單,有木有!

繼續把這個工程達成jar包,命名爲top1.jar,而後放到hadoop下面去運行:


bin/hadoop jar top.jar  cn.tf.kpi.LogEnhance  /topflow/data  /topflow/output1


而後去查看一個數據就能夠了。


也能夠到瀏覽器中查看:


到這裏,就在於分享完畢了,按照我分享的這個流量一步步作,我估計要一兩天的時間才能夠徹底作完這個部分!其實總的來講就是過程很是繁瑣,不過也正符合咱們的實際工做,要知道,咱們實際工做中,遠超乎這些繁瑣流程,中間還會報各類奇奇怪怪的錯,有很是多的細節要處理,若是沒有耐心,估計都會瘋掉,哈哈!

歡迎關注,歡迎在評論區留言!



數據源下載地址:http://download.csdn.net/detail/sdksdk0/9551559

相關文章
相關標籤/搜索