多線程批量數據導入示例——基礎版

前言

當遇到大量數據導入時,爲了提升處理的速度,能夠選擇使用多線程來批量處理這些處理。常見的場景有:java

  1. 大文件導入數據庫(這個文件不必定是標準的CSV可導入文件或者須要在內存中通過必定的處理)
  2. 數據同步(從第三方接口拉取數據處理後寫入本身的數據庫)

以上的場景有一個共性,這類數據導入的場景簡單來講就是將數據從一個數據源移動到另一個數據源,而其中一定能夠分爲兩步數據庫

  1. 數據讀取:從數據源讀取數據到內存
  2. 數據寫入:將內存中的數據寫入到另一個數據源,可能存在數據處理

並且數據讀取的速度通常會比數據寫入的速度快不少,即讀取快,寫入慢多線程

設計思路

因爲場景的特色是讀取快,寫入慢,若是是使用多線程處理,建議是數據寫入部分改造爲多線程。而數據讀取能夠改形成批量讀取數據。簡單來講就是兩個要點:測試

  1. 批量讀取數據
  2. 多線程寫入數據

示例

多線程批量處理最簡單的方案是使用線程池來進行處理,下面會經過一個模擬批量讀取和寫入的服務,以及對這個服務的多線程寫入調用做爲示例,展現如何多線程批量數據導入。優化

模擬服務

import java.util.concurrent.atomic.AtomicLong;

/**
 * 數據批量寫入用的模擬服務
 *
 * @author RJH
 * create at 2019-04-01
 */
public class MockService {
    /**
     * 可讀取總數
     */
    private long canReadTotal;

    /**
     * 寫入總數
     */
    private AtomicLong writeTotal=new AtomicLong(0);

    /**
     * 寫入休眠時間(單位:毫秒)
     */
    private final long sleepTime;

    /**
     * 構造方法
     *
     * @param canReadTotal
     * @param sleepTime
     */
    public MockService(long canReadTotal, long sleepTime) {
        this.canReadTotal = canReadTotal;
        this.sleepTime = sleepTime;
    }

    /**
     * 批量讀取數據接口
     *
     * @param num
     * @return
     */
    public synchronized long readData(int num) {
        long readNum;
        if (canReadTotal >= num) {
            canReadTotal -= num;
            readNum = num;
        } else {
            readNum = canReadTotal;
            canReadTotal = 0;
        }
        //System.out.println("read data size:" + readNum);
        return readNum;
    }

    /**
     * 寫入數據接口
     */
    public void writeData() {
        try {
            // 休眠必定時間模擬寫入速度慢
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 寫入總數自增
        System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
    }

    /**
     * 獲取寫入的總數
     *
     * @return
     */
    public long getWriteTotal() {
        return writeTotal.get();
    }

}

批量數據處理器

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 基於線程池的多線程批量寫入處理器
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandler {

    private ExecutorService executorService;

    private MockService service;
    /**
     * 每次批量讀取的數據量
     */
    private int batch;
    /**
     * 線程個數
     */
    private int threadNum;

    public SimpleBatchHandler(MockService service, int batch,int threadNum) {
        this.service = service;
        this.batch = batch;
        //使用固定數目的線程池
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

    /**
     * 開始處理
     */
    public void startHandle() {
        // 開始處理的時間
        long startTime = System.currentTimeMillis();
        System.out.println("start handle time:" + startTime);
        long readData;
        while ((readData = service.readData(batch)) != 0) {// 批量讀取數據,知道讀取不到數據才中止
            for (long i = 0; i < readData; i++) {
                executorService.execute(() -> service.writeData());
            }
        }
        // 關閉線程池
        executorService.shutdown();
        while (!executorService.isTerminated()) {//等待線程池中的線程執行完

        }
        // 結束時間
        long endTime = System.currentTimeMillis();
        System.out.println("end handle time:" + endTime);
        // 總耗時
        System.out.println("total handle time:" + (endTime - startTime) + "ms");
        // 寫入總數
        System.out.println("total write num:" + service.getWriteTotal());
    }

}

測試類

/**
 * SimpleBatchHandler的測試類
 * @author RJH
 * create at 2019-04-01
 */
public class SimpleBatchHandlerTest {

    public static void main(String[] args) {
        // 總數
        long total=100000;
        // 休眠時間
        long sleepTime=100;
        // 每次拉取的數量
        int batch=100;
        // 線程個數
        int threadNum=16;
        MockService mockService=new MockService(total,sleepTime);
        SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
        handler.startHandle();
    }
}

運行結果

start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分輸出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000

分析

在單線程狀況下的執行時間應該爲total*sleepTime,即10000000ms,而改造爲多線程後執行時間爲648447msthis

示例問題

本示例存在一些問題,會在後續的博客中對本示例進行優化,同時分享給你們如何解決這些問題。atom

相關文章
相關標籤/搜索