Java 多線程爬蟲及分佈式爬蟲架構探索

這是 Java 爬蟲系列博文的第五篇,在上一篇 Java 爬蟲服務器被屏蔽,不要慌,我們換一臺服務器 中,咱們簡單的聊反爬蟲策略和反反爬蟲方法,主要針對的是 IP 被封及其對應辦法。前面幾篇文章咱們把爬蟲相關的基本知識都講的差很少啦。這一篇咱們來聊一聊爬蟲架構相關的內容。java

前面幾章內容咱們的爬蟲程序都是單線程,在咱們調試爬蟲程序的時候,單線程爬蟲沒什麼問題,可是當咱們在線上環境使用單線程爬蟲程序去採集網頁時,單線程就暴露出了兩個致命的問題:git

  • 採集效率特別慢,單線程之間都是串行的,下一個執行動做須要等上一個執行完才能執行
  • 對服務器的CUP等利用率不高,想一想咱們的服務器都是 8核16G,32G 的只跑一個線程會不會太浪費啦

線上環境不可能像咱們本地測試同樣,不在意採集效率,只要能正確提取結果就行。在這個時間就是金錢的年代,不可能給你時間去慢慢的採集,因此單線程爬蟲程序是行不通的,咱們須要將單線程改爲多線程的模式,來提高採集效率和提升計算機利用率。github

多線程的爬蟲程序設計比單線程就要複雜不少,可是與其餘業務在高併發下要保證數據安全又不一樣,多線程爬蟲在數據安全上到要求不是那麼的高,由於每一個頁面均可以被看做是一個獨立體。要作好多線程爬蟲就必須作好兩點:第一點就是統一的待採集 URL 維護,第二點就是 URL 的去重, 下面咱們簡單的來聊一聊這兩點。正則表達式

維護待採集的 URL

多線程爬蟲程序就不能像單線程那樣,每一個線程獨自維護這本身的待採集 URL,若是這樣的話,那麼每一個線程採集的網頁將是同樣的,你這就不是多線程採集啦,你這是將一個頁面採集的屢次。基於這個緣由咱們就須要將待採集的 URL 統一維護,每一個線程從統一 URL 維護處領取採集 URL ,完成採集任務,若是在頁面上發現新的 URL 連接則添加到 統一 URL 維護的容器中。下面是幾種適合用做統一 URL 維護的容器:redis

  • JDK 的安全隊列,例如 LinkedBlockingQueue
  • 高性能的 NoSQL,好比 Redis、Mongodb
  • MQ 消息中間件

URL 的去重

URL 的去重也是多線程採集的關鍵一步,由於若是不去重的話,那麼咱們將採集到大量重複的 URL,這樣並無提高咱們的採集效率,好比一個分頁的新聞列表,咱們在採集第一頁的時候能夠獲得 二、三、四、5 頁的連接,在採集第二頁的時候又會獲得 一、三、四、5 頁的連接,待採集的 URL 隊列中將存在大量的列表頁連接,這樣就會重複採集甚至進入到一個死循環當中,因此就須要 URL 去重。URL 去重的方法就很是多啦,下面是幾種經常使用的 URL 去重方式:數據庫

  • 將 URL 保存到數據庫進行去重,好比 redis、MongoDB
  • 將 URL 放到哈希表中去重,例如 hashset
  • 將 URL 通過 MD5 以後保存到哈希表中去重,相比於上面一種,可以節約空間
  • 使用 布隆過濾器(Bloom Filter)去重,這種方式可以節約大量的空間,就是不那麼準確。

關於多線程爬蟲的兩個核心知識點咱們都知道啦,下面我畫了一個簡單的多線程爬蟲架構圖,以下圖所示:安全

多線程爬蟲架構圖

上面咱們主要了解了多線程爬蟲的架構設計,接下來咱們不妨來試試 Java 多線程爬蟲,咱們以採集虎撲新聞爲例來實戰一下 Java 多線程爬蟲,Java 多線程爬蟲中設計到了 待採集 URL 的維護和 URL 去重,因爲咱們這裏只是演示,因此咱們就使用 JDK 內置的容器來完成,咱們使用 LinkedBlockingQueue 做爲待採集 URL 維護容器,HashSet 做爲 URL 去重容器。下面是 Java 多線程爬蟲核心代碼,詳細代碼以上傳 GitHub,地址在文末:服務器

/** * 多線程爬蟲 */
public class ThreadCrawler implements Runnable {
    // 採集的文章數
    private final AtomicLong pageCount = new AtomicLong(0);
    // 列表頁連接正則表達式
    public static final String URL_LIST = "https://voice.hupu.com/nba";
    protected Logger logger = LoggerFactory.getLogger(getClass());
    // 待採集的隊列
    LinkedBlockingQueue<String> taskQueue;
    // 採集過的連接列表
    HashSet<String> visited;
    // 線程池
    CountableThreadPool threadPool;
    /** * * @param url 起始頁 * @param threadNum 線程數 * @throws InterruptedException */
    public ThreadCrawler(String url, int threadNum) throws InterruptedException {
        this.taskQueue = new LinkedBlockingQueue<>();
        this.threadPool = new CountableThreadPool(threadNum);
        this.visited = new HashSet<>();
        // 將起始頁添加到待採集隊列中
        this.taskQueue.put(url);
    }

    @Override
    public void run() {
        logger.info("Spider started!");
        while (!Thread.currentThread().isInterrupted()) {
	        // 從隊列中獲取待採集 URL
            final String request = taskQueue.poll();
            // 若是獲取 request 爲空,而且當前的線程採已經沒有線程在運行
            if (request == null) {
                if (threadPool.getThreadAlive() == 0) {
                    break;
                }
            } else {
                // 執行採集任務
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            processRequest(request);
                        } catch (Exception e) {
                            logger.error("process request " + request + " error", e);
                        } finally {
                            // 採集頁面 +1
                            pageCount.incrementAndGet();
                        }
                    }
                });
            }
        }
        threadPool.shutdown();
        logger.info("Spider closed! {} pages downloaded.", pageCount.get());
    }

    /** * 處理採集請求 * @param url */
    protected void processRequest(String url) {
        // 判斷是否爲列表頁
        if (url.matches(URL_LIST)) {
	        // 列表頁解析出詳情頁連接添加到待採集URL隊列中
            processTaskQueue(url);
        } else {
	        // 解析網頁
            processPage(url);
        }
    }
    /** * 處理連接採集 * 處理列表頁,將 url 添加到隊列中 * * @param url */
    protected void processTaskQueue(String url) {
        try {
            Document doc = Jsoup.connect(url).get();
            // 詳情頁連接
            Elements elements = doc.select(" div.news-list > ul > li > div.list-hd > h4 > a");
            elements.stream().forEach((element -> {
                String request = element.attr("href");
                // 判斷該連接是否存在隊列或者已採集的 set 中,不存在則添加到隊列中
                if (!visited.contains(request) && !taskQueue.contains(request)) {
                    try {
                        taskQueue.put(request);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }));
            // 列表頁連接
            Elements list_urls = doc.select("div.voice-paging > a");
            list_urls.stream().forEach((element -> {
                String request = element.absUrl("href");
                // 判斷是否符合要提取的列表連接要求
                if (request.matches(URL_LIST)) {
                    // 判斷該連接是否存在隊列或者已採集的 set 中,不存在則添加到隊列中
                    if (!visited.contains(request) && !taskQueue.contains(request)) {
                        try {
                            taskQueue.put(request);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }));

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /** * 解析頁面 * * @param url */
    protected void processPage(String url) {
        try {
            Document doc = Jsoup.connect(url).get();
            String title = doc.select("body > div.hp-wrap > div.voice-main > div.artical-title > h1").first().ownText();

            System.out.println(Thread.currentThread().getName() + " 在 " + new Date() + " 採集了虎撲新聞 " + title);
            // 將採集完的 url 存入到已經採集的 set 中
            visited.add(url);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        try {
            new ThreadCrawler("https://voice.hupu.com/nba", 5).run();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

咱們用 5 個線程去採集虎撲新聞列表頁看看效果若是?運行該程序,獲得以下結果:微信

多線程採集結果

結果中能夠看出,咱們啓動了 5 個線程採集了 61 頁頁面,一共耗時 2 秒鐘,能夠說效果仍是不錯的,咱們來跟單線程對比一下,看看差距有多大?咱們將線程數設置爲 1 ,再次啓動程序,獲得以下結果:多線程

單線程運行結果

能夠看出單線程採集虎撲 61 條新聞花費了 7 秒鐘,耗時差很少是多線程的 4 倍,你想一想這可只是 61 個頁面,頁面更多的話,差距會愈來愈大,因此多線程爬蟲效率仍是很是高的。

分佈式爬蟲架構

分佈式爬蟲架構是一個大型採集程序才須要使用的架構,通常狀況下使用單機多線程就能夠解決業務需求,反正我是沒有分佈式爬蟲項目的經驗,因此這一塊我也沒什麼能夠講的,可是咱們做爲技術人員,咱們須要對技術保存熱度,雖然不用,可是瞭解瞭解也無妨,我查閱了很多資料得出了以下結論:

分佈式爬蟲架構跟咱們多線程爬蟲架構在思路上來講是同樣的,咱們只須要在多線程的基礎上稍加改進就能夠變成一個簡單的分佈式爬蟲架構。由於分佈式爬蟲架構中爬蟲程序部署在不一樣的機器上,因此咱們待採集的 URL 和 採集過的 URL 就不能存放在爬蟲程序機器的內存中啦,咱們須要將它統一在某臺機器上維護啦,好比存放在 Redis 或者 MongoDB 中,每臺機器都從這上面獲取採集連接,而不是從 LinkedBlockingQueue 這樣的內存隊列中取連接啦,這樣一個簡單的分佈式爬蟲架構就出現了,固然這裏面還會有不少細節問題,由於我沒有分佈式架構的經驗,我也無從提及,若是你有興趣的話,歡迎交流。

源代碼:源代碼

文章不足之處,望你們多多指點,共同窗習,共同進步

最後

打個小廣告,歡迎掃碼關注微信公衆號:「平頭哥的技術博文」,一塊兒進步吧。

平頭哥的技術博文
相關文章
相關標籤/搜索