這兩天手癢用jsoup擼了個抓取圖片爬蟲java
第一版:多線程
ThreadPoolExecutor executor = new ThreadPoolExecutor(6, 6, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200)); for (int j = 1; j <= 總頁數; j++) { executor.execute(()->{ // 1.抓取網頁,得到圖片url // 2.根據url保存圖片 // 3.保存後記錄成功和失敗的信息到本地txt }); }
程序看起來沒有什麼問題,只開了6線程操做,開始沒敢開太多線程,怕被網站拉黑。。併發
可是運行起來太慢了,一夜只爬了10個多G,目前分析問題主要有兩點:異步
1.併發操做本地txt,會拖慢單個任務執行的速度優化
2.線程沒有充分利用網站
首先看下操做文件方法吧,所用方法來自NIO:url
Files.write(log, attr.getBytes("utf8"), StandardOpenOption.APPEND);
經過查看源碼發現,該方法會構造一個OutputStream去調用write方法,而write方法上有synchronized,多線程操做無疑會轉爲重量鎖spa
那麼想要記錄日誌的話,最好是讓它們沒有線程競爭的狀況下再去操做文件;線程
而後是優化多線程操做,相比於獲取url,下載圖片確定是要比它更慢的,若是先統一獲取url,而後根據url再去下載圖片是否會更好?日誌
第一次優化:
// 用於記錄全部url Queue<String> queue = new ConcurrentLinkedQueue<String>(); // 用於記錄全部日誌 Queue<String> logQueue = new ConcurrentLinkedQueue<String>(); // 全部任務 List<Consumer> allTasks = new ArrayList<>(); for (int j = 1; j <= 總頁數; j++) { allTasks.add(t ->{ // 得到url,放入queue中 }); } // 使用ForkJoin並行執行記錄url的任務 BatchTaskRunner.execute(allTasks, taskPerThread, tasks -> { tasks.forEach(t->t.accept(null)); }); // 將全部url並行執行下載 List<String> list = queue.stream().collect(Collectors.toList()); BatchTaskRunner.execute(list, taskPerThread, tasks -> { tasks.forEach( // 1.下載文件 // 2.將url成功或失敗放到logQueue中 ); }); // 最後再記錄日誌 logQueue.forEach( // 將全部日誌保存到本地txt中 );
這裏主要分爲三步:
1.並行執行任務,抓取url放入queue
2.並行執行下載,從queue中取url
3.從logQueue中保存日誌到本地
分析:先是抓取全部url,而後再去並行執行保存;將保存日誌放到最後,保存了圖片後最後的日誌反而可有可無了,可是運行時候我發現仍是存在問題:
我去,爲何必定要先放url再去處理啊!!放的同時也取任務,最後剩餘的任務再並行執行不是更快!
好吧,有了這個想法,直接開幹:
第二次優化:
/****************************第二次增長的邏輯start**************************************/ // 控制主線程執行 CountDownLatch countDownLatch = new CountDownLatch(totalPageSize); // 用於消費queue的線程池 ThreadPoolExecutor executor = new ThreadPoolExecutor(12, 12, 0, TimeUnit.SECONDS, new SynchronousQueue<>()); // 用於自旋時的開關 volatile boolean flag = false; /****************************第二次增長的邏輯end**************************************/ // 用於記錄全部url Queue<String> queue = new ConcurrentLinkedQueue<String>(); // 用於記錄全部日誌 Queue<String> logQueue = new ConcurrentLinkedQueue<String>(); // 全部任務 List<Consumer> allTasks = new ArrayList<>(); for (int j = 1; j <= 總頁數; j++) { allTasks.add(t ->{ // 得到url,放入queue中 }); } // 開了一個線程去執行,主要是爲了讓它異步去操做 new Thread(()->{ // 使用ForkJoin並行執行記錄url的任務 // finally中調用countDownLatch.countDown() BatchTaskRunner.execute(allTasks, taskPerThread, tasks -> { tasks.forEach(t->t.accept(null)); }); }).start(); // 一邊抓取一邊消費 for (int i = 0; i < 12; i++) { executor.execute(()->{ try { takeQueue(); // 從queue得到url並消費,若是信號量歸零則將flag置爲true } catch (InterruptedException e) { } }); } for(;;) { if(flag) { break; } Thread.sleep(10000); } countDownLatch.await(); executor.shutdownNow(); // 都取完了,就沒必要再去並行執行了 if(queue.size() == 0) { return; } // 將全部url並行執行下載 List<String> list = queue.stream().collect(Collectors.toList()); BatchTaskRunner.execute(list, taskPerThread, tasks -> { tasks.forEach( // 1.下載文件 // 2.將url成功或失敗放到logQueue中 ); }); // 最後再記錄日誌 logQueue.forEach( // 將全部日誌保存到本地txt中 );
其中的takeQueue方法邏輯:
void takeQueue() throws InterruptedException { for(;;) { long count = countDownLatch.getCount(); // 未歸零則一直去消費 if(count > 0) { String poll = queue.poll(); if(poll != null) { consumer.accept(poll); // 根據url去下載 }else { Thread.sleep(3000); } } else { flag = true; return; } } }
大概擼了個邏輯,日誌什麼的已經不重要了。。。
主線程自旋,保存url同時去併發下載,若是保存url的邏輯執行完了隊列中還有url,則並行去下載
看着線程都用上了,感受爽多了
即便在消費,queue中對象仍是愈來愈多
大概邏輯寫好了,代碼還能夠再優化一下,有想法的歡迎留言