Hadoop Web項目--Friend Find系統

項目使用軟件:Myeclipse10.0,JDK1.7,Hadoop2.6,MySQL5.6。EasyUI1.3.6。jQuery2.0,Spring4.1.3。javascript

Hibernate4.3.1,struts2.3.1。Tomcat7 。Maven3.2.1。java

項目下載地址:https://github.com/fansy1990/friend_find ,項目部署參考:http://blog.csdn.net/fansy1990/article/details/46481409 。node

Hadoop Web項目--Friend Find系統

1. 項目介紹

        Friend Find系統是一個尋找相似用戶的系統。

用戶填寫本身的信息後就可以在本系統內找到和本身志同道合的朋友。本系統使用的是在http://stackoverflow.com/站點上的用戶數據。Stack Overflow是一個程序設計領域的問答站點,隸屬Stack Exchange Network。站點贊成注冊用戶提出或回答問題。還贊成對已有問題或答案加分、扣分或進行改動,條件是用戶達到必定的「聲望值」。「聲望值」就是用戶進行站點交互時能獲取的分數。當聲望值達到某個程度時,用戶的權限就會添加,比方聲望值超過50點就可以評論答案。當用戶的聲望值達到某個階段時,站點還會給用戶頒發貢獻徽章。以此來激勵用戶對站點作出貢獻。該項目創建在如下的假設基礎上,假設用戶對於一個領域問題的「態度」就可以反映出該用戶的價值取向,並依據此價值取向來對用戶進行聚類分組。mysql

這裏的態度可以使用幾個指標屬性來評判,在本系統中原始數據(即用戶信息數據)包含的屬性有多個,從中挑選出最能符合用戶觀點的屬性。做爲該用戶的「態度」進行分析。這裏挑選的屬性是:reputation,upVotes。downVotes,views。即便用這4個屬性來對用戶進行聚類。同一時候,這裏使用MR實現的Clustering by fast search and find of density peaks聚類算法,這裏的實現和http://blog.csdn.net/fansy1990/article/details/46364697這裏的實現原始是不一樣的。linux

2. 項目執行

2.1 準備

1. 下載project。參考上面的鏈接 https://github.com/fansy1990/friend_find,並參考 http://blog.csdn.net/fansy1990/article/details/46481409把它部署上去;
 1) 注意依據數據庫的配置,在mysql數據庫中新建一個friend數據庫;
 2)直接執行部署project,就能夠在數據庫中本身主動創建相應的表,包含:hconstants、loginuser、userdata、usergroup,當中loginuser是用戶登陸表。會本身主動初始化(默認有兩個用戶admin/admin、test/test),hconstants是雲平臺參數數據表、userdata存儲原始用戶數據、usergroup存儲聚類分羣后每個用戶的組別。
2. 部署雲平臺Hadoop2.6(僞分佈式或者全然分佈式都可以,本項目測試使用僞分佈式),同一時候需要注意:設置雲平臺系統linux的時間和執行tomcat的機器的時間同樣,因爲在雲平臺任務監控的時候使用了時間做爲監控中止的信號(詳細可以參考後面)。

3. 使用MyEclipse的export功能把所有源代碼打包,而後把打包後的jar文件複製到hadoop集羣的$HADOOP_HOME/share/hadoop/mapreduce/文件夾如下。

2.2 執行

1. 初始化相應的表
初始化集羣配置表hconstants
訪問系統首頁:http://localhost/friend_find (這裏部署的tomcat默認使用80端口,同一時候web部署的名稱爲friend_find),就能夠看到如下的頁面(系統首頁):

點擊登陸,就能夠看到系統介紹。
點擊初始化表,依次選擇相應的表。就能夠完畢初始化

點擊Hadoop集羣配置表,查看數據:

這裏初始化使用的是lz的虛擬機的配置。因此需要改動爲本身的集羣配置,點擊某一行數據。在toolbar裏就能夠選擇改動或保存等。
2. 系統原始文件:
系統原始文件在project的:

3. 項目實現流程

項目實現的流程依照系統首頁左邊導航欄的順序從上到下執行。完畢數據挖掘的各個步驟。

3.1 數據探索

下載原始數據ask_ubuntu_users.xml 文件,打開,可以看到:

原始數據一共同擁有19550條記錄,去除第一、二、最後一行外其它都是用戶數據(第3行不是用戶數據,是該站點的描寫敘述);
用戶數據需要使用一個主鍵來惟一標示該用戶。這裏不是選擇Id。而是使用EmailHash(這裏假設每個EmailHash一樣的帳號其是同一我的)。

使用上面的假設後,對原始數據進行分析(這裏是所有導入到數據庫後發現的),發現EmailHash是有反覆記錄的。因此這裏需要對數據進行預處理--去重。git

3.2 數據預處理

1. 數據去重
數據去重採用雲平臺Hadoop進行處理,首先把ask_ubuntu_users.xml文件上傳到雲平臺,接着執行MR任務進行過濾。
2. 數據序列化
因爲計算用戶向量兩兩之間的距離的MR任務使用的是序列化的文件,因此這裏需要對數據進行序列化處理;

3.3 建模

建模即便用高速聚類算法來對原始數據進行聚類,主要包含如下幾個步驟:
1. 計算用戶向量兩兩之間的距離;
2. 依據距離求解每個用戶向量的局部密度。
3. 依據1.和2.的結果求解每個用戶向量的最小距離;
4. 依據2,3的結果畫出決策圖,並推斷聚類中心的局部密度和最小距離的閾值。
5. 依據局部密度和最小距離閾值來尋找聚類中心向量;
6. 依據聚類中心向量來進行分類;

3.4 推薦

建模後的結果即可以獲得聚類中心向量以及每個分羣的百分比,同一時候依據分類的結果來對用戶進行組內推薦。

項目流程圖例如如下:

4. 項目功能及實現原理

項目功能主要包含如下:

4.1 數據庫表維護

數據庫表維護主要包含:數據庫表初始化,即用戶登陸表和Hadoop集羣配置表的初始化。數據庫表增刪改查查看:即用戶登陸表、用戶數據表、Hadoop集羣配置表的增刪改查。

數據庫表增刪改查使用同一個DBService類來進行處理。(這裏的DAO使用的是通用的)假設針對每個表都創建一個DAO,那麼代碼就很是臃腫,因此這裏把這些數據庫表都是實現一個接口ObjectInterface,該接口使用一個Map來實例化各個對象。
public interface ObjectInterface {
	/**
	 * 不用每個表都創建一個方法,這裏依據表名本身主動裝配
	 * @param map
	 * @return
	 */
	public  Object setObjectByMap(Map<String,Object> map);
}

在進行保存的時候,直接使用前臺傳入的表名和json字符串進行更新就能夠
/**
	 * 更新或者插入表
	 * 不用每個表都創建一個方法,這裏依據表名本身主動裝配
	 * @param tableName
	 * @param json
	 * @return
	 */
	public boolean updateOrSave(String tableName,String json){
		try{
			// 依據表名得到實體類。並賦值
			Object o = Utils.getEntity(Utils.getEntityPackages(tableName),json);
			baseDao.saveOrUpdate(o);
			log.info("保存表{}。",new Object[]{tableName});
		}catch(Exception e){
			
			e.printStackTrace();
			return false;
		}
		return true;
	}
/**
	 * 依據類名得到實體類
	 * @param tableName
	 * @param json
	 * @return
	 * @throws ClassNotFoundException 
	 * @throws IllegalAccessException 
	 * @throws InstantiationException 
	 * @throws IOException 
	 * @throws JsonMappingException 
	 * @throws JsonParseException 
	 */
	@SuppressWarnings("unchecked")
	public static Object getEntity(String tableName, String json) throws ClassNotFoundException, InstantiationException, IllegalAccessException, JsonParseException, JsonMappingException, IOException {
		Class<?

> cl = Class.forName(tableName); ObjectInterface o = (ObjectInterface)cl.newInstance(); Map<String,Object> map = new HashMap<String,Object>(); ObjectMapper mapper = new ObjectMapper(); try { //convert JSON string to Map map = mapper.readValue(json, Map.class); return o.setObjectByMap(map); } catch (Exception e) { e.printStackTrace(); } return null; }github


4.2 數據預處理

數據預處理包含文件上傳、文件去重、文件下載、數據入庫、DB過濾到HDFS、距離計算、最佳DC。

1. 文件上傳
文件上傳便是把文件從本地上傳到HDFS,例如如下界面:

這裏上傳的便是ask_ubuntu_users.xml 所有數據文件。

上傳直接使用FileSystem的靜態方法下載,例如如下代碼():web

fs.copyFromLocalFile(src, dst);
上傳成功就能夠顯示操做成功,這裏使用aJax異步提交:
// =====uploadId,數據上傳button綁定 click方法
	$('#uploadId').bind('click', function(){
		var input_i=$('#localFileId').val();
		// 彈出進度框
		popupProgressbar('數據上傳','數據上傳中...',1000);
		// ajax 異步提交任務
		callByAJax('cloud/cloud_upload.action',{input:input_i});
	});
當中調用aJax使用一個封裝的方法。之後都可以調用,例如如下:
// 調用ajax異步提交
// 任務返回成功。則提示成功。不然提示失敗的信息
function callByAJax(url,data_){
	$.ajax({
		url : url,
		data: data_,
		async:true,
		dataType:"json",
		context : document.body,
		success : function(data) {
//			$.messager.progress('close');
			closeProgressbar();
			console.info("data.flag:"+data.flag);
			var retMsg;
			if("true"==data.flag){
				retMsg='操做成功。';
			}else{
				retMsg='操做失敗。失敗緣由:'+data.msg;
			}
			$.messager.show({
				title : '提示',
				msg : retMsg
			});
			
			if("true"==data.flag&&"true"==data.monitor){// 加入監控頁面
				// 使用單獨Tab的方式
				layout_center_addTabFun({
					title : 'MR算法監控',
					closable : true,
					// iconCls : node.iconCls,
					href : 'cluster/monitor_one.jsp'
				});
			}
			
		}
	});
}
後臺返回的是json數據。並且這裏爲了和雲平臺監控任務兼容(考慮通用性),這裏還加入了一個打開監控的代碼。
2. 文件去重
在導航欄選擇文件去重,就能夠看到如下的界面:

點擊去重就能夠提交任務到雲平臺,並且會打開MR的監控,例如如下圖:


在點擊」去重「按鈕時,會啓動一個後臺線程Thread:
/**
	 * 去重任務提交
	 */
	public void deduplicate(){
		Map<String ,Object> map = new HashMap<String,Object>();
		try{
			HUtils.setJobStartTime(System.currentTimeMillis()-10000);
			HUtils.JOBNUM=1;
			new Thread(new Deduplicate(input,output)).start();
			map.put("flag", "true");
			map.put("monitor", "true");
		} catch (Exception e) {
			e.printStackTrace();
			map.put("flag", "false");
			map.put("monitor", "false");
			map.put("msg", e.getMessage());
		}
		Utils.write2PrintWriter(JSON.toJSONString(map));
	}
首先設置所有任務的起始時間,這裏往前推遲了10s。是爲了防止時間相差太大(也可以設置2s左右。假設tomcat所在機器和集羣機器時間同樣則不用設置);接着設置任務的總個數;最後啓動多線程執行MR任務。
在任務監控界面,啓動一個定時器,會定時向後臺請求任務的監控信息,當任務所有完畢則會關閉該定時器。
<script type="text/javascript">
		// 本身主動定時刷新 1s
	 	var monitor_cf_interval= setInterval("monitor_one_refresh()",3000);
	</script>
function monitor_one_refresh(){
	$.ajax({ // ajax提交
		url : 'cloud/cloud_monitorone.action',
		dataType : "json",
		success : function(data) {
			if (data.finished == 'error') {// 獲取信息錯誤 。返回數據設置爲0。不然正常返回
				clearInterval(monitor_cf_interval);
				setJobInfoValues(data);
				console.info("monitor,finished:"+data.finished);
				$.messager.show({
					title : '提示',
					msg : '任務執行失敗。'
				});
			} else if(data.finished == 'true'){
				// 所有任務執行成功則中止timer
				console.info('monitor,data.finished='+data.finished);
				setJobInfoValues(data);
				clearInterval(monitor_cf_interval);
				$.messager.show({
					title : '提示',
					msg : '所有任務成功執行完畢!

' }); }else{ // 設置提示,並更改頁面數據,多行顯示job任務信息 setJobInfoValues(data); } } }); }ajax


後臺獲取任務的監控信息。使用如下的方式:
1)使用JobClient.getAllJobs()獲取所有任務的監控信息。
2)使用前面設置的所有任務的啓動時間來過濾每個任務;
3)對過濾後的任務依照啓動時間進行排序並返回;
4)依據返回任務信息的個數和設置的應該的個數來推斷是否中止監控;
/**
	 * 單個任務監控
	 * @throws IOException
	 */
	public void monitorone() throws IOException{
    	Map<String ,Object> jsonMap = new HashMap<String,Object>();
    	List<CurrentJobInfo> currJobList =null;
    	try{
    		currJobList= HUtils.getJobs();
//    		jsonMap.put("rows", currJobList);// 放入數據
    		jsonMap.put("jobnums", HUtils.JOBNUM);
    		// 任務完畢的標識是獲取的任務個數必須等於jobNum,同一時候最後一個job完畢
    		// true 所有任務完畢
    		// false 任務正在執行
    		// error 某一個任務執行失敗。則再也不監控
    		
    		if(currJobList.size()>=HUtils.JOBNUM){// 假設返回的list有JOBNUM個。那麼纔可能完畢任務
    			if("success".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))){
    				jsonMap.put("finished", "true");
    				// 執行完畢。初始化時間點
    				HUtils.setJobStartTime(System.currentTimeMillis());
    			}else if("running".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))){
    				jsonMap.put("finished", "false");
    			}else{// fail 或者kill則設置爲error
    				jsonMap.put("finished", "error");
    				HUtils.setJobStartTime(System.currentTimeMillis());
    			}
    		}else if(currJobList.size()>0){
    			if("fail".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))||
    					"kill".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))){
    				jsonMap.put("finished", "error");
    				HUtils.setJobStartTime(System.currentTimeMillis());
    			}else{
    				jsonMap.put("finished", "false");
    			}
        	}	
    		if(currJobList.size()==0){
    			jsonMap.put("finished", "false");
//    			return ;
    		}else{
    			if(jsonMap.get("finished").equals("error")){
    				CurrentJobInfo cj =currJobList.get(currJobList.size()-1);
    				cj.setRunState("Error!");
    				jsonMap.put("rows", cj);
    			}else{
    				jsonMap.put("rows", currJobList.get(currJobList.size()-1));
    			}
    		}
    		jsonMap.put("currjob", currJobList.size());
    	}catch(Exception e){
    		e.printStackTrace();
    		jsonMap.put("finished", "error");
    		HUtils.setJobStartTime(System.currentTimeMillis());
    	}
    	System.out.println(new java.util.Date()+":"+JSON.toJSONString(jsonMap));
    	Utils.write2PrintWriter(JSON.toJSONString(jsonMap));// 使用JSON傳輸數據
    	return ;
    }

獲取所有任務,並過濾的代碼:
/**
	 * 依據時間來推斷,而後得到Job的狀態。以此來進行監控 Job的啓動時間和使用system.currentTimeMillis得到的時間是一致的。
	 * 不存在時區不一樣的問題;
	 * 
	 * @return
	 * @throws IOException
	 */
	public static List<CurrentJobInfo> getJobs() throws IOException {
		JobStatus[] jss = getJobClient().getAllJobs();
		List<CurrentJobInfo> jsList = new ArrayList<CurrentJobInfo>();
		jsList.clear();
		for (JobStatus js : jss) {
			if (js.getStartTime() > jobStartTime) {
				jsList.add(new CurrentJobInfo(getJobClient().getJob(
						js.getJobID()), js.getStartTime(), js.getRunState()));
			}
		}
		Collections.sort(jsList);
		return jsList;
	}
當有多個任務時,使用此監控也是可以的,僅僅用設置HUtils.JOBNUM的值就能夠。
3. 文件下載
文件下載便是把過濾後的文件下載到本地,(因爲過濾後的文件需要導入到數據庫Mysql。因此這裏提供下載功能)

文件下載使用FilsSystem.copyToLocalFile()靜態方法:
fs.copyToLocalFile(false, file.getPath(), new Path(dst,
							"hdfs_" + (i++) + HUtils.DOWNLOAD_EXTENSION), true);
4.數據入庫
數據入庫即文件從去重後的本地文件導入到MySql數據庫中:

這裏使用的是批量插入。同一時候這裏不使用xml的解析,而是直接使用字符串的解析,因爲在雲平臺過濾的時候,是去掉了第1,2,最後一行,因此xml文件是不完整的。不能使用xml解析,因此直接使用讀取文件,而後進行字符串的解析。

/**
	 * 批量插入xmlPath數據
	 * @param xmlPath
	 * @return
	 */
	public Map<String,Object> insertUserData(String xmlPath){
		Map<String,Object> map = new HashMap<String,Object>();
		try{
			baseDao.executeHql("delete UserData");
//			if(!Utils.changeDat2Xml(xmlPath)){
//				map.put("flag", "false");
//				map.put("msg", "HDFS文件轉爲xml失敗");
//				return map;
//			}
//			List<String[]> strings= Utils.parseXmlFolder2StrArr(xmlPath);
			// ---解析不使用xml解析,直接使用定製解析就能夠
			//---
			List<String[]>strings = Utils.parseDatFolder2StrArr(xmlPath);
			List<Object> uds = new ArrayList<Object>();
			for(String[] s:strings){
				uds.add(new UserData(s));
			}
			int ret =baseDao.saveBatch(uds);
			log.info("用戶表批量插入了{}條記錄!",ret);
		}catch(Exception e){
			e.printStackTrace();
			map.put("flag", "false");
			map.put("msg", e.getMessage());
			return map;
		}
		map.put("flag", "true");
		return map;
	}

public Integer saveBatch(List<Object> lists) {
		Session session = this.getCurrentSession();
//		org.hibernate.Transaction tx = session.beginTransaction();
		int i=0;
		try{
		for ( Object l:lists) {
			i++;
		    session.save(l);
			if( i % 50 == 0 ) { // Same as the JDBC batch size
		        //flush a batch of inserts and release memory:
		        session.flush();
		        session.clear();
		        if(i%1000==0){
		        	System.out.println(new java.util.Date()+":已經預插入了"+i+"條記錄...");
		        }
		    }
		}}catch(Exception e){
			e.printStackTrace();
		}
//		tx.commit();
//		session.close();
		Utils.simpleLog("插入數據數爲:"+i);
		return i;
	}

5. DB過濾到HDFS
MySQL的用戶數據過濾到HDFS,即便用如下的規則進行過濾:
規則 :reputation>15,upVotes>0,downVotes>0,views>0的用戶。
接着。上傳這些用戶,使用SequenceFile進行寫入,因爲如下的距離計算便是使用序列化文件做爲輸入的。因此這裏直接寫入序列化文件;
private static boolean db2hdfs(List<Object> list, Path path) throws IOException {
		boolean flag =false;
		int recordNum=0;
		SequenceFile.Writer writer = null;
		Configuration conf = getConf();
		try {
			Option optPath = SequenceFile.Writer.file(path);
			Option optKey = SequenceFile.Writer
					.keyClass(IntWritable.class);
			Option optVal = SequenceFile.Writer.valueClass(DoubleArrIntWritable.class);
			writer = SequenceFile.createWriter(conf, optPath, optKey, optVal);
			DoubleArrIntWritable dVal = new DoubleArrIntWritable();
			IntWritable dKey = new IntWritable();
			for (Object user : list) {
				if(!checkUser(user)){
					continue; // 不符合規則 
				}
				dVal.setValue(getDoubleArr(user),-1);
				dKey.set(getIntVal(user));
				writer.append(dKey, dVal);// 用戶id,<type,用戶的有效向量 >// 後面執行分類的時候需要統一格式,因此這裏需要反過來
				recordNum++;
			}
		} catch (IOException e) {
			Utils.simpleLog("db2HDFS失敗,+hdfs file:"+path.toString());
			e.printStackTrace();
			flag =false;
			throw e;
		} finally {
			IOUtils.closeStream(writer);
		}
		flag=true;
		Utils.simpleLog("db2HDFS 完畢,hdfs file:"+path.toString()+",records:"+recordNum);
		return flag;
	}
生成文件個數便是HDFS中文件的個數。
6. 距離計算
距離計算即計算每個用戶直接的距離。用法即便用兩次循環遍歷文件,只是這裏一共同擁有N*(N-1)/2個輸出,因爲針對外層用戶ID大於內層用戶ID的記錄,不進行輸出。這裏使用MR進行。

Mapper的map函數:輸出的key-value對是<DoubleWritable,<int,int>>--><距離,<用戶i的ID。用戶j的ID>>。且用戶i的ID<用戶j的ID;
public void map(IntWritable key,DoubleArrIntWritable  value,Context cxt)throws InterruptedException,IOException{
		cxt.getCounter(FilterCounter.MAP_COUNTER).increment(1L);
		if(cxt.getCounter(FilterCounter.MAP_COUNTER).getValue()%3000==0){
			log.info("Map處理了{}條記錄...",cxt.getCounter(FilterCounter.MAP_COUNTER).getValue());
			log.info("Map生成了{}條記錄...",cxt.getCounter(FilterCounter.MAP_OUT_COUNTER).getValue());
		}
		Configuration conf = cxt.getConfiguration();
		SequenceFile.Reader reader = null;
		FileStatus[] fss=input.getFileSystem(conf).listStatus(input);
		for(FileStatus f:fss){
			if(!f.toString().contains("part")){
				continue; // 排除其它文件
			}
			try {
				reader = new SequenceFile.Reader(conf, Reader.file(f.getPath()),
						Reader.bufferSize(4096), Reader.start(0));
				IntWritable dKey = (IntWritable) ReflectionUtils.newInstance(
						reader.getKeyClass(), conf);
				DoubleArrIntWritable dVal = (DoubleArrIntWritable) ReflectionUtils.newInstance(
						reader.getValueClass(), conf);
	
				while (reader.next(dKey, dVal)) {// 循環讀取文件
					// 當前IntWritable需要小於給定的dKey
					if(key.get()<dKey.get()){
						cxt.getCounter(FilterCounter.MAP_OUT_COUNTER).increment(1L);
						double dis= HUtils.getDistance(value.getDoubleArr(), dVal.getDoubleArr());
						newKey.set(dis);
						newValue.setValue(key.get(), dKey.get());
						cxt.write(newKey, newValue);
					}

				}
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				IOUtils.closeStream(reader);
			}
		}
	}
Reducer的reduce函數直接輸出:
public void reduce(DoubleWritable key,Iterable<IntPairWritable> values,Context cxt)throws InterruptedException,IOException{
		for(IntPairWritable v:values){
			cxt.getCounter(FilterCounter.REDUCE_COUNTER).increment(1);
			cxt.write(key, v);
		}
	}

6. 最佳DC
最佳DC是在」聚類算法「-->」執行聚類「時使用的參數,詳細可以參考Clustering by fast search and find of density peaks相關論文。

在尋找最佳DC時是把所有距離依照從大到小進行排序。而後順序遍歷這些距離,取前面的2%左右的數據。這裏排序因爲在」計算距離「MR任務時,已經利用其Map->reduce的排序性就能夠。其距離已經依照距離的大小從小到大排序了,因此僅僅需遍歷就能夠,這裏使用直接遍歷序列文件的方式。例如如下:
/**
	 * 依據給定的閾值百分比返回閾值
	 * 
	 * @param percent
	 *            通常爲1~2%
	 * @return
	 */
	public static double findInitDC(double percent, String path,long iNPUT_RECORDS2) {
		Path input = null;
		if (path == null) {
			input = new Path(HUtils.getHDFSPath(HUtils.FILTER_CALDISTANCE
					+ "/part-r-00000"));
		} else {
			input = new Path(HUtils.getHDFSPath(path + "/part-r-00000"));
		}
		Configuration conf = HUtils.getConf();
		SequenceFile.Reader reader = null;
		long counter = 0;
		long percent_ = (long) (percent * iNPUT_RECORDS2);
		try {
			reader = new SequenceFile.Reader(conf, Reader.file(input),
					Reader.bufferSize(4096), Reader.start(0));
			DoubleWritable dkey = (DoubleWritable) ReflectionUtils.newInstance(
					reader.getKeyClass(), conf);
			Writable dvalue = (Writable) ReflectionUtils.newInstance(
					reader.getValueClass(), conf);
			while (reader.next(dkey, dvalue)) {// 循環讀取文件
				counter++;
				if(counter%1000==0){
					Utils.simpleLog("讀取了"+counter+"條記錄。。

。"); } if (counter >= percent_) { HUtils.DELTA_DC = dkey.get();// 賦予最佳DC閾值 break; } } } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeStream(reader); } return HUtils.DELTA_DC; } 算法


這裏需要說明一下,通過試驗。發現使用距離閾值29.4時,聚類的決策圖中的聚類中心向量並不是十分明顯,因此在如下使用的閾值是100;

4.3 聚類算法

1. 執行聚類
執行聚類包含三個MR任務:局部密度MR、最小距離MR以及排序MR任務:


1)局部密度MR
局部密度計算使用的輸入文件便是前面計算的距離文件,其MR數據流例如如下:
/**
 * Find the local density of every point vector
 * 
 * 輸入爲 <key,value>--> <distance,<id_i,id_j>>
 *  <距離,<向量i編號,向量j編號>>
 *  
 *  Mapper:
 *  輸出向量i編號,1
 *      向量j編號,1
 *  Reducer:
 *  輸出
 *     向量i編號,局部密度
 *     有些向量是沒有局部密度的。當某個向量距離其它點的距離所有都大於給定閾值dc時就會發生
 * @author fansy
 * @date 2015-7-3
 */
Mapper的邏輯例如如下:
 
/**
 * 輸入爲<距離d_ij,<向量i編號,向量j編號>>
 * 依據距離dc閾值推斷距離d_ij是否小於dc,符合要求則
 * 輸出
 * 向量i編號。1
 * 向量j編號,1
 * @author fansy
 * @date 2015-7-3
 */
map函數:
public void map(DoubleWritable key,IntPairWritable value,Context cxt)throws InterruptedException,IOException{
		double distance= key.get();
		
		if(method.equals("gaussian")){
            one.set(Math.pow(Math.E, -(distance/dc)*(distance/dc)));
        }
		
		if(distance<dc){
			vectorId.set(value.getFirst());
			cxt.write(vectorId, one);
			vectorId.set(value.getSecond());
			cxt.write(vectorId, one);
		}
	}
這裏的密度有兩種計算方式。依據前臺傳入的參數選擇不一樣的算法就能夠,這裏默認使用的cut-off。即局部密度有一個點則局部密度加1。
reducer中的reduce邏輯即把一樣的點的局部密度所有加起來就能夠:
public void reduce(IntWritable key, Iterable<DoubleWritable> values,Context cxt)
	throws IOException,InterruptedException{
		double sum =0;
		for(DoubleWritable v:values){
			sum+=v.get();
		}
		sumAll.set(sum);// 
		cxt.write(key, sumAll);
		Utils.simpleLog("vectorI:"+key.get()+",density:"+sumAll);
	}
2)最小距離MR
最小距離MR邏輯例如如下:
/**
 * find delta distance of every point
 * 尋找大於自身密度的最小其它向量的距離
 * mapper輸入:
 * 輸入爲<距離d_ij,<向量i編號,向量j編號>>
 * 把LocalDensityJob的輸出
 * 		i,density_i
 * 放入一個map中,用於在mapper中進行推斷兩個局部密度的大小以決定是否輸出
 * mapper輸出:
 *      i,<density_i,min_distance_j>
 *      IntWritable,DoublePairWritable
 * reducer 輸出:
 * 		<density_i*min_distancd_j> <density_i,min_distance_j,i>
 * 		DoubleWritable,  IntDoublePairWritable
 * @author fansy
 * @date 2015-7-3
 */
這裏reducer輸出爲每個點(即每個用戶)局部密度和最小距離的乘積,一種方式尋找聚類中心個數的方法就是把這個乘積從大到小排序,並把這些點畫折線圖,看其斜率變化最大的點,取前面點的個數即爲聚類中心個數。
3)排序MR
排序MR即把2)的局部密度和最小距離的乘積進行排序,這裏可以利用map-reduce的排序性,本身定義一個Writable,而後讓其依照值的大小從大到小排序。
/**
 * 
 */
package com.fz.fastcluster.keytype;

/**
 * 本身定義DoubleWritable
 * 改動其排序方式,
 * 從大到小排列
 * @author fansy
 * @date 2015-7-3
 */

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* Writable for Double values.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class CustomDoubleWritable implements WritableComparable<CustomDoubleWritable> {

 private double value = 0.0;
 
 public CustomDoubleWritable() {
   
 }
 
 public CustomDoubleWritable(double value) {
   set(value);
 }
 
 @Override
 public void readFields(DataInput in) throws IOException {
   value = in.readDouble();
 }

 @Override
 public void write(DataOutput out) throws IOException {
   out.writeDouble(value);
 }
 
 public void set(double value) { this.value = value; }
 
 public double get() { return value; }

 /**
  * Returns true iff <code>o</code> is a DoubleWritable with the same value.
  */
 @Override
 public boolean equals(Object o) {
   if (!(o instanceof CustomDoubleWritable)) {
     return false;
   }
   CustomDoubleWritable other = (CustomDoubleWritable)o;
   return this.value == other.value;
 }
 
 @Override
 public int hashCode() {
   return (int)Double.doubleToLongBits(value);
 }
 
 @Override
 public int compareTo(CustomDoubleWritable o) {// 改動這裏就能夠
   return (value < o.value ?

1 : (value == o.value ? 0 : -1)); } @Override public String toString() { return Double.toString(value); } /** A Comparator optimized for DoubleWritable. */ public static class Comparator extends WritableComparator { public Comparator() { super(CustomDoubleWritable.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { double thisValue = readDouble(b1, s1); double thatValue = readDouble(b2, s2); return (thisValue < thatValue ?

1 : (thisValue == thatValue ?

0 : -1)); } } static { // register this comparator WritableComparator.define(CustomDoubleWritable.class, new Comparator()); } }


2. 畫決策圖
畫決策圖,直接解析雲平臺的排序MR的輸出,而後取前面的500條記錄(前面500條記錄包含的局部密度和最小距離的乘積的最大的500個,後面的點更不可能成爲聚類中心點,因此這裏僅僅取500個,同一時候需要注意,假設前面設置排序MR的reducer個數大於一個,那麼其輸出爲多個文件,則這裏是取每個文件的前面500個向量)


依次點擊繪圖,展現決策圖,就能夠看到畫出的決策圖:

聚類中心應該是取右上角位置的點。因此這裏選擇去點密度大於50,點距離大於50的點。這裏有3個。加上沒有畫出來的局部密度最大的點。一共同擁有4個聚類中心向量。

3. 尋找聚類中心
尋找聚類中心就是依據前面決策圖獲得的點密度和點距離閾值來過濾排序MR的輸出,獲得符合要求的用戶ID,這些用戶ID便是聚類中心向量的ID。接着,依據這些ID在數據庫中找到每個用戶ID相應的有效向量(reputation。upVotes,downVotes。views)寫入HDFS和本地文件。

寫入HDFS是爲了做爲分類的中心點,寫入本地是爲了後面查看的方便。


代碼例如如下:
/**
	 * 依據給定的閾值尋找聚類中心向量,並寫入hdfs
	 * 非MR任務,不需要監控。注意返回值
	 */
	public void center2hdfs(){
		// localfile:method
		// 1. 讀取SortJob的輸出,獲取前面k條記錄中的大於局部密度和最小距離閾值的id;
		// 2. 依據id,找到每個id相應的記錄;
		// 3. 把記錄轉爲double[] 。
		// 4. 把向量寫入hdfs
		// 5. 把向量寫入本地文件裏,方便後面的查看
		Map<String,Object> retMap=new HashMap<String,Object>();
		
		Map<Object,Object> firstK =null;
		List<Integer> ids= null;
		List<UserData> users=null;
		try{
		firstK=HUtils.readSeq(input==null?HUtils.SORTOUTPUT+"/part-r-00000":input,
				100);// 這裏默認使用	前100條記錄
		ids=HUtils.getCentIds(firstK,numReducerDensity,numReducerDistance);
		// 2
		users = dBService.getTableData("UserData",ids);
		Utils.simpleLog("聚類中心向量有"+users.size()+"個!");
		// 3,4,5
		HUtils.writecenter2hdfs(users,method,output);	
		}catch(Exception e){
			e.printStackTrace();
			retMap.put("flag", "false");
			retMap.put("msg", e.getMessage());
			Utils.write2PrintWriter(JSON.toJSONString(retMap));
			return ;
		}
		retMap.put("flag", "true");
		Utils.write2PrintWriter(JSON.toJSONString(retMap));
		return ;
	}
寫入HDFS和本地的聚類中心例如如下:


4. 執行分類
4.1 執行分類的思路爲:
1)聚類中心向量已經寫入到_center/iter_0/clustered/part-m-00000中;接着,拷貝原始用戶向量(即「DB過濾到HDFS」的輸出)到_center/iter_0/unclustered/
2)執行第一次分類。使用Mapper就能夠,mapper邏輯爲讀取_center/iter_0/unclustered/裏面的所有文件的每一行。針對每一行A,讀取_center/iter_0/clustered/裏面所有的數據,循環推斷這些向量和A的距離,找到和A的距離最小的距離(同一時候這個距離需要知足大於給定的閾值),並記錄這個距離相應向量的類型type,那麼就可以輸出向量A和類型type,那向量A就已經被分類了,分類後的數據寫入到_center/iter_1/clustered裏面;假設沒有找到最小距離(即所有的距離都大於給定的閾值),那麼向量A就是沒有被分類的,那麼把數據寫入到_center/iter_1/unclustered裏面。
3)在2)中的mapper中需要記錄分類數據和未分類數據的記錄數。這樣在MR任務執行完畢後。就能夠依據這兩個數值來推斷是否需要進行下次循環。假設這兩個數值都是零,那麼就退出循環。不然進行下一步。
4)在第i次循環(i>=2)時,使用_center/iter_(i-1)/unclustered裏面的數據做爲輸入,針對這個輸入的每一行向量A,遍歷_center/iter_1/clustered ~ _center/iter_(i-1)/clustered,使用2)中的方式對A進行分類。假設完畢分類。那麼就把數據寫入到_center/iter_i/clustered,不然寫入到_center/iter_i/unclustered裏面。
5)依據第i次MR任務記錄的Clustered和Unclustered的值來推斷是否進行下次循環。不用則退出循環。不然繼續循環進入4);
map函數代碼:
public void map(IntWritable key,DoubleArrIntWritable  value,Context cxt){
		double[] inputI= value.getDoubleArr();
		
		// hdfs
		Configuration conf = cxt.getConfiguration();
		FileSystem fs = null;
		Path path = null;
		
		SequenceFile.Reader reader = null;
		try {
			fs = FileSystem.get(conf);
			// read all before center files 
			String parentFolder =null;
			double smallDistance = Double.MAX_VALUE;
			int smallDistanceType=-1;
			double distance;
			
			// if iter_i !=0,then start i with 1,else start with 0
			for(int i=start;i<iter_i;i++){// all files are clustered points
				
				parentFolder=HUtils.CENTERPATH+"/iter_"+i+"/clustered";
				RemoteIterator<LocatedFileStatus> files=fs.listFiles(new Path(parentFolder), false);
				
				while(files.hasNext()){
					path = files.next().getPath();
					if(!path.toString().contains("part")){
						continue; // return 
					}
					reader = new SequenceFile.Reader(conf, Reader.file(path),
							Reader.bufferSize(4096), Reader.start(0));
					IntWritable dkey = (IntWritable) ReflectionUtils.newInstance(
							reader.getKeyClass(), conf);
					DoubleArrIntWritable dvalue = (DoubleArrIntWritable) ReflectionUtils.newInstance(
							reader.getValueClass(), conf);
					while (reader.next(dkey, dvalue)) {// read file literally
						distance = HUtils.getDistance(inputI, dvalue.getDoubleArr());
					
						if(distance>dc){// not count the farest point
							continue;
						}
						// 這裏僅僅要找到離的近期的點並且其distance<=dc 就能夠。把這個點的type賦值給當前值就能夠
						if(distance<smallDistance){
							smallDistance=distance;
							smallDistanceType=dvalue.getIdentifier();
						}
						
					}// while
				}// while
			}// for
					
			vectorI.set(key.get());// 用戶id
			typeDoubleArr.setValue(inputI,smallDistanceType);
			
			if(smallDistanceType!=-1){
				log.info("clustered-->vectorI:{},typeDoubleArr:{}",new Object[]{vectorI,typeDoubleArr.toString()});
				cxt.getCounter(ClusterCounter.CLUSTERED).increment(1);
				out.write("clustered", vectorI, typeDoubleArr,"clustered/part");	
			}else{
				log.info("unclustered---->vectorI:{},typeDoubleArr:{}",new Object[]{vectorI,typeDoubleArr.toString()});
				cxt.getCounter(ClusterCounter.UNCLUSTERED).increment(1);
				out.write("unclustered", vectorI, typeDoubleArr,"unclustered/part");
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(reader);
		}

	}


4.2 執行分類的閾值設置
每次循環執行分類時,閾值都是變化的,這裏採取的方式是:
1. 計算聚類中心向量兩兩之間的距離。並依照距離排序,從小到大,每次循環取出距離的一半當作閾值,一直取到最後一個距離;
2. 當進行到K*(K-1)/2個距離時。即最後一個距離(K個聚類中心向量)後。下次循環的閾值設置爲當前閾值翻倍。即乘以2;並計數,當再循環k次後,此閾值將再也不變化。
3. 這樣設置可以下降誤判。同一時候控制循環的次數;
public void run() {
		input=input==null?HUtils.FILTER_PREPAREVECTORS:input;
		
		// 刪除iter_i(i>0)的所有文件
		try {
			HUtils.clearCenter((output==null?

HUtils.CENTERPATH:output)); } catch (FileNotFoundException e2) { e2.printStackTrace(); } catch (IOException e2) { e2.printStackTrace(); } output=output==null?HUtils.CENTERPATHPREFIX:output+"/iter_"; // 加一個操做。把/user/root/preparevectors裏面的數據複製到/user/root/_center/iter_0/unclustered裏面 HUtils.copy(input,output+"0/unclustered"); try { Thread.sleep(200);// 暫停200ms } catch (InterruptedException e1) { e1.printStackTrace(); } // 求解dc的閾值。這裏的dc不用傳入進來就能夠,即delta的值 // 閾值問題可以在討論,這裏臨時使用傳進來的閾值就能夠 // double dc =dcs[0]; // 讀取聚類中心文件 Map<Object,Object> vectorsMap= HUtils.readSeq(output+"0/clustered/part-m-00000", Integer.parseInt(k)); double[][] vectors = HUtils.getCenterVector(vectorsMap); double[] distances= Utils.getDistances(vectors); // 這裏不使用傳入進來的閾值 int iter_i=0; int ret=0; double tmpDelta=0; int kInt = Integer.parseInt(k); try { do{ if(iter_i>=distances.length){ // delta= String.valueOf(distances[distances.length-1]/2); // 這裏使用什麼方式尚未想好。。。 // 使用如下的方式 tmpDelta=Double.parseDouble(delta); while(kInt-->0){// 超過k次後就再也不增大 tmpDelta*=2;// 每次翻倍 } delta=String.valueOf(tmpDelta); }else{ delta=String.valueOf(distances[iter_i]/2); } log.info("this is the {} iteration,with dc:{}",new Object[]{iter_i,delta}); String[] ar={ HUtils.getHDFSPath(output)+iter_i+"/unclustered", HUtils.getHDFSPath(output)+(iter_i+1),//output //HUtils.getHDFSPath(HUtils.CENTERPATHPREFIX)+iter_i+"/clustered/part-m-00000",//center file k, delta, String.valueOf((iter_i+1)) }; try{ ret = ToolRunner.run(HUtils.getConf(), new ClusterDataJob(), ar); if(ret!=0){ log.info("ClusterDataJob failed, with iteration {}",new Object[]{iter_i}); break; } }catch(Exception e){ e.printStackTrace(); } iter_i++; HUtils.JOBNUM++;// 每次循環後加1 }while(shouldRunNextIter()); } catch (IllegalArgumentException e) { e.printStackTrace(); } if(ret==0){ log.info("All cluster Job finished with iteration {}",new Object[]{iter_i}); } }



4.3 執行分類的監控思路
執行監控仍是使用以前的代碼,但是這裏的MR任務個數一開始並不能直接肯定,那就不能控制監控循環結束的時間。因此這裏需要進行改動。這裏在MR任務循環完畢以後,設置JOBNUM的值來控制監控任務的結束。並且一開始設置JOBNUM爲2。這樣在一開始的MR執行結束後就會進行下一次監控循環(這裏有個假設就是監控不會僅僅有一次),並且在MR任務每次結束後JOBNUM的值需要遞增1:
public void runCluster2(){
		Map<String ,Object> map = new HashMap<String,Object>();
		try {
			//提交一個Hadoop MR任務的基本流程
			// 1. 設置提交時間閾值,並設置這組job的個數
			//使用當前時間就能夠,當前時間往前10s。以防server和雲平臺時間相差
			HUtils.setJobStartTime(System.currentTimeMillis()-10000);// 
			// 因爲不知道循環多少次完畢,因此這裏設置爲2,每次循環都遞增1
			// 當所有循環完畢的時候,就該值減去2就能夠中止監控部分的循環
			HUtils.JOBNUM=2;
			
			// 2. 使用Thread的方式啓動一組MR任務
			new Thread(new RunCluster2(input, output,delta, record)).start();
			// 3. 啓動成功後。直接返回到監控,同一時候監控定時向後臺獲取數據,並在前臺展現;
			
			map.put("flag", "true");
			map.put("monitor", "true");
		} catch (Exception e) {
			e.printStackTrace();
			map.put("flag", "false");
			map.put("monitor", "false");
			map.put("msg", e.getMessage());
		}
		Utils.write2PrintWriter(JSON.toJSONString(map));
	}
在MR任務循環結束後,又一次設置JOBNUM的值就能夠控制監控的循環中止:
/**
	 * 是否應該繼續下次循環
	 * 直接使用分類記錄數和未分類記錄數來推斷
	 * @throws IOException 
	 * @throws IllegalArgumentException 
	 */
	private boolean shouldRunNextIter()  {
		
		if(HUtils.UNCLUSTERED==0||HUtils.CLUSTERED==0){
			HUtils.JOBNUM-=2;// 不用監控 則減去2;
			return false;
		}
		return true;
		
	}
執行分類頁面:
這裏距離閾值是沒實用的,後臺直接使用上面的算法獲得;循環完畢監控界面的終於閾值,例如如下所看到的:


4.4 聚類中心及推薦

1. 組別入庫
組別入庫。便是把_center/iter_i/clustered裏面的數據解析導入數據庫中。導入數據庫仍是使用上面的批插入操做:
/**
	 * 把分類的數據解析到list裏面
	 * @param path
	 * @return
	 */
	private static Collection<? extends UserGroup> resolve(Path path) {
		// TODO Auto-generated method stub
		List<UserGroup> list = new ArrayList<UserGroup>();
		Configuration conf = HUtils.getConf();
		SequenceFile.Reader reader = null;
		int i=0;
		try {
			reader = new SequenceFile.Reader(conf, Reader.file(path),
					Reader.bufferSize(4096), Reader.start(0));
			IntWritable dkey =  (IntWritable) ReflectionUtils
					.newInstance(reader.getKeyClass(), conf);
			DoubleArrIntWritable dvalue =  (DoubleArrIntWritable) ReflectionUtils
					.newInstance(reader.getValueClass(), conf);

			while (reader.next(dkey, dvalue)) {// 循環讀取文件
				// 使用這個進行克隆
				list.add(new UserGroup(i++,dkey.get(),dvalue.getIdentifier()));
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(reader);
		}
		Utils.simpleLog("讀取"+list.size()+"條記錄。文件:"+path.toString());
		return list;
	}

2. 聚類中心及佔比
聚類中心及佔比直接使用數據庫中的數據進行統計的,即統計1.中的分類數據每個類別的總記錄數,而後再進行計算。聚類中心即直接讀取以前寫入本地的聚類中心向量文件就能夠。

3. 用戶查詢及推薦
用戶查詢及推薦即便用用戶組內的用戶來進行推薦。

依據給定的ID來查詢該用戶的分組,假設有分組,那麼就查詢出該分組內的用戶,展現到前臺:



5. 總結

1. 原始數據通過去重、過濾後,僅剩下541條記錄。即對541條記錄進行聚類,不算大數據處理。
2. 上面的Hadoop實現的聚類算法,可以使用大數據來測試下,看下效果;
3. 聚類算法在計算兩兩之間的距離時隨着文件的增大,其耗時增加很是快O(N^2);
4. 使用組內的用戶來對用戶直接進行推薦的方式有待商榷,可以考慮在組內使用其它方式來過濾用戶(比方依據地理位置等信息,該信息在原始數據中是有的),再次推薦;
5. 本項目僅供學習參考,重心在技術和處理方式以及實現方式上;


分享,成長,快樂

腳踏實地,專一

轉載請註明blog地址:http://blog.csdn.net/fansy1990

相關文章
相關標籤/搜索