##1. WebMagic概覽 WebMagic項目代碼分爲核心和擴展兩部分。核心部分(webmagic-core)是一個精簡的、模塊化的爬蟲實現,而擴展部分則包括一些便利的、實用性的功能。WebMagic的架構設計參照了Scrapy,目標是儘可能的模塊化,並體現爬蟲的功能特色。css
這部分提供很是簡單、靈活的API,在基本不改變開發模式的狀況下,編寫一個爬蟲。html
擴展部分(webmagic-extension)提供一些便捷的功能,例如註解模式編寫爬蟲等。同時內置了一些經常使用的組件,便於爬蟲開發。web
另外WebMagic還包括一些外圍擴展和一個正在開發的產品化項目webmagic-avalon。 ##2. 核心組件 ###2.1 結構圖 ###2.2 四大組件正則表達式
##3. 源碼分析(主類Spider) ###3.1 各組件初始化及可擴展 ####3.1.1
初始化Scheduler:(默認QueueScheduler) protected Scheduler scheduler = new QueueScheduler();
redis
採用新的Scheduler:數據庫
public Spider setScheduler(Scheduler scheduler) { checkIfRunning(); Scheduler oldScheduler = this.scheduler; this.scheduler = scheduler; if (oldScheduler != null) { Request request; while ((request = oldScheduler.poll(this)) != null) { //複製原來的url到新的scheduler this.scheduler.push(request, this); } } return this; }
####3.1.2
初始化Downloader:(默認HttpClientDownloader)緩存
protected void initComponent() { if (downloader == null) { //用戶沒有自定義Downloader this.downloader = new HttpClientDownloader(); } if (pipelines.isEmpty()) { //用戶沒有自定義Pipeline pipelines.add(new ConsolePipeline()); } downloader.setThread(threadNum); if (threadPool == null || threadPool.isShutdown()) { if (executorService != null && !executorService.isShutdown()) { threadPool = new CountableThreadPool(threadNum, executorService); } else { threadPool = new CountableThreadPool(threadNum); } } if (startRequests != null) { for (Request request : startRequests) { scheduler.push(request, this); } startRequests.clear(); } startTime = new Date(); }
####3.1.3 初始化Pipeline:(默認ConsolePipeline)
####3.1.4 初始化PageProcessor:(用戶自定義完成) ###3.2 如何實現多線程 ####3.2.1
初始化線程池(默認Executors.newFixedThreadPool(threadNum)) Executors.newFixedThreadPool做用:建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待.安全
public CountableThreadPool(int threadNum) { this.threadNum = threadNum; this.executorService = Executors.newFixedThreadPool(threadNum); }
####3.2.2
多線程併發控制多線程
public void execute(final Runnable runnable) { if (threadAlive.get() >= threadNum) { try { reentrantLock.lock();//同步鎖 下面爲保護代碼塊 while (threadAlive.get() >= threadNum) { try { condition.await(); } catch (InterruptedException e) { } } } finally { reentrantLock.unlock(); } } threadAlive.incrementAndGet(); executorService.execute(new Runnable() { @Override public void run() { try { runnable.run(); } finally { try { reentrantLock.lock(); threadAlive.decrementAndGet(); //線程數量減小一個時,經過signal()方法通知前面condition.await()的線程 condition.signal(); } finally { reentrantLock.unlock(); } } } }); }
####3.2.3 Java中的ReentrantLock和synchronized兩種鎖定機制的對比架構
ReentrantLock默認狀況下爲不公平鎖
private ReentrantLock lock = new ReentrantLock(true); //公平鎖 try { lock.lock(); //若是被其它資源鎖定,會在此等待鎖釋放,達到暫停的效果 //操做 } finally { lock.unlock(); }``` 不公平鎖與公平鎖的區別: 公平狀況下,操做會排一個隊按順序執行,來保證執行順序。(會消耗更多的時間來排隊) 不公平狀況下,是無序狀態容許插隊,jvm會自動計算如何處理更快速來調度插隊。(若是不關心順序,這個速度會更快) #### 3.2.4 AtomicInteger && CAS >AtomicInteger,一個提供原子操做的Integer的類。在Java語言中,++i和i++操做並非線程安全的,在使用的時候,不可避免的會用到synchronized關鍵字。而AtomicInteger則經過一種線程安全的加減操做接口。 首先要說一下,AtomicInteger類compareAndSet經過原子操做實現了CAS操做,最底層基於彙編語言實現 CAS是Compare And Set的一個簡稱,以下理解: 1,已知當前內存裏面的值current和預期要修改爲的值new傳入 2,內存中AtomicInteger對象地址對應的真實值(由於有可能別修改)real與current對比,相等表示real未被修改過,是「安全」的,將new賦給real結束而後返回;不相等說明real已經被修改,結束並從新執行1直到修改爲功 ####3.2.5 程序如何終止
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { Request request = scheduler.poll(this); //當scheduler內目標URL爲空時 if (request == null) { //已經沒有線程在運行了, exitWhenComplete默認爲true if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } // wait until new url added waitNewUrl(); }
上述while循環結束,則程序完成任務並終止 ###3.3 HttpClient使用http鏈接池發送http請求 將用戶設置的線程數設置爲httpclient最大鏈接池數
public void setThread(int thread) { httpClientGenerator.setPoolSize(thread); }
public HttpClientGenerator setPoolSize(int poolSize) { // 將最大鏈接數增長爲poolSize connectionManager.setMaxTotal(poolSize); return this; }
###3.4 URL在Scheduler中去重 將下載結果頁面中的連接抽取出來並放入scheduler中
public void push(Request request, Task task) { logger.trace("get a candidate url {}", request.getUrl()); if (!duplicatedRemover.isDuplicate(request, task) || shouldReserved(request)) { logger.debug("push to queue {}", request.getUrl()); pushWhenNoDuplicate(request, task); } }
####3.4.1 redischeduler URL去重複 ```boolean isDuplicate = jedis.sismember(getSetKey(task), request.getUrl());``` RedisScheduler 中判斷url是否重複的方法,由於一個Spider就是對應只有一個UUID,故上述的判斷則是:判斷當前的url是不是uuid集合的元素 >System.out.println(jedis.sismember("sname", "minxr"));// 判斷 minxr是不是sname集合的元素 ####3.4.2 bloomFilter URL去重複 ```boolean isDuplicate = bloomFilter.mightContain(getUrl(request));``` ####3.4.3 hashset URL去重複 ```public boolean isDuplicate(Request request, Task task) { return !urls.add(getUrl(request)); }``` 優勢: 1)節約緩存空間(空值的映射),再也不須要空值映射。 2)減小數據庫或緩存的請求次數。 3)提高業務的處理效率以及業務隔離性。 缺點: 1)存在誤判的機率。 2)傳統的Bloom Filter不能做刪除操做。 ###3.5 抽取部分API | 方法 | 說明 | 示例 | | ------------- |:-------------:| -----:| | xpath(String xpath) | 使用XPath選擇| html.xpath("//div[@class='title']")| | $(String selector) | 使用Css選擇器選擇| html.$("div.title") | | css(String selector) | 功能同$(),使用Css選擇器選擇 | html.css("div.title") | |regex(String regex) | 使用正則表達式抽取 | html.regex("\(.*?)\") | | replace(String regex, String replacement) | 替換內容 | html.replace("\","") | 這部分抽取API返回的都是一個Selectable接口,意思是說,抽取是支持鏈式調用的。 ###3.6 代理池 ####3.6.1 代理池初始化:
//從以往保存的本地文件中讀取代理信息做爲新的代理池 public SimpleProxyPool() { this(null, true); } //以往保存的本地文件中讀取代理+用戶輸入的httpProxyList合併爲新的代理池 public SimpleProxyPool(List<String[]> httpProxyList) { this(httpProxyList, true); } //以往保存的本地文件中讀取代理+用戶輸入的httpProxyList合併爲新的代理池(後者可認爲操控) public SimpleProxyPool(List<String[]> httpProxyList, boolean isUseLastProxy) { if (httpProxyList != null) { addProxy(httpProxyList.toArray(new String[httpProxyList.size()][])); } if (isUseLastProxy) { if (!new File(proxyFilePath).exists()) { setFilePath(); } readProxyList(); timer.schedule(saveProxyTask, 0, saveProxyInterval); } }
####3.6.2 httpProxyList怎麼傳值
String[] source = { "::0.0.0.1:0", "::0.0.0.2:0", "::0.0.0.3:0", "::0.0.0.4:0" }; for (String line : source) { httpProxyList.add(new String[] {line.split(":")[0], line.split(":")[1], line.split(":")[2], line.split(":")[3] }); }
####3.6.3 本地文件Proxy獲存儲與獲取:定時任務
if (isUseLastProxy) { if (!new File(proxyFilePath).exists()) { setFilePath(); } readProxyList(); timer.schedule(saveProxyTask, 0, saveProxyInterval); }
saveProxyTask()函數負責把最新的代理池ip寫入到本地指定文件 ####3.6.4 使用DelayQueue管理Proxy 目的:能夠根據compareTo方法制定的優先取出代理池中使用間隔較短的代理(一開始默認都爲1.5s)優先取出並執行. 目前代理池的策略是: * 1. 在添加時鏈接相應端口作校驗 * 2. 每一個代理有1.5S的使用間隔 * 3. 每次失敗後,下次取出代理的時間改成1.5S*失敗次數 * 4. 若是代理失敗次數超過20次,則直接丟棄
public void returnProxy(HttpHost host, int statusCode) { Proxy p = allProxy.get(host.getAddress().getHostAddress()); if (p == null) { return; } switch (statusCode) { //成功 case Proxy.SUCCESS: p.setReuseTimeInterval(reuseInterval); p.setFailedNum(0); p.setFailedErrorType(new ArrayList<Integer>()); p.recordResponse(); p.successNumIncrement(1); break; //失敗 case Proxy.ERROR_403: // banned,try longer interval p.fail(Proxy.ERROR_403); p.setReuseTimeInterval(reuseInterval * p.getFailedNum()); logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0); break; //代理被禁 case Proxy.ERROR_BANNED: p.fail(Proxy.ERROR_BANNED); p.setReuseTimeInterval(10 * 60 * 1000 * p.getFailedNum()); logger.warn("this proxy is banned >>>> " + p.getHttpHost()); logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0); break; //404 case Proxy.ERROR_404: // p.fail(Proxy.ERROR_404); // p.setReuseTimeInterval(reuseInterval * p.getFailedNum()); break; default: p.fail(statusCode); break; } //當前代理失敗次數超過20:reviveTime = 2 * 60 * 60 * 1000; if (p.getFailedNum() > 20) { p.setReuseTimeInterval(reviveTime); logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size()); return; } //檢驗代理ip符合下列要求的: if (p.getFailedNum() > 0 && p.getFailedNum() % 5 == 0) { if (!ProxyUtils.validateProxy(host)) { p.setReuseTimeInterval(reviveTime); logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size()); return; } } try { proxyQueue.put(p); } catch (InterruptedException e) { logger.warn("proxyQueue return proxy error", e); } }
使用Socket來校驗代理是否有效,客戶端爲本地.建立與代理的鏈接
public static boolean validateProxy(HttpHost p) { if (localAddr == null) { logger.error("cannot get local IP"); return false; } boolean isReachable = false; Socket socket = null; try { socket = new Socket(); socket.bind(new InetSocketAddress(localAddr, 0)); InetSocketAddress endpointSocketAddr = new InetSocketAddress(p.getAddress().getHostAddress(), p.getPort()); socket.connect(endpointSocketAddr, 3000); logger.debug("SUCCESS - connection established! Local: " + localAddr.getHostAddress() + " remote: " + p); isReachable = true; } catch (IOException e) { logger.warn("FAILRE - CAN not connect! Local: " + localAddr.getHostAddress() + " remote: " + p); } finally { if (socket != null) { try { socket.close(); } catch (IOException e) { logger.warn("Error occurred while closing socket of validating proxy", e); } } } return isReachable; }