WebMagic源碼分析

Webmagic

##1. WebMagic概覽 WebMagic項目代碼分爲核心和擴展兩部分。核心部分(webmagic-core)是一個精簡的、模塊化的爬蟲實現,而擴展部分則包括一些便利的、實用性的功能。WebMagic的架構設計參照了Scrapy,目標是儘可能的模塊化,並體現爬蟲的功能特色。css

這部分提供很是簡單、靈活的API,在基本不改變開發模式的狀況下,編寫一個爬蟲。html

擴展部分(webmagic-extension)提供一些便捷的功能,例如註解模式編寫爬蟲等。同時內置了一些經常使用的組件,便於爬蟲開發。web

另外WebMagic還包括一些外圍擴展和一個正在開發的產品化項目webmagic-avalon。 ##2. 核心組件 ###2.1 結構圖 webmagic ###2.2 四大組件正則表達式

  • 1.Downloader:下載器
  • 2.PageProcessor:抽取器
  • 3.Scheduler:調度器
  • 4.Pipeline:結果處理器

##3. 源碼分析(主類Spider) ###3.1 各組件初始化及可擴展 ####3.1.1
初始化Scheduler:(默認QueueScheduler) protected Scheduler scheduler = new QueueScheduler(); redis

採用新的Scheduler:數據庫

public Spider setScheduler(Scheduler scheduler) {
        checkIfRunning();
        Scheduler oldScheduler = this.scheduler;
        this.scheduler = scheduler;
        if (oldScheduler != null) {
            Request request;
            while ((request = oldScheduler.poll(this)) != null) {
            //複製原來的url到新的scheduler
                this.scheduler.push(request, this);
            }
        }
        return this;
    }

####3.1.2
初始化Downloader:(默認HttpClientDownloader)緩存

protected void initComponent() {
        if (downloader == null) {
        //用戶沒有自定義Downloader
            this.downloader = new HttpClientDownloader();
        }
        if (pipelines.isEmpty()) {
        //用戶沒有自定義Pipeline
            pipelines.add(new ConsolePipeline());
        }
        downloader.setThread(threadNum);
        if (threadPool == null || threadPool.isShutdown()) {
            if (executorService != null && !executorService.isShutdown()) {
                threadPool = new CountableThreadPool(threadNum, executorService);
            } else {
                threadPool = new CountableThreadPool(threadNum);
            }
        }
        if (startRequests != null) {
            for (Request request : startRequests) {
                scheduler.push(request, this);
            }
            startRequests.clear();
        }
        startTime = new Date();
    }

####3.1.3 初始化Pipeline:(默認ConsolePipeline)
####3.1.4 初始化PageProcessor:(用戶自定義完成) ###3.2 如何實現多線程 ####3.2.1
初始化線程池(默認Executors.newFixedThreadPool(threadNum)) Executors.newFixedThreadPool做用:建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待.安全

public CountableThreadPool(int threadNum) {
        this.threadNum = threadNum;
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

####3.2.2
多線程併發控制多線程

public void execute(final Runnable runnable) {
        if (threadAlive.get() >= threadNum) {
            try {
                reentrantLock.lock();//同步鎖  下面爲保護代碼塊
                while (threadAlive.get() >= threadNum) {
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                    }
                }
            } finally {
                reentrantLock.unlock();
            }
        }
        threadAlive.incrementAndGet();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    runnable.run();
                } finally {
                    try {
                        reentrantLock.lock();
                        threadAlive.decrementAndGet();
                        //線程數量減小一個時,經過signal()方法通知前面condition.await()的線程
                        condition.signal();
                    } finally {
                        reentrantLock.unlock();
                    }
                }
            }
        });
    }

####3.2.3 Java中的ReentrantLock和synchronized兩種鎖定機制的對比架構

ReentrantLock默認狀況下爲不公平鎖

private ReentrantLock lock = new ReentrantLock(true); //公平鎖
        try {
            lock.lock(); //若是被其它資源鎖定,會在此等待鎖釋放,達到暫停的效果
            //操做
        } finally {
            lock.unlock();
        }```
        不公平鎖與公平鎖的區別:
公平狀況下,操做會排一個隊按順序執行,來保證執行順序。(會消耗更多的時間來排隊)
不公平狀況下,是無序狀態容許插隊,jvm會自動計算如何處理更快速來調度插隊。(若是不關心順序,這個速度會更快)  
  
#### 3.2.4 AtomicInteger && CAS
  >AtomicInteger,一個提供原子操做的Integer的類。在Java語言中,++i和i++操做並非線程安全的,在使用的時候,不可避免的會用到synchronized關鍵字。而AtomicInteger則經過一種線程安全的加減操做接口。
首先要說一下,AtomicInteger類compareAndSet經過原子操做實現了CAS操做,最底層基於彙編語言實現  
CAS是Compare And Set的一個簡稱,以下理解:  
1,已知當前內存裏面的值current和預期要修改爲的值new傳入  
2,內存中AtomicInteger對象地址對應的真實值(由於有可能別修改)real與current對比,相等表示real未被修改過,是「安全」的,將new賦給real結束而後返回;不相等說明real已經被修改,結束並從新執行1直到修改爲功  

####3.2.5 程序如何終止

while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { Request request = scheduler.poll(this); //當scheduler內目標URL爲空時 if (request == null) { //已經沒有線程在運行了, exitWhenComplete默認爲true if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } // wait until new url added waitNewUrl(); }

上述while循環結束,則程序完成任務並終止  
###3.3 HttpClient使用http鏈接池發送http請求
將用戶設置的線程數設置爲httpclient最大鏈接池數

public void setThread(int thread) { httpClientGenerator.setPoolSize(thread); }

 

public HttpClientGenerator setPoolSize(int poolSize) { // 將最大鏈接數增長爲poolSize connectionManager.setMaxTotal(poolSize); return this; }

###3.4 URL在Scheduler中去重  
將下載結果頁面中的連接抽取出來並放入scheduler中

public void push(Request request, Task task) { logger.trace("get a candidate url {}", request.getUrl()); if (!duplicatedRemover.isDuplicate(request, task) || shouldReserved(request)) { logger.debug("push to queue {}", request.getUrl()); pushWhenNoDuplicate(request, task); } }

####3.4.1 redischeduler URL去重複

```boolean isDuplicate = jedis.sismember(getSetKey(task), request.getUrl());```  




RedisScheduler 中判斷url是否重複的方法,由於一個Spider就是對應只有一個UUID,故上述的判斷則是:判斷當前的url是不是uuid集合的元素  

>System.out.println(jedis.sismember("sname", "minxr"));// 判斷 minxr是不是sname集合的元素   

####3.4.2 bloomFilter URL去重複
```boolean isDuplicate = bloomFilter.mightContain(getUrl(request));```


####3.4.3  hashset  URL去重複
```public boolean isDuplicate(Request request, Task task) {
    return !urls.add(getUrl(request));
}```  



優勢:  
1)節約緩存空間(空值的映射),再也不須要空值映射。  
2)減小數據庫或緩存的請求次數。  
3)提高業務的處理效率以及業務隔離性。  
缺點:  
1)存在誤判的機率。  
2)傳統的Bloom Filter不能做刪除操做。
###3.5 抽取部分API
| 方法        | 說明           | 示例  |
| ------------- |:-------------:| -----:|
| xpath(String xpath)      | 使用XPath選擇| html.xpath("//div[@class='title']")|
| $(String selector)    | 使用Css選擇器選擇|   html.$("div.title") |
| css(String selector)      |   功能同$(),使用Css選擇器選擇  |  html.css("div.title") |
|regex(String regex)    |  使用正則表達式抽取    |  html.regex("\(.*?)\") |
| replace(String regex, String replacement)    |  替換內容    |  html.replace("\","")  |
這部分抽取API返回的都是一個Selectable接口,意思是說,抽取是支持鏈式調用的。
###3.6 代理池
####3.6.1 代理池初始化:

//從以往保存的本地文件中讀取代理信息做爲新的代理池 public SimpleProxyPool() { this(null, true); } //以往保存的本地文件中讀取代理+用戶輸入的httpProxyList合併爲新的代理池 public SimpleProxyPool(List<String[]> httpProxyList) { this(httpProxyList, true); } //以往保存的本地文件中讀取代理+用戶輸入的httpProxyList合併爲新的代理池(後者可認爲操控) public SimpleProxyPool(List<String[]> httpProxyList, boolean isUseLastProxy) { if (httpProxyList != null) { addProxy(httpProxyList.toArray(new String[httpProxyList.size()][])); } if (isUseLastProxy) { if (!new File(proxyFilePath).exists()) { setFilePath(); } readProxyList(); timer.schedule(saveProxyTask, 0, saveProxyInterval); } }

####3.6.2 httpProxyList怎麼傳值

String[] source = { "::0.0.0.1:0", "::0.0.0.2:0", "::0.0.0.3:0", "::0.0.0.4:0" }; for (String line : source) { httpProxyList.add(new String[] {line.split(":")[0], line.split(":")[1], line.split(":")[2], line.split(":")[3] }); }

####3.6.3 本地文件Proxy獲存儲與獲取:定時任務

if (isUseLastProxy) { if (!new File(proxyFilePath).exists()) { setFilePath(); } readProxyList(); timer.schedule(saveProxyTask, 0, saveProxyInterval); }

saveProxyTask()函數負責把最新的代理池ip寫入到本地指定文件  
  
####3.6.4 使用DelayQueue管理Proxy  
目的:能夠根據compareTo方法制定的優先取出代理池中使用間隔較短的代理(一開始默認都爲1.5s)優先取出並執行.  
目前代理池的策略是:

* 1. 在添加時鏈接相應端口作校驗  
* 2. 每一個代理有1.5S的使用間隔  
* 3. 每次失敗後,下次取出代理的時間改成1.5S*失敗次數  
* 4. 若是代理失敗次數超過20次,則直接丟棄

public void returnProxy(HttpHost host, int statusCode) { Proxy p = allProxy.get(host.getAddress().getHostAddress()); if (p == null) { return; } switch (statusCode) { //成功 case Proxy.SUCCESS: p.setReuseTimeInterval(reuseInterval); p.setFailedNum(0); p.setFailedErrorType(new ArrayList<Integer>()); p.recordResponse(); p.successNumIncrement(1); break; //失敗 case Proxy.ERROR_403: // banned,try longer interval p.fail(Proxy.ERROR_403); p.setReuseTimeInterval(reuseInterval * p.getFailedNum()); logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0); break; //代理被禁 case Proxy.ERROR_BANNED: p.fail(Proxy.ERROR_BANNED); p.setReuseTimeInterval(10 * 60 * 1000 * p.getFailedNum()); logger.warn("this proxy is banned >>>> " + p.getHttpHost()); logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0); break; //404 case Proxy.ERROR_404: // p.fail(Proxy.ERROR_404); // p.setReuseTimeInterval(reuseInterval * p.getFailedNum()); break; default: p.fail(statusCode); break; } //當前代理失敗次數超過20:reviveTime = 2 * 60 * 60 * 1000; if (p.getFailedNum() > 20) { p.setReuseTimeInterval(reviveTime); logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size()); return; } //檢驗代理ip符合下列要求的: if (p.getFailedNum() > 0 && p.getFailedNum() % 5 == 0) { if (!ProxyUtils.validateProxy(host)) { p.setReuseTimeInterval(reviveTime); logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size()); return; } } try { proxyQueue.put(p); } catch (InterruptedException e) { logger.warn("proxyQueue return proxy error", e); } }

使用Socket來校驗代理是否有效,客戶端爲本地.建立與代理的鏈接

public static boolean validateProxy(HttpHost p) { if (localAddr == null) { logger.error("cannot get local IP"); return false; } boolean isReachable = false; Socket socket = null; try { socket = new Socket(); socket.bind(new InetSocketAddress(localAddr, 0)); InetSocketAddress endpointSocketAddr = new InetSocketAddress(p.getAddress().getHostAddress(), p.getPort()); socket.connect(endpointSocketAddr, 3000); logger.debug("SUCCESS - connection established! Local: " + localAddr.getHostAddress() + " remote: " + p); isReachable = true; } catch (IOException e) { logger.warn("FAILRE - CAN not connect! Local: " + localAddr.getHostAddress() + " remote: " + p); } finally { if (socket != null) { try { socket.close(); } catch (IOException e) { logger.warn("Error occurred while closing socket of validating proxy", e); } } } return isReachable; }

相關文章
相關標籤/搜索