本文主要研究下reactive streams的flux的parallel運行方式.react
在一些涉及IO操做,好比讀取文件,訪問數據庫等,一般建議使用異步線程以parallel模式運行,以提高性能。數據庫
@Test public void testParallelRunOn(){ Flux.range(1, 1000) .log() .parallel(8) .runOn(Schedulers.parallel()) //parallel flux .sequential() //必須使用sequential來將這些異步線程的執行結果聚集成一個stream .map(e -> { LOGGER.info("map thread:{},e:{}",Thread.currentThread().getName(),e); return e*10; }) .subscribe(e -> { LOGGER.info("subscribe thread:{},e:{}",Thread.currentThread().getName(),e); }); }
部分輸出異步
2:38:53.949 [main] INFO reactor.Flux.Range.1 - | onNext(13) 22:38:53.949 [parallel-2] INFO com.example.demo.ParallelTest - subscribe thread:parallel-2,e:120 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(14) 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:13 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:130 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(15) 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:14 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:140 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(16) 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:15 22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:150 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(17) 22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:16 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(18) 22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - subscribe thread:parallel-8,e:160 22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:17 22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(19) 22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - subscribe thread:parallel-8,e:170 22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:18