Hello,你們好,我是樓下小黑哥~java
若是給你一個包含一億行數據的超大文件,讓你在一週以內將數據轉化導入生產數據庫,你會如何操做?shell
上面的問題實際上是小黑哥前段時間接到一個真實的業務需求,將一個老系統歷史數據經過線下文件的方式遷移到新的生產系統。數據庫
因爲老闆們已經敲定了新系統上線時間,因此只留給小黑哥一週的時間將歷史數據導入生產系統。小程序
因爲時間緊,而數據量又超大,因此小黑哥設計的過程想到一下解決辦法:多線程
歡迎關注個人公衆號:小黑十一點半,得到平常乾貨推送。若是您對個人專題內容感興趣,也能夠關注個人博客:studyidea.cn併發
首先咱們能夠寫個小程序,或者使用拆分命令 split
將這個超大文件拆分一個個小文件。異步
-- 將一個大文件拆分紅若干個小文件,每一個文件 100000 行 split -l 100000 largeFile.txt -d -a 4 smallFile_
這裏之因此選擇先將大文件拆分,主要考慮到兩個緣由:async
第一若是程序直接讀取這個大文件,假設讀取一半的時候,程序忽然宕機,這樣就會直接丟失文件讀取的進度,又須要從新開頭讀取。ide
而文件拆分以後,一旦小文件讀取結束,咱們能夠將小文件移動一個指定文件夾。ui
這樣即便應用程序宕機重啓,咱們從新讀取時,只須要讀取剩餘的文件。
第二,一個文件,只能被一個應用程序讀取,這樣就限制了導入的速度。
而文件拆分以後,咱們能夠採用多節點部署的方式,水平擴展。每一個節點讀取一部分文件,這樣就能夠成倍的加快導入速度。
當咱們拆分完文件,接着咱們就須要讀取文件內容,進行導入。
以前拆分的時候,設置每一個小文件包含 10w 行的數據。因爲擔憂一會兒將 10w 數據讀取應用中,致使堆內存佔用太高,引發頻繁的 Full GC,因此下面採用流式讀取的方式,一行一行的讀取數據。
固然了,若是拆分以後文件很小,或者說應用的堆內存設置很大,咱們能夠直接將文件加載到應用內存中處理。這樣相對來講簡單一點。
逐行讀取的代碼以下:
File file = ... try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line=iterator.nextLine(); convertToDB(line); } }
上面代碼使用 commons-io
中的 LineIterator
類,這個類底層使用了 BufferedReader
讀取文件內容。它將其封裝成迭代器模式,這樣咱們能夠很方便的迭代讀取。
若是當前使用 JDK1.8 ,那麼上述操做更加簡單,咱們能夠直接使用 JDK 原生的類 Files
將文件轉成 Stream
方式讀取,代碼以下:
Files.lines(Paths.get("文件路徑"), Charset.defaultCharset()).forEach(line -> { convertToDB(line); });
其實仔細看下 Files#lines
底層源碼,其實原理跟上面的 LineIterator
相似,一樣也是封裝成迭代器模式。
上述讀取的代碼寫起來不難,可是存在效率問題,主要是由於只有單線程在導入,上一行數據導入完成以後,才能繼續操做下一行。
爲了加快導入速度,那咱們就多來幾個線程,併發導入。
多線程咱們天然將會使用線程池的方式,相關代碼改造以下:
File file = ...; ExecutorService executorService = new ThreadPoolExecutor( 5, 10, 60, TimeUnit.MINUTES, // 文件數量,假設文件包含 10W 行 new ArrayBlockingQueue<>(10*10000), // guava 提供 new ThreadFactoryBuilder().setNameFormat("test-%d").build()); try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line = iterator.nextLine(); executorService.submit(() -> { convertToDB(line); }); } }
上述代碼中,每讀取到一行內容,就會直接交給線程池來執行。
咱們知道線程池原理以下:
因爲咱們上述線程池設置的核心線程數爲 5,很快就到達了最大核心線程數,後續任務只能被加入隊列。
爲了後續任務不被線程池拒絕,咱們能夠採用以下方案:
以上兩種方案都存在一樣的問題,第一種是至關於將文件全部內容加載到內存,將會佔用過多內存。
而第二種建立過多的線程,一樣也會佔用過多內存。
一旦內存佔用過多,GC 沒法清理,就可能會引發頻繁的 Full GC,甚至致使 OOM,致使程序導入速度過慢。
解決這個問題,咱們能夠以下兩種解決方案:
CountDownLatch
批量執行CountDownLatch
批量執行JDK 提供的 CountDownLatch
,可讓主線程等待子線程都執行完成以後,再繼續往下執行。
利用這個特性,咱們能夠改造多線程導入的代碼,主體邏輯以下:
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { // 存儲每一個任務執行的行數 List<String> lines = Lists.newArrayList(); // 存儲異步任務 List<ConvertTask> tasks = Lists.newArrayList(); while (iterator.hasNext()) { String line = iterator.nextLine(); lines.add(line); // 設置每一個線程執行的行數 if (lines.size() == 1000) { // 新建異步任務,注意這裏須要建立一個 List tasks.add(new ConvertTask(Lists.newArrayList(lines))); lines.clear(); } if (tasks.size() == 10) { asyncBatchExecuteTask(tasks); } } // 文件讀取結束,可是可能還存在未被內容 tasks.add(new ConvertTask(Lists.newArrayList(lines))); // 最後再執行一次 asyncBatchExecuteTask(tasks); }
這段代碼中,每一個異步任務將會導入 1000 行數據,等積累了 10 個異步任務,而後將會調用 asyncBatchExecuteTask
使用線程池異步執行。
/** * 批量執行任務 * * @param tasks */ private static void asyncBatchExecuteTask(List<ConvertTask> tasks) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); for (ConvertTask task : tasks) { task.setCountDownLatch(countDownLatch); executorService.submit(task); } // 主線程等待異步線程 countDownLatch 執行結束 countDownLatch.await(); // 清空,從新添加任務 tasks.clear(); }
asyncBatchExecuteTask
方法內將會建立 CountDownLatch
,而後主線程內調用 await
方法等待全部異步線程執行結束。
ConvertTask
異步任務邏輯以下:
/** * 異步任務 * 等數據導入完成以後,必定要調用 countDownLatch.countDown() * 否則,這個主線程將會被阻塞, */ private static class ConvertTask implements Runnable { private CountDownLatch countDownLatch; private List<String> lines; public ConvertTask(List<String> lines) { this.lines = lines; } public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { for (String line : lines) { convertToDB(line); } } finally { countDownLatch.countDown(); } } }
ConvertTask
任務類邏輯就很是簡單,遍歷全部行,將其導入到數據庫中。全部數據導入結束,調用 countDownLatch#countDown
。
一旦全部異步線程執行結束,調用 countDownLatch#countDown
,主線程將會被喚醒,繼續執行文件讀取。
雖然這種方式解決上述問題,可是這種方式,每次都須要積累必定任務數才能開始異步執行全部任務。
另外每次都須要等待全部任務執行結束以後,才能開始下一批任務,批量執行消耗的時間等於最慢的異步任務消耗的時間。
這種方式線程池中線程存在必定的閒置時間,那有沒有辦法一直壓榨線程池,讓它一直在幹活呢?
回到最開始的問題,文件讀取導入,其實就是一個生產者-消費者消費模型。
主線程做爲生產者不斷讀取文件,而後將其放置到隊列中。
異步線程做爲消費者不斷從隊列中讀取內容,導入到數據庫中。
一旦隊列滿載,生產者應該阻塞,直到消費者消費任務。
其實咱們使用線程池的也是一個生產者-消費者消費模型,其也使用阻塞隊列。
那爲何線程池在隊列滿載的時候,不發生阻塞?
這是由於線程池內部使用 offer
方法,這個方法在隊列滿載的時候不會發生阻塞,而是直接返回 。
那咱們有沒有辦法在線程池隊列滿載的時候,阻塞主線程添加任務?
實際上是能夠的,咱們自定義線程池拒絕策略,當隊列滿時改成調用 BlockingQueue.put
來實現生產者的阻塞。
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } } };
這樣一旦線程池滿載,主線程將會被阻塞。
使用這種方式以後,咱們能夠直接使用上面提到的多線程導入的代碼。
ExecutorService executorService = new ThreadPoolExecutor( 5, 10, 60, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("test-%d").build(), (r, executor) -> { if (!executor.isShutdown()) { try { // 主線程將會被阻塞 executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } }); File file = new File("文件路徑"); try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line = iterator.nextLine(); executorService.submit(() -> convertToDB(line)); } }
一個超大的文件,咱們能夠採用拆分文件的方式,將其拆分紅多份文件,而後部署多個應用程序提升讀取速度。
另外讀取過程咱們還可使用多線程的方式併發導入,不過咱們須要注意線程池滿載以後,將會拒絕後續任務。
咱們能夠經過擴展線程池,自定義拒絕策略,使讀取主線程阻塞。
好了,今天文章內容就到這裏,不知道各位有沒有其餘更好的解決辦法,歡迎留言討論。
歡迎關注個人公衆號:小黑十一點半,得到平常乾貨推送。若是您對個人專題內容感興趣,也能夠關注個人博客:studyidea.cn