Java多線程爬蟲實現

1、需求html

        1.定時抓取固定網站新聞標題、內容、發表時間和來源。java

        2.程序須要支持分佈式、多線程mysql

 

2、設計程序員

      1.網站是固定,可是將來也可能添加新的網站去抓取,每一個網站內容節點設計都不同,這樣就須要支持動態可配置來新增網站以方便將來的擴展,這樣就須要每次都須要開發介入。sql

       2.網站html節點的結構可能發生變化,因此也要支持提取節點可配置。apache

       3.怎樣支持分佈式?暫時最簡單的想法就是:多機器部署程序,還有新搞一臺或者部署程序其中一臺製做一個定時任務,定時開啓每臺機器應該抓取哪一個網站,暫時不能支持同一個網站同時能夠支持被多臺機器同時抓取,這樣會比較麻煩,要用到分佈式隊列。因此暫時一個網站同時只會被單臺機器抓取。數組

       4.多線程,怎樣多線程?多線程抓取我這邊有兩個實現:多線程

               (1)一個線程抓取一個網站,維護一個本身的url隊列作廣度抓取,同時抓取多個網站。如圖:閉包

                    

               (2)多個線程同時抓取不一樣的網站。如圖:併發

              以上兩張辦法其實各有優勢,也給有缺點,看咱們怎麼取捨了。

              方法1:每一個線程建立一個本身的隊列,圖中的queue能夠不用concurrentQueue,優勢:不涉及到控制併發,每一個網站一個線程抓取一個網站,抓取完畢即自動回收銷燬線程。控制方便。缺點:線程數不能夠擴展,例如當只有3個網站,你最多隻能開3個線程來抓取,不能開更多,有必定的侷限性。

              方法2:N個線程同時抓取N個網站,線程數和網站數目不掛鉤,優勢:線程數能夠調整而且和和抓取網站數量無關。3個網站咱們能夠開4個5個或者10個這個能夠根據您的硬件資源進行調整。缺點:須要控制併發,而且要控制何時銷燬線程(thread1空閒,而且queue爲空不表明任務能夠結束,可能thread2結果還沒返回),當被抓取的網站響應較慢時,會拖慢整個爬蟲進度。

 

3、實現

       抓取方式最終仍是選擇了方法二,由於線程數可配置!

          使用技術:

                    jfinal 用了以後才發現這東西不適合,可是因爲項目進度問題,仍是使用了。

                    maven項目管理

                    jetty server

                    mysql

                    eclipse 開發

          項目須要重點攻破的難點:

                    (1)合理的控制N個線程正常的抓取網站,而且當全部線程工做都完成了而且須要抓取的隊列爲空時,N個線程同時退出銷燬。

                    (2)不一樣網站設計節點不同,須要經過配置解決各個網站須要抓取的URL和抓取節點內容在html節點的位置。

                    (3)個性化內容處理,因爲html結構設計問題,抓取的內容可能有些多餘的html標籤,或者多餘的內容該怎麼處理。

         實現1:線程管理創建一個線程中心管理控制器,控制器負責線程的銷燬。如圖:

                    (1)建立一箇中心管理器,管理器存放N個線程的signal數組標記,用來標記線程是否空閒,而且建立N個標記的線程開關。用來同時結束N個線程。

                    (2)線程在開始抓取請求連接時把idle置爲false,抓取連接完畢以後繼續循環取隊列(循環時候判斷開關是否爲true),當從隊列poll結果爲空時,把線程置爲idle=true,而且sleep 1S(看我的愛好)。

                    (3)線程中心中心管理器調度一個定時檢測任務1s鍾檢測一次 線程的signal數組標記和queue.size(),當queue.size()==0而且數組標記所有都標記空閒的時候,把線程開關switcher所有置爲false(關閉,這樣線程會結束while退出)

                     如圖下狀況,4個線程的狀態後線程便可自行退出:

核心控制功能和線程功能代碼以下:

   CoreTaskController.java

import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;

import org.apache.log4j.Logger;

import com.crawler.core.conf.CrawlerConf;


/**
 * 任務核心控制器
 * @author Jacky
 * 2015-9-30
 */
public class CoreTaskController {
	
	public static Logger log = Logger.getLogger(CoreTaskController.class);
	/**
	 * 信號量,用來標記當前線程是否空閒
	 * 0表示空閒
	 * 1表示繁忙
	 * CrawlerConf.getMaxT()表示獲取最大線程數
	 */
	public static volatile int[] signal = new int[CrawlerConf.getMaxT()];;
	
	public static volatile boolean totalSwitcher = true;
	
	/**
	 * 線程開關
	 * 1表示打開
	 * 0表示關閉
	 */
	public static volatile int[] switcher = new int[CrawlerConf.getMaxT()];
	
	public static Timer timer;
	
	public static void init() {
		// 開啓全部線程開關
		for(int i=0;i<switcher.length;i++) {
			switcher[i] = 1;
		}
	}
	
	public static void startSc() {
		if(null != timer) {
			timer.cancel();
		}
		timer = new Timer();
		timer.schedule(new JobTask(), 5000,1000);
	}
	
	static class JobTask extends TimerTask {

		@Override
		public void run() {
			// 當總開關被關閉,當即結束全部線程
			if(!totalSwitcher) {
				// 關閉線程開關
				for(int i=0;i<switcher.length;i++) {
					switcher[i] = 0;
					// 中止調度此任務
					this.cancel();
					CoreTaskController.timer.cancel();
				}
				UrlQueue.concurrentSet.clear();
				return;
			}
			/**
			* UrlQueue.concurrentQueue 就是用來存放
			* url的隊列
			*/
			if(UrlQueue.concurrentQueue.size() == 0 ) {
				boolean stop = true;
				for(int i : signal) {
					stop = stop & (i == 0);
				}
				if(stop) {
					// 關閉線程開關
					for(int i=0;i<switcher.length;i++) {
						switcher[i] = 0;
						// 中止調度此任務
						this.cancel();
						CoreTaskController.timer.cancel();
					}
					UrlQueue.concurrentSet.clear();
				}
			}
			log.warn("time task 調度中..signal="+Arrays.toString(signal)+",urlQueueSize="+UrlQueue.concurrentQueue.size()+",switcher="+Arrays.toString(switcher));
		}
		
	}
	
}

  ConcurrentCrawlerTask.java 線程任務代碼

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

import com.crawler.core.conf.HttpConf;
import com.crawler.core.conf.UrlConf;
import com.crawler.core.conf.WebsiteConf;
import com.crawler.system.model.OriginContent;

/**
 * 爬蟲任務類
 * @author Jacky
 * 2015-9-29
 */
public class ConcurrentCrawlerTask extends Thread {
	
	Logger log = Logger.getLogger(ConcurrentCrawlerTask.class);
	
	private int index;
	
	public ConcurrentCrawlerTask(int index,String taskName) {
		super(taskName);
		this.index = index;
	}

	@Override
	public void run() {
		UrlConf urlConf = null;
		System.out.println("開啓線程"+index);
		// 有個線程總開關,若是總開關關閉直接結束全部線程循環,
		// 每一個線程有個自動開關,任務控制器檢測到全部線程空閒,而且隊列爲空自動把開關改成關閉。
		while(CoreTaskController.totalSwitcher && 
				((urlConf = UrlQueue.concurrentQueue.poll()) != null || (null == urlConf && (CoreTaskController.switcher[index] == 1)))) {
			if(urlConf != null) {
				working();
				try {
					if(urlConf.getDepth() <= urlConf.getConf().getDepth() 
							&& !UrlQueue.concurrentSet.contains(urlConf.getHttpUrl())) {
						// 匹配URL是否須要抓取,這些非核心能夠無視
						Pattern pattern = Pattern.compile(urlConf.getConf().getUrlReg().trim());
						Matcher m = pattern.matcher(urlConf.getHttpUrl().trim());
						if(urlConf.getDepth() == 1 || m.matches()) {
							HttpClient httpClient = new HttpClient();
							httpClient.getHttpConnectionManager().getParams().setConnectionTimeout(HttpConf.TIME_OUT);
							GetMethod getMethod = new GetMethod(urlConf.getHttpUrl());
							initHttpParam(getMethod, urlConf.getConf());
							try {
								int httpCode = httpClient.executeMethod(getMethod);
								if (httpCode != HttpStatus.SC_OK) {
									log.error("request url:"+urlConf.getHttpUrl() + " failed! http code = "+httpCode);
									continue;
								}
								BufferedReader reader = new BufferedReader(new InputStreamReader(getMethod.getResponseBodyAsStream(),urlConf.getConf().getCharset()));  
								StringBuilder stringBuilder = new StringBuilder();  
								String str = null;
								while((str = reader.readLine())!=null){  
									stringBuilder.append(str);  
								}  
								//..
								//..
								
							} catch(Exception e) {
								log.error("http error!", e);
							}
					   }
					   // 用於保存該連接是否已經被訪問過
					   UrlQueue.concurrentSet.add(urlConf.getHttpUrl());
					}
					//System.out.println("queue size:" + UrlQueue.concurrentQueue.size()+",depth:"+urlConf.getDepth());
				} catch(Exception e) {
					System.out.println(e.getCause());
					log.error("unknow error.",e.getCause());
				}
			} else {
				idle();
				System.out.println("線程"+index+"空閒");
				log.info("thread:"+index+" 空閒中.");
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					break;
				}
			}
		}
		System.out.println("線程"+index+",任務結束");
	}
	
	private void working() {
		CoreTaskController.signal[index] = 1;
	}
	
	private void idle() {
		CoreTaskController.signal[index] = 0;
	}
	
	/**
	 * 初始化請求參數
	 * @param getMethod
	 * @param conf
	 */
	private void initHttpParam(GetMethod getMethod,WebsiteConf conf) {
		getMethod.getParams().setParameter(HttpMethodParams.SO_TIMEOUT, HttpConf.SO_TIMEOUT);
		getMethod.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,new DefaultHttpMethodRetryHandler());
		getMethod.getParams().setParameter(HttpMethodParams.USER_AGENT, HttpConf.USER_AGENT);
		getMethod.getParams().setParameter(HttpMethodParams.HTTP_CONTENT_CHARSET, conf.getCharset());
		getMethod.getParams().setParameter(HttpMethodParams.HTTP_URI_CHARSET, conf.getCharset());
		getMethod.getParams().setParameter("refer", HttpConf.REFER);
	}
	
	@Override
	public void interrupt() {
		idle();
		super.interrupt();
	}

	@Override
	protected void finalize() throws Throwable {
		idle();
		super.finalize();
	}

	public static void main(String[] args) {
		Pattern p = Pattern.compile("^http://news.carnoc.com/(cache/)?(list/){1}(\\w(/\\w)*)+.html$");
		Matcher m = p.matcher("http://news.carnoc.com/cache/list/news_hotlist_1.html");
		System.out.println(m.matches());
	}
}

            實現二:關於抓取各類不一樣網站節點的問題,最好是能配置。java有不少工具能夠解析html,如:htmlparser、htmlcleaner等。可是多網站作到能夠配置仍是比較麻煩的,當時我就想到可否使用xpath,可是java原聲帶的xpath對xml格式要求很是嚴格,你們都知道html結構不是嚴謹的xml格式,html沒有要求嚴格的閉包,因此這個是行不通的。後來通過查閱資料,htmlcleaner支持xpath語法,這真是一個天大的好消息,由於沒有它咱們就得本身寫一大堆配置文件,本身根據配置文件來解析html,很是耗時。有了htmlCleaner,經過配置xpath便可找到節點。下面是個cleaner使用簡單的例子:

HtmlCleaner cleaner = new HtmlCleaner();
TagNode root = cleaner.clean(respStr);
//=======================開始蒐集數據=================================
// 新聞標題
// conf.getNewsTitleX()是讀取了我配置文件配置的newsTitle的xpath語法
Object title[] = root.evaluateXPath(conf.getNewsTitleX());
if(title.length > 0 && checkType(title[0])) {
    this.title = ((TagNode)title[0]).getText().toString().trim();
}

下面是我其中一個網站的配置文件:

crawler.name = hkzx
crawler.charset = utf-8
crawler.depth = 4


crawler.url.index = http://www.aviationnow.com.cn/Page/News_Search.aspx
crawler.url.default.head=http://www.aviationnow.com.cn/Page/
crawler.url.reg =^http://www.aviationnow.com.cn/Page/News_details.aspx\\?id=[0-9a-z-]+$
crawler.html.title =//body//div[@class='NewsDetails_Title']//span[@id='d_title']           ##注意看這裏,這就是取title的表達式 對應java中的conf.getNewsTitleX()
crawler.html.content = //body//div[@class='NewsDetails_Text']
crawler.html.pubtime =//body//div[@class='NewsDetails_date']
crawler.html.from = //body//div[@class='NewsDetails_source']
crawler.html.readcount =
crawler.html.tag = 
crawler.html.keywords =

 

        實現三:在以上工做都完成以後,內容能夠抓取到,時間也能夠抓取到,可是抓取的時候遇到一個問題,因爲網站設計緣由,有些程序員會把一些文字如:發表者、發表時間、來源等放到一個標籤裏面,而後這個標籤裏面又包含其餘標籤。

       

<div class='article_info'><span>發表者:<span>張三</span> 來源:測試網站 發表時間:2015-10-10 00:00:00</span></div>

        若是要取來源xpath只能定位到節點「//body//div[@class='article_info']//span[1]」 ,取出來的innerHtml是 「發表者:<span>張三</span> 來源:測試網站 發表時間:2015-10-10 00:00:00」這種狀況後來想一想也只能個性化處理了。設置一個filter接口,經過配置文件配置提取某個字段的時候 最字段進行過濾,好比string.subString("空格")[1]來取來源,後來想一想其實配置文件中支持配置groovy腳本也是至關不錯的,哈哈

        

        開發中須要注意的事項:

            1.提取網站url的時候,有些新聞發佈網站a標籤和iframe標籤的連接不會寫全相似:http://xxxx.com/article/1234556,  而是直接平級連接如<a href='./11212121' >文章1</a>,這種狀況須要處理好

            2.被抓取的網站html結構隨時發生變動,須要經過查看內容抓取正確來判斷網站結構是否發生變化。

            3.異步加載的數據一時間沒想到怎麼去抓

            4.要設置訪問連接指紋,訪問過的就不要再訪問了避免循環抓取。數據量巨大的時候可使用bloomfilter,不大的時候直接hashset就夠了。

相關文章
相關標籤/搜索