本文主要研究下reactor異步線程的變量傳遞html
在傳統的請求/應答同步模式中,使用threadlocal來傳遞上下文變量是很是方便的,能夠免得在每一個方法參數添加公用的變量,好比當前登陸用戶。可是業務方法可能使用了async或者在其餘線程池中異步執行,這個時候threadlocal的做用就失效了。java
這個時候的解決辦法就是採起propagation模式,即在同步線程與異步線程銜接處傳播這個變量。react
好比spring就提供了TaskDecorator,經過實現這個接口,能夠本身控制傳播那些變量。例如:web
class MdcTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // Right now: Web thread context ! // (Grab the current thread MDC data) Map<String, String> contextMap = MDC.getCopyOfContextMap(); return () -> { try { // Right now: @Async thread context ! // (Restore the Web thread context's MDC data) MDC.setContextMap(contextMap); runnable.run(); } finally { MDC.clear(); } }; } }
這裏注意在finally裏頭clear
配置這個taskDecoratorspring
@EnableAsync @Configuration public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setTaskDecorator(new MdcTaskDecorator()); executor.initialize(); return executor; } }
完整實例詳見 Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads
spring5引入webflux,其底層是基於reactor,那麼reactor如何進行上下文變量的傳播呢?官方提供了Context對象來替代threadlocal。segmentfault
其特性以下:api
@Test public void testSubscriberContext(){ String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello World") .verifyComplete(); }
這裏從最底部的subscriberContext設置message值爲World,而後flatMap裏頭經過subscriberContext來訪問。
@Test public void testContextSequence(){ String key = "message"; Mono<String> r = Mono.just("Hello") //NOTE 這個subscriberContext設置的過高了 .subscriberContext(ctx -> ctx.put(key, "World")) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger"))); StepVerifier.create(r) .expectNext("Hello Stranger") .verifyComplete(); }
因爲這個例子的subscriberContext設置的過高了,不能做用在flatMap裏頭的Mono.subscriberContext()
@Test public void testContextImmutable(){ String key = "message"; Mono<String> r = Mono.subscriberContext() .map( ctx -> ctx.put(key, "Hello")) //這裏返回了一個新的,所以上面的設置失效了 .flatMap( ctx -> Mono.subscriberContext()) .map( ctx -> ctx.getOrDefault(key,"Default")); StepVerifier.create(r) .expectNext("Default") .verifyComplete(); }
subscriberContext永遠返回一個新的
@Test public void testReadOrder(){ String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "Reactor")) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello Reactor") .verifyComplete(); }
operator只會讀取離它最近的一個context
@Test public void testContextBetweenFlatMap(){ String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "Reactor")) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello Reactor World") .verifyComplete(); }
flatMap讀取離它最近的context
@Test public void testContextInFlatMap(){ String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key)) ) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key)) .subscriberContext(ctx -> ctx.put(key, "Reactor")) ) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello World Reactor") .verifyComplete(); }
這裏第一個flatMap沒法讀取第二個flatMap內部的context
reactor經過提供Context來實現了相似同步線程threadlocal的功能,很是強大,值得好好琢磨。異步