記錄:一個爬蟲程序的優化過程

這兩天手癢用jsoup擼了個抓取圖片爬蟲java

 

第一版:多線程

ThreadPoolExecutor executor = new ThreadPoolExecutor(6, 6, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200));
for (int j = 1; j <= 總頁數; j++) {
    executor.execute(()->{
        // 1.抓取網頁,得到圖片url
        // 2.根據url保存圖片
        // 3.保存後記錄成功和失敗的信息到本地txt
	});
}

程序看起來沒有什麼問題,只開了6線程操做,開始沒敢開太多線程,怕被網站拉黑。。併發

可是運行起來太慢了,一夜只爬了10個多G,目前分析問題主要有兩點:異步

    1.併發操做本地txt,會拖慢單個任務執行的速度優化

    2.線程沒有充分利用網站

首先看下操做文件方法吧,所用方法來自NIO:url

Files.write(log, attr.getBytes("utf8"), StandardOpenOption.APPEND);

經過查看源碼發現,該方法會構造一個OutputStream去調用write方法,而write方法上有synchronized,多線程操做無疑會轉爲重量鎖spa

那麼想要記錄日誌的話,最好是讓它們沒有線程競爭的狀況下再去操做文件;線程

 

而後是優化多線程操做,相比於獲取url,下載圖片確定是要比它更慢的,若是先統一獲取url,而後根據url再去下載圖片是否會更好?日誌

 

第一次優化:

// 用於記錄全部url
Queue<String> queue = new ConcurrentLinkedQueue<String>();
// 用於記錄全部日誌
Queue<String> logQueue = new ConcurrentLinkedQueue<String>();
// 全部任務
List<Consumer> allTasks = new ArrayList<>();
for (int j = 1; j <= 總頁數; j++) {
    allTasks.add(t ->{
    	// 得到url,放入queue中
    });
}
// 使用ForkJoin並行執行記錄url的任務
BatchTaskRunner.execute(allTasks, taskPerThread, tasks -> {
    tasks.forEach(t->t.accept(null));
});
// 將全部url並行執行下載
List<String> list = queue.stream().collect(Collectors.toList());
BatchTaskRunner.execute(list, taskPerThread, tasks -> {
    tasks.forEach(
        // 1.下載文件
        // 2.將url成功或失敗放到logQueue中
    );
});
// 最後再記錄日誌
logQueue.forEach(
    // 將全部日誌保存到本地txt中
);

這裏主要分爲三步:

    1.並行執行任務,抓取url放入queue

    2.並行執行下載,從queue中取url

    3.從logQueue中保存日誌到本地

分析:先是抓取全部url,而後再去並行執行保存;將保存日誌放到最後,保存了圖片後最後的日誌反而可有可無了,可是運行時候我發現仍是存在問題:

我去,爲何必定要先放url再去處理啊!!放的同時也取任務,最後剩餘的任務再並行執行不是更快!

 

好吧,有了這個想法,直接開幹:

第二次優化:

/****************************第二次增長的邏輯start**************************************/
// 控制主線程執行
CountDownLatch countDownLatch = new CountDownLatch(totalPageSize);
// 用於消費queue的線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(12, 12, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
// 用於自旋時的開關
volatile boolean flag = false;
/****************************第二次增長的邏輯end**************************************/


// 用於記錄全部url
Queue<String> queue = new ConcurrentLinkedQueue<String>();
// 用於記錄全部日誌
Queue<String> logQueue = new ConcurrentLinkedQueue<String>();
// 全部任務
List<Consumer> allTasks = new ArrayList<>();


for (int j = 1; j <= 總頁數; j++) {
    allTasks.add(t ->{
    	// 得到url,放入queue中
    });
}
// 開了一個線程去執行,主要是爲了讓它異步去操做
new Thread(()->{
    // 使用ForkJoin並行執行記錄url的任務
    // finally中調用countDownLatch.countDown()
    BatchTaskRunner.execute(allTasks, taskPerThread, tasks -> {
        tasks.forEach(t->t.accept(null));
    });
}).start();

// 一邊抓取一邊消費
for (int i = 0; i < 12; i++) {
    executor.execute(()->{
    	try {
    		takeQueue(); // 從queue得到url並消費,若是信號量歸零則將flag置爲true
    	} catch (InterruptedException e) {
    				
    	}
    });
}
for(;;) {
    if(flag) {
    	break;
    }
    Thread.sleep(10000);
}
countDownLatch.await();
executor.shutdownNow();
// 都取完了,就沒必要再去並行執行了
if(queue.size() == 0) {
    return;
}
// 將全部url並行執行下載
List<String> list = queue.stream().collect(Collectors.toList());
BatchTaskRunner.execute(list, taskPerThread, tasks -> {
    tasks.forEach(
        // 1.下載文件
        // 2.將url成功或失敗放到logQueue中
    );
});
// 最後再記錄日誌
logQueue.forEach(
    // 將全部日誌保存到本地txt中
);

其中的takeQueue方法邏輯:

void takeQueue() throws InterruptedException {
		for(;;) {
			long count = countDownLatch.getCount();
			// 未歸零則一直去消費
			if(count > 0) {
				String poll = queue.poll();
				if(poll != null) {
					consumer.accept(poll); // 根據url去下載
				}else {
					Thread.sleep(3000);
				}
			} else {
				flag = true;
				return;
			}
		}
	}

大概擼了個邏輯,日誌什麼的已經不重要了。。。

主線程自旋,保存url同時去併發下載,若是保存url的邏輯執行完了隊列中還有url,則並行去下載

看着線程都用上了,感受爽多了

 

即便在消費,queue中對象仍是愈來愈多

 

大概邏輯寫好了,代碼還能夠再優化一下,有想法的歡迎留言

相關文章
相關標籤/搜索