聊聊reactive streams的parallel flux

本文主要研究下reactive streams的flux的parallel運行方式.react

目的

在一些涉及IO操做,好比讀取文件,訪問數據庫等,一般建議使用異步線程以parallel模式運行,以提高性能。數據庫

實例

@Test
    public void testParallelRunOn(){
        Flux.range(1, 1000)
                .log()
                .parallel(8)
                .runOn(Schedulers.parallel()) //parallel flux
                .sequential() //必須使用sequential來將這些異步線程的執行結果聚集成一個stream
                .map(e -> {
                    LOGGER.info("map thread:{},e:{}",Thread.currentThread().getName(),e);
                    return e*10;
                })
                .subscribe(e -> {
                    LOGGER.info("subscribe thread:{},e:{}",Thread.currentThread().getName(),e);
                });
    }

部分輸出異步

2:38:53.949 [main] INFO reactor.Flux.Range.1 - | onNext(13)
22:38:53.949 [parallel-2] INFO com.example.demo.ParallelTest - subscribe thread:parallel-2,e:120
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(14)
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:13
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:130
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(15)
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:14
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:140
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(16)
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - map thread:parallel-5,e:15
22:38:53.950 [parallel-5] INFO com.example.demo.ParallelTest - subscribe thread:parallel-5,e:150
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(17)
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:16
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(18)
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - subscribe thread:parallel-8,e:160
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:17
22:38:53.950 [main] INFO reactor.Flux.Range.1 - | onNext(19)
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - subscribe thread:parallel-8,e:170
22:38:53.950 [parallel-8] INFO com.example.demo.ParallelTest - map thread:parallel-8,e:18

小結

  • parallel來指定線程池線程個數
  • runOn啓動parallel flux
  • sequential將異步線程池執行結果聚集成一個stream

doc

相關文章
相關標籤/搜索