Lucene搜索引擎+HDFS+MR完成垂直搜索

  介於上一篇的java實現網絡爬蟲基礎之上,這一篇的思想是將網絡收集的數據保存到HDFS和數據庫(Mysql)中;而後用MR對HDFS的數據進行索引處理,處理成倒排索引;搜索時先用HDFS創建好的索引來搜索對應的數據ID,根據ID從數據庫中提取數據,呈現到網頁上。javascript

   這是一個完整的集合網絡爬蟲、數據庫、HDFS、MapReduce、DAO設計模式、JSP/Servlet的項目,完成了數據收集、數據分析、數據索引並分頁呈現。html

  完整的代碼呈現,但願認真仔細閱讀。java

------>node

目錄:mysql

一、搜索引擎闡述git

二、數據庫表創建web

三、使用DAO設計模式進行數據庫操做正則表達式

【Ⅰ】數據庫鏈接類DataBaseConnectionsql

【Ⅱ】表單元素的封裝類News數據庫

【Ⅲ】編寫DAO接口INewsDAO

【Ⅳ】DAO接口的實現類NewsDAOImp類

【Ⅴ】工廠類DAOFactory類

四、網絡爬蟲實現★★ 【參考博客《java實現網絡爬蟲》《Heritrix實現網絡爬蟲》

五、MR(MapReduce)對HDFS數據進行索引處理★★

六、實現搜索引擎

【Ⅰ】建立web項目,編寫測試用例,測試是否能夠讀取HDFS的數據內容

【Ⅱ】 編寫index首頁

【Ⅲ】處理HDFS查詢的操做

【Ⅳ】servlet類搜索結果向頁面傳遞

【Ⅴ】結果呈現,實現分頁

七、總結

------>

 

一、搜索引擎闡述

搜索引擎的執行流程:

1) 經過爬蟲來將數據下載到本地

2) 將數據提取出來保存到HDFS和數據庫中(MySQL

3) 經過MR來對HDFS的數據進行索引處理,處理成爲倒排索引

4) 搜索時先使用HDFS創建好的索引來搜索對應的數據ID,再根據ID來從MySQL數據庫中提取數據的具體信息。

5) 能夠完成分頁等操做。

 

倒排索引對應的就是正排索引,正排索引指的就是MySQL數據庫中id的索引。

而倒排索引的目的是能夠根據關鍵字查詢出該關鍵字對應的數據id

 

這裏就須要用到MySQL數據庫,以及經過Java EE版的Eclipse來完成網站的開發。

爲了開發起來更方便,咱們這裏使用MyEclipse來完成。

二、數據庫表創建

先安裝好MySQL數據庫。

 

安裝時,注意編碼選擇gbk

 

經過控制檯的mysql -u用戶名 –p密碼 便可登陸mysql數據庫。

以後使用show databases能夠看到全部的數據庫。

使用create database 能夠創建一個新的庫。

使用 use 庫名 ,能夠切換到另外一個庫。

使用show tables能夠看到一個庫下的全部表。

以後就能夠經過普通的sql語句來創建表和進行數據的操做了。

 

在進行數據庫操做時,企業開發中一定要使用DAOData Access Object)設計模式

組成以下:

1) DataBaseConnection:創建數據庫鏈接

2) VO:與表對應的數據對象

3) DAO接口:規範操做方法

4) DAOImpl:針對DAO接口進行方法實現。

5) Factory:用來創建DAO接口對象。

 

首先根據需求,將數據庫表創建出來,這裏只需創建一個簡單的news新聞表,用於存儲網絡上爬取得數據。

1 CREATE TABLE news (
2     id                int                primary key ,
3     title            varchar(200)    not null,
4     description        text            ,
5     url                varchar(200)
6 );

 

三、使用DAO設計模式進行數據庫操做

根據上述的DAO設計模式,咱們須要編寫相關操做類,來完成數據庫的操做。

【Ⅰ】  數據庫鏈接類DataBaseConnection,須要導入jar包:

 1 package org.liky.sina.dbc;
 2 
 3 import java.sql.Connection;
 4 import java.sql.DriverManager;
 5 import java.sql.SQLException;
 6 
 7 /**
 8  * 鏈接數據庫mysql的sina_news
 9  * @author k04
10  *
11  */
12 public class DataBaseConnection {
  //此處能夠試着兩種表達加載類的方法
13 // private static final String DBORIVER="org.git.mm.mysql.Driver"; 14 private static final String DBORIVER="com.mysql.jdbc.Driver"; 15 private static final String DBURL="jdbc:mysql://localhost:3306/sina_news"; 16 private static final String DBUSER="root"; 17 private static final String DBPASSWORD="admin"; 18 19 private Connection conn; 20 /** 21 * 建立數據庫鏈接 22 * @return 23 */ 24 public Connection getConnection(){ 25 try { 26 if(conn==null||conn.isClosed()){ 27 //創建一個新的鏈接 28 Class.forName(DBORIVER); 29 conn=DriverManager.getConnection(DBURL, DBUSER, DBPASSWORD); 30 //System.out.println("success to connect!"); 31 } 32 }catch (ClassNotFoundException e) { 33 // TODO Auto-generated catch block 34 e.printStackTrace(); 35 } catch (SQLException e) { 36 // TODO Auto-generated catch block 37 e.printStackTrace(); 38 } 39 return conn; 40 } 41 /* 42 * 關閉鏈接 43 */ 44 public void close(){ 45 if(conn!=null){ 46 try{ 47 conn.close(); 48 }catch(SQLException e){ 49 e.printStackTrace(); 50 } 51 } 52 } 53 54 }

 【Ⅱ】表單元素的封裝類News,根據建表時設定的四種元素id,title,description,url,將網絡爬取得內容完整的導入數據庫中,此處能夠用shift+Alt+S在Eclipse快捷建立封裝類:

 1 package org.liky.sina.vo;
 2 /**
 3  * news封裝類
 4  * @author k04
 5  *
 6  */
 7 public class News {
 8     private Integer id;
 9     private String title;
10     private String description;
11     private String url;
12     
13     
14     public News() {
15     }
16     public News(Integer id,String title,String description,String url) {
17         this.id=id;
18         this.title=title;
19         this.description=description;
20         this.url=url;
21     }
22     
23     
24     public Integer getId() {
25         return id;
26     }
27     public void setId(Integer id) {
28         this.id = id;
29     }
30     public String getTitle() {
31         return title;
32     }
33     public void setTitle(String title) {
34         this.title = title;
35     }
36     public String getDescription() {
37         return description;
38     }
39     public void setDescription(String description) {
40         this.description = description;
41     }
42     public String getUrl() {
43         return url;
44     }
45     public void setUrl(String url) {
46         this.url = url;
47     }
48 }

 

 【Ⅲ】編寫DAO接口INewsDAO,存放數據庫操做類的方法名:

 1 package org.liky.sina.dao;
 2 /**
 3  * 接口,呈現三個方法
 4  */
 5 import java.util.List;
 6 import org.liky.sina.vo.News;
 7 
 8 public interface INewsDAO {
 9     /**
10      * 添加數據
11      * @param news    要添加的對象
12      * @throws Exception
13      */
14     public void doCreate(News news)throws Exception;
15     /**
16      * 根據主鍵id查詢數據
17      * 
18      */
19     public News findById(int id)throws Exception;
20     /**
21      * 根據一組id查詢全部結果
22      * @param ids    全部要查詢的id
23      * @return    查詢到的數據
24      * 由於索引是根據熱詞查到一堆的id
25      */
26     public List<News> findByIds(int[] ids)throws Exception;
27     
28 }

 

 【Ⅳ】DAO接口的實現類NewsDAOImp類:

 1 package org.liky.sina.dao.impl;
 2 /**
 3  * 繼承INewsDAO接口
 4  * 實現三個方法,插入數據,查找指定id數據,查找一組id數據
 5  */
 6 import java.sql.PreparedStatement;
 7 import java.sql.ResultSet;
 8 import java.util.ArrayList;
 9 import java.util.List;
10 
11 import org.liky.sina.dao.INewsDAO;
12 import org.liky.sina.dbc.DataBaseConnection;
13 import org.liky.sina.vo.News;
14 
15 public class NewsDAOImpl implements INewsDAO {
16     //聲明一個數據庫鏈接類對象
17     private DataBaseConnection dbc;
18     
19     
20     //構造器,參數爲數據庫鏈接類對象
21     public NewsDAOImpl(DataBaseConnection dbc) {
22         this.dbc=dbc;
23         
24     }
25 
26     @Override
27     public void doCreate(News news) throws Exception {
28         // TODO Auto-generated method stub
29         String sql="INSERT INTO news (id,title,description,url) VALUES (?,?,?,?)";
30         PreparedStatement pst=dbc.getConnection().prepareStatement(sql);
31         //設置參數
32         pst.setInt(1, news.getId());
33         pst.setString(2, news.getTitle());
34         pst.setString(3, news.getDescription());
35         pst.setString(4, news.getUrl());
36         
37         pst.executeUpdate();
38         System.out.println("create success.");
39     }
40 
41     @Override
42     public News findById(int id) throws Exception {
43         // TODO Auto-generated method stub
44         String sql="SELECT id,title,description,url FROM news WHERE id = ?";
45         PreparedStatement pst=dbc.getConnection().prepareStatement(sql);
46         pst.setInt(1, id);
47         ResultSet rs=pst.executeQuery();
48         News news=null;
49         //將符合id的數據遍歷寫入news並返回
50         if(rs.next()){
51             news=new News();
52             news.setId(rs.getInt(1));
53             news.setTitle(rs.getString(2));
54             news.setDescription(rs.getString(3));
55             news.setUrl(rs.getString(4));
56         }
57         //System.out.println("find success.");
58         return news;
59     }
60 
61     @Override
62     public List<News> findByIds(int[] ids) throws Exception {
63         // TODO Auto-generated method stub
64         StringBuilder sql=new StringBuilder("SELECT id,title,description,url FROM news WHERE id IN (");
65         //將id寫入ids,並用逗號隔開
66         if(ids!=null&&ids.length>0){
67             for(int id:ids){
68                 sql.append(id);
69                 sql.append(",");
70             }
71             //截取最後一個逗號,並補上括號
72             String resultSQL=sql.substring(0, sql.length()-1)+")";
73             
74             PreparedStatement pst=dbc.getConnection().prepareStatement(resultSQL);
75             ResultSet rs=pst.executeQuery();
76             //存取一組id到鏈表中
77             List<News> list=new ArrayList<>();
78             while(rs.next()){
79                 News news=new News();
80                 news.setId(rs.getInt(1));
81                 news.setTitle(rs.getString(2));
82                 news.setDescription(rs.getString(3));
83                 news.setUrl(rs.getString(4));
84                 list.add(news);
85             }
86         }
87         //System.out.println("find success.");
88         return null;
89     }
90 
91 }

 

【Ⅴ】工廠類DAOFactory類,此類寫入了數據庫鏈接類參數,返回DAO實現類對象:

   java中,咱們一般有如下幾種建立對象的方式:

       (1) 使用new關鍵字直接建立對象;

       (2) 經過反射機制建立對象;

       (3) 經過clone()方法建立對象;

       (4) 經過工廠類建立對象。

 1 package org.liky.sina.factory;
 2 /**
 3  * 工廠類
 4  * 輸入一個鏈接數據庫對象的參數,返回數據庫表操做的類
 5  */
 6 import org.liky.sina.dao.INewsDAO;
 7 import org.liky.sina.dao.impl.NewsDAOImpl;
 8 import org.liky.sina.dbc.DataBaseConnection;
 9 
10 public class DAOFactory {
11     public static INewsDAO getINewsDAOInstance(DataBaseConnection dbc){
12         return new NewsDAOImpl(dbc);
13     }
14 }

 

四、網絡爬蟲實現

  如今編寫整個項目的重點,編寫URLDemo類,在爬蟲中進行數據庫的操做以及HDFS的寫入:

  a'  關於此類,在網頁解析時用了簡單的Jsoup,並無如《java網絡爬蟲》用正則表達式,因此須要導入jsoup的jar包  

  b'  關於HDFS在eclipse的配置以及本機的鏈接,我後續博客會闡述,也能夠網絡查詢方法;

  c'  這個類也是執行類,我收集的是新浪新聞網的數據,爬取深度爲5,設置線程數5,而且篩選了只有連接含有「sian.news.com.cn」的。

  d'  網絡爬蟲我講了兩種方法:(1)java代碼實現網絡爬蟲

                    (2)Heritrix工具實現網絡爬蟲

  此處我仍是選擇了直接寫代碼實現,自由度高也方便讀寫存取。

  1 package org.liky.sina.craw;
  2 
  3 import java.util.ArrayList;
  4 import java.util.HashMap;
  5 import java.util.HashSet;
  6 import java.util.List;
  7 import java.util.Map;
  8 import java.util.Set;
  9 
 10 import org.apache.hadoop.conf.Configuration;
 11 import org.apache.hadoop.fs.FSDataOutputStream;
 12 import org.apache.hadoop.fs.FileSystem;
 13 import org.apache.hadoop.fs.Path;
 14 import org.jsoup.Jsoup;
 15 import org.jsoup.nodes.Document;
 16 import org.jsoup.nodes.Element;
 17 import org.jsoup.select.Elements;
 18 import org.liky.sina.dao.INewsDAO;
 19 import org.liky.sina.dbc.DataBaseConnection;
 20 import org.liky.sina.factory.DAOFactory;
 21 import org.liky.sina.vo.News;
 22 
 23 /**
 24  * 爬蟲開始進行數據庫操做以及HDFS寫入
 25  * 
 26  * @author k04
 27  * 
 28  */
 29 public class URLDemo {
 30     // 該對象的構造方法會默認加載hadoop中的兩個配置文件,hdfs-site.xml和core-site.xml
 31     // 這兩個文件包含訪問hdfs所需的參數值
 32     private static Configuration conf = new Configuration();
 33 
 34     private static int id = 1;
 35 
 36     private static FileSystem fs;
 37 
 38     private static Path path;
 39 
 40     // 等待爬取的url
 41     private static List<String> allWaitUrl = new ArrayList<>();
 42     // 已經爬取的url
 43     private static Set<String> allOverUrl = new HashSet<>();
 44     // 記錄全部url的深度,以便在addUrl方法內判斷
 45     private static Map<String, Integer> allUrlDepth = new HashMap<>();
 46     // 爬取網頁的深度
 47     private static int maxDepth = 5;
 48     // 聲明object獨享幫助進行線程的等待操做
 49     private static Object obj = new Object();
 50     // 設置總線程數
 51     private static final int MAX_THREAD = 20;
 52     // 記錄空閒的線程數
 53     private static int count = 0;
 54 
 55     // 聲明INewsDAO對象,
 56     private static INewsDAO dao;
 57 
 58     static {
 59         dao = DAOFactory.getINewsDAOInstance(new DataBaseConnection());
 60     }
 61 
 62     public static void main(String args[]) {
 63         // 爬取的目標網址
 64         String strUrl = "http://news.sina.com.cn/";
 65 
 66         // 爬取第一個輸入的url
 67         addUrl(strUrl, 0);
 68         // 創建多個線程
 69         for (int i = 0; i < MAX_THREAD; i++) {
 70             new URLDemo().new MyThread().start();
 71         }
 72 
 73         // DataBaseConnection dc=new DataBaseConnection();
 74         // dc.getConnection();
 75 
 76     }
 77 
 78     public static void parseUrl(String strUrl, int depth) {
 79         // 先判斷當前url是否爬取過
 80         // 判斷深度是否符合要求
 81         if (!(allOverUrl.contains(strUrl) || depth > maxDepth)) {
 82             System.out.println("當前執行的  " + Thread.currentThread().getName()
 83                     + "  爬蟲線程處理爬取: " + strUrl);
 84 
 85             try {
 86                 // 用jsoup進行數據爬取
 87                 Document doc = Jsoup.connect(strUrl).get();
 88                 // 經過doc接受返回的結果
 89                 // 提取有效的title和description
 90                 String title = doc.title();
 91                 Element descE = doc.getElementsByAttributeValue("name",
 92                         "description").first();
 93                 String desc = descE.attr("content");
 94 
 95                 // System.out.println(title + " --> " + desc);
 96 
 97                 // 若是有效,則驚醒保存
 98                 if (title != null && desc != null && !title.trim().equals("")
 99                         && !desc.trim().equals("")) {
100                     // 須要生成一個id,以便放入數據庫中,所以id也要加入到HDFS中,便於後續索引
101                     News news = new News();
102                     news.setId(id++);
103                     news.setTitle(title);
104                     news.setDescription(desc);
105                     news.setUrl(strUrl);
106                     // 添加到數據庫語句
107                     dao.doCreate(news);
108                     // 向HDFS保存數據
109                     path = new Path("hdfs://localhost:9000/sina_news_input/"
110                             + System.currentTimeMillis() + ".txt");
111                     fs = path.getFileSystem(conf);
112                     FSDataOutputStream os = fs.create(path);
113                     // 進行內容輸出,此處須要用news.getId(),否則數據庫和HDFS的id會不相同,由於多線程的運行
114                     os.writeUTF(news.getId() + "\r\n" + title + "\r\n" + desc);
115                     os.close();
116 
117                     // 解析全部超連接
118                     Elements aEs = doc.getElementsByTag("a");
119                     // System.out.println(aEs);
120                     if (aEs != null && aEs.size() > 0) {
121                         for (Element aE : aEs) {
122                             String href = aE.attr("href");
123                             System.out.println(href);
124                             // 截取網址,並給出篩選條件!!!
125                             if ((href.startsWith("http:") || href
126                                     .startsWith("https:"))
127                                     && href.contains("news.sina.com.cn")) {
128                                 // 調用addUrl()方法
129                                 addUrl(href, depth + 1);
130                             }
131                         }
132                     }
133 
134                 }
135 
136             } catch (Exception e) {
137 
138             }
139             // 吧當前爬完的url放入到偶爾中
140             allOverUrl.add(strUrl);
141             System.out.println(strUrl + "爬去完成,已經爬取的內容量爲:" + allOverUrl.size()
142                     + "剩餘爬取量爲:" + allWaitUrl.size());
143 
144             // 判斷是否集合中海油其餘的內容須要進行爬取,若是有,則進行線程的喚醒
145             if (allWaitUrl.size() > 0) {
146                 synchronized (obj) {
147                     obj.notify();
148                 }
149             } else {
150                 System.out.println("爬取結束...");
151                 System.exit(0);
152             }
153 
154         }
155     }
156 
157     /**
158      * url加入到等待隊列中 並判斷是否已經放過,若沒有就放入allUrlDepth中
159      * 
160      * @param href
161      * @param depth
162      */
163     public static synchronized void addUrl(String href, int depth) {
164         // 將url放入隊列中
165         allWaitUrl.add(href);
166         // 判斷url是否已經存在
167         if (!allUrlDepth.containsKey(href)) {
168             allUrlDepth.put(href, depth + 1);
169         }
170     }
171 
172     /**
173      * 獲取等待隊列下一個url,並從等待隊列中移除
174      * 
175      * @return
176      */
177     public static synchronized String getUrl() {
178         if (allWaitUrl.size() > 0) {
179             String nextUrl = allWaitUrl.get(0);
180             allWaitUrl.remove(0);
181             return nextUrl;
182         }
183         return null;
184     }
185 
186     /**
187      * 用多線程進行url爬取
188      * 
189      * @author k04
190      * 
191      */
192     public class MyThread extends Thread {
193 
194         @Override
195         public void run() {
196             // 編寫一個死循環,以便線程能夠一直存在
197             while (true) {
198                 //
199 
200                 String url = getUrl();
201                 if (url != null) {
202                     // 調用該方法爬取url的數據
203                     parseUrl(url, allUrlDepth.get(url));
204                 } else {
205                     System.out.println("當前線程準備就緒,等待鏈接爬取:" + this.getName());
206                     // 線程+1
207                     count++;
208                     // 創建一個對象,幫助線程進入等待狀態wait()
209                     synchronized (obj) {
210                         try {
211                             obj.wait();
212                         } catch (Exception e) {
213                             e.printStackTrace();
214                         }
215                         // 線程-1
216                         count--;
217                     }
218                 }
219             }
220         }
221 
222     }
223 
224 }

 如今執行上述類URLDemo,該類執行流程:

    1‘  讀取源連接,分配線程任務;

    2’  數據根據DAO設計模式寫入到數據庫中;

    3‘  數據上傳到HDFS的input文件夾中;

 

   執行完能夠查看HDFS和數據庫的結果:

    

 

 

關於此處,仍是有些許瑕疵,雙方的數據量並不相等,主要是多線程爬取的時候,id的傳輸並不相同!

 

將重複的數據進行清洗

當數據爬取完成後,咱們會發現有不少重複的數據,這能夠經過SQL語句來進行清洗的操做。

 

SQL中可使用group by關鍵字來完成分組函數的處理。

咱們能夠先經過該函數來測試一下。

測試後會發現真正有效數據量應該是4634條。

這就須要咱們經過創建一張新表,把全部不重複的數據加入到新表中。

mysql支持create table時經過select語句來查詢出一些結果做爲新表的結構和數據。

CREATE TABLE new_news SELECT id,title,description,url FROM news WHERE id IN (SELECT min(id) FROM news GROUP BY title)

 

 

五、MR(MapReduce)對HDFS數據進行索引處理

下面就能夠開始編寫MR程序。

Map格式,要求key爲關鍵字,valueid

因爲一個關鍵字可能會對應多個id,因此id之間咱們想使用,來分隔。因此keyvalue的類型都應該是StringText

  1 package org.liky.sina.index;
  2 /**
  3  * 編寫MR程序。
  4     Map格式,要求key爲關鍵字,value爲id。
  5     因爲一個關鍵字可能會對應多個id,因此id之間咱們想使用,來分隔。因此key和value的類型都應該是String(Text)
  6  */
  7 import java.io.IOException;
  8 import java.util.*;
  9 import java.util.regex.*;
 10 import jeasy.analysis.MMAnalyzer;
 11 import org.apache.hadoop.conf.*;
 12 import org.apache.hadoop.fs.Path;
 13 import org.apache.hadoop.io.*;
 14 import org.apache.hadoop.mapred.*;
 15 import org.apache.hadoop.util.*;
 16 
 17 
 18 public class IndexCreater extends Configured implements Tool {
 19 
 20     public static void main(String[] args) throws Exception {
 21 int res = ToolRunner.run(new Configuration(), new IndexCreater(), 22 new String[] { "hdfs://localhost:9000/sina_news_input/", 23 "hdfs://localhost:9000/output_news_map/" });  24         System.exit(res);
 25     }
 26 
 27     
 28     
 29     //Mapper接口中的後兩個泛型參數,第一個表示返回後Map的key的類型,第二個表示返回後的value類型
 30     public static class MapClass extends MapReduceBase implements Mapper<LongWritable,Text,Text,Text>{
 31         
 32         private static Pattern p;
 33         
 34         static{
 35             System.out.println("開始Map操做.....");
 36             p=Pattern.compile("\\d+");
 37         }
 38         
 39         private int id;
 40         private int line=1;
 41         
 42         private static MMAnalyzer mm=new MMAnalyzer();
 43         
 44         //輸出的詞
 45         private Text word=new Text();
 46 
 47         //map過程的核心方法
 48         @Override
 49         public void map(LongWritable key, Text value,
 50                 OutputCollector<Text, Text> output, Reporter reporter)
 51                 throws IOException {
 52             if (line == 1) {
 53                 // 讀取的是第一行,咱們就須要將第一行的id保留下來
 54                 line++;
 55                 Matcher m = p.matcher(value.toString());
 56                 if (m.find()) {
 57                     id = Integer.parseInt(m.group());
 58                 }
 59             } else {
 60                 String tempStr = value.toString();
 61                 // 按空格將單詞拆分出來
 62                 // StringTokenizer itr = new StringTokenizer(line);
 63                 // 使用分詞器來進行詞組的拆分
 64                 String[] results = mm.segment(tempStr, "|").split("\\|");
 65                 // 每一個單詞記錄出現了1次
 66                 for (String temp : results) {
 67                     word.set(temp.toLowerCase());
 68                     output.collect(word, new Text(id + ""));
 69                 }
 70             }
 71             
 72         }
 73         
 74         
 75         
 76 
 77     }
 78     
 79     
 80     //對全部的結果進行規約,合併
 81     //Reducer中也有泛型,前兩個表示Map過程輸出的結果類型,後兩個表示Reduce處理後輸出的類型
 82     public static class Reduce extends MapReduceBase implements Reducer<Text,Text,Text,Text>{
 83 
 84         static{
 85             System.out.println("開始reduce操做.....");
 86         }
 87         @Override
 88         public void reduce(Text key, Iterator<Text> values,
 89                 OutputCollector<Text, Text> output, Reporter repoter)
 90                 throws IOException {
 91             //將全部key值相同的結果,求和
 92             StringBuilder result=new StringBuilder();
 93             while(values.hasNext()){
 94                 //存在一個key相同的,加入result
 95                 String temp=values.next().toString();
 96                 if(!result.toString().contains(temp+",")){
 97                 result.append(temp+",");
 98             }
 99                 
100             }
101             //將其規約
102             output.collect(key, new Text(result.substring(0, result.length()-1)));
103             //輸出key相同的id值
104             System.out.println(key+"---->"+result);
105         }
106         
107         
108         
109     }
110 
111     static int printUsage() {
112         System.out
113                 .println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
114         ToolRunner.printGenericCommandUsage(System.out);
115         return -1;
116     }
117 
118     
119     @Override
120     public int run(String[] args) throws Exception {
121         // TODO Auto-generated method stub
122         JobConf conf = new JobConf(getConf(), IndexCreater.class);
123         conf.setJobName("wordcount");
124 
125         // 輸出結果的Map的key值類型
126         conf.setOutputKeyClass(Text.class);
127         // 輸出結果的Map的value值類型
128         conf.setOutputValueClass(Text.class);
129 
130         conf.setMapperClass(MapClass.class);
131         conf.setCombinerClass(Reduce.class);
132         conf.setReducerClass(Reduce.class);
133 
134         List<String> other_args = new ArrayList<String>();
135         for (int i = 0; i < args.length; ++i) {
136             try {
137                 if ("-m".equals(args[i])) {
138                     conf.setNumMapTasks(Integer.parseInt(args[++i]));
139                 } else if ("-r".equals(args[i])) {
140                     conf.setNumReduceTasks(Integer.parseInt(args[++i]));
141                 } else {
142                     other_args.add(args[i]);
143                 }
144             } catch (NumberFormatException except) {
145                 System.out.println("ERROR: Integer expected instead of "
146                         + args[i]);
147                 return printUsage();
148             } catch (ArrayIndexOutOfBoundsException except) {
149                 System.out.println("ERROR: Required parameter missing from "
150                         + args[i - 1]);
151                 return printUsage();
152             }
153         }
154         // Make sure there are exactly 2 parameters left.
155         if (other_args.size() != 2) {
156             System.out.println("ERROR: Wrong number of parameters: "
157                     + other_args.size() + " instead of 2.");
158             return printUsage();
159         }
160         // 設置輸出結果按照什麼格式保存,以便後續使用。
161         conf.setOutputFormat(MapFileOutputFormat.class);
162         // 輸入文件的HDFS路徑
163         FileInputFormat.setInputPaths(conf, other_args.get(0));
164         // 輸出結果的HDFS路徑
165         FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
166 
167         JobClient.runJob(conf);
168 
169         return 0;
170     }
171 
172 }

 

 六、實現搜索引擎

【Ⅰ】建立web項目,編寫測試用例,測試是否能夠讀取HDFS的數據內容

在安裝好的MyEclipse開發工具中,開始編寫搜索引擎展現部分的內容。

這裏先使用普通的JSP + Servlet的模式來完成程序的編寫。

首先創建一個普通的Web項目。

以後,在裏面編寫一個測試用例,測試是否能夠讀取HDFS中的數據內容,注意須要先將hadoop/lib目錄下的全部jar包,以及hadoop根目錄下的支持jar包拷貝到項目WEB-INF目錄下的lib目錄中。

注意!!!完成上面數據的收集和分析以後,如今讀取的內容須要從通過MR處理以後存儲在HDFS的sina_new_ouputwenjianjia內讀取。

 1 package org.liky.sina.test;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.FileSystem;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.io.MapFile.Reader;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapred.MapFileOutputFormat;
 9 import org.junit.Test;
10 
11 public class TestCaseSina {
12 
13     @Test
14     public void test() throws Exception {
15         Configuration conf = new Configuration();
16         Path path = new Path("hdfs://localhost:9000/output_news_map/");
17         FileSystem fs = path.getFileSystem(conf);
18         Reader reader = MapFileOutputFormat.getReaders(fs, path, conf)[0];
19         Text value = (Text) reader.get(new Text("印度"), new Text());
20         
21         System.out.println(value);
22 
23     }
24 }

 

【Ⅱ】 編寫index首頁

將以前寫好的DAO代碼也拷貝到項目中,以便之後查詢數據庫使用。

以後編寫一個jsp頁面,用來接收用戶輸入的查詢關鍵字。

 此處我就從簡了   O(∩_∩)O

1 <body>
2     <center>
3         <form action="SearchServlet" method="post">
4             請輸入查詢關鍵字:
5             <input type="text" name="keyword"> 
6             <input type="submit" value="查詢">
7         </form>    
8     </center>
9 </body>

 

 【Ⅲ】處理HDFS查詢的操做  

根據設置好的路徑,創建一個SearchServlet,並完成doGetdoPost方法

在這個Servlet中會用處處理HDFS查詢的操做方法,所以咱們須要單獨聲明一個HDFSUtils工具類,來幫助咱們實現查詢的功能。

 

 1 package org.liky.sina.utils;
 2 
 3 import java.io.IOException;
 4 import java.util.Set;
 5 import java.util.TreeSet;
 6 
 7 import org.apache.hadoop.conf.Configuration;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.MapFile.Reader;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapred.MapFileOutputFormat;
13 
14 public class HDFSUtils {
15         //鏈接hadoop的配置
16     private static Configuration conf = new Configuration();
17        //建立須要讀取數據的hdfs路徑
18     private static Path path = new Path(
19             "hdfs://localhost:9000/output_news_map/");
20 
21     private static FileSystem fs = null;
22 
23     static {
24         try {
25             fs = path.getFileSystem(conf);
26         } catch (IOException e) {
27             e.printStackTrace();
28         }
29     }
30 
31     public static Integer[] getIdsByKeyword(String keyword) throws Exception {
32 
33         Reader reader = MapFileOutputFormat.getReaders(fs, path, conf)[0];
34         Text value = (Text) reader.get(new Text(keyword), new Text());
35                //set存放關鍵詞搜索的一組id
36         Set<Integer> set = new TreeSet<Integer>();
37         String[] strs = value.toString().split(",");
38 
39         for (String str : strs) {
40             set.add(Integer.parseInt(str));
41         }
42         
43         return set.toArray(new Integer[0]);
44     }
45 
46 }

 

 

 【Ⅳ】servlet類搜索結果向頁面傳遞

Servlet中經過調用HDFSUtils和以前寫過的DAO方法,便可查詢到結果並設置向頁面傳遞。

此處將關鍵詞搜索的結果呈現到result.jsp界面,因此下面就是編寫該界面呈現最終結果。

 

 1 package org.liky.sina.servlet;
 2 
 3 import java.io.IOException;
 4 import java.util.List;
 5 
 6 import javax.servlet.ServletException;
 7 import javax.servlet.http.HttpServlet;
 8 import javax.servlet.http.HttpServletRequest;
 9 import javax.servlet.http.HttpServletResponse;
10 
11 import org.liky.sina.dbc.DataBaseConnection;
12 import org.liky.sina.factory.DAOFactory;
13 import org.liky.sina.utils.HDFSUtils;
14 import org.liky.sina.vo.News;
15 
16 public class SearchServlet extends HttpServlet {
17 
18     public void doGet(HttpServletRequest request, HttpServletResponse response)
19             throws ServletException, IOException {
20         this.doPost(request, response);
21     }
22 
23     public void doPost(HttpServletRequest request, HttpServletResponse response)
24             throws ServletException, IOException {
25         // 接收提交的查詢關鍵字參數
26         // 先處理亂碼
27         request.setCharacterEncoding("UTF-8");
28         // 接收參數
29         String keyword = request.getParameter("keyword");
30         // 根據關鍵字進行查詢。
31         try {
32             Integer[] ids = HDFSUtils.getIdsByKeyword(keyword);
33             // 根據這些id來查詢出相應的結果
34             List<News> allNews = DAOFactory.getINewsDAOInstance(
35                     new DataBaseConnection()).findByIds(ids);
36 
37             // 將結果傳遞迴頁面顯示
38             request.setAttribute("allNews", allNews);
39 
40             // 切換到頁面上
41             request.getRequestDispatcher("/result.jsp").forward(request,
42                     response);
43         } catch (Exception e) {
44             e.printStackTrace();
45         }
46 
47     }
48 }

 

 

【Ⅴ】結果呈現,實現分頁

 

最後,咱們須要在頁面上將結果呈現出來,在web根目錄下編寫result.jsp文件。

能夠經過JSTL + EL來完成內容的輸出。

 

 1 <%@page import="org.liky.sina.vo.News"%>
 2 <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
 3 <%
 4     String path = request.getContextPath();
 5     String basePath = request.getScheme() + "://"
 6             + request.getServerName() + ":" + request.getServerPort()
 7             + path + "/";
 8 %>
 9 
10 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
11 <html>
12 <head>
13 <base href="<%=basePath%>">
14 
15 <title>新浪新聞搜索</title>
16 </head>
17 
18 <body>
19     <center>
20         <%
21             List<News> allNews = (List<News>)request.getAttribute("allNews");
22         %>
23         <table width="80%">
24             <%
25                 for (News n : allNews) {
26             
27             %>
28                 <tr>
29                     <td>
30                         <a href="<%=n.getUrl() %>" target="_blank"><%=n.getTitle() %></a> <br>
31                         <%=n.getDescription() %>
32                         <hr/>
33                     </td>
34                 </tr>
35             <%
36                 }
37                 
38             %>
39         </table>
40         
41         <%
42             int cp = (Integer)request.getAttribute("currentPage");
43             int allPages = (Integer)request.getAttribute("allPages");
44         %>
45         <form id="split_page_form" action="SearchServlet" method="post">
46             <input type="hidden"  name="currentPage" id="cp" value="<%=cp %>" />
47             <input type="button" <%=cp == 1?"disabled":"" %> value="首頁" onclick="changeCp(1);">
48             <input type="button" <%=cp == 1?"disabled":"" %> value="上一頁" onclick="changeCp(<%=cp - 1 %>);">
49             <input type="button" <%=cp == allPages?"disabled":"" %> value="下一頁" onclick="changeCp(<%=cp + 1 %>);">
50             <input type="button" <%=cp == allPages?"disabled":"" %> value="尾頁" onclick="changeCp(<%=allPages %>);">
51             第 <%=cp %>  頁 / 共 <%=allPages %>52             <br>
53             請輸入查詢關鍵字:<input type="text" name="keyword" value="<%=request.getParameter("keyword")%>">
54             <input type="submit" value="查詢">
55         </form>
56         <script type="text/javascript">
57             function changeCp(newcp) {
58                 // 改變當前頁數
59                 document.getElementById("cp").value = newcp;
60                 // 提交表單
61                 document.getElementById("split_page_form").submit();
62             }
63         </script>
64         
65     </center>
66 </body>
67 </html>

 

 

 

 

 

  這個項目看起來很簡單,可是囊括了不少知識,包括

      1’  java的多線程處理,接口及方法實現;

      2‘  數據庫的基礎操做及代碼鏈接與表操做;

      3’  網絡爬蟲進行數據的收集,包含兩種方法(僅我會的,這個是難點):

          (1)java代碼實現(正則表達式或者Jsoup)

          (2)Heritrix工具實現(抑或其餘工具)

      4‘  Hadoop的基本配置和HDFS於eclipse的配置(後續闡述);

      5’  HDFS的文件存取、MapReduce方法的編寫(這個是難點);

      6‘  DAO設計模式的代碼實現;

      7’  Jsp/Servlet的基礎知識,以及JSTL和EL的瞭解

   該項目實現的功能包含:

      1‘  數據收集(網絡爬蟲);

      2’  數據保存(DAO,數據庫及HDFS);

      3‘  數據分析/規約(MapReduce);

      4’  搜索引擎(jsp/servlet,jstl/el);

      5‘  web呈現(web項目)

  這是一個簡單的大數據搜索引擎的實現,綜合性較強,須要多多閱讀學習。

  此外,還有一些缺陷未能實現,會有些麻煩,一個在編碼格式上,一個在搜索的關鍵字交叉搜索上。

相關文章
相關標籤/搜索