微服務的異步調用

微服務的異步調用

三豐 soft張三丰 java

微服務的異步調用

異步調用

一個能夠無需等待被調用函數的返回值就讓操做繼續進行的方法。web

異步調用就是你 喊 你朋友吃飯 ,你朋友說知道了 ,待會忙完去找你 ,你就去作別的了。同步調用就是你 喊 你朋友吃飯 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,大家一塊兒去。spring

jdk1.8以前的Future

jdk併發包裏的Future表明了將來的某個結果,當咱們向線程池中提交任務的時候會返回該對象,能夠經過future得到執行的結果,可是jdk1.8以前的Future有點雞肋,並不能實現真正的異步,須要阻塞的獲取結果,或者不斷的輪詢。promise

一般咱們但願當線程執行完一些耗時的任務後,可以自動的通知咱們結果,很遺憾這在原生jdk1.8以前是不支持的,可是咱們能夠經過第三方的庫實現真正的異步回調。多線程

public class JavaFuture {
    public static void main(String[] args) throws Throwable, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<String> f = executor.submit(new Callable<String>() {

            @Override
            public String call() throws Exception {
                System.out.println("task started!");
                longTimeMethod();
                System.out.println("task finished!");
                return "hello";
            }
        });

        //此處get()方法阻塞main線程
        System.out.println(f.get());
        System.out.println("main thread is blocked");
    }
}

若是想得到耗時操做的結果,能夠經過get()方法獲取,可是該方法會阻塞當前線程,咱們能夠在作完剩下的某些工做的時候調用get()方法試圖去獲取結果。併發

也能夠調用非阻塞的方法isDone來肯定操做是否完成,isDone這種方式有點兒相似下面的過程:
微服務的異步調用app

jdk1.8開始的Future

直到jdk1.8纔算真正支持了異步操做,jdk1.8中提供了lambda表達式,使得java向函數式語言又靠近了一步。藉助jdk原生的CompletableFuture能夠實現異步的操做,同時結合lambada表達式大大簡化了代碼量。代碼例子以下:異步

package netty_promise;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

public class JavaPromise {
    public static void main(String[] args) throws Throwable, ExecutionException {
        // 兩個線程的線程池
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //jdk1.8以前的實現方式
        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                System.out.println("task started!");
                try {
                    //模擬耗時操做
                    longTimeMethod();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "task finished!";
            }
        }, executor);

        //採用lambada的實現方式
        future.thenAccept(e -> System.out.println(e + " ok"));

        System.out.println("main thread is running");
    }
}

實現方式相似下圖:
微服務的異步調用async

Spring的異步方法

先把longTimeMethod 封裝到Spring的異步方法中,這個異步方法的返回值是Future的實例。這個方法必定要寫在Spring管理的類中,注意註解@Async。ide

@Service
public class AsynchronousService{
  @Async
  public Future springAsynchronousMethod(){
    Integer result = longTimeMethod();
    return new AsyncResult(result);
  }
}

其餘類調用這個方法。這裏注意,必定要其餘的類,若是在同類中調用,是不生效的。

@Autowired
private AsynchronousService asynchronousService;

public void useAsynchronousMethod(){
    Future future = asynchronousService.springAsynchronousMethod();
    future.get(1000, TimeUnit.MILLISECONDS);
}

其實Spring只不過在原生的Future中進行了一次封裝,咱們最終得到的仍是Future實例。

ThreadPoolTaskExecutor

當咱們須要實現併發、異步等操做時,一般都會使用到ThreadPoolTaskExecutor。
微服務的異步調用

當一個任務被提交到線程池時,首先查看線程池的核心線程是否都在執行任務,否就選擇一條線程執行任務,是就執行第二步。查看核心線程池是否已滿,不滿就建立一條線程執行任務,不然執行第三步。查看任務隊列是否已滿,不滿就將任務存儲在任務隊列中,不然執行第四步。查看線程池是否已滿,不滿就建立一條線程執行任務,不然就按照策略處理沒法執行的任務。

在ThreadPoolExecutor中表現爲:

若是當前運行的線程數小於corePoolSize,那麼就建立線程來執行任務(執行時須要獲取全局鎖)。若是運行的線程大於或等於corePoolSize,那麼就把task加入BlockQueue。若是建立的線程數量大於BlockQueue的最大容量,那麼建立新線程來執行該任務。若是建立線程致使當前運行的線程數超過maximumPoolSize,就根據飽和策略來拒絕該任務。

TaskDecorator

public interface TaskDecorator A callback interface for a decorator to be applied to any Runnable about to be executed. Note that such a decorator is not necessarily being applied to the user-supplied Runnable/Callable but rather to the actual execution callback (which may be a wrapper around the user-supplied task). The primary use case is to set some execution context around the task's invocation, or to provide some monitoring/statistics for task execution.

意思就是說這是一個執行回調方法的裝飾器,主要應用於傳遞上下文,或者提供任務的監控/統計信息。看上去正好能夠應用於咱們這種場景。多線程的場景下要多注意。

解決方案

上文中的錯誤信息涉及到RequestAttributes 和SecurityContext,他們都是經過ThreadLocal來保存線程數據,在同步方法中沒有問題,使用線程池異步調用時,咱們能夠經過配合線程池的TaskDecorator裝飾器拷貝上下文傳遞。

注意 線程池中的線程是可複用的,使用ThreadLocal須要注意內存泄露問題,因此線程執行完成後須要在finally方法中移除上下文對象。

代碼以下

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import javax.annotation.Nonnull;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    @Bean("ttlExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 設置線程池核心容量
        executor.setCorePoolSize(20);
        // 設置線程池最大容量
        executor.setMaxPoolSize(100);
        // 設置任務隊列長度
        executor.setQueueCapacity(200);
        // 設置線程超時時間
        executor.setKeepAliveSeconds(60);
        // 設置線程名稱前綴
        executor.setThreadNamePrefix("ttl-executor-");
        // 設置任務丟棄後的處理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 設置任務的裝飾
        executor.setTaskDecorator(new ContextCopyingDecorator());
        executor.initialize();
        return executor;
    }

    static class ContextCopyingDecorator implements TaskDecorator {
        @Nonnull
        @Override
        public Runnable decorate(@Nonnull Runnable runnable) {
            RequestAttributes context = RequestContextHolder.currentRequestAttributes();
            SecurityContext securityContext = SecurityContextHolder.getContext();
            return () -> {
                try {
                    RequestContextHolder.setRequestAttributes(context);
                    SecurityContextHolder.setContext(securityContext);
                    runnable.run();
                } finally {
                    SecurityContextHolder.clearContext();
                    RequestContextHolder.resetRequestAttributes();
                }
            };
        }
    }
}
相關文章
相關標籤/搜索