Spring Boot配置線程池使用多線程插入數據

前言:

最近在工做中須要將一大批數據導入到數據庫中,由於種種緣由這些數據不能使用同步數據的方式來進行復制,而是提供了一批文本,文本里面有不少行url地址,須要的字段都包含在這些url中。最開始是使用的正常的普通方式去寫入,可是量太大了,因此就嘗試使用多線程來寫入。下面咱們就來介紹一下怎麼使用多線程進行導入。spring

1.文本格式

格式就是相似於這種格式的url,固然這裏只是舉個例子,大概有300多個文本,每一個文本里面有大概25000條url,而每條url要插入兩個表,這個量仍是有點大的,單線程跑的很是慢。數據庫

  • https://www.test.com/?type=1&code=123456&goodsId=321

2.springboot配置線程池

咱們須要建立一個ExecutorConfig類來設置線程池的各類配置。springboot

@Configuration
@EnableAsync
public class ExecutorConfig {
    private static Logger logger = LogManager.getLogger(ExecutorConfig.class.getName());

    @Bean
    public Executor asyncServiceExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心線程數
        executor.setCorePoolSize(5);
        //配置最大線程數
        executor.setMaxPoolSize(10);
        //配置隊列大小
        executor.setQueueCapacity(400);
        //配置線程池中的線程的名稱前綴
        executor.setThreadNamePrefix("thread-");
        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        return executor;
    }
}

3.建立異步任務接口

咱們須要建立一個接口,再這個接口裏面聲明瞭咱們須要調用的異步方法多線程

public interface AsyncService {

    /**
     *  執行異步任務
     */
    void writeTxt();
}

4.建立異步實現類

再建立一個異步類實現上面的異步接口,重寫接口裏面的方法,最重要的是咱們須要在方法上加@Async("asyncServiceExecutor")註解,它是剛剛咱們在線程池配置類的裏的那個配製方法的名字,加上這個後每次執行這個方法都會開啓一個線程放入線程池中。我下面這個方法是開啓多線程遍歷文件夾中的文件而後爲每一個文件都複製一個副本出來。異步

@Service
public class AsyncServiceImpl implements AsyncService {
    private static Logger logger = LogManager.getLogger(AsyncServiceImpl.class.getName());

    @Async("asyncServiceExecutor")
    public void writeTxt(String fileName){
        logger.info("線程-" + Thread.currentThread().getId() + "在執行寫入");
        try {
            File file = new File(fileName);

            List<String> lines = FileUtils.readLines(file);

            File copyFile = new File(fileName + "_copy.txt");
            lines.stream().forEach(string->{
                try {
                    FileUtils.writeStringToFile(copyFile,string,"utf8",true);
                    FileUtils.writeStringToFile(copyFile,"\r\n","utf8",true);
                } catch (IOException e) {
                    logger.info(e.getMessage());
                }
            });
        }catch (Exception e) {
            logger.info(e.getMessage());
        }
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class BootApplicationTests {

@Autowired
private AsyncService asyncService;

@Test
public void write() {
    File file = new File("F://ac_code_1//test.txt");
    try {
        FileUtils.writeStringToFile(file, "ceshi", "utf8");
        FileUtils.writeStringToFile(file, "\r\n", "utf8");
        FileUtils.writeStringToFile(file, "ceshi2", "utf8");
    } catch (IOException e) {
        e.printStackTrace();
    }
}

5.修改成阻塞式

上面的步驟已經基本實現了多線程的操做,可是當我真的開始導入數據的時候又發現一個問題,就是每次運行後纔剛開始導入就自動中止了,緣由是我在Junit中運行了代碼後它雖然開始導入了,可是由於數據不少時間很長,而Juint跑完主線程的邏輯後就把整個JVM都關掉了,因此導入了一點點就中止了,上面的測試方法之因此沒問題是由於幾個文件的複製速度很快,在主線程跑完以前就跑完了,因此看上去沒問題。最開始我用了一個最笨的方法,直接在主線程最後調用Thread.sleep()方法,雖然有效果可是這也太low了,並且你也無法判斷到底數據導完沒有。因此我又換了一個方式。async

6.使用countDownLatch阻塞主線程

CountDownLatch是一個同步工具類,它容許一個或多個線程一直等待,直到其餘線程執行完後再執行。它可使主線程一直等到全部的子線程執行完以後再執行。咱們修改下代碼,建立一個CountDownLatch實例,大小是全部運行線程的數量,而後在異步類的方法中的finally裏面對它進行減1,在主線程最後調用await()方法,這樣就能確保全部的子線程運行完後主線程纔會繼續執行。ide

@RunWith(SpringRunner.class)
@SpringBootTest
public class BootApplicationTests {

    private final CountDownLatch countDownLatch = new CountDownLatch(10);

    @Autowired
    private AsyncService asyncService;

    @Test
    public void mainWait() {
        try {
            for (int i = 0; i < 10; i++) {
                asyncService.mainWait(countDownLatch);
            }
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
@Service
public class AsyncServiceImpl implements AsyncService {
    private static Logger logger = LogManager.getLogger(AsyncServiceImpl.class.getName());

    @Override
    @Async("asyncServiceExecutor")
    public void mainWait(CountDownLatch countDownLatch) {
        try {
            System.out.println("線程" + Thread.currentThread().getId() + "開始執行");
            for (int i=1;i<1000000000;i++){
                Integer integer = new Integer(i);
                int l = integer.intValue();
                for (int x=1;x<10;x++){
                    Integer integerx = new Integer(x);
                    int j = integerx.intValue();
                }
            }
            System.out.println("線程" + Thread.currentThread().getId() + "執行結束");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            countDownLatch.countDown();
        }
    }
}

7.導入代碼

雖然上面的多線程是重點,不過仍是把導入數據的代碼展現出來給你們參考一下,固然這是簡化版,真實的要比這個多了不少判斷,不過那都是基於業務需求作的判斷。工具

@RunWith(value = SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
    private static Log logger = LogFactory.getLog(ApplicationTests.class);
    
    private final CountDownLatch countDownLatch;

    @Autowired
    AsyncService asyncService;

    @Test
    public void writeCode() {
        try {
            File file = new File("F:\\ac_code_1");
            File[] files = file.listFiles();
            //計數器數量就等於文件數量,由於每一個文件會開一個線程
            countDownLatch = new CountDownLatch(files.length);

            Arrays.stream(files).forEach(file1 -> {
                File child = new File(file1.getAbsolutePath());
                String fileName = child.getAbsolutePath();
                logger.info(asyncService.writeCode(fileName,countDownLatch));
            });
            countDownLatch.await();
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
@Service
public class AsyncServiceImpl implements AsyncService {
    private static Log logger = LogFactory.getLog(AsyncServiceImpl.class);

    @Autowired
    IExampleService exampleService;

    @Override
    @Async("asyncServiceExecutor")
    public String writeCode(String fileName,CountDownLatch countDownLatch) {
        logger.info("線程-" + Thread.currentThread().getId() + "在導入-" + fileName);
        try {
            File file = new File(fileName);
            List<String> list = FileUtils.readLines(file);
            for (String string : list) {
                String[] parmas = string.split(",");
                ExampleVo vo = new ExampleVo();
                vo.setParam1(parmas[0]);
                vo.setParam1(parmas[1]);
                vo.setParam1(parmas[2]);
                exampleService.save(vo);
            }
            return "導入完成-" + fileName;
        }catch (Exception e){
            e.printStackTrace();
            return null;
        }finally {
            //導入完後減1
            countDownLatch.countDown();
        }
    }
}

總結:

到這裏就已經講完了多線程插入數據的方法,目前這個方法還很簡陋。由於是每一個文件都開一個線程性能消耗比較大,並且若是線程池的線程配置太多了,頻繁切換反而會變得很慢,你們若是有更好的辦法均可以留言討論。性能

相關文章
相關標籤/搜索