這是我參與更文挑戰的第1天,活動詳情查看: 更文挑戰java
crawler4j 是一個開源的 Java 爬蟲框架,且擁有4k多個 star ,相信其源碼值得我去研究,因此才寫下這篇文章。若有錯誤歡迎聯繫我指正!git
其實本文的重點不在於研究 crawler4j 源碼中的各類邏輯、細節等,主要仍是以 crawler4j 這個例子來看 Java 中 wait 與 notify 的使用,看看熱門開源項目裏是如何使用如何編碼的。github
想快速瞭解的話,你能夠直接看核心邏輯部分,也能夠直接看究極簡單版wait/notify使用markdown
crawler4j 中最重要的兩個類莫過於 CrawlController 和 WebCrawler 了,一個是用於設置與開啓爬蟲,而另外一個則是爬蟲的核心實現類。這裏討論的代碼基本都在 CrawlController 類中。session
熟悉的同窗都知道,在開啓 controller 時通常有兩個用法,以下:框架
// 用法1:阻塞式,當爬蟲線程都結束後纔會執行這行之後的代碼
controller.start(factory, numberOfCrawlers);
複製代碼
// 用法2:非阻塞式,在 start 之後, waitUntilFinish 之前的代碼都會馬上執行,在 waitUntilFinish 處阻塞
controller.startNonBlocking(factory, numberOfCrawlers);
// 這中間的代碼都會異步執行
controller.waitUntilFinish();
複製代碼
這個的源碼部分就是本文重點要討論的 wait 與 notify 的使用。異步
首先,在 CrawlController 中定義了兩個這個功能須要的重要變量:ide
/** * Is the crawling of this session finished? */
protected boolean finished;
protected final Object waitingLock = new Object();
複製代碼
爲了只關注重要內容,其餘部分代碼我以註釋的形式帶過。oop
咱們調用 start 方法的入口在這裏post
/** * Start the crawling session and wait for it to finish. * * @param crawlerFactory * factory to create crawlers on demand for each thread * @param numberOfCrawlers * the number of concurrent threads that will be contributing in * this crawling session. * @param <T> Your class extending WebCrawler */
public <T extends WebCrawler> void start(WebCrawlerFactory<T> crawlerFactory, int numberOfCrawlers) {
this.start(crawlerFactory, numberOfCrawlers, true);
}
複製代碼
它會去調用另外一個有更多參數的 start 方法,多的參數就是 isBlocking
,這個參數表示是否須要阻塞,具體做用在下面這個 start 方法的註釋中給出
protected <T extends WebCrawler> void start(final WebCrawlerFactory<T> crawlerFactory, final int numberOfCrawlers, boolean isBlocking) {
// 根據提供的工廠類 crawlerFactory 構造指定數量的線程並使它們開始運行
// 建立一個監控線程 monitorThread 以下
Thread monitorThread = new Thread(new Runnable() {
@Override
public void run() {
try {
synchronized (waitingLock) {
while (true) {
// 設置的監控循環週期
sleep(config.getThreadMonitoringDelaySeconds());
boolean someoneIsWorking = false;
// 第一部分:
// 觀察每一個 爬蟲線程 是否正常運行,若沒有正常運行則採起相應措施
// 第一部分的代碼省略,有興趣能夠去 github 看
// 第二部分:
// 查看是否還有正在工做的線程,若沒有則準備退出並關閉資源
// 這個部分也是咱們常常看到的 "It looks like no thread is working, waiting for ..." 等 打印日誌的所在源碼部分
// 在關閉時會調用 notifyAll
if (!someoneIsWorking && shutOnEmpty) {
// 再次確保無線程工做且隊列中無 URL 等待爬取
// 釋放資源
waitingLock.notifyAll();
// 釋放資源
}
}
}
} catch (Throwable e) {
if (config.isHaltOnError()) {
// 發生了某個錯誤
setError(e);
synchronized (waitingLock) {
// 釋放資源
waitingLock.notifyAll();
// 釋放資源
}
} else {
logger.error("Unexpected Error", e);
}
}
}
});
monitorThread.start();
// 若是須要阻塞,那麼就調用 waitUntilFinish 這個方法,代碼執行到這就會阻塞住
if (isBlocking) {
waitUntilFinish();
}
}
複製代碼
從代碼中能夠看到,阻塞的地方在最後幾行,也就是監控線程開啓後的 waitUntilFinish 方法。
監控線程在監控到線程都運行完後,調用 waitingLock.notifyAll()
從而使這裏的阻塞結束,那麼這是如何作到的呢,咱們再來看 waitUntilFinish 方法。
這個方法的源碼很短,我直接放出來。
/** * Wait until this crawling session finishes. */
public void waitUntilFinish() {
while (!finished) {
synchronized (waitingLock) {
if (config.isHaltOnError()) {
Throwable t = getError();
if (t != null && config.isHaltOnError()) {
if (t instanceof RuntimeException) {
throw (RuntimeException)t;
} else if (t instanceof Error) {
throw (Error)t;
} else {
throw new RuntimeException("error on monitor thread", t);
}
}
}
if (finished) {
return;
}
try {
// 主動讓出並等待鎖資源
waitingLock.wait();
} catch (InterruptedException e) {
logger.error("Error occurred", e);
}
}
}
}
複製代碼
首先,在 start 方法和 waitUntilFinish 方法中都有 synchronized 來修飾關鍵的代碼塊,而且爭奪的都是同一個鎖 waitingLock。這意味着一方執行,就會有另外一方被阻塞。咱們但願的是 waitUntilFinish 一直被阻塞,直到爬蟲線程都執行完(也就是 start 方法中對應的 synchronized 方法塊裏的內容)後,再讓 waitUntilFinish 方法結束。這也就是源碼中對這部分的處理,同時也是 wait 和 notify 使用的思想。
再來理一遍源碼這塊的邏輯:
其中也還有不少細節沒有說起,如延時的設置、循環監控週期、資源的釋放等等,因爲不是本文關注的重點內容,能夠本身參照源碼理解一下。
不難看出其實本質就是某個線程調用 wait 方法主動讓出鎖給另外一個線程,而後另外一個線程在執行完任務後調用 notify/notifyAll 來通知它執行完了以讓它開始搶佔鎖。
其中還有一些細節:
爲了加深理解,本身動手實現一下 crawler4j 這個機制的究極簡易版以下(注意只是實現 wait/notify 機制):
package thread_practice;
public class WaitNotify {
private final Object waitingLock = new Object();
private boolean isFinished = false;
public void start() {
synchronized (waitingLock) {
isFinished = false;
System.out.println("doing sth...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("done.");
isFinished = true;
waitingLock.notifyAll();
}
}
public void waitUntilFinish() {
synchronized (waitingLock) {
if (isFinished) return;
try {
waitingLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
WaitNotify wn = new WaitNotify();
new Thread(() -> wn.start()).start();
wn.waitUntilFinish();
System.out.println("continue another thing...");
}
}
複製代碼
執行程序5秒以內:
執行程序5秒以後:
能夠看到主線程確實是阻塞在了 wn.waitUntilFinish()
這個地方,在5秒以後才繼續執行下去。 其邏輯和前面幾節個人解釋同樣,是隻提取了核心部分的簡化版。
本文結合 crawler4j 中實際使用的例子對實際場景中如何使用 wait 與 notify 進行了介紹與討論,也根據 crawler4j 中的場景實現了一個簡易版功能。線程之間的通訊離不開 wait 與 notify ,固然也不止 wait 與 notify ,我會在之後對這方面作更深刻的研究。
本文有什麼錯誤歡迎聯繫我指正。
這裏只是單個線程通知單個線程任務執行完畢,若是是多個線程通知單個線程的場景怎麼處理呢?