1、需求html
1.定時抓取固定網站新聞標題、內容、發表時間和來源。java
2.程序須要支持分佈式、多線程mysql
2、設計程序員
1.網站是固定,可是將來也可能添加新的網站去抓取,每一個網站內容節點設計都不同,這樣就須要支持動態可配置來新增網站以方便將來的擴展,這樣就須要每次都須要開發介入。sql
2.網站html節點的結構可能發生變化,因此也要支持提取節點可配置。apache
3.怎樣支持分佈式?暫時最簡單的想法就是:多機器部署程序,還有新搞一臺或者部署程序其中一臺製做一個定時任務,定時開啓每臺機器應該抓取哪一個網站,暫時不能支持同一個網站同時能夠支持被多臺機器同時抓取,這樣會比較麻煩,要用到分佈式隊列。因此暫時一個網站同時只會被單臺機器抓取。數組
4.多線程,怎樣多線程?多線程抓取我這邊有兩個實現:多線程
(1)一個線程抓取一個網站,維護一個本身的url隊列作廣度抓取,同時抓取多個網站。如圖:閉包
(2)多個線程同時抓取不一樣的網站。如圖:併發
以上兩張辦法其實各有優勢,也給有缺點,看咱們怎麼取捨了。
方法1:每一個線程建立一個本身的隊列,圖中的queue能夠不用concurrentQueue,優勢:不涉及到控制併發,每一個網站一個線程抓取一個網站,抓取完畢即自動回收銷燬線程。控制方便。缺點:線程數不能夠擴展,例如當只有3個網站,你最多隻能開3個線程來抓取,不能開更多,有必定的侷限性。
方法2:N個線程同時抓取N個網站,線程數和網站數目不掛鉤,優勢:線程數能夠調整而且和和抓取網站數量無關。3個網站咱們能夠開4個5個或者10個這個能夠根據您的硬件資源進行調整。缺點:須要控制併發,而且要控制何時銷燬線程(thread1空閒,而且queue爲空不表明任務能夠結束,可能thread2結果還沒返回),當被抓取的網站響應較慢時,會拖慢整個爬蟲進度。
3、實現
抓取方式最終仍是選擇了方法二,由於線程數可配置!
使用技術:
jfinal 用了以後才發現這東西不適合,可是因爲項目進度問題,仍是使用了。
maven項目管理
jetty server
mysql
eclipse 開發
項目須要重點攻破的難點:
(1)合理的控制N個線程正常的抓取網站,而且當全部線程工做都完成了而且須要抓取的隊列爲空時,N個線程同時退出銷燬。
(2)不一樣網站設計節點不同,須要經過配置解決各個網站須要抓取的URL和抓取節點內容在html節點的位置。
(3)個性化內容處理,因爲html結構設計問題,抓取的內容可能有些多餘的html標籤,或者多餘的內容該怎麼處理。
實現1:線程管理創建一個線程中心管理控制器,控制器負責線程的銷燬。如圖:
(1)建立一箇中心管理器,管理器存放N個線程的signal數組標記,用來標記線程是否空閒,而且建立N個標記的線程開關。用來同時結束N個線程。
(2)線程在開始抓取請求連接時把idle置爲false,抓取連接完畢以後繼續循環取隊列(循環時候判斷開關是否爲true),當從隊列poll結果爲空時,把線程置爲idle=true,而且sleep 1S(看我的愛好)。
(3)線程中心中心管理器調度一個定時檢測任務1s鍾檢測一次 線程的signal數組標記和queue.size(),當queue.size()==0而且數組標記所有都標記空閒的時候,把線程開關switcher所有置爲false(關閉,這樣線程會結束while退出)
如圖下狀況,4個線程的狀態後線程便可自行退出:
核心控制功能和線程功能代碼以下:
CoreTaskController.java
import java.util.Arrays; import java.util.Timer; import java.util.TimerTask; import org.apache.log4j.Logger; import com.crawler.core.conf.CrawlerConf; /** * 任務核心控制器 * @author Jacky * 2015-9-30 */ public class CoreTaskController { public static Logger log = Logger.getLogger(CoreTaskController.class); /** * 信號量,用來標記當前線程是否空閒 * 0表示空閒 * 1表示繁忙 * CrawlerConf.getMaxT()表示獲取最大線程數 */ public static volatile int[] signal = new int[CrawlerConf.getMaxT()];; public static volatile boolean totalSwitcher = true; /** * 線程開關 * 1表示打開 * 0表示關閉 */ public static volatile int[] switcher = new int[CrawlerConf.getMaxT()]; public static Timer timer; public static void init() { // 開啓全部線程開關 for(int i=0;i<switcher.length;i++) { switcher[i] = 1; } } public static void startSc() { if(null != timer) { timer.cancel(); } timer = new Timer(); timer.schedule(new JobTask(), 5000,1000); } static class JobTask extends TimerTask { @Override public void run() { // 當總開關被關閉,當即結束全部線程 if(!totalSwitcher) { // 關閉線程開關 for(int i=0;i<switcher.length;i++) { switcher[i] = 0; // 中止調度此任務 this.cancel(); CoreTaskController.timer.cancel(); } UrlQueue.concurrentSet.clear(); return; } /** * UrlQueue.concurrentQueue 就是用來存放 * url的隊列 */ if(UrlQueue.concurrentQueue.size() == 0 ) { boolean stop = true; for(int i : signal) { stop = stop & (i == 0); } if(stop) { // 關閉線程開關 for(int i=0;i<switcher.length;i++) { switcher[i] = 0; // 中止調度此任務 this.cancel(); CoreTaskController.timer.cancel(); } UrlQueue.concurrentSet.clear(); } } log.warn("time task 調度中..signal="+Arrays.toString(signal)+",urlQueueSize="+UrlQueue.concurrentQueue.size()+",switcher="+Arrays.toString(switcher)); } } }
ConcurrentCrawlerTask.java 線程任務代碼
import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpStatus; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import com.crawler.core.conf.HttpConf; import com.crawler.core.conf.UrlConf; import com.crawler.core.conf.WebsiteConf; import com.crawler.system.model.OriginContent; /** * 爬蟲任務類 * @author Jacky * 2015-9-29 */ public class ConcurrentCrawlerTask extends Thread { Logger log = Logger.getLogger(ConcurrentCrawlerTask.class); private int index; public ConcurrentCrawlerTask(int index,String taskName) { super(taskName); this.index = index; } @Override public void run() { UrlConf urlConf = null; System.out.println("開啓線程"+index); // 有個線程總開關,若是總開關關閉直接結束全部線程循環, // 每一個線程有個自動開關,任務控制器檢測到全部線程空閒,而且隊列爲空自動把開關改成關閉。 while(CoreTaskController.totalSwitcher && ((urlConf = UrlQueue.concurrentQueue.poll()) != null || (null == urlConf && (CoreTaskController.switcher[index] == 1)))) { if(urlConf != null) { working(); try { if(urlConf.getDepth() <= urlConf.getConf().getDepth() && !UrlQueue.concurrentSet.contains(urlConf.getHttpUrl())) { // 匹配URL是否須要抓取,這些非核心能夠無視 Pattern pattern = Pattern.compile(urlConf.getConf().getUrlReg().trim()); Matcher m = pattern.matcher(urlConf.getHttpUrl().trim()); if(urlConf.getDepth() == 1 || m.matches()) { HttpClient httpClient = new HttpClient(); httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(HttpConf.TIME_OUT); GetMethod getMethod = new GetMethod(urlConf.getHttpUrl()); initHttpParam(getMethod, urlConf.getConf()); try { int httpCode = httpClient.executeMethod(getMethod); if (httpCode != HttpStatus.SC_OK) { log.error("request url:"+urlConf.getHttpUrl() + " failed! http code = "+httpCode); continue; } BufferedReader reader = new BufferedReader(new InputStreamReader(getMethod.getResponseBodyAsStream(),urlConf.getConf().getCharset())); StringBuilder stringBuilder = new StringBuilder(); String str = null; while((str = reader.readLine())!=null){ stringBuilder.append(str); } //.. //.. } catch(Exception e) { log.error("http error!", e); } } // 用於保存該連接是否已經被訪問過 UrlQueue.concurrentSet.add(urlConf.getHttpUrl()); } //System.out.println("queue size:" + UrlQueue.concurrentQueue.size()+",depth:"+urlConf.getDepth()); } catch(Exception e) { System.out.println(e.getCause()); log.error("unknow error.",e.getCause()); } } else { idle(); System.out.println("線程"+index+"空閒"); log.info("thread:"+index+" 空閒中."); try { Thread.sleep(1000); } catch (InterruptedException e) { break; } } } System.out.println("線程"+index+",任務結束"); } private void working() { CoreTaskController.signal[index] = 1; } private void idle() { CoreTaskController.signal[index] = 0; } /** * 初始化請求參數 * @param getMethod * @param conf */ private void initHttpParam(GetMethod getMethod,WebsiteConf conf) { getMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, HttpConf.SO_TIMEOUT); getMethod.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,new DefaultHttpMethodRetryHandler()); getMethod.getParams().setParameter(HttpMethodParams.USER_AGENT, HttpConf.USER_AGENT); getMethod.getParams().setParameter(HttpMethodParams.HTTP_CONTENT_CHARSET, conf.getCharset()); getMethod.getParams().setParameter(HttpMethodParams.HTTP_URI_CHARSET, conf.getCharset()); getMethod.getParams().setParameter("refer", HttpConf.REFER); } @Override public void interrupt() { idle(); super.interrupt(); } @Override protected void finalize() throws Throwable { idle(); super.finalize(); } public static void main(String[] args) { Pattern p = Pattern.compile("^http://news.carnoc.com/(cache/)?(list/){1}(\\w(/\\w)*)+.html$"); Matcher m = p.matcher("http://news.carnoc.com/cache/list/news_hotlist_1.html"); System.out.println(m.matches()); } }
實現二:關於抓取各類不一樣網站節點的問題,最好是能配置。java有不少工具能夠解析html,如:htmlparser、htmlcleaner等。可是多網站作到能夠配置仍是比較麻煩的,當時我就想到可否使用xpath,可是java原聲帶的xpath對xml格式要求很是嚴格,你們都知道html結構不是嚴謹的xml格式,html沒有要求嚴格的閉包,因此這個是行不通的。後來通過查閱資料,htmlcleaner支持xpath語法,這真是一個天大的好消息,由於沒有它咱們就得本身寫一大堆配置文件,本身根據配置文件來解析html,很是耗時。有了htmlCleaner,經過配置xpath便可找到節點。下面是個cleaner使用簡單的例子:
HtmlCleaner cleaner = new HtmlCleaner(); TagNode root = cleaner.clean(respStr); //=======================開始蒐集數據================================= // 新聞標題 // conf.getNewsTitleX()是讀取了我配置文件配置的newsTitle的xpath語法 Object title[] = root.evaluateXPath(conf.getNewsTitleX()); if(title.length > 0 && checkType(title[0])) { this.title = ((TagNode)title[0]).getText().toString().trim(); }
下面是我其中一個網站的配置文件:
crawler.name = hkzx crawler.charset = utf-8 crawler.depth = 4 crawler.url.index = http://www.aviationnow.com.cn/Page/News_Search.aspx crawler.url.default.head=http://www.aviationnow.com.cn/Page/ crawler.url.reg =^http://www.aviationnow.com.cn/Page/News_details.aspx\\?id=[0-9a-z-]+$ crawler.html.title =//body//div[@class='NewsDetails_Title']//span[@id='d_title'] ##注意看這裏,這就是取title的表達式 對應java中的conf.getNewsTitleX() crawler.html.content = //body//div[@class='NewsDetails_Text'] crawler.html.pubtime =//body//div[@class='NewsDetails_date'] crawler.html.from = //body//div[@class='NewsDetails_source'] crawler.html.readcount = crawler.html.tag = crawler.html.keywords =
實現三:在以上工做都完成以後,內容能夠抓取到,時間也能夠抓取到,可是抓取的時候遇到一個問題,因爲網站設計緣由,有些程序員會把一些文字如:發表者、發表時間、來源等放到一個標籤裏面,而後這個標籤裏面又包含其餘標籤。
<div class='article_info'><span>發表者:<span>張三</span> 來源:測試網站 發表時間:2015-10-10 00:00:00</span></div>
若是要取來源xpath只能定位到節點「//body//div[@class='article_info']//span[1]」 ,取出來的innerHtml是 「發表者:<span>張三</span> 來源:測試網站 發表時間:2015-10-10 00:00:00」這種狀況後來想一想也只能個性化處理了。設置一個filter接口,經過配置文件配置提取某個字段的時候 最字段進行過濾,好比string.subString("空格")[1]來取來源,後來想一想其實配置文件中支持配置groovy腳本也是至關不錯的,哈哈
開發中須要注意的事項:
1.提取網站url的時候,有些新聞發佈網站a標籤和iframe標籤的連接不會寫全相似:http://xxxx.com/article/1234556, 而是直接平級連接如<a href='./11212121' >文章1</a>,這種狀況須要處理好
2.被抓取的網站html結構隨時發生變動,須要經過查看內容抓取正確來判斷網站結構是否發生變化。
3.異步加載的數據一時間沒想到怎麼去抓
4.要設置訪問連接指紋,訪問過的就不要再訪問了避免循環抓取。數據量巨大的時候可使用bloomfilter,不大的時候直接hashset就夠了。