介於上一篇的java實現網絡爬蟲基礎之上,這一篇的思想是將網絡收集的數據保存到HDFS和數據庫(Mysql)中;而後用MR對HDFS的數據進行索引處理,處理成倒排索引;搜索時先用HDFS創建好的索引來搜索對應的數據ID,根據ID從數據庫中提取數據,呈現到網頁上。javascript
這是一個完整的集合網絡爬蟲、數據庫、HDFS、MapReduce、DAO設計模式、JSP/Servlet的項目,完成了數據收集、數據分析、數據索引並分頁呈現。html
完整的代碼呈現,但願認真仔細閱讀。java
------>node
目錄:mysql
一、搜索引擎闡述git
二、數據庫表創建web
三、使用DAO設計模式進行數據庫操做正則表達式
【Ⅰ】數據庫鏈接類DataBaseConnectionsql
【Ⅱ】表單元素的封裝類News數據庫
【Ⅲ】編寫DAO接口INewsDAO
【Ⅳ】DAO接口的實現類NewsDAOImp類
【Ⅴ】工廠類DAOFactory類
四、網絡爬蟲實現★★ 【參考博客《java實現網絡爬蟲》和《Heritrix實現網絡爬蟲》】
五、MR(MapReduce)對HDFS數據進行索引處理★★
六、實現搜索引擎
【Ⅰ】建立web項目,編寫測試用例,測試是否能夠讀取HDFS的數據內容
【Ⅱ】 編寫index首頁
【Ⅲ】處理HDFS查詢的操做
【Ⅳ】servlet類搜索結果向頁面傳遞
【Ⅴ】結果呈現,實現分頁
七、總結
------>
一、搜索引擎闡述
搜索引擎的執行流程:
1) 經過爬蟲來將數據下載到本地
2) 將數據提取出來保存到HDFS和數據庫中(MySQL)
3) 經過MR來對HDFS的數據進行索引處理,處理成爲倒排索引
4) 搜索時先使用HDFS創建好的索引來搜索對應的數據ID,再根據ID來從MySQL數據庫中提取數據的具體信息。
5) 能夠完成分頁等操做。
倒排索引對應的就是正排索引,正排索引指的就是MySQL數據庫中id的索引。
而倒排索引的目的是能夠根據關鍵字查詢出該關鍵字對應的數據id。
這裏就須要用到MySQL數據庫,以及經過Java EE版的Eclipse來完成網站的開發。
爲了開發起來更方便,咱們這裏使用MyEclipse來完成。
二、數據庫表創建
先安裝好MySQL數據庫。
安裝時,注意編碼選擇gbk。
經過控制檯的mysql -u用戶名 –p密碼 便可登陸mysql數據庫。
以後使用show databases能夠看到全部的數據庫。
使用create database 能夠創建一個新的庫。
使用 use 庫名 ,能夠切換到另外一個庫。
使用show tables能夠看到一個庫下的全部表。
以後就能夠經過普通的sql語句來創建表和進行數據的操做了。
在進行數據庫操做時,企業開發中一定要使用DAO(Data Access Object)設計模式
組成以下:
1) DataBaseConnection:創建數據庫鏈接
2) VO:與表對應的數據對象
3) DAO接口:規範操做方法
4) DAOImpl:針對DAO接口進行方法實現。
5) Factory:用來創建DAO接口對象。
首先根據需求,將數據庫表創建出來,這裏只需創建一個簡單的news新聞表,用於存儲網絡上爬取得數據。
1 CREATE TABLE news ( 2 id int primary key , 3 title varchar(200) not null, 4 description text , 5 url varchar(200) 6 );
三、使用DAO設計模式進行數據庫操做
根據上述的DAO設計模式,咱們須要編寫相關操做類,來完成數據庫的操做。
【Ⅰ】 數據庫鏈接類DataBaseConnection,須要導入jar包:
1 package org.liky.sina.dbc; 2 3 import java.sql.Connection; 4 import java.sql.DriverManager; 5 import java.sql.SQLException; 6 7 /** 8 * 鏈接數據庫mysql的sina_news 9 * @author k04 10 * 11 */ 12 public class DataBaseConnection {
//此處能夠試着兩種表達加載類的方法 13 // private static final String DBORIVER="org.git.mm.mysql.Driver"; 14 private static final String DBORIVER="com.mysql.jdbc.Driver"; 15 private static final String DBURL="jdbc:mysql://localhost:3306/sina_news"; 16 private static final String DBUSER="root"; 17 private static final String DBPASSWORD="admin"; 18 19 private Connection conn; 20 /** 21 * 建立數據庫鏈接 22 * @return 23 */ 24 public Connection getConnection(){ 25 try { 26 if(conn==null||conn.isClosed()){ 27 //創建一個新的鏈接 28 Class.forName(DBORIVER); 29 conn=DriverManager.getConnection(DBURL, DBUSER, DBPASSWORD); 30 //System.out.println("success to connect!"); 31 } 32 }catch (ClassNotFoundException e) { 33 // TODO Auto-generated catch block 34 e.printStackTrace(); 35 } catch (SQLException e) { 36 // TODO Auto-generated catch block 37 e.printStackTrace(); 38 } 39 return conn; 40 } 41 /* 42 * 關閉鏈接 43 */ 44 public void close(){ 45 if(conn!=null){ 46 try{ 47 conn.close(); 48 }catch(SQLException e){ 49 e.printStackTrace(); 50 } 51 } 52 } 53 54 }
【Ⅱ】表單元素的封裝類News,根據建表時設定的四種元素id,title,description,url,將網絡爬取得內容完整的導入數據庫中,此處能夠用shift+Alt+S在Eclipse快捷建立封裝類:
1 package org.liky.sina.vo; 2 /** 3 * news封裝類 4 * @author k04 5 * 6 */ 7 public class News { 8 private Integer id; 9 private String title; 10 private String description; 11 private String url; 12 13 14 public News() { 15 } 16 public News(Integer id,String title,String description,String url) { 17 this.id=id; 18 this.title=title; 19 this.description=description; 20 this.url=url; 21 } 22 23 24 public Integer getId() { 25 return id; 26 } 27 public void setId(Integer id) { 28 this.id = id; 29 } 30 public String getTitle() { 31 return title; 32 } 33 public void setTitle(String title) { 34 this.title = title; 35 } 36 public String getDescription() { 37 return description; 38 } 39 public void setDescription(String description) { 40 this.description = description; 41 } 42 public String getUrl() { 43 return url; 44 } 45 public void setUrl(String url) { 46 this.url = url; 47 } 48 }
【Ⅲ】編寫DAO接口INewsDAO,存放數據庫操做類的方法名:
1 package org.liky.sina.dao; 2 /** 3 * 接口,呈現三個方法 4 */ 5 import java.util.List; 6 import org.liky.sina.vo.News; 7 8 public interface INewsDAO { 9 /** 10 * 添加數據 11 * @param news 要添加的對象 12 * @throws Exception 13 */ 14 public void doCreate(News news)throws Exception; 15 /** 16 * 根據主鍵id查詢數據 17 * 18 */ 19 public News findById(int id)throws Exception; 20 /** 21 * 根據一組id查詢全部結果 22 * @param ids 全部要查詢的id 23 * @return 查詢到的數據 24 * 由於索引是根據熱詞查到一堆的id 25 */ 26 public List<News> findByIds(int[] ids)throws Exception; 27 28 }
【Ⅳ】DAO接口的實現類NewsDAOImp類:
1 package org.liky.sina.dao.impl; 2 /** 3 * 繼承INewsDAO接口 4 * 實現三個方法,插入數據,查找指定id數據,查找一組id數據 5 */ 6 import java.sql.PreparedStatement; 7 import java.sql.ResultSet; 8 import java.util.ArrayList; 9 import java.util.List; 10 11 import org.liky.sina.dao.INewsDAO; 12 import org.liky.sina.dbc.DataBaseConnection; 13 import org.liky.sina.vo.News; 14 15 public class NewsDAOImpl implements INewsDAO { 16 //聲明一個數據庫鏈接類對象 17 private DataBaseConnection dbc; 18 19 20 //構造器,參數爲數據庫鏈接類對象 21 public NewsDAOImpl(DataBaseConnection dbc) { 22 this.dbc=dbc; 23 24 } 25 26 @Override 27 public void doCreate(News news) throws Exception { 28 // TODO Auto-generated method stub 29 String sql="INSERT INTO news (id,title,description,url) VALUES (?,?,?,?)"; 30 PreparedStatement pst=dbc.getConnection().prepareStatement(sql); 31 //設置參數 32 pst.setInt(1, news.getId()); 33 pst.setString(2, news.getTitle()); 34 pst.setString(3, news.getDescription()); 35 pst.setString(4, news.getUrl()); 36 37 pst.executeUpdate(); 38 System.out.println("create success."); 39 } 40 41 @Override 42 public News findById(int id) throws Exception { 43 // TODO Auto-generated method stub 44 String sql="SELECT id,title,description,url FROM news WHERE id = ?"; 45 PreparedStatement pst=dbc.getConnection().prepareStatement(sql); 46 pst.setInt(1, id); 47 ResultSet rs=pst.executeQuery(); 48 News news=null; 49 //將符合id的數據遍歷寫入news並返回 50 if(rs.next()){ 51 news=new News(); 52 news.setId(rs.getInt(1)); 53 news.setTitle(rs.getString(2)); 54 news.setDescription(rs.getString(3)); 55 news.setUrl(rs.getString(4)); 56 } 57 //System.out.println("find success."); 58 return news; 59 } 60 61 @Override 62 public List<News> findByIds(int[] ids) throws Exception { 63 // TODO Auto-generated method stub 64 StringBuilder sql=new StringBuilder("SELECT id,title,description,url FROM news WHERE id IN ("); 65 //將id寫入ids,並用逗號隔開 66 if(ids!=null&&ids.length>0){ 67 for(int id:ids){ 68 sql.append(id); 69 sql.append(","); 70 } 71 //截取最後一個逗號,並補上括號 72 String resultSQL=sql.substring(0, sql.length()-1)+")"; 73 74 PreparedStatement pst=dbc.getConnection().prepareStatement(resultSQL); 75 ResultSet rs=pst.executeQuery(); 76 //存取一組id到鏈表中 77 List<News> list=new ArrayList<>(); 78 while(rs.next()){ 79 News news=new News(); 80 news.setId(rs.getInt(1)); 81 news.setTitle(rs.getString(2)); 82 news.setDescription(rs.getString(3)); 83 news.setUrl(rs.getString(4)); 84 list.add(news); 85 } 86 } 87 //System.out.println("find success."); 88 return null; 89 } 90 91 }
【Ⅴ】工廠類DAOFactory類,此類寫入了數據庫鏈接類參數,返回DAO實現類對象:
java中,咱們一般有如下幾種建立對象的方式:
(1) 使用new關鍵字直接建立對象;
(2) 經過反射機制建立對象;
(3) 經過clone()方法建立對象;
(4) 經過工廠類建立對象。
1 package org.liky.sina.factory; 2 /** 3 * 工廠類 4 * 輸入一個鏈接數據庫對象的參數,返回數據庫表操做的類 5 */ 6 import org.liky.sina.dao.INewsDAO; 7 import org.liky.sina.dao.impl.NewsDAOImpl; 8 import org.liky.sina.dbc.DataBaseConnection; 9 10 public class DAOFactory { 11 public static INewsDAO getINewsDAOInstance(DataBaseConnection dbc){ 12 return new NewsDAOImpl(dbc); 13 } 14 }
四、網絡爬蟲實現
如今編寫整個項目的重點,編寫URLDemo類,在爬蟲中進行數據庫的操做以及HDFS的寫入:
a' 關於此類,在網頁解析時用了簡單的Jsoup,並無如《java網絡爬蟲》用正則表達式,因此須要導入jsoup的jar包 ;
b' 關於HDFS在eclipse的配置以及本機的鏈接,我後續博客會闡述,也能夠網絡查詢方法;
c' 這個類也是執行類,我收集的是新浪新聞網的數據,爬取深度爲5,設置線程數5,而且篩選了只有連接含有「sian.news.com.cn」的。
d' 網絡爬蟲我講了兩種方法:(1)java代碼實現網絡爬蟲
(2)Heritrix工具實現網絡爬蟲
此處我仍是選擇了直接寫代碼實現,自由度高也方便讀寫存取。
1 package org.liky.sina.craw; 2 3 import java.util.ArrayList; 4 import java.util.HashMap; 5 import java.util.HashSet; 6 import java.util.List; 7 import java.util.Map; 8 import java.util.Set; 9 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.fs.FSDataOutputStream; 12 import org.apache.hadoop.fs.FileSystem; 13 import org.apache.hadoop.fs.Path; 14 import org.jsoup.Jsoup; 15 import org.jsoup.nodes.Document; 16 import org.jsoup.nodes.Element; 17 import org.jsoup.select.Elements; 18 import org.liky.sina.dao.INewsDAO; 19 import org.liky.sina.dbc.DataBaseConnection; 20 import org.liky.sina.factory.DAOFactory; 21 import org.liky.sina.vo.News; 22 23 /** 24 * 爬蟲開始進行數據庫操做以及HDFS寫入 25 * 26 * @author k04 27 * 28 */ 29 public class URLDemo { 30 // 該對象的構造方法會默認加載hadoop中的兩個配置文件,hdfs-site.xml和core-site.xml 31 // 這兩個文件包含訪問hdfs所需的參數值 32 private static Configuration conf = new Configuration(); 33 34 private static int id = 1; 35 36 private static FileSystem fs; 37 38 private static Path path; 39 40 // 等待爬取的url 41 private static List<String> allWaitUrl = new ArrayList<>(); 42 // 已經爬取的url 43 private static Set<String> allOverUrl = new HashSet<>(); 44 // 記錄全部url的深度,以便在addUrl方法內判斷 45 private static Map<String, Integer> allUrlDepth = new HashMap<>(); 46 // 爬取網頁的深度 47 private static int maxDepth = 5; 48 // 聲明object獨享幫助進行線程的等待操做 49 private static Object obj = new Object(); 50 // 設置總線程數 51 private static final int MAX_THREAD = 20; 52 // 記錄空閒的線程數 53 private static int count = 0; 54 55 // 聲明INewsDAO對象, 56 private static INewsDAO dao; 57 58 static { 59 dao = DAOFactory.getINewsDAOInstance(new DataBaseConnection()); 60 } 61 62 public static void main(String args[]) { 63 // 爬取的目標網址 64 String strUrl = "http://news.sina.com.cn/"; 65 66 // 爬取第一個輸入的url 67 addUrl(strUrl, 0); 68 // 創建多個線程 69 for (int i = 0; i < MAX_THREAD; i++) { 70 new URLDemo().new MyThread().start(); 71 } 72 73 // DataBaseConnection dc=new DataBaseConnection(); 74 // dc.getConnection(); 75 76 } 77 78 public static void parseUrl(String strUrl, int depth) { 79 // 先判斷當前url是否爬取過 80 // 判斷深度是否符合要求 81 if (!(allOverUrl.contains(strUrl) || depth > maxDepth)) { 82 System.out.println("當前執行的 " + Thread.currentThread().getName() 83 + " 爬蟲線程處理爬取: " + strUrl); 84 85 try { 86 // 用jsoup進行數據爬取 87 Document doc = Jsoup.connect(strUrl).get(); 88 // 經過doc接受返回的結果 89 // 提取有效的title和description 90 String title = doc.title(); 91 Element descE = doc.getElementsByAttributeValue("name", 92 "description").first(); 93 String desc = descE.attr("content"); 94 95 // System.out.println(title + " --> " + desc); 96 97 // 若是有效,則驚醒保存 98 if (title != null && desc != null && !title.trim().equals("") 99 && !desc.trim().equals("")) { 100 // 須要生成一個id,以便放入數據庫中,所以id也要加入到HDFS中,便於後續索引 101 News news = new News(); 102 news.setId(id++); 103 news.setTitle(title); 104 news.setDescription(desc); 105 news.setUrl(strUrl); 106 // 添加到數據庫語句 107 dao.doCreate(news); 108 // 向HDFS保存數據 109 path = new Path("hdfs://localhost:9000/sina_news_input/" 110 + System.currentTimeMillis() + ".txt"); 111 fs = path.getFileSystem(conf); 112 FSDataOutputStream os = fs.create(path); 113 // 進行內容輸出,此處須要用news.getId(),否則數據庫和HDFS的id會不相同,由於多線程的運行 114 os.writeUTF(news.getId() + "\r\n" + title + "\r\n" + desc); 115 os.close(); 116 117 // 解析全部超連接 118 Elements aEs = doc.getElementsByTag("a"); 119 // System.out.println(aEs); 120 if (aEs != null && aEs.size() > 0) { 121 for (Element aE : aEs) { 122 String href = aE.attr("href"); 123 System.out.println(href); 124 // 截取網址,並給出篩選條件!!! 125 if ((href.startsWith("http:") || href 126 .startsWith("https:")) 127 && href.contains("news.sina.com.cn")) { 128 // 調用addUrl()方法 129 addUrl(href, depth + 1); 130 } 131 } 132 } 133 134 } 135 136 } catch (Exception e) { 137 138 } 139 // 吧當前爬完的url放入到偶爾中 140 allOverUrl.add(strUrl); 141 System.out.println(strUrl + "爬去完成,已經爬取的內容量爲:" + allOverUrl.size() 142 + "剩餘爬取量爲:" + allWaitUrl.size()); 143 144 // 判斷是否集合中海油其餘的內容須要進行爬取,若是有,則進行線程的喚醒 145 if (allWaitUrl.size() > 0) { 146 synchronized (obj) { 147 obj.notify(); 148 } 149 } else { 150 System.out.println("爬取結束..."); 151 System.exit(0); 152 } 153 154 } 155 } 156 157 /** 158 * url加入到等待隊列中 並判斷是否已經放過,若沒有就放入allUrlDepth中 159 * 160 * @param href 161 * @param depth 162 */ 163 public static synchronized void addUrl(String href, int depth) { 164 // 將url放入隊列中 165 allWaitUrl.add(href); 166 // 判斷url是否已經存在 167 if (!allUrlDepth.containsKey(href)) { 168 allUrlDepth.put(href, depth + 1); 169 } 170 } 171 172 /** 173 * 獲取等待隊列下一個url,並從等待隊列中移除 174 * 175 * @return 176 */ 177 public static synchronized String getUrl() { 178 if (allWaitUrl.size() > 0) { 179 String nextUrl = allWaitUrl.get(0); 180 allWaitUrl.remove(0); 181 return nextUrl; 182 } 183 return null; 184 } 185 186 /** 187 * 用多線程進行url爬取 188 * 189 * @author k04 190 * 191 */ 192 public class MyThread extends Thread { 193 194 @Override 195 public void run() { 196 // 編寫一個死循環,以便線程能夠一直存在 197 while (true) { 198 // 199 200 String url = getUrl(); 201 if (url != null) { 202 // 調用該方法爬取url的數據 203 parseUrl(url, allUrlDepth.get(url)); 204 } else { 205 System.out.println("當前線程準備就緒,等待鏈接爬取:" + this.getName()); 206 // 線程+1 207 count++; 208 // 創建一個對象,幫助線程進入等待狀態wait() 209 synchronized (obj) { 210 try { 211 obj.wait(); 212 } catch (Exception e) { 213 e.printStackTrace(); 214 } 215 // 線程-1 216 count--; 217 } 218 } 219 } 220 } 221 222 } 223 224 }
如今執行上述類URLDemo,該類執行流程:
1‘ 讀取源連接,分配線程任務;
2’ 數據根據DAO設計模式寫入到數據庫中;
3‘ 數據上傳到HDFS的input文件夾中;
執行完能夠查看HDFS和數據庫的結果:
關於此處,仍是有些許瑕疵,雙方的數據量並不相等,主要是多線程爬取的時候,id的傳輸並不相同!
將重複的數據進行清洗,
當數據爬取完成後,咱們會發現有不少重複的數據,這能夠經過SQL語句來進行清洗的操做。
SQL中可使用group by關鍵字來完成分組函數的處理。
咱們能夠先經過該函數來測試一下。
測試後會發現真正有效數據量應該是4634條。
這就須要咱們經過創建一張新表,把全部不重複的數據加入到新表中。
mysql支持create table時經過select語句來查詢出一些結果做爲新表的結構和數據。
CREATE TABLE new_news SELECT id,title,description,url FROM news WHERE id IN (SELECT min(id) FROM news GROUP BY title)
五、MR(MapReduce)對HDFS數據進行索引處理
下面就能夠開始編寫MR程序。
Map格式,要求key爲關鍵字,value爲id。
因爲一個關鍵字可能會對應多個id,因此id之間咱們想使用,來分隔。因此key和value的類型都應該是String(Text)
1 package org.liky.sina.index; 2 /** 3 * 編寫MR程序。 4 Map格式,要求key爲關鍵字,value爲id。 5 因爲一個關鍵字可能會對應多個id,因此id之間咱們想使用,來分隔。因此key和value的類型都應該是String(Text) 6 */ 7 import java.io.IOException; 8 import java.util.*; 9 import java.util.regex.*; 10 import jeasy.analysis.MMAnalyzer; 11 import org.apache.hadoop.conf.*; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.*; 14 import org.apache.hadoop.mapred.*; 15 import org.apache.hadoop.util.*; 16 17 18 public class IndexCreater extends Configured implements Tool { 19 20 public static void main(String[] args) throws Exception { 21 int res = ToolRunner.run(new Configuration(), new IndexCreater(), 22 new String[] { "hdfs://localhost:9000/sina_news_input/", 23 "hdfs://localhost:9000/output_news_map/" }); 24 System.exit(res); 25 } 26 27 28 29 //Mapper接口中的後兩個泛型參數,第一個表示返回後Map的key的類型,第二個表示返回後的value類型 30 public static class MapClass extends MapReduceBase implements Mapper<LongWritable,Text,Text,Text>{ 31 32 private static Pattern p; 33 34 static{ 35 System.out.println("開始Map操做....."); 36 p=Pattern.compile("\\d+"); 37 } 38 39 private int id; 40 private int line=1; 41 42 private static MMAnalyzer mm=new MMAnalyzer(); 43 44 //輸出的詞 45 private Text word=new Text(); 46 47 //map過程的核心方法 48 @Override 49 public void map(LongWritable key, Text value, 50 OutputCollector<Text, Text> output, Reporter reporter) 51 throws IOException { 52 if (line == 1) { 53 // 讀取的是第一行,咱們就須要將第一行的id保留下來 54 line++; 55 Matcher m = p.matcher(value.toString()); 56 if (m.find()) { 57 id = Integer.parseInt(m.group()); 58 } 59 } else { 60 String tempStr = value.toString(); 61 // 按空格將單詞拆分出來 62 // StringTokenizer itr = new StringTokenizer(line); 63 // 使用分詞器來進行詞組的拆分 64 String[] results = mm.segment(tempStr, "|").split("\\|"); 65 // 每一個單詞記錄出現了1次 66 for (String temp : results) { 67 word.set(temp.toLowerCase()); 68 output.collect(word, new Text(id + "")); 69 } 70 } 71 72 } 73 74 75 76 77 } 78 79 80 //對全部的結果進行規約,合併 81 //Reducer中也有泛型,前兩個表示Map過程輸出的結果類型,後兩個表示Reduce處理後輸出的類型 82 public static class Reduce extends MapReduceBase implements Reducer<Text,Text,Text,Text>{ 83 84 static{ 85 System.out.println("開始reduce操做....."); 86 } 87 @Override 88 public void reduce(Text key, Iterator<Text> values, 89 OutputCollector<Text, Text> output, Reporter repoter) 90 throws IOException { 91 //將全部key值相同的結果,求和 92 StringBuilder result=new StringBuilder(); 93 while(values.hasNext()){ 94 //存在一個key相同的,加入result 95 String temp=values.next().toString(); 96 if(!result.toString().contains(temp+",")){ 97 result.append(temp+","); 98 } 99 100 } 101 //將其規約 102 output.collect(key, new Text(result.substring(0, result.length()-1))); 103 //輸出key相同的id值 104 System.out.println(key+"---->"+result); 105 } 106 107 108 109 } 110 111 static int printUsage() { 112 System.out 113 .println("wordcount [-m <maps>] [-r <reduces>] <input> <output>"); 114 ToolRunner.printGenericCommandUsage(System.out); 115 return -1; 116 } 117 118 119 @Override 120 public int run(String[] args) throws Exception { 121 // TODO Auto-generated method stub 122 JobConf conf = new JobConf(getConf(), IndexCreater.class); 123 conf.setJobName("wordcount"); 124 125 // 輸出結果的Map的key值類型 126 conf.setOutputKeyClass(Text.class); 127 // 輸出結果的Map的value值類型 128 conf.setOutputValueClass(Text.class); 129 130 conf.setMapperClass(MapClass.class); 131 conf.setCombinerClass(Reduce.class); 132 conf.setReducerClass(Reduce.class); 133 134 List<String> other_args = new ArrayList<String>(); 135 for (int i = 0; i < args.length; ++i) { 136 try { 137 if ("-m".equals(args[i])) { 138 conf.setNumMapTasks(Integer.parseInt(args[++i])); 139 } else if ("-r".equals(args[i])) { 140 conf.setNumReduceTasks(Integer.parseInt(args[++i])); 141 } else { 142 other_args.add(args[i]); 143 } 144 } catch (NumberFormatException except) { 145 System.out.println("ERROR: Integer expected instead of " 146 + args[i]); 147 return printUsage(); 148 } catch (ArrayIndexOutOfBoundsException except) { 149 System.out.println("ERROR: Required parameter missing from " 150 + args[i - 1]); 151 return printUsage(); 152 } 153 } 154 // Make sure there are exactly 2 parameters left. 155 if (other_args.size() != 2) { 156 System.out.println("ERROR: Wrong number of parameters: " 157 + other_args.size() + " instead of 2."); 158 return printUsage(); 159 } 160 // 設置輸出結果按照什麼格式保存,以便後續使用。 161 conf.setOutputFormat(MapFileOutputFormat.class); 162 // 輸入文件的HDFS路徑 163 FileInputFormat.setInputPaths(conf, other_args.get(0)); 164 // 輸出結果的HDFS路徑 165 FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); 166 167 JobClient.runJob(conf); 168 169 return 0; 170 } 171 172 }
六、實現搜索引擎
【Ⅰ】建立web項目,編寫測試用例,測試是否能夠讀取HDFS的數據內容
在安裝好的MyEclipse開發工具中,開始編寫搜索引擎展現部分的內容。
這裏先使用普通的JSP + Servlet的模式來完成程序的編寫。
首先創建一個普通的Web項目。
以後,在裏面編寫一個測試用例,測試是否能夠讀取HDFS中的數據內容,注意須要先將hadoop/lib目錄下的全部jar包,以及hadoop根目錄下的支持jar包拷貝到項目WEB-INF目錄下的lib目錄中。
注意!!!完成上面數據的收集和分析以後,如今讀取的內容須要從通過MR處理以後存儲在HDFS的sina_new_ouputwenjianjia內讀取。
1 package org.liky.sina.test; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.io.MapFile.Reader; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapred.MapFileOutputFormat; 9 import org.junit.Test; 10 11 public class TestCaseSina { 12 13 @Test 14 public void test() throws Exception { 15 Configuration conf = new Configuration(); 16 Path path = new Path("hdfs://localhost:9000/output_news_map/"); 17 FileSystem fs = path.getFileSystem(conf); 18 Reader reader = MapFileOutputFormat.getReaders(fs, path, conf)[0]; 19 Text value = (Text) reader.get(new Text("印度"), new Text()); 20 21 System.out.println(value); 22 23 } 24 }
【Ⅱ】 編寫index首頁
將以前寫好的DAO代碼也拷貝到項目中,以便之後查詢數據庫使用。
以後編寫一個jsp頁面,用來接收用戶輸入的查詢關鍵字。
此處我就從簡了 O(∩_∩)O
1 <body> 2 <center> 3 <form action="SearchServlet" method="post"> 4 請輸入查詢關鍵字: 5 <input type="text" name="keyword"> 6 <input type="submit" value="查詢"> 7 </form> 8 </center> 9 </body>
【Ⅲ】處理HDFS查詢的操做
根據設置好的路徑,創建一個SearchServlet,並完成doGet和 doPost方法。
在這個Servlet中會用處處理HDFS查詢的操做方法,所以咱們須要單獨聲明一個HDFSUtils工具類,來幫助咱們實現查詢的功能。
1 package org.liky.sina.utils; 2 3 import java.io.IOException; 4 import java.util.Set; 5 import java.util.TreeSet; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.MapFile.Reader; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapred.MapFileOutputFormat; 13 14 public class HDFSUtils { 15 //鏈接hadoop的配置 16 private static Configuration conf = new Configuration(); 17 //建立須要讀取數據的hdfs路徑 18 private static Path path = new Path( 19 "hdfs://localhost:9000/output_news_map/"); 20 21 private static FileSystem fs = null; 22 23 static { 24 try { 25 fs = path.getFileSystem(conf); 26 } catch (IOException e) { 27 e.printStackTrace(); 28 } 29 } 30 31 public static Integer[] getIdsByKeyword(String keyword) throws Exception { 32 33 Reader reader = MapFileOutputFormat.getReaders(fs, path, conf)[0]; 34 Text value = (Text) reader.get(new Text(keyword), new Text()); 35 //set存放關鍵詞搜索的一組id 36 Set<Integer> set = new TreeSet<Integer>(); 37 String[] strs = value.toString().split(","); 38 39 for (String str : strs) { 40 set.add(Integer.parseInt(str)); 41 } 42 43 return set.toArray(new Integer[0]); 44 } 45 46 }
【Ⅳ】servlet類搜索結果向頁面傳遞
在Servlet中經過調用HDFSUtils和以前寫過的DAO方法,便可查詢到結果並設置向頁面傳遞。
此處將關鍵詞搜索的結果呈現到result.jsp界面,因此下面就是編寫該界面呈現最終結果。
1 package org.liky.sina.servlet; 2 3 import java.io.IOException; 4 import java.util.List; 5 6 import javax.servlet.ServletException; 7 import javax.servlet.http.HttpServlet; 8 import javax.servlet.http.HttpServletRequest; 9 import javax.servlet.http.HttpServletResponse; 10 11 import org.liky.sina.dbc.DataBaseConnection; 12 import org.liky.sina.factory.DAOFactory; 13 import org.liky.sina.utils.HDFSUtils; 14 import org.liky.sina.vo.News; 15 16 public class SearchServlet extends HttpServlet { 17 18 public void doGet(HttpServletRequest request, HttpServletResponse response) 19 throws ServletException, IOException { 20 this.doPost(request, response); 21 } 22 23 public void doPost(HttpServletRequest request, HttpServletResponse response) 24 throws ServletException, IOException { 25 // 接收提交的查詢關鍵字參數 26 // 先處理亂碼 27 request.setCharacterEncoding("UTF-8"); 28 // 接收參數 29 String keyword = request.getParameter("keyword"); 30 // 根據關鍵字進行查詢。 31 try { 32 Integer[] ids = HDFSUtils.getIdsByKeyword(keyword); 33 // 根據這些id來查詢出相應的結果 34 List<News> allNews = DAOFactory.getINewsDAOInstance( 35 new DataBaseConnection()).findByIds(ids); 36 37 // 將結果傳遞迴頁面顯示 38 request.setAttribute("allNews", allNews); 39 40 // 切換到頁面上 41 request.getRequestDispatcher("/result.jsp").forward(request, 42 response); 43 } catch (Exception e) { 44 e.printStackTrace(); 45 } 46 47 } 48 }
【Ⅴ】結果呈現,實現分頁
最後,咱們須要在頁面上將結果呈現出來,在web根目錄下編寫result.jsp文件。
能夠經過JSTL + EL來完成內容的輸出。
1 <%@page import="org.liky.sina.vo.News"%> 2 <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%> 3 <% 4 String path = request.getContextPath(); 5 String basePath = request.getScheme() + "://" 6 + request.getServerName() + ":" + request.getServerPort() 7 + path + "/"; 8 %> 9 10 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> 11 <html> 12 <head> 13 <base href="<%=basePath%>"> 14 15 <title>新浪新聞搜索</title> 16 </head> 17 18 <body> 19 <center> 20 <% 21 List<News> allNews = (List<News>)request.getAttribute("allNews"); 22 %> 23 <table width="80%"> 24 <% 25 for (News n : allNews) { 26 27 %> 28 <tr> 29 <td> 30 <a href="<%=n.getUrl() %>" target="_blank"><%=n.getTitle() %></a> <br> 31 <%=n.getDescription() %> 32 <hr/> 33 </td> 34 </tr> 35 <% 36 } 37 38 %> 39 </table> 40 41 <% 42 int cp = (Integer)request.getAttribute("currentPage"); 43 int allPages = (Integer)request.getAttribute("allPages"); 44 %> 45 <form id="split_page_form" action="SearchServlet" method="post"> 46 <input type="hidden" name="currentPage" id="cp" value="<%=cp %>" /> 47 <input type="button" <%=cp == 1?"disabled":"" %> value="首頁" onclick="changeCp(1);"> 48 <input type="button" <%=cp == 1?"disabled":"" %> value="上一頁" onclick="changeCp(<%=cp - 1 %>);"> 49 <input type="button" <%=cp == allPages?"disabled":"" %> value="下一頁" onclick="changeCp(<%=cp + 1 %>);"> 50 <input type="button" <%=cp == allPages?"disabled":"" %> value="尾頁" onclick="changeCp(<%=allPages %>);"> 51 第 <%=cp %> 頁 / 共 <%=allPages %> 頁 52 <br> 53 請輸入查詢關鍵字:<input type="text" name="keyword" value="<%=request.getParameter("keyword")%>"> 54 <input type="submit" value="查詢"> 55 </form> 56 <script type="text/javascript"> 57 function changeCp(newcp) { 58 // 改變當前頁數 59 document.getElementById("cp").value = newcp; 60 // 提交表單 61 document.getElementById("split_page_form").submit(); 62 } 63 </script> 64 65 </center> 66 </body> 67 </html>
總結:
這個項目看起來很簡單,可是囊括了不少知識,包括
1’ java的多線程處理,接口及方法實現;
2‘ 數據庫的基礎操做及代碼鏈接與表操做;
3’ 網絡爬蟲進行數據的收集,包含兩種方法(僅我會的,這個是難點):
(1)java代碼實現(正則表達式或者Jsoup)
(2)Heritrix工具實現(抑或其餘工具)
4‘ Hadoop的基本配置和HDFS於eclipse的配置(後續闡述);
5’ HDFS的文件存取、MapReduce方法的編寫(這個是難點);
6‘ DAO設計模式的代碼實現;
7’ Jsp/Servlet的基礎知識,以及JSTL和EL的瞭解
該項目實現的功能包含:
1‘ 數據收集(網絡爬蟲);
2’ 數據保存(DAO,數據庫及HDFS);
3‘ 數據分析/規約(MapReduce);
4’ 搜索引擎(jsp/servlet,jstl/el);
5‘ web呈現(web項目)
這是一個簡單的大數據搜索引擎的實現,綜合性較強,須要多多閱讀學習。
此外,還有一些缺陷未能實現,會有些麻煩,一個在編碼格式上,一個在搜索的關鍵字交叉搜索上。