項目使用軟件:Myeclipse10.0,JDK1.7,Hadoop2.6,MySQL5.6。EasyUI1.3.6。jQuery2.0,Spring4.1.3。javascript
Hibernate4.3.1,struts2.3.1。Tomcat7 。Maven3.2.1。java
項目下載地址:https://github.com/fansy1990/friend_find ,項目部署參考:http://blog.csdn.net/fansy1990/article/details/46481409 。node
用戶填寫本身的信息後就可以在本系統內找到和本身志同道合的朋友。本系統使用的是在http://stackoverflow.com/站點上的用戶數據。Stack Overflow是一個程序設計領域的問答站點,隸屬Stack Exchange Network。站點贊成注冊用戶提出或回答問題。還贊成對已有問題或答案加分、扣分或進行改動,條件是用戶達到必定的「聲望值」。「聲望值」就是用戶進行站點交互時能獲取的分數。當聲望值達到某個程度時,用戶的權限就會添加,比方聲望值超過50點就可以評論答案。當用戶的聲望值達到某個階段時,站點還會給用戶頒發貢獻徽章。以此來激勵用戶對站點作出貢獻。該項目創建在如下的假設基礎上,假設用戶對於一個領域問題的「態度」就可以反映出該用戶的價值取向,並依據此價值取向來對用戶進行聚類分組。mysql
這裏的態度可以使用幾個指標屬性來評判,在本系統中原始數據(即用戶信息數據)包含的屬性有多個,從中挑選出最能符合用戶觀點的屬性。做爲該用戶的「態度」進行分析。這裏挑選的屬性是:reputation,upVotes。downVotes,views。即便用這4個屬性來對用戶進行聚類。同一時候,這裏使用MR實現的Clustering by fast search and find of density peaks聚類算法,這裏的實現和http://blog.csdn.net/fansy1990/article/details/46364697這裏的實現原始是不一樣的。linux
使用上面的假設後,對原始數據進行分析(這裏是所有導入到數據庫後發現的),發現EmailHash是有反覆記錄的。因此這裏需要對數據進行預處理--去重。git
public interface ObjectInterface { /** * 不用每個表都創建一個方法,這裏依據表名本身主動裝配 * @param map * @return */ public Object setObjectByMap(Map<String,Object> map); }
/** * 更新或者插入表 * 不用每個表都創建一個方法,這裏依據表名本身主動裝配 * @param tableName * @param json * @return */ public boolean updateOrSave(String tableName,String json){ try{ // 依據表名得到實體類。並賦值 Object o = Utils.getEntity(Utils.getEntityPackages(tableName),json); baseDao.saveOrUpdate(o); log.info("保存表{}。",new Object[]{tableName}); }catch(Exception e){ e.printStackTrace(); return false; } return true; }
/** * 依據類名得到實體類 * @param tableName * @param json * @return * @throws ClassNotFoundException * @throws IllegalAccessException * @throws InstantiationException * @throws IOException * @throws JsonMappingException * @throws JsonParseException */ @SuppressWarnings("unchecked") public static Object getEntity(String tableName, String json) throws ClassNotFoundException, InstantiationException, IllegalAccessException, JsonParseException, JsonMappingException, IOException { Class<?> cl = Class.forName(tableName); ObjectInterface o = (ObjectInterface)cl.newInstance(); Map<String,Object> map = new HashMap<String,Object>(); ObjectMapper mapper = new ObjectMapper(); try { //convert JSON string to Map map = mapper.readValue(json, Map.class); return o.setObjectByMap(map); } catch (Exception e) { e.printStackTrace(); } return null; }github
上傳直接使用FileSystem的靜態方法下載,例如如下代碼():web
fs.copyFromLocalFile(src, dst);
// =====uploadId,數據上傳button綁定 click方法 $('#uploadId').bind('click', function(){ var input_i=$('#localFileId').val(); // 彈出進度框 popupProgressbar('數據上傳','數據上傳中...',1000); // ajax 異步提交任務 callByAJax('cloud/cloud_upload.action',{input:input_i}); });當中調用aJax使用一個封裝的方法。之後都可以調用,例如如下:
// 調用ajax異步提交 // 任務返回成功。則提示成功。不然提示失敗的信息 function callByAJax(url,data_){ $.ajax({ url : url, data: data_, async:true, dataType:"json", context : document.body, success : function(data) { // $.messager.progress('close'); closeProgressbar(); console.info("data.flag:"+data.flag); var retMsg; if("true"==data.flag){ retMsg='操做成功。'; }else{ retMsg='操做失敗。失敗緣由:'+data.msg; } $.messager.show({ title : '提示', msg : retMsg }); if("true"==data.flag&&"true"==data.monitor){// 加入監控頁面 // 使用單獨Tab的方式 layout_center_addTabFun({ title : 'MR算法監控', closable : true, // iconCls : node.iconCls, href : 'cluster/monitor_one.jsp' }); } } }); }後臺返回的是json數據。並且這裏爲了和雲平臺監控任務兼容(考慮通用性),這裏還加入了一個打開監控的代碼。
/** * 去重任務提交 */ public void deduplicate(){ Map<String ,Object> map = new HashMap<String,Object>(); try{ HUtils.setJobStartTime(System.currentTimeMillis()-10000); HUtils.JOBNUM=1; new Thread(new Deduplicate(input,output)).start(); map.put("flag", "true"); map.put("monitor", "true"); } catch (Exception e) { e.printStackTrace(); map.put("flag", "false"); map.put("monitor", "false"); map.put("msg", e.getMessage()); } Utils.write2PrintWriter(JSON.toJSONString(map)); }首先設置所有任務的起始時間,這裏往前推遲了10s。是爲了防止時間相差太大(也可以設置2s左右。假設tomcat所在機器和集羣機器時間同樣則不用設置);接着設置任務的總個數;最後啓動多線程執行MR任務。
<script type="text/javascript"> // 本身主動定時刷新 1s var monitor_cf_interval= setInterval("monitor_one_refresh()",3000); </script>
function monitor_one_refresh(){ $.ajax({ // ajax提交 url : 'cloud/cloud_monitorone.action', dataType : "json", success : function(data) { if (data.finished == 'error') {// 獲取信息錯誤 。返回數據設置爲0。不然正常返回 clearInterval(monitor_cf_interval); setJobInfoValues(data); console.info("monitor,finished:"+data.finished); $.messager.show({ title : '提示', msg : '任務執行失敗。' }); } else if(data.finished == 'true'){ // 所有任務執行成功則中止timer console.info('monitor,data.finished='+data.finished); setJobInfoValues(data); clearInterval(monitor_cf_interval); $.messager.show({ title : '提示', msg : '所有任務成功執行完畢!' }); }else{ // 設置提示,並更改頁面數據,多行顯示job任務信息 setJobInfoValues(data); } } }); }ajax
/** * 單個任務監控 * @throws IOException */ public void monitorone() throws IOException{ Map<String ,Object> jsonMap = new HashMap<String,Object>(); List<CurrentJobInfo> currJobList =null; try{ currJobList= HUtils.getJobs(); // jsonMap.put("rows", currJobList);// 放入數據 jsonMap.put("jobnums", HUtils.JOBNUM); // 任務完畢的標識是獲取的任務個數必須等於jobNum,同一時候最後一個job完畢 // true 所有任務完畢 // false 任務正在執行 // error 某一個任務執行失敗。則再也不監控 if(currJobList.size()>=HUtils.JOBNUM){// 假設返回的list有JOBNUM個。那麼纔可能完畢任務 if("success".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))){ jsonMap.put("finished", "true"); // 執行完畢。初始化時間點 HUtils.setJobStartTime(System.currentTimeMillis()); }else if("running".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))){ jsonMap.put("finished", "false"); }else{// fail 或者kill則設置爲error jsonMap.put("finished", "error"); HUtils.setJobStartTime(System.currentTimeMillis()); } }else if(currJobList.size()>0){ if("fail".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))|| "kill".equals(HUtils.hasFinished(currJobList.get(currJobList.size()-1)))){ jsonMap.put("finished", "error"); HUtils.setJobStartTime(System.currentTimeMillis()); }else{ jsonMap.put("finished", "false"); } } if(currJobList.size()==0){ jsonMap.put("finished", "false"); // return ; }else{ if(jsonMap.get("finished").equals("error")){ CurrentJobInfo cj =currJobList.get(currJobList.size()-1); cj.setRunState("Error!"); jsonMap.put("rows", cj); }else{ jsonMap.put("rows", currJobList.get(currJobList.size()-1)); } } jsonMap.put("currjob", currJobList.size()); }catch(Exception e){ e.printStackTrace(); jsonMap.put("finished", "error"); HUtils.setJobStartTime(System.currentTimeMillis()); } System.out.println(new java.util.Date()+":"+JSON.toJSONString(jsonMap)); Utils.write2PrintWriter(JSON.toJSONString(jsonMap));// 使用JSON傳輸數據 return ; }
/** * 依據時間來推斷,而後得到Job的狀態。以此來進行監控 Job的啓動時間和使用system.currentTimeMillis得到的時間是一致的。 * 不存在時區不一樣的問題; * * @return * @throws IOException */ public static List<CurrentJobInfo> getJobs() throws IOException { JobStatus[] jss = getJobClient().getAllJobs(); List<CurrentJobInfo> jsList = new ArrayList<CurrentJobInfo>(); jsList.clear(); for (JobStatus js : jss) { if (js.getStartTime() > jobStartTime) { jsList.add(new CurrentJobInfo(getJobClient().getJob( js.getJobID()), js.getStartTime(), js.getRunState())); } } Collections.sort(jsList); return jsList; }當有多個任務時,使用此監控也是可以的,僅僅用設置HUtils.JOBNUM的值就能夠。
fs.copyToLocalFile(false, file.getPath(), new Path(dst, "hdfs_" + (i++) + HUtils.DOWNLOAD_EXTENSION), true);4.數據入庫
/** * 批量插入xmlPath數據 * @param xmlPath * @return */ public Map<String,Object> insertUserData(String xmlPath){ Map<String,Object> map = new HashMap<String,Object>(); try{ baseDao.executeHql("delete UserData"); // if(!Utils.changeDat2Xml(xmlPath)){ // map.put("flag", "false"); // map.put("msg", "HDFS文件轉爲xml失敗"); // return map; // } // List<String[]> strings= Utils.parseXmlFolder2StrArr(xmlPath); // ---解析不使用xml解析,直接使用定製解析就能夠 //--- List<String[]>strings = Utils.parseDatFolder2StrArr(xmlPath); List<Object> uds = new ArrayList<Object>(); for(String[] s:strings){ uds.add(new UserData(s)); } int ret =baseDao.saveBatch(uds); log.info("用戶表批量插入了{}條記錄!",ret); }catch(Exception e){ e.printStackTrace(); map.put("flag", "false"); map.put("msg", e.getMessage()); return map; } map.put("flag", "true"); return map; }
public Integer saveBatch(List<Object> lists) { Session session = this.getCurrentSession(); // org.hibernate.Transaction tx = session.beginTransaction(); int i=0; try{ for ( Object l:lists) { i++; session.save(l); if( i % 50 == 0 ) { // Same as the JDBC batch size //flush a batch of inserts and release memory: session.flush(); session.clear(); if(i%1000==0){ System.out.println(new java.util.Date()+":已經預插入了"+i+"條記錄..."); } } }}catch(Exception e){ e.printStackTrace(); } // tx.commit(); // session.close(); Utils.simpleLog("插入數據數爲:"+i); return i; }
private static boolean db2hdfs(List<Object> list, Path path) throws IOException { boolean flag =false; int recordNum=0; SequenceFile.Writer writer = null; Configuration conf = getConf(); try { Option optPath = SequenceFile.Writer.file(path); Option optKey = SequenceFile.Writer .keyClass(IntWritable.class); Option optVal = SequenceFile.Writer.valueClass(DoubleArrIntWritable.class); writer = SequenceFile.createWriter(conf, optPath, optKey, optVal); DoubleArrIntWritable dVal = new DoubleArrIntWritable(); IntWritable dKey = new IntWritable(); for (Object user : list) { if(!checkUser(user)){ continue; // 不符合規則 } dVal.setValue(getDoubleArr(user),-1); dKey.set(getIntVal(user)); writer.append(dKey, dVal);// 用戶id,<type,用戶的有效向量 >// 後面執行分類的時候需要統一格式,因此這裏需要反過來 recordNum++; } } catch (IOException e) { Utils.simpleLog("db2HDFS失敗,+hdfs file:"+path.toString()); e.printStackTrace(); flag =false; throw e; } finally { IOUtils.closeStream(writer); } flag=true; Utils.simpleLog("db2HDFS 完畢,hdfs file:"+path.toString()+",records:"+recordNum); return flag; }
public void map(IntWritable key,DoubleArrIntWritable value,Context cxt)throws InterruptedException,IOException{ cxt.getCounter(FilterCounter.MAP_COUNTER).increment(1L); if(cxt.getCounter(FilterCounter.MAP_COUNTER).getValue()%3000==0){ log.info("Map處理了{}條記錄...",cxt.getCounter(FilterCounter.MAP_COUNTER).getValue()); log.info("Map生成了{}條記錄...",cxt.getCounter(FilterCounter.MAP_OUT_COUNTER).getValue()); } Configuration conf = cxt.getConfiguration(); SequenceFile.Reader reader = null; FileStatus[] fss=input.getFileSystem(conf).listStatus(input); for(FileStatus f:fss){ if(!f.toString().contains("part")){ continue; // 排除其它文件 } try { reader = new SequenceFile.Reader(conf, Reader.file(f.getPath()), Reader.bufferSize(4096), Reader.start(0)); IntWritable dKey = (IntWritable) ReflectionUtils.newInstance( reader.getKeyClass(), conf); DoubleArrIntWritable dVal = (DoubleArrIntWritable) ReflectionUtils.newInstance( reader.getValueClass(), conf); while (reader.next(dKey, dVal)) {// 循環讀取文件 // 當前IntWritable需要小於給定的dKey if(key.get()<dKey.get()){ cxt.getCounter(FilterCounter.MAP_OUT_COUNTER).increment(1L); double dis= HUtils.getDistance(value.getDoubleArr(), dVal.getDoubleArr()); newKey.set(dis); newValue.setValue(key.get(), dKey.get()); cxt.write(newKey, newValue); } } } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeStream(reader); } } }Reducer的reduce函數直接輸出:
public void reduce(DoubleWritable key,Iterable<IntPairWritable> values,Context cxt)throws InterruptedException,IOException{ for(IntPairWritable v:values){ cxt.getCounter(FilterCounter.REDUCE_COUNTER).increment(1); cxt.write(key, v); } }
/** * 依據給定的閾值百分比返回閾值 * * @param percent * 通常爲1~2% * @return */ public static double findInitDC(double percent, String path,long iNPUT_RECORDS2) { Path input = null; if (path == null) { input = new Path(HUtils.getHDFSPath(HUtils.FILTER_CALDISTANCE + "/part-r-00000")); } else { input = new Path(HUtils.getHDFSPath(path + "/part-r-00000")); } Configuration conf = HUtils.getConf(); SequenceFile.Reader reader = null; long counter = 0; long percent_ = (long) (percent * iNPUT_RECORDS2); try { reader = new SequenceFile.Reader(conf, Reader.file(input), Reader.bufferSize(4096), Reader.start(0)); DoubleWritable dkey = (DoubleWritable) ReflectionUtils.newInstance( reader.getKeyClass(), conf); Writable dvalue = (Writable) ReflectionUtils.newInstance( reader.getValueClass(), conf); while (reader.next(dkey, dvalue)) {// 循環讀取文件 counter++; if(counter%1000==0){ Utils.simpleLog("讀取了"+counter+"條記錄。。。"); } if (counter >= percent_) { HUtils.DELTA_DC = dkey.get();// 賦予最佳DC閾值 break; } } } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeStream(reader); } return HUtils.DELTA_DC; } 算法
/** * Find the local density of every point vector * * 輸入爲 <key,value>--> <distance,<id_i,id_j>> * <距離,<向量i編號,向量j編號>> * * Mapper: * 輸出向量i編號,1 * 向量j編號,1 * Reducer: * 輸出 * 向量i編號,局部密度 * 有些向量是沒有局部密度的。當某個向量距離其它點的距離所有都大於給定閾值dc時就會發生 * @author fansy * @date 2015-7-3 */Mapper的邏輯例如如下:
/** * 輸入爲<距離d_ij,<向量i編號,向量j編號>> * 依據距離dc閾值推斷距離d_ij是否小於dc,符合要求則 * 輸出 * 向量i編號。1 * 向量j編號,1 * @author fansy * @date 2015-7-3 */map函數:
public void map(DoubleWritable key,IntPairWritable value,Context cxt)throws InterruptedException,IOException{ double distance= key.get(); if(method.equals("gaussian")){ one.set(Math.pow(Math.E, -(distance/dc)*(distance/dc))); } if(distance<dc){ vectorId.set(value.getFirst()); cxt.write(vectorId, one); vectorId.set(value.getSecond()); cxt.write(vectorId, one); } }這裏的密度有兩種計算方式。依據前臺傳入的參數選擇不一樣的算法就能夠,這裏默認使用的cut-off。即局部密度有一個點則局部密度加1。
public void reduce(IntWritable key, Iterable<DoubleWritable> values,Context cxt) throws IOException,InterruptedException{ double sum =0; for(DoubleWritable v:values){ sum+=v.get(); } sumAll.set(sum);// cxt.write(key, sumAll); Utils.simpleLog("vectorI:"+key.get()+",density:"+sumAll); }2)最小距離MR
/** * find delta distance of every point * 尋找大於自身密度的最小其它向量的距離 * mapper輸入: * 輸入爲<距離d_ij,<向量i編號,向量j編號>> * 把LocalDensityJob的輸出 * i,density_i * 放入一個map中,用於在mapper中進行推斷兩個局部密度的大小以決定是否輸出 * mapper輸出: * i,<density_i,min_distance_j> * IntWritable,DoublePairWritable * reducer 輸出: * <density_i*min_distancd_j> <density_i,min_distance_j,i> * DoubleWritable, IntDoublePairWritable * @author fansy * @date 2015-7-3 */這裏reducer輸出爲每個點(即每個用戶)局部密度和最小距離的乘積,一種方式尋找聚類中心個數的方法就是把這個乘積從大到小排序,並把這些點畫折線圖,看其斜率變化最大的點,取前面點的個數即爲聚類中心個數。
/** * */ package com.fz.fastcluster.keytype; /** * 本身定義DoubleWritable * 改動其排序方式, * 從大到小排列 * @author fansy * @date 2015-7-3 */ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * Writable for Double values. */ @InterfaceAudience.Public @InterfaceStability.Stable public class CustomDoubleWritable implements WritableComparable<CustomDoubleWritable> { private double value = 0.0; public CustomDoubleWritable() { } public CustomDoubleWritable(double value) { set(value); } @Override public void readFields(DataInput in) throws IOException { value = in.readDouble(); } @Override public void write(DataOutput out) throws IOException { out.writeDouble(value); } public void set(double value) { this.value = value; } public double get() { return value; } /** * Returns true iff <code>o</code> is a DoubleWritable with the same value. */ @Override public boolean equals(Object o) { if (!(o instanceof CustomDoubleWritable)) { return false; } CustomDoubleWritable other = (CustomDoubleWritable)o; return this.value == other.value; } @Override public int hashCode() { return (int)Double.doubleToLongBits(value); } @Override public int compareTo(CustomDoubleWritable o) {// 改動這裏就能夠 return (value < o.value ?1 : (value == o.value ? 0 : -1)); } @Override public String toString() { return Double.toString(value); } /** A Comparator optimized for DoubleWritable. */ public static class Comparator extends WritableComparator { public Comparator() { super(CustomDoubleWritable.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { double thisValue = readDouble(b1, s1); double thatValue = readDouble(b2, s2); return (thisValue < thatValue ?
1 : (thisValue == thatValue ?
0 : -1)); } } static { // register this comparator WritableComparator.define(CustomDoubleWritable.class, new Comparator()); } }
寫入HDFS是爲了做爲分類的中心點,寫入本地是爲了後面查看的方便。
/** * 依據給定的閾值尋找聚類中心向量,並寫入hdfs * 非MR任務,不需要監控。注意返回值 */ public void center2hdfs(){ // localfile:method // 1. 讀取SortJob的輸出,獲取前面k條記錄中的大於局部密度和最小距離閾值的id; // 2. 依據id,找到每個id相應的記錄; // 3. 把記錄轉爲double[] 。 // 4. 把向量寫入hdfs // 5. 把向量寫入本地文件裏,方便後面的查看 Map<String,Object> retMap=new HashMap<String,Object>(); Map<Object,Object> firstK =null; List<Integer> ids= null; List<UserData> users=null; try{ firstK=HUtils.readSeq(input==null?HUtils.SORTOUTPUT+"/part-r-00000":input, 100);// 這裏默認使用 前100條記錄 ids=HUtils.getCentIds(firstK,numReducerDensity,numReducerDistance); // 2 users = dBService.getTableData("UserData",ids); Utils.simpleLog("聚類中心向量有"+users.size()+"個!"); // 3,4,5 HUtils.writecenter2hdfs(users,method,output); }catch(Exception e){ e.printStackTrace(); retMap.put("flag", "false"); retMap.put("msg", e.getMessage()); Utils.write2PrintWriter(JSON.toJSONString(retMap)); return ; } retMap.put("flag", "true"); Utils.write2PrintWriter(JSON.toJSONString(retMap)); return ; }寫入HDFS和本地的聚類中心例如如下:
public void map(IntWritable key,DoubleArrIntWritable value,Context cxt){ double[] inputI= value.getDoubleArr(); // hdfs Configuration conf = cxt.getConfiguration(); FileSystem fs = null; Path path = null; SequenceFile.Reader reader = null; try { fs = FileSystem.get(conf); // read all before center files String parentFolder =null; double smallDistance = Double.MAX_VALUE; int smallDistanceType=-1; double distance; // if iter_i !=0,then start i with 1,else start with 0 for(int i=start;i<iter_i;i++){// all files are clustered points parentFolder=HUtils.CENTERPATH+"/iter_"+i+"/clustered"; RemoteIterator<LocatedFileStatus> files=fs.listFiles(new Path(parentFolder), false); while(files.hasNext()){ path = files.next().getPath(); if(!path.toString().contains("part")){ continue; // return } reader = new SequenceFile.Reader(conf, Reader.file(path), Reader.bufferSize(4096), Reader.start(0)); IntWritable dkey = (IntWritable) ReflectionUtils.newInstance( reader.getKeyClass(), conf); DoubleArrIntWritable dvalue = (DoubleArrIntWritable) ReflectionUtils.newInstance( reader.getValueClass(), conf); while (reader.next(dkey, dvalue)) {// read file literally distance = HUtils.getDistance(inputI, dvalue.getDoubleArr()); if(distance>dc){// not count the farest point continue; } // 這裏僅僅要找到離的近期的點並且其distance<=dc 就能夠。把這個點的type賦值給當前值就能夠 if(distance<smallDistance){ smallDistance=distance; smallDistanceType=dvalue.getIdentifier(); } }// while }// while }// for vectorI.set(key.get());// 用戶id typeDoubleArr.setValue(inputI,smallDistanceType); if(smallDistanceType!=-1){ log.info("clustered-->vectorI:{},typeDoubleArr:{}",new Object[]{vectorI,typeDoubleArr.toString()}); cxt.getCounter(ClusterCounter.CLUSTERED).increment(1); out.write("clustered", vectorI, typeDoubleArr,"clustered/part"); }else{ log.info("unclustered---->vectorI:{},typeDoubleArr:{}",new Object[]{vectorI,typeDoubleArr.toString()}); cxt.getCounter(ClusterCounter.UNCLUSTERED).increment(1); out.write("unclustered", vectorI, typeDoubleArr,"unclustered/part"); } } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeStream(reader); } }
public void run() { input=input==null?HUtils.FILTER_PREPAREVECTORS:input; // 刪除iter_i(i>0)的所有文件 try { HUtils.clearCenter((output==null?HUtils.CENTERPATH:output)); } catch (FileNotFoundException e2) { e2.printStackTrace(); } catch (IOException e2) { e2.printStackTrace(); } output=output==null?HUtils.CENTERPATHPREFIX:output+"/iter_"; // 加一個操做。把/user/root/preparevectors裏面的數據複製到/user/root/_center/iter_0/unclustered裏面 HUtils.copy(input,output+"0/unclustered"); try { Thread.sleep(200);// 暫停200ms } catch (InterruptedException e1) { e1.printStackTrace(); } // 求解dc的閾值。這裏的dc不用傳入進來就能夠,即delta的值 // 閾值問題可以在討論,這裏臨時使用傳進來的閾值就能夠 // double dc =dcs[0]; // 讀取聚類中心文件 Map<Object,Object> vectorsMap= HUtils.readSeq(output+"0/clustered/part-m-00000", Integer.parseInt(k)); double[][] vectors = HUtils.getCenterVector(vectorsMap); double[] distances= Utils.getDistances(vectors); // 這裏不使用傳入進來的閾值 int iter_i=0; int ret=0; double tmpDelta=0; int kInt = Integer.parseInt(k); try { do{ if(iter_i>=distances.length){ // delta= String.valueOf(distances[distances.length-1]/2); // 這裏使用什麼方式尚未想好。。。 // 使用如下的方式 tmpDelta=Double.parseDouble(delta); while(kInt-->0){// 超過k次後就再也不增大 tmpDelta*=2;// 每次翻倍 } delta=String.valueOf(tmpDelta); }else{ delta=String.valueOf(distances[iter_i]/2); } log.info("this is the {} iteration,with dc:{}",new Object[]{iter_i,delta}); String[] ar={ HUtils.getHDFSPath(output)+iter_i+"/unclustered", HUtils.getHDFSPath(output)+(iter_i+1),//output //HUtils.getHDFSPath(HUtils.CENTERPATHPREFIX)+iter_i+"/clustered/part-m-00000",//center file k, delta, String.valueOf((iter_i+1)) }; try{ ret = ToolRunner.run(HUtils.getConf(), new ClusterDataJob(), ar); if(ret!=0){ log.info("ClusterDataJob failed, with iteration {}",new Object[]{iter_i}); break; } }catch(Exception e){ e.printStackTrace(); } iter_i++; HUtils.JOBNUM++;// 每次循環後加1 }while(shouldRunNextIter()); } catch (IllegalArgumentException e) { e.printStackTrace(); } if(ret==0){ log.info("All cluster Job finished with iteration {}",new Object[]{iter_i}); } }
public void runCluster2(){ Map<String ,Object> map = new HashMap<String,Object>(); try { //提交一個Hadoop MR任務的基本流程 // 1. 設置提交時間閾值,並設置這組job的個數 //使用當前時間就能夠,當前時間往前10s。以防server和雲平臺時間相差 HUtils.setJobStartTime(System.currentTimeMillis()-10000);// // 因爲不知道循環多少次完畢,因此這裏設置爲2,每次循環都遞增1 // 當所有循環完畢的時候,就該值減去2就能夠中止監控部分的循環 HUtils.JOBNUM=2; // 2. 使用Thread的方式啓動一組MR任務 new Thread(new RunCluster2(input, output,delta, record)).start(); // 3. 啓動成功後。直接返回到監控,同一時候監控定時向後臺獲取數據,並在前臺展現; map.put("flag", "true"); map.put("monitor", "true"); } catch (Exception e) { e.printStackTrace(); map.put("flag", "false"); map.put("monitor", "false"); map.put("msg", e.getMessage()); } Utils.write2PrintWriter(JSON.toJSONString(map)); }在MR任務循環結束後,又一次設置JOBNUM的值就能夠控制監控的循環中止:
/** * 是否應該繼續下次循環 * 直接使用分類記錄數和未分類記錄數來推斷 * @throws IOException * @throws IllegalArgumentException */ private boolean shouldRunNextIter() { if(HUtils.UNCLUSTERED==0||HUtils.CLUSTERED==0){ HUtils.JOBNUM-=2;// 不用監控 則減去2; return false; } return true; }執行分類頁面:
/** * 把分類的數據解析到list裏面 * @param path * @return */ private static Collection<? extends UserGroup> resolve(Path path) { // TODO Auto-generated method stub List<UserGroup> list = new ArrayList<UserGroup>(); Configuration conf = HUtils.getConf(); SequenceFile.Reader reader = null; int i=0; try { reader = new SequenceFile.Reader(conf, Reader.file(path), Reader.bufferSize(4096), Reader.start(0)); IntWritable dkey = (IntWritable) ReflectionUtils .newInstance(reader.getKeyClass(), conf); DoubleArrIntWritable dvalue = (DoubleArrIntWritable) ReflectionUtils .newInstance(reader.getValueClass(), conf); while (reader.next(dkey, dvalue)) {// 循環讀取文件 // 使用這個進行克隆 list.add(new UserGroup(i++,dkey.get(),dvalue.getIdentifier())); } } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeStream(reader); } Utils.simpleLog("讀取"+list.size()+"條記錄。文件:"+path.toString()); return list; }
依據給定的ID來查詢該用戶的分組,假設有分組,那麼就查詢出該分組內的用戶,展現到前臺: