WebMagic 爬蟲框架淺析

引言

好久以前由於爬蟲需求就接觸過 WebMagic,可是一直停留在簡單使用階段。近來公司項目也有爬蟲需求,並且須要對爬蟲框架作一些定製開發,便以此爲契機深刻學習 WebMagic 的設計思想及實現原理。html

概述

WebMagic 是國內知名開發者黃億華開源的一個 Java 爬蟲框架。WebMagic 的架構設計參照了Scrapy,目標是儘可能的模塊化,並體現爬蟲的功能特色。WebMagic 的結構分爲Downloader、PageProcessor、Scheduler、Pipeline 四大組件,並由 Spider 將它們彼此組織起來。這四大組件對應爬蟲生命週期中的下載、處理、管理和持久化等功能。 html5

圖片

Scheduler

Scheduler 是 WebMagic中的 URL 調度器,負責從 Spider 處理收集 (push) 須要抓取的 URL (Page 的 targetRequests)、並 poll 出將要被處理的 URL 給 Spider,同時還負責對 URL 判斷是否進行錯誤重試、及去重處理、以及總頁面數、剩餘頁面數統計等。 Scheduler 實現類主要有 DuplicateRemovedScheduler、PriorityScheduler、QueueScheduler,拓展包還有 RedisScheduler、FileCacheQueueScheduler。雖然實現類很多,可是原理都差很少,WebMagic 默認實現是 QueueScheduler,便以此分析。 java

圖片
Scheduler 接口定義了 Scheduler 最基礎的功能:添加一個請求,獲取一個請求。

public interface Scheduler {

    /** * add a url to fetch * * @param request request * @param task task */
    public void push(Request request, Task task);

    /** * get an url to crawl * * @param task the task of spider * @return the url to crawl */
    public Request poll(Task task);

}
複製代碼

MonitorableScheduler 接口定義了獲取剩餘請求數和總請求數的方法。git

public interface MonitorableScheduler extends Scheduler {

    public int getLeftRequestsCount(Task task);

    public int getTotalRequestsCount(Task task);

}
複製代碼

DuplicateRemovedScheduler 抽象類實現了通用的 push 模板方法,並在 push 方法內部判斷錯誤重試、去重處理等。github

public abstract class DuplicateRemovedScheduler implements Scheduler {

    protected Logger logger = LoggerFactory.getLogger(getClass());
    // 去重策略實現類,關鍵點在於 private Set<String> urls = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());經過 Set 和 ConcurrentHashMap 的特性實現去重及併發安全
    private DuplicateRemover duplicatedRemover = new HashSetDuplicateRemover();

    public DuplicateRemover getDuplicateRemover() {
        return duplicatedRemover;
    }

    public DuplicateRemovedScheduler setDuplicateRemover(DuplicateRemover duplicatedRemover) {
        this.duplicatedRemover = duplicatedRemover;
        return this;
    }

    // 通用 push 模版方法
    @Override
    public void push(Request request, Task task) {
        logger.trace("get a candidate url {}", request.getUrl());
        if (shouldReserved(request) || noNeedToRemoveDuplicate(request) || !duplicatedRemover.isDuplicate(request, task)) {
            logger.debug("push to queue {}", request.getUrl());
            pushWhenNoDuplicate(request, task);
        }
    }

    // 若是設置了回收重試則不須要去重處理
    protected boolean shouldReserved(Request request) {
        return request.getExtra(Request.CYCLE_TRIED_TIMES) != null;
    }
    // 若是是 POST 請求則不須要去重處理,由於 POST 請求不是冪等的,POST 請求沒有加入到 Set 中去重,因此也不會計入請求數統計中
    protected boolean noNeedToRemoveDuplicate(Request request) {
        return HttpConstant.Method.POST.equalsIgnoreCase(request.getMethod());
    }

    protected void pushWhenNoDuplicate(Request request, Task task) {

    }
}
複製代碼

QueueScheduler 的實現很簡單,維護一個 LinkedBlockingQueue 便可,獲取剩餘請求數即隊列的 size,獲取總請求數即 HashSetDuplicateRemover 維護的 Set 集合的 size。web

@ThreadSafe
public class QueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {
    private BlockingQueue<Request> queue = new LinkedBlockingQueue();

    public QueueScheduler() {
    }

    public void pushWhenNoDuplicate(Request request, Task task) {
        this.queue.add(request);
    }

    public Request poll(Task task) {
        return (Request)this.queue.poll();
    }

    public int getLeftRequestsCount(Task task) {
        return this.queue.size();
    }

    public int getTotalRequestsCount(Task task) {
        return this.getDuplicateRemover().getTotalRequestsCount(task);
    }
}
複製代碼

Downloader

Downloader 是負責請求 URL 獲取返回值(HTML、Json、Jsonp 等)的一個組件,同時也會處理 POST 重定向、Https 驗證、IP 代理、判斷失敗重試等 正則表達式

圖片
Downloader 接口定義了下載和設置線程數的方法。

public interface Downloader {

    /** * Downloads web pages and store in Page object. * * @param request request * @param task task * @return page */
    public Page download(Request request, Task task);

    /** * Tell the downloader how many threads the spider used. * @param threadNum number of threads */
    public void setThread(int threadNum);
}
複製代碼

AbstractDownloader 抽象類提供了更上層的 download 方法實現及定義了成功失敗的回調方法。安全

public abstract class AbstractDownloader implements Downloader {

    /** * A simple method to download a url. * * @param url url * @return html */
    public Html download(String url) {
        return download(url, null);
    }

    /** * A simple method to download a url. * * @param url url * @param charset charset * @return html */
    public Html download(String url, String charset) {
        Page page = download(new Request(url), Site.me().setCharset(charset).toTask());
        return (Html) page.getHtml();
    }

    protected void onSuccess(Request request) {
    }

    protected void onError(Request request) {
    }

}
複製代碼

HttpClientDownloader 類是 WebMagic Downloader 的默認實現,主要功能是根據配置生成 HttpClient 實例請求網絡,將請求、結果封裝成 Page 對象,並調用相應的回調方法。網絡

經過 Site 獲取域名,而後經過域名判斷是否在 httpClients 這個 map 中已存在 HttpClient 實例,若是存在則重用,不然經過 httpClientGenerator 建立一個新的實例,而後加入到 httpClients這個 map 中並返回。注意爲了確保線程安全性,這裏用到了線程安全的雙重判斷機制。多線程

private CloseableHttpClient getHttpClient(Site site) {
    if (site == null) {
        return httpClientGenerator.getClient(null);
    }
    String domain = site.getDomain();
    CloseableHttpClient httpClient = httpClients.get(domain);
    if (httpClient == null) {
        synchronized (this) {
            httpClient = httpClients.get(domain);
            if (httpClient == null) {
                httpClient = httpClientGenerator.getClient(site);
                httpClients.put(domain, httpClient);
            }
        }
    }
    return httpClient;
}
複製代碼

WebMagic threadNum 既是線程池的線程數,也是 HttpClient ConnectionManager 的鏈接數,這裏設置的就是鏈接數。

@Override
public void setThread(int thread) {
    httpClientGenerator.setPoolSize(thread);
}
複製代碼
public HttpClientGenerator setPoolSize(int poolSize) {
    connectionManager.setMaxTotal(poolSize);
    return this;
}
複製代碼

HttpClientDownloader 優先獲取 Site 對象的 charset,若是爲空會智能檢測字符編碼,首先判斷 httpResponse.getEntity().getContentType().getValue() 是否含有好比 charset=utf-8, 不然用 Jsoup 解析內容,判斷是提取 meta 標籤,而後判斷針對 HTML4 中 和 HTML5 中 分狀況判斷出字符編碼.

private String getHtmlCharset(String contentType, byte[] contentBytes) throws IOException {
    String charset = CharsetUtils.detectCharset(contentType, contentBytes);
    if (charset == null) {
        charset = Charset.defaultCharset().name();
        logger.warn("Charset autodetect failed, use {} as charset. Please specify charset in Site.setCharset()", Charset.defaultCharset());
    }
    return charset;
}
複製代碼
public static String detectCharset(String contentType, byte[] contentBytes) throws IOException {
    String charset;
    // charset
    // 一、encoding in http header Content-Type
    charset = UrlUtils.getCharset(contentType);
    if (StringUtils.isNotBlank(contentType) && StringUtils.isNotBlank(charset)) {
        logger.debug("Auto get charset: {}", charset);
        return charset;
    }
    // use default charset to decode first time
    Charset defaultCharset = Charset.defaultCharset();
    String content = new String(contentBytes, defaultCharset);
    // 二、charset in meta
    if (StringUtils.isNotEmpty(content)) {
        Document document = Jsoup.parse(content);
        Elements links = document.select("meta");
        for (Element link : links) {
            // 2.一、html4.01 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
            String metaContent = link.attr("content");
            String metaCharset = link.attr("charset");
            if (metaContent.indexOf("charset") != -1) {
                metaContent = metaContent.substring(metaContent.indexOf("charset"), metaContent.length());
                charset = metaContent.split("=")[1];
                break;
            }
            // 2.二、html5 <meta charset="UTF-8" />
            else if (StringUtils.isNotEmpty(metaCharset)) {
                charset = metaCharset;
                break;
            }
        }
    }
    logger.debug("Auto get charset: {}", charset);
    // 三、todo use tools as cpdetector for content decode
    return charset;
}
複製代碼

download() 方法就是常規的 HttpClient 操做請求網絡,handleResponse() 方法將請求、結果封裝成 Page 對象,而後調用相應的回調方法,最後將 HttpClient 的鏈接和代理釋放掉。

@Override
public Page download(Request request, Task task) {
    if (task == null || task.getSite() == null) {
        throw new NullPointerException("task or site can not be null");
    }
    CloseableHttpResponse httpResponse = null;
    CloseableHttpClient httpClient = getHttpClient(task.getSite());
    Proxy proxy = proxyProvider != null ? proxyProvider.getProxy(task) : null;
    HttpClientRequestContext requestContext = httpUriRequestConverter.convert(request, task.getSite(), proxy);
    Page page = Page.fail();
    try {
        httpResponse = httpClient.execute(requestContext.getHttpUriRequest(), requestContext.getHttpClientContext());
        page = handleResponse(request, request.getCharset() != null ? request.getCharset() : task.getSite().getCharset(), httpResponse, task);
        onSuccess(request);
        logger.info("downloading page success {}", request.getUrl());
        return page;
    } catch (IOException e) {
        logger.warn("download page {} error", request.getUrl(), e);
        onError(request);
        return page;
    } finally {
        if (httpResponse != null) {
            //ensure the connection is released back to pool
            EntityUtils.consumeQuietly(httpResponse.getEntity());
        }
        if (proxyProvider != null && proxy != null) {
            proxyProvider.returnProxy(proxy, page, task);
        }
    }
}
複製代碼

PageProcessor

PageProcessor 接口定義了 process() 頁面分析的方法還有 getSite() 提供 HttpClient 請求相關配置的方法。

public interface PageProcessor {

    /** * process the page, extract urls to fetch, extract the data and store * * @param page page */
    public void process(Page page);

    /** * get the site settings * * @return site * @see Site */
    public Site getSite();
}
複製代碼

這裏的頁面分析主要指HTML頁面的分析,頁面分析能夠說是垂直爬蟲最複雜的一部分。Selector 是 WebMagic 爲了簡化頁面抽取開發的獨立模塊,整合了 CSS Selector、XPath 和正則表達式,並能夠進行鏈式的抽取,很容易就實現強大的功能。

圖片

圖片
圖片
接口:

  • Selector:定義了根據字符串選擇單個元素和選擇多個元素的方法。
  • ElementSelector:定義了根據 Jsoup Element選擇單個、多個元素的方法。主要用於 CSS、Xpath 選擇器。

抽象類:

  • BaseElementSelector 實現類前面說的兩個接口,主要用於 CSS、Xpath 選擇器繼承。模板化接口方法,並定義了一些選擇元素的方法由子類實現。

實現類:

  • CssSelector:CSS 選擇器的實現類,繼承 BaseElementSelector。基本實現都是基於Jsoup 的 CSS 選擇接口。
  • XpathSelector:Xpath 選擇器的實現類,繼承 BaseElementSelector。基本實現都是採用做者本身基於 Jsoup 實現的 Xsoup 的相關接口。
  • RegexSelector:正則表達式選擇器的實現類,僅實現了Selector接口。

源碼:
源碼就不做分析了,基本就是基本的 Java 正則 API 和 Jsoup API 的封裝調用。

Pipeline

Pipeline其實也是容易被忽略的一部分。你們都知道持久化的重要性,可是不少框架都選擇直接在頁面抽取的時候將持久化一塊兒完成,例如crawer4j。可是Pipeline真正的好處是,將頁面的在線分析和離線處理拆分開來,能夠在一些線程裏進行下載,另外一些線程裏進行處理和持久化。

Pipeline 接口很簡單,只有一個 process() 方法,參數是 PageProcessor 的解析結果及任務 task 對象,實現類主要有 ConsolePipeline、FilePipeline、ResultItemsCollectorPipeline 等,把解析結果拼接起來輸出到控制檯、文件或者保存到內存集合對象中。源碼很簡單也就不展開分析了。

public interface Pipeline {

    /** * Process extracted results. * * @param resultItems resultItems * @param task task */
    public void process(ResultItems resultItems, Task task);
}
複製代碼
@ThreadSafe
public class FilePipeline extends FilePersistentBase implements Pipeline {

    private Logger logger = LoggerFactory.getLogger(getClass());

    /** * create a FilePipeline with default path"/data/webmagic/" */
    public FilePipeline() {
        setPath("/data/webmagic/");
    }

    public FilePipeline(String path) {
        setPath(path);
    }

    @Override
    public void process(ResultItems resultItems, Task task) {
        String path = this.path + PATH_SEPERATOR + task.getUUID() + PATH_SEPERATOR;
        try {
            PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(getFile(path + DigestUtils.md5Hex(resultItems.getRequest().getUrl()) + ".html")),"UTF-8"));
            printWriter.println("url:\t" + resultItems.getRequest().getUrl());
            for (Map.Entry<String, Object> entry : resultItems.getAll().entrySet()) {
                if (entry.getValue() instanceof Iterable) {
                    Iterable value = (Iterable) entry.getValue();
                    printWriter.println(entry.getKey() + ":");
                    for (Object o : value) {
                        printWriter.println(o);
                    }
                } else {
                    printWriter.println(entry.getKey() + ":\t" + entry.getValue());
                }
            }
            printWriter.close();
        } catch (IOException e) {
            logger.warn("write file error", e);
        }
    }
}
複製代碼

結語

WebMagic 由四大組件(Downloader、PageProcessor、Scheduler、Pipeline)構成,核心代碼很是簡單,主要是將這些組件結合並完成多線程的任務。在WebMagic 中,基本上能夠對爬蟲的功能作任何定製。

@123lxw123, 本文版權屬於再惠研發團隊,歡迎轉載,轉載請保留出處。

相關文章
相關標籤/搜索