如何在 Spring 異步調用中傳遞上下文

什麼是異步調用?

異步調用是相對於同步調用而言的,同步調用是指程序按預約順序一步步執行,每一步必須等到上一步執行完後才能執行,異步調用則無需等待上一步程序執行完便可執行。異步調用指,在程序在執行時,無需等待執行的返回值便可繼續執行後面的代碼。在咱們的應用服務中,有不少業務邏輯的執行操做不須要同步返回(如發送郵件、冗餘數據表等),只須要異步執行便可。java

本文將介紹 Spring 應用中,如何實現異步調用。在異步調用的過程當中,會出現線程上下文信息的丟失,咱們該如何解決線程上下文信息的傳遞。git

Spring 應用中實現異步

Spring 爲任務調度與異步方法執行提供了註解支持。經過在方法或類上設置 @Async 註解,可以使得方法被異步調用。調用者會在調用時當即返回,而被調用方法的實際執行是交給 Spring 的 TaskExecutor 來完成的。因此被註解的方法被調用的時候,會在新的線程中執行,而調用它的方法會在原線程中執行,這樣能夠避免阻塞,以及保證任務的實時性。github

引入依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
複製代碼

引入 Spring 相關的依賴便可。web

入口類

@SpringBootApplication
@EnableAsync
public class AsyncApplication {
    public static void main(String[] args) {
        SpringApplication.run(AsyncApplication.class, args);
    }

複製代碼

入口類增長了 @EnableAsync 註解,主要是爲了掃描範圍包下的全部 @Async 註解。spring

對外的接口

這裏寫了一個簡單的接口:bash

@RestController
@Slf4j
public class TaskController {

    @Autowired
    private TaskService taskService;

    @GetMapping("/task")
    public String taskExecute() {
        try {
            taskService.doTaskOne();
            taskService.doTaskTwo();
            taskService.doTaskThree();
        } catch (Exception e) {
           log.error("error executing task for {}",e.getMessage());
        }
        return "ok";
    }
}
複製代碼

調用 TaskService 執行三個異步方法。微信

Service 方法

@Component
@Slf4j
//@Async
public class TaskService {

    @Async
    public void doTaskOne() throws Exception {
        log.info("開始作任務一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務一,耗時:" + (end - start) + "毫秒");
    }

    @Async
    public void doTaskTwo() throws Exception {
        log.info("開始作任務二");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務二,耗時:" + (end - start) + "毫秒");
    }

    @Async
    public void doTaskThree() throws Exception {
        log.info("開始作任務三");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務三,耗時:" + (end - start) + "毫秒");
    }
}
複製代碼

@Async 能夠用於類上,標識該類的全部方法都是異步方法,也能夠單獨用於某些方法。每一個方法都會 sleep 1000 ms。多線程

結果展現

運行結果以下:架構

能夠看到 TaskService 中的三個方法是異步執行的,接口的結果快速返回,日誌信息異步輸出。異步調用,經過開啓新的線程調用的方法,不影響主線程。異步方法實際的執行交給了 Spring 的 TaskExecutor 來完成。app

Future:獲取異步執行的結果

在上面的測試中咱們也能夠發現主調用方法並無等到調用方法執行完就結束了當前的任務。若是想要知道調用的三個方法所有執行完該怎麼辦呢,下面就能夠用到異步回調。

異步回調就是讓每一個被調用的方法返回一個 Future 類型的值,Spring 中提供了一個 Future 接口的子類:AsyncResult,因此咱們能夠返回 AsyncResult 類型的值。

public class AsyncResult<V> implements ListenableFuture<V> {

	private final V value;

	private final ExecutionException executionException;
	//...
}
複製代碼

AsyncResult 實現了 ListenableFuture 接口,該對象內部有兩個屬性:返回值和異常信息。

public interface ListenableFuture<T> extends Future<T> {
    void addCallback(ListenableFutureCallback<? super T> var1);

    void addCallback(SuccessCallback<? super T> var1, FailureCallback var2);
}
複製代碼

ListenableFuture 接口繼承自 Future,在此基礎上增長了回調方法的定義。Future 接口定義以下:

public interface Future<V> {
	// 是否能夠打斷當前正在執行的任務
    boolean cancel(boolean mayInterruptIfRunning);
    
    // 任務取消的結果
    boolean isCancelled();
	
	// 異步方法中最後返回的那個對象中的值 
	V get() throws InterruptedException, ExecutionException;
	// 用來判斷該異步任務是否執行完成,若是執行完成,則返回 true,若是未執行完成,則返回false
    boolean isDone();
	// 與 get() 同樣,只不過這裏參數中設置了超時時間
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
複製代碼

#get() 方法,在執行的時候是須要等待回調結果的,阻塞等待。若是不設置超時時間,它就阻塞在那裏直到有了任務執行完成。咱們設置超時時間,就能夠在當前任務執行過久的狀況下中斷當前任務,釋放線程,這樣就不會致使一直佔用資源。

#cancel(boolean) 方法,參數是一個 boolean 類型的值,用來傳入是否能夠打斷當前正在執行的任務。若是參數是 true 且當前任務沒有執行完成 ,說明能夠打斷當前任務,那麼就會返回 true;若是當前任務尚未執行,那麼無論參數是 true 仍是 false,返回值都是 true;若是當前任務已經完成,那麼無論參數是 true 仍是 false,那麼返回值都是 false;若是當前任務沒有完成且參數是 false,那麼返回值也是 false。即:

  1. 若是任務還沒執行,那麼若是想取消任務,就必定返回 true,與參數無關。
  2. 若是任務已經執行完成,那麼任務必定是不能取消的,因此此時返回值都是false,與參數無關。
  3. 若是任務正在執行中,那麼此時是否取消任務就看參數是否容許打斷(true/false)。

獲取異步方法返回值的實現

public Future<String> doTaskOne() throws Exception {
        log.info("開始作任務一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務一,耗時:" + (end - start) + "毫秒");
        return new AsyncResult<>("任務一完成,耗時" + (end - start) + "毫秒");
    }
    //...其餘兩個方法相似,省略
複製代碼

咱們將 task 方法的返回值改成 Future<String>,將執行的時間拼接爲字符串返回。

@GetMapping("/task")
    public String taskExecute() {
        try {
            Future<String> r1 = taskService.doTaskOne();
            Future<String> r2 = taskService.doTaskTwo();
            Future<String> r3 = taskService.doTaskThree();
            while (true) {
                if (r1.isDone() && r2.isDone() && r3.isDone()) {
                    log.info("execute all tasks");
                    break;
                }
                Thread.sleep(200);
            }
            log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get());
        } catch (Exception e) {
           log.error("error executing task for {}",e.getMessage());
        }

        return "ok";
    }
複製代碼

在調用異步方法以後,能夠經過循環判斷異步方法是否執行完成。結果正如咱們所預期,future 所 get 到的是 AsyncResult 返回的字符串。

配置線程池

前面是最簡單的使用方法,使用默認的 TaskExecutor。若是想使用自定義的 Executor,能夠結合 @Configuration 註解的配置方式。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class TaskPoolConfig {

    @Bean("taskExecutor") // bean 的名稱,默認爲首字母小寫的方法名
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10); // 核心線程數(默認線程數)
        executor.setMaxPoolSize(20); // 最大線程數
        executor.setQueueCapacity(200); // 緩衝隊列數
        executor.setKeepAliveSeconds(60); // 容許線程空閒時間(單位:默認爲秒)
        executor.setThreadNamePrefix("taskExecutor-"); // 線程池名前綴
        // 線程池對拒絕任務的處理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}
複製代碼

線程池的配置很靈活,對核心線程數、最大線程數等屬性進行配置。其中,rejection-policy,當線程池已經達到最大線程數的時候,如何處理新任務。可選策略有 CallerBlocksPolicy、CallerRunsPolicy 等。CALLER_RUNS:不在新線程中執行任務,而是由調用者所在的線程來執行。咱們驗證下,線程池的設置是否生效,在 TaskService 中,打印當前的線程名稱:

public Future<String> doTaskOne() throws Exception {
        log.info("開始作任務一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務一,耗時:" + (end - start) + "毫秒");
        log.info("當前線程爲 {}", Thread.currentThread().getName());
        return new AsyncResult<>("任務一完成,耗時" + (end - start) + "毫秒");
    }
複製代碼

經過結果能夠看到,線程池配置的線程名前綴已經生效。在 Spring @Async 異步線程使用過程當中,須要注意的是如下的用法會使 @Async 失效:

  • 異步方法使用 static 修飾;
  • 異步類沒有使用 @Component 註解(或其餘註解)致使 Spring 沒法掃描到異步類;
  • 異步方法不能與被調用的異步方法在同一個類中;
  • 類中須要使用 @Autowired 或 @Resource 等註解自動注入,不能手動 new 對象;
  • 若是使用 Spring Boot 框架必須在啓動類中增長 @EnableAsync 註解。

線程上下文信息傳遞

不少時候,在微服務架構中的一次請求會涉及多個微服務。或者一個服務中會有多個處理方法,這些方法有多是異步方法。有些線程上下文信息,如請求的路徑,用戶惟一的 userId,這些信息會一直在請求中傳遞。若是不作任何處理,咱們看下是否可以正常獲取這些信息。

@GetMapping("/task")
    public String taskExecute() {
        try {
            Future<String> r1 = taskService.doTaskOne();
            Future<String> r2 = taskService.doTaskTwo();
            Future<String> r3 = taskService.doTaskThree();

            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
            HttpServletRequest request = requestAttributes.getRequest();
            log.info("當前線程爲 {},請求方法爲 {},請求路徑爲:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString());
            while (true) {
                if (r1.isDone() && r2.isDone() && r3.isDone()) {
                    log.info("execute all tasks");
                    break;
                }
                Thread.sleep(200);
            }
            log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get());
        } catch (Exception e) {
            log.error("error executing task for {}", e.getMessage());
        }

        return "ok";
    }
複製代碼

在 Spring Boot Web 中咱們能夠經過 RequestContextHolder 很方便的獲取 request。在接口方法中,輸出請求的方法和請求的路徑。

public Future<String> doTaskOne() throws Exception {
        log.info("開始作任務一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務一,耗時:" + (end - start) + "毫秒");
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        log.info("當前線程爲 {},請求方法爲 {},請求路徑爲:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString());
        return new AsyncResult<>("任務一完成,耗時" + (end - start) + "毫秒");
    }
複製代碼

同時在 TaskService 中,驗證是否是也能輸出請求的信息。運行程序,結果以下:

在 TaskService 中,每一個異步線程的方法獲取 RequestContextHolder 中的請求信息時,報了空指針異常。這說明了請求的上下文信息未傳遞到異步方法的線程中。RequestContextHolder 的實現,裏面有兩個 ThreadLocal 保存當前線程下的 request。

//獲得存儲進去的request
    private static final ThreadLocal<RequestAttributes> requestAttributesHolder =
            new NamedThreadLocal<RequestAttributes>("Request attributes");
    //可被子線程繼承的request
    private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =
            new NamedInheritableThreadLocal<RequestAttributes>("Request context");
複製代碼

再看 #getRequestAttributes() 方法,至關於直接獲取 ThreadLocal 裏面的值,這樣就使得每一次獲取到的 Request 是該請求的 request。如何將上下文信息傳遞到異步線程呢?Spring 中的 ThreadPoolTaskExecutor 有一個配置屬性 TaskDecoratorTaskDecorator 是一個回調接口,採用裝飾器模式。裝飾模式是動態的給一個對象添加一些額外的功能,就增長功能來講,裝飾模式比生成子類更爲靈活。所以 TaskDecorator 主要用於任務的調用時設置一些執行上下文,或者爲任務執行提供一些監視/統計。

public interface TaskDecorator {

	Runnable decorate(Runnable runnable);
}
複製代碼

#decorate 方法,裝飾給定的 Runnable,返回包裝的 Runnable 以供實際執行。

下面咱們定義一個線程上下文拷貝的 TaskDecorator

import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

public class ContextDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        RequestAttributes context = RequestContextHolder.currentRequestAttributes();
        return () -> {
            try {
                RequestContextHolder.setRequestAttributes(context);
                runnable.run();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}
複製代碼

實現較爲簡單,將當前線程的 context 裝飾到指定的 Runnable,最後重置當前線程上下文。

在線程池的配置中,增長回調的 TaskDecorator 屬性的配置:

@Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        // 增長 TaskDecorator 屬性的配置
        executor.setTaskDecorator(new ContextDecorator());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
複製代碼

通過如上配置,咱們再次運行服務,並訪問接口,控制檯日誌信息以下:

由結果可知,線程的上下文信息傳遞成功。

小結

本文結合示例講解了 Spring 中實現異步方法,獲取異步方法的返回值。並介紹了配置 Spring 線程池的方式。最後介紹如何在異步多線程中傳遞線程上下文信息。線程上下文傳遞在分佈式環境中會常常用到,好比分佈式鏈路追蹤中須要一次請求涉及到的 TraceId、SpanId。簡單來講,須要傳遞的信息可以在不一樣線程中。異步方法是咱們在平常開發中用來多線程處理業務邏輯,這些業務邏輯不須要嚴格的執行順序。用好異步解決問題的同時,更要用對異步多線程的方式。

源碼地址

推薦閱讀

微服務合集

訂閱最新文章,歡迎關注個人公衆號

微信公衆號
相關文章
相關標籤/搜索