【spring boot】在spring boot下使用多線程

 

使用場景:java

方法處理到某一步,須要將信息交給另外一個線程去處理!!spring

===================================================================================緩存

第一種:最簡單的Runnable多線程

public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);
        Runnable runnable = dealMsg(msg);
    //將返回的runnable對象傳入,並start()啓動線程
     new Thread(runnable).start(); }
複製代碼
//建立一個Runnable,重寫run方法
public Runnable dealMsg(String msg){ Runnable runnable = new Runnable() { @Override public void run() { System.out.println("新開線程中處理:"+msg); } }; return runnable; }
複製代碼

 

====================================================================================================dom

第二種:本身建立JDK線程池,交給spring管理,而後將任務交給線程池便可異步

1.建立線程池,交給spring管理async

package com.sxd.util;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
public class ThreadConfig {

    /**
     *newFixedThreadPool
     建立一個指定工做線程數量的線程池。每當提交一個任務就建立一個工做線程,若是工做線程數量達到線程池初始的最大數,則將提交的任務存入到池隊列中。

     newCachedThreadPool
     建立一個可緩存的線程池。這種類型的線程池特色是: 
     1).工做線程的建立數量幾乎沒有限制(其實也有限制的,數目爲Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。 
     2).若是長時間沒有往線程池中提交任務,即若是工做線程空閒了指定的時間(默認爲1分鐘),則該工做線程將自動終止。終止後,若是你又提交了新的任務,則線程池從新建立一個工做線程。

     newSingleThreadExecutor
     建立一個單線程化的Executor,即只建立惟一的工做者線程來執行任務,若是這個線程異常結束,會有另外一個取代它,保證順序執行(我以爲這點是它的特點)。單工做線程最大的特色是可保證順序地執行各個任務,而且在任意給定的時間不會有多個線程是活動的 。

     newScheduleThreadPool
     建立一個定長的線程池,並且支持定時的以及週期性的任務執行,相似於Timer。
     * @return
     */
    @Bean
    public ExecutorService getExecutorTools(){
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        return  executorService;
    }

}
複製代碼
package com.sxd.util;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
public class ThreadConfig {

    /**
     *newFixedThreadPool
     建立一個指定工做線程數量的線程池。每當提交一個任務就建立一個工做線程,若是工做線程數量達到線程池初始的最大數,則將提交的任務存入到池隊列中。

     newCachedThreadPool
     建立一個可緩存的線程池。這種類型的線程池特色是: 
     1).工做線程的建立數量幾乎沒有限制(其實也有限制的,數目爲Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。 
     2).若是長時間沒有往線程池中提交任務,即若是工做線程空閒了指定的時間(默認爲1分鐘),則該工做線程將自動終止。終止後,若是你又提交了新的任務,則線程池從新建立一個工做線程。

     newSingleThreadExecutor
     建立一個單線程化的Executor,即只建立惟一的工做者線程來執行任務,若是這個線程異常結束,會有另外一個取代它,保證順序執行(我以爲這點是它的特點)。單工做線程最大的特色是可保證順序地執行各個任務,而且在任意給定的時間不會有多個線程是活動的 。

     newScheduleThreadPool
     建立一個定長的線程池,並且支持定時的以及週期性的任務執行,相似於Timer。
     * @return
     */
    @Bean
    public ExecutorService getExecutorTools(){
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        return  executorService;
    }

}
複製代碼

2.使用它ide

import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


@Component
public class Consumer1 {


    @Resource
    private ExecutorService executorService;

    
    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);


        /**
         * 分類1:能夠返回值的 Callable
         */
        Future fal  = executorService.submit(new Callable<String>() {
            @Override
            public String call() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
                return "處理成功!";
            }
        });

        try {
            System.out.println(fal.get());
        }catch (Exception e){
            System.out.println(e);
        }

        /**
         * 分類2:不會返回值的 Runnable
         */
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

        /**
         * 分類3:也能夠這樣
         */
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

    }




}

 

複製代碼
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


@Component
public class Consumer1 {


    @Resource
    private ExecutorService executorService;

    
    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);


        /**
         * 分類1:能夠返回值的 Callable
         */
        Future fal  = executorService.submit(new Callable<String>() {
            @Override
            public String call() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
                return "處理成功!";
            }
        });

        try {
            System.out.println(fal.get());
        }catch (Exception e){
            System.out.println(e);
        }

        /**
         * 分類2:不會返回值的 Runnable
         */
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

        /**
         * 分類3:也能夠這樣
         */
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":"+msg);
            }
        });

    }




}
複製代碼

 

====================================================================================================post

第三種:使用spring封裝的線程池spa

1.建立線程配置類【

@ComponentScan("com.sxd") 標明會在哪一個包下使用多線程

package com.sxd.util;

import java.util.concurrent.Executor;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@ComponentScan("com.sxd")
@EnableAsync
// 線程配置類
public class AsyncTaskConfig implements AsyncConfigurer {

    // ThredPoolTaskExcutor的處理流程
    // 當池子大小小於corePoolSize,就新建線程,並處理請求
    // 當池子大小等於corePoolSize,把請求放入workQueue中,池子裏的空閒線程就去workQueue中取任務並處理
    // 當workQueue放不下任務時,就新建線程入池,並處理請求,若是池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來作拒絕處理
    // 當池子的線程數大於corePoolSize時,多餘的線程會等待keepAliveTime長時間,若是無請求可處理就自行銷燬

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);// 最小線程數
        taskExecutor.setMaxPoolSize(10);// 最大線程數
        taskExecutor.setQueueCapacity(25);// 等待隊列

        taskExecutor.initialize();

        return taskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}
複製代碼
package com.sxd.util;

import java.util.concurrent.Executor;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@ComponentScan("com.sxd")
@EnableAsync
// 線程配置類
public class AsyncTaskConfig implements AsyncConfigurer {

    // ThredPoolTaskExcutor的處理流程
    // 當池子大小小於corePoolSize,就新建線程,並處理請求
    // 當池子大小等於corePoolSize,把請求放入workQueue中,池子裏的空閒線程就去workQueue中取任務並處理
    // 當workQueue放不下任務時,就新建線程入池,並處理請求,若是池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來作拒絕處理
    // 當池子的線程數大於corePoolSize時,多餘的線程會等待keepAliveTime長時間,若是無請求可處理就自行銷燬

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);// 最小線程數
        taskExecutor.setMaxPoolSize(10);// 最大線程數
        taskExecutor.setQueueCapacity(25);// 等待隊列

        taskExecutor.initialize();

        return taskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}
複製代碼

2.建立線程任務執行類

package com.sxd.util;

import java.util.Random;
import java.util.concurrent.Future;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

@Service
// 線程執行任務類
public class AsyncTaskService {

    Random random = new Random();// 默認構造方法

    @Async
    // 代表是異步方法
    // 無返回值
    public void executeAsyncTask(String msg) {
        System.out.println(Thread.currentThread().getName()+"開啓新線程執行" + msg);
    }

    /**
     * 異常調用返回Future
     *
     * @param i
     * @return
     * @throws InterruptedException
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) throws InterruptedException {
        System.out.println("input is " + i);
        Thread.sleep(1000 * random.nextInt(i));

        Future<String> future = new AsyncResult<String>("success:" + i);// Future接收返回值,這裏是String類型,能夠指明其餘類型

        return future;
    }
}
複製代碼
package com.sxd.util;

import java.util.Random;
import java.util.concurrent.Future;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

@Service
// 線程執行任務類
public class AsyncTaskService {

    Random random = new Random();// 默認構造方法

    @Async
    // 代表是異步方法
    // 無返回值
    public void executeAsyncTask(String msg) {
        System.out.println(Thread.currentThread().getName()+"開啓新線程執行" + msg);
    }

    /**
     * 異常調用返回Future
     *
     * @param i
     * @return
     * @throws InterruptedException
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) throws InterruptedException {
        System.out.println("input is " + i);
        Thread.sleep(1000 * random.nextInt(i));

        Future<String> future = new AsyncResult<String>("success:" + i);// Future接收返回值,這裏是String類型,能夠指明其餘類型

        return future;
    }
}
複製代碼

3.使用它

@Component
public class Consumer1 {


    @Resource
    private AsyncTaskService asyncTaskService;


    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);

        asyncTaskService.executeAsyncTask(msg);

    }
    
}
複製代碼
@Component
public class Consumer1 {


    @Resource
    private AsyncTaskService asyncTaskService;


    public void test(String msg){
        System.out.println(Thread.currentThread().getName()+":"+msg);

        asyncTaskService.executeAsyncTask(msg);

    }
    
}
複製代碼

 

====================================================================================================

第四種:在代碼中啓動異步處理最簡單的代碼

複製代碼
public void test(){
    new Thread(()->doReplace(replaceLog)).start();         
}

public void doReplace(String replaceLog){
                  
            //異步處理的業務
}
複製代碼

 

======================================

就這麼多,再補充噻!!

相關文章
相關標籤/搜索