批量SQL優化實戰

有時在工做中,咱們須要將大量的數據持久化到數據庫中,若是數據量很大的話直接插入的執行速度很是慢,而且因爲插入操做也沒有太多可以進行sql優化的地方,因此只能從程序代碼的角度進行優化。因此本文將嘗試使用幾種不一樣方式對插入操做進行優化,看看如何可以最大程度的縮短SQL執行時間。java

以插入1000條數據爲例,首先進行數據準備,用於插入數據庫測試:sql

private List<Order> prepareData(){
    List<Order> orderList=new ArrayList<>();
    for (int i = 1; i <= 1000; i++) {
        Order order=new Order();
        order.setId(Long.valueOf(i));
        order.setOrderNumber("A");
        order.setMoney(100D);
        order.setTenantId(1L);
        orderList.add(order);
    }
    return orderList;
}

直接插入

首先測試直接插入1000條數據:數據庫

public void noBatch() {
    List<Order> orderList = prepareData();
    long startTime = System.currentTimeMillis();
    for (Order order : orderList) {
        orderMapper.insert(order);
    }
    System.out.println("總耗時: " + (System.currentTimeMillis() - startTime) / 1000.0 + "s");
}

執行時間以下:服務器

mybatis-plus 批量插入

接下來,使用mybatis-plus的批量查詢,咱們本身的Service接口須要繼承IService接口:mybatis

public interface SqlService extends IService<Order> {
}

在實現類SqlServiceImpl中直接調用saveBatch方法:多線程

public void plusBatch() {
    List<Order> orderList = prepareData();
    long startTime = System.currentTimeMillis();
    saveBatch(orderList);
    System.out.println("總耗時: " + (System.currentTimeMillis() - startTime) / 1000.0 + "s");
}

執行代碼,查看運行時間:app

能夠發現,使用mybatis-plus的批量插入並無比循環單條插入顯著縮短期,因此來查看一下saveBatch方法的源碼:框架

@Transactional(rollbackFor = Exception.class)
@Override
public boolean saveBatch(Collection<T> entityList, int batchSize) {
    String sqlStatement = sqlStatement(SqlMethod.INSERT_ONE);
    return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity));
}

其中調用了executeBatch方法:less

protected <E> boolean executeBatch(Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
    Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
    return !CollectionUtils.isEmpty(list) && executeBatch(sqlSession -> {
        int size = list.size();
        int i = 1;
        for (E element : list) {
            consumer.accept(sqlSession, element);
            if ((i % batchSize == 0) || i == size) {
                sqlSession.flushStatements();
            }
            i++;
        }
    });
}

在for循環中,consumer的accept執行的是sqlSession的insert操做,這一階段都是對sql的拼接,只有到最後當for循環執行完成後,纔會將數據批量刷新到數據庫中。也就是說,以前咱們向數據庫服務器發起了1000次請求,可是使用批量插入,只須要發起一次請求就能夠了。若是拋出異常,則會進行回滾,不會向數據庫中寫入數據。可是雖然減小了數據庫請求的次數,對於縮短執行時間並無顯著的提高。ide

並行流

Stream是JAVA8中用於處理集合的關鍵抽象概念,能夠進行復雜的查找、過濾、數據映射等操做。而並行流Parallel Stream,能夠將整個數據內容分紅多個數據塊,並使用多個線程分別處理每一個數據塊的流。在大量數據的插入操做中,不存在數據的依賴的耦合關係,所以能夠進行拆分使用並行流進行插入。測試插入的代碼以下:

public void stream(){
    List<Order> orderList = prepareData();
    long startTime = System.currentTimeMillis();
    orderList.parallelStream().forEach(order->orderMapper.insert(order));
    System.out.println("總耗時: " + (System.currentTimeMillis() - startTime) / 1000.0 + "s");
}

仍是先對上面的代碼進行測試:

能夠發現速度比以前快了不少,這是由於並行流底層使用了Fork/Join框架,具體來講使用了「分而治之」的思想,對任務進行了拆分,使用不一樣線程進行執行,最後彙總(對Fork/Join不熟悉的同窗能夠回顧一下請求合併與分而治之這篇文章,裏面介紹了它的基礎使用)。並行流在底層使用了ForkJoinPool線程池,從ForkJoinPool的默認構造函數中看出,它擁有的默認線程數量等於計算機的邏輯處理器數量:

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}

也就是說,若是咱們服務器是邏輯8核的話,那麼就會有8個線程來同時執行插入操做,大大縮短了執行的時間。而且ForkJoinPool線程池爲了提升任務的並行度和吞吐量,採用了任務竊取機制,可以進一步的縮短執行的時間。

Fork/Join

在並行流中,建立的ForkJoinPool的線程數量是固定的,那麼經過手動修改線程池中線程的數量,可否進一步的提升執行效率呢?通常而言,在線程池中,設置線程數量等於處理器數量就能夠了,由於若是建立過多線程,線程頻繁切換上下文也會額外消耗時間,反而會增長執行的整體時間。可是對於批量SQL的插入操做,沒有複雜的業務處理邏輯,僅僅是須要頻繁的與數據庫進行交互,屬於I/O密集型操做。而對於I/O密集型操做,程序中存在大量I/O等待佔據時間,致使CPU使用率較低。因此咱們嘗試增長線程數量,來看一下可否進一步縮短執行時間呢?

定義插入任務,由於不須要返回,直接繼承RecursiveAction父類。size是每一個隊列中包含的任務數量,在構造方法中傳入,若是一個隊列中的任務數量大於它那麼就繼續進行拆分,直到任務數量足夠小:

public class BatchInsertTask<E> extends RecursiveAction {
    private List<E> list;
    private BaseMapper<E> mapper;
    private int size;

    public BatchInsertTask(List<E> list, BaseMapper<E> mapper, int size) {
        this.list = list;
        this.mapper = mapper;
        this.size = size;
    }

    @Override
    protected void compute() {
        if (list.size() <= size) {
            list.stream().forEach(item -> mapper.insert(item));
        } else {
            int middle = list.size() / 2;
            List<E> left = list.subList(0, middle);
            List<E> right = list.subList(middle, list.size());
            BatchInsertTask<E> leftTask = new BatchInsertTask<>(left, mapper, size);
            BatchInsertTask<E> rightTask = new BatchInsertTask<>(right, mapper, size);
            invokeAll(leftTask, rightTask);
        }
    }
}

使用ForkJoinPool運行上面定義的任務,線程池中的線程數取CPU線程的2倍,將執行的SQL條數均分到每一個線程的執行隊列中:

public class BatchSqlUtil {
    public static <E> void runSave(List<E> list, BaseMapper<E> mapper) {
        int processors = getProcessors();
        ForkJoinPool forkJoinPool = new ForkJoinPool(processors);
        int size = (int) Math.ceil((double)list.size() / processors);
        BatchInsertTask<E> task = new BatchInsertTask<E>(list, mapper, size);
        forkJoinPool.invoke(task);
    }

    private static int getProcessors() {
        int processors = Runtime.getRuntime().availableProcessors();
        return processors<<=1;
    }
}

啓動測試代碼:

public void batch() {
    List<Order> orderList = prepareData();
    long startTime = System.currentTimeMillis();
    BatchSqlUtil.runSave(orderList,orderMapper);
    System.out.println("總耗時: " + (System.currentTimeMillis() - startTime) / 1000.0 + "s");
}

查看運行時間:

能夠看到,經過增長ForkJoinPool中的線程,能夠進一步的縮短批量插入的時間。

若是文章對您有所幫助,歡迎關注公衆號 碼農參上

相關文章
相關標籤/搜索