聊聊reactor異步線程的變量傳遞

本文主要研究下reactor異步線程的變量傳遞html

threadlocal的問題

在傳統的請求/應答同步模式中,使用threadlocal來傳遞上下文變量是很是方便的,能夠免得在每一個方法參數添加公用的變量,好比當前登陸用戶。可是業務方法可能使用了async或者在其餘線程池中異步執行,這個時候threadlocal的做用就失效了。java

這個時候的解決辦法就是採起propagation模式,即在同步線程與異步線程銜接處傳播這個變量。react

TaskDecorator

好比spring就提供了TaskDecorator,經過實現這個接口,能夠本身控制傳播那些變量。例如:web

class MdcTaskDecorator implements TaskDecorator {
 
  @Override
  public Runnable decorate(Runnable runnable) {
    // Right now: Web thread context !
    // (Grab the current thread MDC data)
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
      try {
        // Right now: @Async thread context !
        // (Restore the Web thread context's MDC data)
        MDC.setContextMap(contextMap);
        runnable.run();
      } finally {
        MDC.clear();
      }
    };
  }
}
這裏注意在finally裏頭clear

配置這個taskDecoratorspring

@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
 
  @Override
  public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setTaskDecorator(new MdcTaskDecorator());
    executor.initialize();
    return executor;
  }

}
完整實例詳見 Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads

Reactor Context

spring5引入webflux,其底層是基於reactor,那麼reactor如何進行上下文變量的傳播呢?官方提供了Context對象來替代threadlocal。segmentfault

其特性以下:api

  • 相似map的kv操做,好比put(Object key, Object value),putAll(Context), hasKey(Object key)
  • immutable,即同一個key,後面put不會覆蓋
  • 提供getOrDefault,getOrEmpty方法
  • Context與做用鏈上的每一個Subscriber綁定
  • 經過subscriberContext(Context)來訪問
  • Context的做用是自底向上

實例

設置及讀取

@Test
    public void testSubscriberContext(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World")
                .verifyComplete();
    }
這裏從最底部的subscriberContext設置message值爲World,而後flatMap裏頭經過subscriberContext來訪問。

自底向上

@Test
    public void testContextSequence(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                //NOTE 這個subscriberContext設置的過高了
                .subscriberContext(ctx -> ctx.put(key, "World"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));

        StepVerifier.create(r)
                .expectNext("Hello Stranger")
                .verifyComplete();
    }
因爲這個例子的subscriberContext設置的過高了,不能做用在flatMap裏頭的Mono.subscriberContext()

不可變

@Test
    public void testContextImmutable(){
        String key = "message";

        Mono<String> r = Mono.subscriberContext()
                .map( ctx -> ctx.put(key, "Hello"))
                //這裏返回了一個新的,所以上面的設置失效了
                .flatMap( ctx -> Mono.subscriberContext())
                .map( ctx -> ctx.getOrDefault(key,"Default"));

        StepVerifier.create(r)
                .expectNext("Default")
                .verifyComplete();
    }
subscriberContext永遠返回一個新的

多個連續的subscriberContext

@Test
    public void testReadOrder(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor")
                .verifyComplete();
    }
operator只會讀取離它最近的一個context

flatMap間的subscriberContext

@Test
    public void testContextBetweenFlatMap(){
        String key = "message";
        Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello Reactor World")
                .verifyComplete();
    }
flatMap讀取離它最近的context

flatMap中的subscriberContext

@Test
    public void testContextInFlatMap(){
        String key = "message";
        Mono<String> r =
                Mono.just("Hello")
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                        )
                        .flatMap( s -> Mono.subscriberContext()
                                .map( ctx -> s + " " + ctx.get(key))
                                .subscriberContext(ctx -> ctx.put(key, "Reactor"))
                        )
                        .subscriberContext(ctx -> ctx.put(key, "World"));

        StepVerifier.create(r)
                .expectNext("Hello World Reactor")
                .verifyComplete();
    }
這裏第一個flatMap沒法讀取第二個flatMap內部的context

小結

reactor經過提供Context來實現了相似同步線程threadlocal的功能,很是強大,值得好好琢磨。異步

doc

相關文章
相關標籤/搜索