從crawler4j源碼中看wait與notify

這是我參與更文挑戰的第1天,活動詳情查看: 更文挑戰java

引言

crawler4j 是一個開源的 Java 爬蟲框架,且擁有4k多個 star ,相信其源碼值得我去研究,因此才寫下這篇文章。若有錯誤歡迎聯繫我指正!git

其實本文的重點不在於研究 crawler4j 源碼中的各類邏輯、細節等,主要仍是以 crawler4j 這個例子來看 Java 中 waitnotify 的使用,看看熱門開源項目裏是如何使用如何編碼的。github

想快速瞭解的話,你能夠直接看核心邏輯部分,也能夠直接看究極簡單版wait/notify使用markdown

正文

crawler4j 中最重要的兩個類莫過於 CrawlControllerWebCrawler 了,一個是用於設置與開啓爬蟲,而另外一個則是爬蟲的核心實現類。這裏討論的代碼基本都在 CrawlController 類中。session

熟悉的同窗都知道,在開啓 controller 時通常有兩個用法,以下:框架

// 用法1:阻塞式,當爬蟲線程都結束後纔會執行這行之後的代碼
controller.start(factory, numberOfCrawlers);
複製代碼
// 用法2:非阻塞式,在 start 之後, waitUntilFinish 之前的代碼都會馬上執行,在 waitUntilFinish 處阻塞
controller.startNonBlocking(factory, numberOfCrawlers);
// 這中間的代碼都會異步執行
controller.waitUntilFinish();
複製代碼

這個的源碼部分就是本文重點要討論的 waitnotify 的使用。異步

兩個重要變量

首先,在 CrawlController 中定義了兩個這個功能須要的重要變量:ide

/** * Is the crawling of this session finished? */
protected boolean finished;

protected final Object waitingLock = new Object();

複製代碼
  • finished 用於判斷這次爬取是否已結束
  • waitingLock 則是用於加鎖

阻塞式 start 方法

爲了只關注重要內容,其餘部分代碼我以註釋的形式帶過。oop

咱們調用 start 方法的入口在這裏post

/** * Start the crawling session and wait for it to finish. * * @param crawlerFactory * factory to create crawlers on demand for each thread * @param numberOfCrawlers * the number of concurrent threads that will be contributing in * this crawling session. * @param <T> Your class extending WebCrawler */
public <T extends WebCrawler> void start(WebCrawlerFactory<T> crawlerFactory, int numberOfCrawlers) {
    this.start(crawlerFactory, numberOfCrawlers, true);
}
複製代碼

它會去調用另外一個有更多參數的 start 方法,多的參數就是 isBlocking ,這個參數表示是否須要阻塞,具體做用在下面這個 start 方法的註釋中給出

protected <T extends WebCrawler> void start(final WebCrawlerFactory<T> crawlerFactory, final int numberOfCrawlers, boolean isBlocking) {

    // 根據提供的工廠類 crawlerFactory 構造指定數量的線程並使它們開始運行

    // 建立一個監控線程 monitorThread 以下
    Thread monitorThread = new Thread(new Runnable() {

        @Override
        public void run() {
            try {
                synchronized (waitingLock) {

                    while (true) {
                        // 設置的監控循環週期
                        sleep(config.getThreadMonitoringDelaySeconds());
                        boolean someoneIsWorking = false;
                        
                        // 第一部分: 
                        // 觀察每一個 爬蟲線程 是否正常運行,若沒有正常運行則採起相應措施
                        // 第一部分的代碼省略,有興趣能夠去 github 看
                    
                        // 第二部分: 
                        // 查看是否還有正在工做的線程,若沒有則準備退出並關閉資源
                        // 這個部分也是咱們常常看到的 "It looks like no thread is working, waiting for ..." 等 打印日誌的所在源碼部分
                        // 在關閉時會調用 notifyAll
                        if (!someoneIsWorking && shutOnEmpty) {
                            // 再次確保無線程工做且隊列中無 URL 等待爬取

                            // 釋放資源

                            waitingLock.notifyAll();

                            // 釋放資源
                        }
                    }
                }
            } catch (Throwable e) {
                if (config.isHaltOnError()) {
                    // 發生了某個錯誤
                    setError(e);
                    synchronized (waitingLock) {
                        // 釋放資源

                        waitingLock.notifyAll();

                        // 釋放資源
                    }
                } else {
                    logger.error("Unexpected Error", e);
                }
            }
        }

    });

    monitorThread.start();

    // 若是須要阻塞,那麼就調用 waitUntilFinish 這個方法,代碼執行到這就會阻塞住
    if (isBlocking) {
        waitUntilFinish();
    }
}
複製代碼

從代碼中能夠看到,阻塞的地方在最後幾行,也就是監控線程開啓後的 waitUntilFinish 方法。

監控線程在監控到線程都運行完後,調用 waitingLock.notifyAll() 從而使這裏的阻塞結束,那麼這是如何作到的呢,咱們再來看 waitUntilFinish 方法。

waitUntilFinish 方法如何使 start 阻塞

這個方法的源碼很短,我直接放出來。

/** * Wait until this crawling session finishes. */
public void waitUntilFinish() {
    while (!finished) {
        synchronized (waitingLock) {
            if (config.isHaltOnError()) {
                Throwable t = getError();
                if (t != null && config.isHaltOnError()) {
                    if (t instanceof RuntimeException) {
                        throw (RuntimeException)t;
                    } else if (t instanceof Error) {
                        throw (Error)t;
                    } else {
                        throw new RuntimeException("error on monitor thread", t);
                    }
                }
            }
            if (finished) {
                return;
            }
            try {
                // 主動讓出並等待鎖資源
                waitingLock.wait();
            } catch (InterruptedException e) {
                logger.error("Error occurred", e);
            }
        }
    }
}
複製代碼

首先,在 start 方法和 waitUntilFinish 方法中都有 synchronized 來修飾關鍵的代碼塊,而且爭奪的都是同一個鎖 waitingLock。這意味着一方執行,就會有另外一方被阻塞。咱們但願的是 waitUntilFinish 一直被阻塞,直到爬蟲線程都執行完(也就是 start 方法中對應的 synchronized 方法塊裏的內容)後,再讓 waitUntilFinish 方法結束。這也就是源碼中對這部分的處理,同時也是 wait 和 notify 使用的思想。

核心邏輯

再來理一遍源碼這塊的邏輯:

  1. monitorThread 的 run 方法中使用 synchronized 獲取鎖 waitingLock,並循環檢查是否全部爬蟲線程、爬蟲任務都執行完畢。
  2. waitUntilFinish 使用 synchronized 獲取鎖 waitingLock,並根據變量 isFinished 檢查爬取過程是否結束,若結束則直接返回;若未結束,則調用 wait 方法讓出資源給 monitorThread 的 run 方法。
  3. 即便 waitUntilFinish 在調用 wait 方法後又獲取到了鎖 waitingLock ,它也會根據爬取是否結束 isFinished 來判斷是否要再次進入循環調用 wait 方法。
  4. monitorThread 在檢查到全部爬蟲線程、爬蟲任務都執行完畢後,調用 notifyAll 方法(和 notify 同樣,只是對全部競爭鎖資源的線程都發送通知)來讓 waitUntilFinish 繼續從 wait 處執行下去
  5. waitUntilFinish 得到鎖資源,並從調用 wait 方法後的代碼繼續向下執行,在循環判斷 isFinished 時發現爬取過程結束了,則直接返回,整個過程結束

其中也還有不少細節沒有說起,如延時的設置、循環監控週期、資源的釋放等等,因爲不是本文關注的重點內容,能夠本身參照源碼理解一下。

透過現象看本質

不難看出其實本質就是某個線程調用 wait 方法主動讓出鎖給另外一個線程,而後另外一個線程在執行完任務後調用 notify/notifyAll 來通知它執行完了以讓它開始搶佔鎖

其中還有一些細節:

  • 調用 notify 後,調用 wait 的線程並不會立刻得到鎖資源,而是等調用 notify 的那個線程釋放鎖資源後它才能得到,也就是說即便線程調用了 notify 方法,可能也要等到他退出 synchronized 代碼塊後,其餘線程才能得到鎖資源
  • 調用 wait 釋放鎖,又從新得到鎖後,代碼會從 wait 方法下面的那一行繼續向下執行,而不會去回到 synchronized 代碼塊開始的地方執行,這也是爲何源碼中要使用 while 循環去重複獲取鎖資源。由於若是沒有這層循環而該線程在釋放鎖後從新獲取鎖時其實爬取過程還沒結束(也就是 isFinished 是 False),那 waitUntilFinish 就會直接結束
  • wait 其實能夠設置超時時長 wait(long timeout),在 timeout 時間後喚醒本身,這就至關於 timeout 時間後有人來通知他能夠去搶鎖資源了

究極簡易版實現

爲了加深理解,本身動手實現一下 crawler4j 這個機制的究極簡易版以下(注意只是實現 wait/notify 機制):

package thread_practice;

public class WaitNotify {

    private final Object waitingLock = new Object();
    private boolean isFinished = false;

    public void start() {
        synchronized (waitingLock) {
            isFinished = false;
            System.out.println("doing sth...");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("done.");
            isFinished = true;
            waitingLock.notifyAll();
        }
    }

    public void waitUntilFinish() {
        synchronized (waitingLock) {
            if (isFinished) return;

            try {
                waitingLock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        WaitNotify wn = new WaitNotify();
        new Thread(() -> wn.start()).start();
        wn.waitUntilFinish();
        System.out.println("continue another thing...");
    }
}
複製代碼

執行程序5秒以內:

執行程序5秒以後:

能夠看到主線程確實是阻塞在了 wn.waitUntilFinish() 這個地方,在5秒以後才繼續執行下去。 其邏輯和前面幾節個人解釋同樣,是隻提取了核心部分的簡化版。

總結

本文結合 crawler4j 中實際使用的例子對實際場景中如何使用 wait 與 notify 進行了介紹與討論,也根據 crawler4j 中的場景實現了一個簡易版功能。線程之間的通訊離不開 wait 與 notify ,固然也不止 wait 與 notify ,我會在之後對這方面作更深刻的研究。

本文有什麼錯誤歡迎聯繫我指正。

擴展

這裏只是單個線程通知單個線程任務執行完畢,若是是多個線程通知單個線程的場景怎麼處理呢?

  • 若是是多個線程都執行完了,才通知某個線程,那能夠參照 crawler4j 的方式,使用一個監控線程去循環檢查全部線程是否執行完
  • 若是是多個線程中的某個執行完了就要通知,如何實現?
相關文章
相關標籤/搜索