項目開發使用到的軟件有:Myeclipse2014,JDK1.8。Hadoop2.6,MySQL5.6。EasyUI1.3.6,jQuery2.0,Spring4.1.3,Hibernate4.3.1。Struts2.3.1。Maven3.2.1,Mahout0.10。
項目下載地址:https://github.com/fansy1990/mahout1.0,項目部署參考:http://blog.csdn.net/fansy1990/article/details/46481409 。javascript
此項目是在Hadoop Web項目–Friend Find系統 基礎之上整理Mahout0.10版本號中MR程序的調用測試而成,重點演示怎樣調用Mahout0.10的MR算法、怎樣把MR算法嵌入到Web項目中,附帶數據生成及數據查看、MR 任務監控等功能。
Mahout0.10的MR算法主要參考如下的文件:
此文檔裏面含有了覆蓋常常使用工具類、聚類算法、分類算法、推薦算法等的MR調用mahout命令以及其相應的實現類。
此篇博客接下來將依照如下的內容進行編寫:html
mapreduce.app-submission.cross-platform=true
fs.defaultFS=hdfs://node101:8020
mapreduce.framework.name=yarn
yarn.resourcemanager.address=node101:8032
yarn.resourcemanager.scheduler.address=node101:8030
mapreduce.jobhistory.address=node101:10020java
項目組織架構:
node
頁面採用html+jQuery+easyUI開發,整個頁面使用easyUI的layout標籤,左邊導航欄使用easyUI的tree標籤,其數據使用json格式存儲在src\main\webapp\tree_data.json文件裏。mysql
針對某個頁面,其json配置例如如下:jquery
{
「id」:152,
「text」:」fkmeans+」,
「attributes」:{
「folder」:」0」,
「url」:」clustering/fuzzykmeans.jsp」
}
}linux
這樣在點擊左邊導航欄fkmeans+導航時。就能夠在右邊彈出clustering/fuzzykmeans.jsp頁面。其js代碼例如如下:git
$('#navid').tree({
onClick: function(node){
// alert(node.text+","+node.url); // alert node text property when clicked
console.info("click:"+node.text);
if(node.attributes.folder=='1'){
return ;
}
console.info("open url:"+node.attributes.url)
var url;
if (node.attributes.url) {
url = node.attributes.url;
} else {
url = '404.jsp';
}
console.info("open "+url);
layout_center_addTabFun({
title : node.text,
closable : true,
iconCls : node.iconCls,
href : url
});
}
});
當中 layout_center_addTabFun函數例如如下:github
function layout_center_addTabFun(opts) {
var t = $(‘#layout_center_tabs’);
if (t.tabs(‘exists’, opts.title)) {
t.tabs(‘select’, opts.title);
} else {
t.tabs(‘add’, opts);
}
console.info(「打開頁面:」+opts.title);
}web
這個函數主要是推斷右邊窗體是否有名字爲給定title的頁面,假設沒有,則打開這個頁面。
js所有代碼例如如下:
當中basic.js是首頁的js文件,包含一些公共的js函數等;hconstants.js主要是針對Hadoop配置表進行的操做。jquery*.js相應的兩個文件爲jQuery的必須文件;mr*.js相應則是MR不一樣類別算法相應的js處理文件;preprocess.js爲數據構造、數據查看的js處理;
請求提交主要包含:MR算法任務提交。非MR算法任務提交。其它請求提交。這裏都採用統一的提交邏輯,例如如下:
這裏所有頁面提交都直接使用easyUI的< a > 標籤,同一時候在js裏面綁定其提交的點擊觸發函數。
在函數裏面需要首先獲取頁面參考(假設是MR監控任務。則需要先推斷是否已經有監控頁面。需要提示關閉當前監控頁面),接着彈出框提示正在執行,最後統一提交到公共函數callByAJax中。這裏列舉三種提交的典型js代碼:
1. 提交MR任務個數固定的MR任務
//evaluateFactorization---
$('#evaluateFactorization_submit').bind('click', function(){
// 檢查是否有「MR監控頁面」,假設有,則退出,並提示關閉
if(exitsMRmonitor()){
return ;
}
var input=$('#evaluateFactorization_input').val();
var output=$('#evaluateFactorization_output').val();
var userFeatures=$('#evaluateFactorization_userFeatures').val();
var itemFeatures=$('#evaluateFactorization_itemFeatures').val();
// 彈出進度框
popupProgressbar('推薦MR','evaluateFactorization任務提交中...',1000);
// ajax 異步提交任務
callByAJax('cloud/cloud_submitJob.action',{algorithm:"EvaluateFactorizationRunnable",jobnums:'1', arg1:input,arg2:output,arg3:userFeatures,arg4:itemFeatures});
});
// ------evaluateFactorization
2 提交MR個數不固定的MR任務
// kmeans---
$('#kmeans_submit').bind('click', function(){
// 檢查是否有「MR監控頁面」。假設有。則退出。並提示關閉
if(exitsMRmonitor()){
return ;
}
var input=$('#kmeans_input').val();//
var output=$('#kmeans_output').val();//
var clusters=$('#kmeans_clusters').val();//
var k=$('#kmeans_k').val();
var convergenceDelta=$('#kmeans_convergenceDelta').val();
var maxIter=$('#kmeans_maxIter').val();
var clustering=$('#kmeans_clustering').combobox("getValue");
var distanceMeasure=$('#kmeans_distanceMeasure').combobox("getValue");
var jobnums_=parseInt(k); // 一共的MR個數
if("true"==clustering){
jobnums_=jobnums_+1;
}
jobnums_=jobnums_+"";
// 彈出進度框
popupProgressbar('聚類MR','kmeans任務提交中...',1000);
// ajax 異步提交任務
callByAJax('cloud/cloud_submitIterMR.action',{algorithm:"KMeansDriverRunnable",jobnums:jobnums_,
arg1:input,arg2:output,arg3:clusters,arg4:k,
arg5:convergenceDelta,arg6:maxIter,arg7:clustering,arg8:distanceMeasure});
});
// ------kmeans
這裏把不定MR個數的任務和定MR個數的任務區分開來了,事實上是可以不用區分的。因爲在返回結果都是一個Map,依據map結果來進行操做的。只是需要在不一樣的實現中設置標誌位(詳細參考如下的實現分析)
3 提交非MR任務
$('#upload_submit').bind('click', function(){
var input=$('#upload_input').val();
var output=$('#upload_output').val();
// 彈出進度框
popupProgressbar('數據上傳','數據上傳中...',1000);
// ajax 異步提交任務
callByAJax('cloud/cloud_submitJobNotMR.action',{algorithm:'Upload',
arg1:input,arg2:output});
});
這裏要注意MR任務和非MR任務是需要區分的,因爲非MR任務使用的是同步模式(這裏同步模式不是指aJax的同步。而是指實現方式),即用戶點擊後。會一直彈出正在處理的提示,而後等後臺處理完畢。返回結果纔會關閉彈窗,同一時候把結果直接展現在原網頁。但是MR的任務會啓動多線程,當多線程成功啓動後。直接關閉提示框,同一時候打開MR任務監控頁面。開啓頁面定時刷新任務,向後臺獲取任務執行狀況信息。
callByAJax函數例如如下:
// 調用ajax異步提交
// 任務返回成功。則提示成功。不然提示失敗的信息
function callByAJax(url,data_){
$.ajax({
url : url,
data: data_,
async:true,
dataType:"json",
context : document.body,
success : function(data) {
closeProgressbar();
console.info("close the progressbar,flag:"+data.flag);
var retMsg;
if("true"==data.flag){
retMsg='操做成功!';
if(typeof data.return_show !="undefined"){// 讀取文件
var return_id = "#"+data.return_show+"";
// var obj=document.getElementById(data.return_show);
$(return_id).html(data.return_txt);
console.info('defined:'+data.return_show);
}
}else{
retMsg='操做失敗!'; if(typeof data.return_show !="undefined"){// 讀取文件 var return_id = "#"+data.return_show+""; $(return_id).html(data.msg); } } $.messager.show({ title : '提示', msg : retMsg }); if("true"==data.flag&&"true"==data.monitor){// 加入監控頁面 // 使用單獨Tab的方式 layout_center_addTabFun({ title : 'MR算法監控', closable : true, href : 'monitor/monitor.jsp' }); } } }); }
所有的MR任務提交到Action後,都會啓動一個線程來專門執行MR任務。這樣就可以直接返回前臺頁面。提示任務已經成功提交。
Action中相應的代碼例如如下:
/** * 提交變jobnum的任務,暫未加入 * */
public void submitIterMR(){
Map<String ,Object> map = new HashMap<String,Object>();
try {
//提交一個Hadoop MR任務的基本流程
// 1. 設置提交時間閾值,並設置這組job的個數
//使用當前時間就能夠,當前時間往前10s,以防server和雲平臺時間相差
HUtils.setJobStartTime(System.currentTimeMillis()-10000);//
// 因爲不知道循環多少次完畢。因此這裏設置爲最大值,
// 當所有MR完畢的時候,在監控代碼處又一次設置JOBNUM;
HUtils.setALLJOBSFINISHED(false);
HUtils.JOBNUM=Integer.parseInt(jobnums);
// 2. 使用Thread的方式啓動一組MR任務
// 2.1 生成Runnable接口
RunnableWithArgs runJob = (RunnableWithArgs) Utils.getClassByName(
Utils.THREADPACKAGES+algorithm);
// 2.2 設置參數
runJob.setArgs(new String[]{arg1,arg2,arg3,arg4,arg5,arg6,arg7,arg8,arg9,arg10,arg11});
// 2.3 啓動Thread
new Thread(runJob).start();
// 3. 啓動成功後。直接返回到監控,同一時候監控定時向後臺獲取數據,並在前臺展現;
map.put("flag", "true");
map.put("monitor", "true");
} catch (Exception e) {
e.printStackTrace();
map.put("flag", "false");
map.put("monitor", "false");
map.put("msg", "任務啓動失敗!"); } Utils.write2PrintWriter(JSON.toJSONString(map)); }
這裏採用統一接口把所有的提交都整合到一個函數中。算法參數採用匿名的方式,不管前臺傳送了多少個,都用所有的參數來接收。
而後使用Java反射來生成實際執行任務的類。並啓動多線程。
最後返回的map數據依據需要需要設置監控的flag爲true(和callByAJax函數中的標識相應)。
所有MR任務都必須實現如下的接口:
/** * 帶有參數的Runnable接口 * @author fansy * @date 2015-8-4 */
public interface RunnableWithArgs extends Runnable {
public abstract void setArgs(String[] args);
}
該接口有兩點需要注意,其一。它繼承了Runnable接口。其二,它本身定義了一個setArgs函數;
如下來看一個實現,以kmeans算法的調用爲例:
/** * @author fansy * @date 2015-8-4 */
public class KMeansDriverRunnable implements RunnableWithArgs {
private String input;
private String output;
private String clusters;
private String k;
private String convergenceDelta;
private String maxIter;
private String clustering;
private String distanceMeasure;
@Override
public void run() {
String[] args=null;
if("true".equals(clustering)){
args=new String[17];
args[16]="-cl";
}else{
args= new String[16];
}
args[0]="-i";
args[1]=input;
args[2]="-o";
args[3]=output;
args[4]="-c";
args[5]=clusters;
args[6]="-k";
args[7]=k;
args[8]="-cd";
args[9]=convergenceDelta;
args[10]="-x";
args[11]=maxIter;
args[12]="-dm";
args[13]=distanceMeasure;
args[14]="--tempDir";
args[15]="temp";
Utils.printStringArr(args);
try {
HUtils.delete(output);
HUtils.delete("temp");
HUtils.delete(clusters);
int ret = ToolRunner.run(HUtils.getConf() ,new KMeansDriver() , args);
if(ret==0){// 所有任務執行完畢
HUtils.setALLJOBSFINISHED(true);
}
} catch (Exception e) {
e.printStackTrace();
// 任務中,報錯,需要在任務監控界面體現出來
HUtils.setRUNNINGJOBERROR(true);
Utils.simpleLog("KMeansDriver任務錯誤!");
}
}
@Override
public void setArgs(String[] args) {
this.input=args[0];
this.output=args[1];
this.clusters=args[2];
this.k=args[3];
this.convergenceDelta=args[4];
this.maxIter=args[5];
this.clustering=args[6];
this.distanceMeasure=args[7];
}
}
首先,這裏需要實現setArgs函數,這個函數就是把匿名的算法參數所有實名化(實際上,這裏可以不用這一步操做的,但是爲了代碼的可讀性。仍是建議這樣作)。
接着,在run函數中,依據傳進來的算法參數構造MR算法需要使用的算法參數。而後直接提交MR任務就能夠。
這裏需要注意:
1. 當任務執行出錯時需要設置標誌位。方便在任務監控時,前臺向後臺獲取任務狀態信息時,提示錯誤;
2. 固定個數的MR任務和非固定個數的MR任務的不一樣點是當非固定個數的MR提早執行完畢(比方kmeans算法假設設置了循環次數爲10,那麼假如當循環次數達到了8次時。其閾值知足條件。退出了循環)。那麼就要實時更改MR任務的次數(非固定個數MR任務最開始設置任務所有個數是依照最大值來設置的),並設置相關標識,即不用再進行監控。
與MR實現類似,非MR實現的Action函數例如如下:
/** * 提交非MR的任務 * 算法詳細參數意思對比jsp頁面理解,每個實體類會把arg1~arg11 轉換爲實際的意思 * @throws ClassNotFoundException * @throws IllegalAccessException * @throws InstantiationException */
public void submitJobNotMR() throws InstantiationException, IllegalAccessException, ClassNotFoundException{
Map<String ,Object> map = new HashMap<String,Object>();
INotMRJob runJob = (INotMRJob) Utils.getClassByName(
Utils.THREADNOTPACKAGES+algorithm);
// 2.2 設置參數
runJob.setArgs(new String[]{arg1,arg2,arg3,arg4,arg5,arg6,arg7,arg8,arg9,arg10,arg11});
map= runJob.runJob();
Utils.write2PrintWriter(JSON.toJSONString(map));
return ;
}
所有非MR任務都要實現INotMRJob接口,該接口定義例如如下:
/** * 提交非MR任務的基類 * @author fansy * @date 2015年8月5日 */
public interface INotMRJob {
public void setArgs(String[] args);
public Map<String,Object> runJob();
}
兩個函數分別相應RunnableWithArgs的兩個函數。
一個讀取HDFS文件的詳細實現例如如下:
/** * 讀取HDFS txt文件 * @author fansy * @date 2015年8月5日 */
public class ReadTxt implements INotMRJob {
private String input;
private String lines;
@Override
public void setArgs(String[] args) {
this.input=args[0];
this.lines=args[1];
}
@Override
public Map<String, Object> runJob() {
Map<String ,Object> map = new HashMap<String,Object>();
String txt =null;
map.put("return_show", "readtxt_return");
try{
txt = HUtils.readTxt(input, lines, "<br>");
txt ="文件的內容是:<br>"+txt;
map.put("flag", "true");
map.put("return_txt", txt);
}catch(Exception e){
e.printStackTrace();
map.put("flag", "false");
map.put("monitor", "false");
map.put("msg", input+"讀取失敗!");
}
return map;
}
}
非MR任務結果返回直接在原網頁展現,在callByAJax中推斷相應的標誌位假設不爲空。那麼就是需要展現在原網頁的,原網頁中必須有相應的組件來顯示,比方如下的網頁代碼:
<div id="upload_return" style="padding-left: 30px;font-size: 20px;padding-top:10px;"></div>
MR的任務則會開啓監控。在監控頁面展示任務的執行狀況。
二次開發實際就是在此版本號的基礎上加入本身的功能而已。
一共包含如下幾個步驟:
1. 編寫測試函數
比方要加入一個fuzzykmeans的算法,那麼就在src/test/java裏面編寫測試函數,例如如下:
編寫測試函數的主要目的是,研究算法的參數以及輸入數據的格式等。
2. 加入json導航欄數據
在tree_data.json中加入相應的算法,例如如下:
3. 編寫頁面
參考1.中的所有算法需要參數來編寫jsp頁面,例如如下圖:
4. 編寫頁面處理js
依據jsp頁面中的button,來編寫button的觸發事件,例如如下:
5. 實現請求提交接口實現
編寫請求提交接口的實現分爲兩種,假設是MR任務則實現RunnableWithArgs接口,假設是非MR任務則實現INotMRJob接口就能夠。例如如下圖所看到的:
6. 執行項目並測試
打開瀏覽器。訪問剛開發的功能,點擊頁面中的button進行測試,例如如下:
在這裏可以進行集羣參數的配置。主要是鏈接Hadoop集羣的參數;
2. 數據構造和查看
文件上傳界面例如如下:
文件上傳主要包含兩個功能,其一就是把本地文件上傳到HDFS文件;其二就是針對各個算法的數據初始化,這裏的初始化基本都是把本地文件(這些文件在src/main/resources/data中已經存在)上傳到HDFS指定文件夾,這裏關於文件夾構造可以參考Upload.java文件:
/**
* 數據上傳
* 統一命名:
上傳本地文件:WEB-INF/classes/data//.
上傳HDFS文件:/user/root///input.
* @author fansy
* @date 2015年8月5日
*/
其它的基本是數據查看之類的。最後一個分類數據生成,是針對輸入數據需要是序列化的數據。因此這裏直接生成序列化數據在HDFS指定的文件夾就能夠。
3. 相關Mahout算法
相關MR算法中,頁面都有默認的參數,比方:
這裏的輸入數據路徑是依據前面Upload裏面生成的路徑是一致的,有些MR算法需要先執行其它MR算法,而後才幹執行,這時其輸入路徑就是上一個MR算法相應的輸出了。
分享,成長。快樂
腳踏實地,專一
轉載請註明blog地址:http://blog.csdn.net/fansy1990