爲了應對高併發場景下到服務端編程需求,微軟最早提出了一種異步編程到方案Reactive Programming
,也就是反應式編程。react
以後在Java社區就出現了RxJava和Akka Stream等技術方案,讓Java平臺在反應式編程上有了多種選擇。算法
函數式編程編程
反應式編程通常是基於函數式編程實現的,函數式編程有以下特色:安全
反應式編程是一種基於數據流,傳遞變化,聲明式的編程範式。服務器
事件驅動網絡
思想是組件之間交互經過鬆耦合的生產者和消費者來實現的,而且事件以異步,非阻塞方式進行發送和接收。數據結構
事件驅動是系統經過推模式實現的,也就是生產者在消息產生時推送數據給消費者進行處理,而不是讓消費者不斷輪詢或等待數據實現的。併發
響應及時框架
因爲反應式是異步的,好比進行數據處理的話,在交出任務以後就快速返回,而不是阻塞的等待任務執行完畢再返回。任務的執行給到後臺線程執行,等任務處理完成以後返回,好比Java8的CompletableFuture。異步
事件彈性
事件驅動系統是鬆耦合的,上下游之間不是直接依賴,可是在Debug時成本更高一些。
Spring Reactor是Pivotal基於反應式編程實現的一種方案。是一種非阻塞,事件驅動的編程方案,使用函數式編程實現。
觀察者模式
反應式編程和命令式編程在迭代器上的實現:
背壓
若是Publisher發佈消息太快,超過Subscriber處理速度該怎麼辦?響應式編程引入了背壓概念,使得Subscriber可以控制消費消息的速度。
在Java生態中,Netflix的RxJava,TypeSafe的Scala,Akaka,pivatol的Sping,Reactor都是反應式編程的框架。
Stream不是集合元素,不是數據結構,也不保存數據,只是關於算法和計算,更像一種能夠編程的迭代器。
Stream能夠並行操做,迭代器只能命令式的,串型操做。 並行操做是將數據分紅多段,每個在不一樣線程中處理,最後將結果一塊兒輸出。這樣能夠大大利用硬件資源。
Reactor主要模塊基於Netty實現:
核心類:
反應式編程概念總結:
Reactor使用方式上基本分爲三步:
Reactor編程須要先建立出Mono或Flux。
同步調用結果建立對象
Mono<String> helloWorld = Mono.just("Hello World"); // 能夠指定序列中包含的所有元素 Flux<String> fewWords = Flux.just("Hello","World"); Flux<String> manyWords = Flux.fromIterable(words);
這種方式通常用在通過一系列非IO型操做後,獲得一個對應的對象,當須要將這個對象交給IO操做時,能夠經過這種方式轉換成Mono或Flux。
異步調用結果建立
若是異步獲得結果,好比CompletableFuture能夠建立一個Mono:
Mono.fromFuture(completableFuture);
若是這個異步調用不返回CompletableFuture,而有本身的回調方法,那麼可使用:
static<T>Mono<T>create(Consumer<MonoSink<T>>callback)
Mono.create(sink ->{ ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url,String.class); entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>(){ @Override public void onSuccess(ResponseEntity<String> result){ sink.success(result.getBody()); } @Override public void onFailure(Throwable ex) { sink.error(ex); } }); });
在進行Mono和Flux處理階段,通常使用filter
,map
,flatMap
,then
,zip
,reduce
等。
map,flatMap,then 三個頻率使用比較高。
then
是下一步意思,表明執行順序的下一步,不表示下一步依賴於上一步。then方法參數只是一個Mono,入參不是上一步的執行結果。
flatMap和map的參數是Function,是上一步執行的結果。
flatMap和map
傳統的命令式編程
Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);
對應的反應式編程
Mono.just(params) .flatMap(v -> doStep1(v)) .flatMap(v -> doStep2(v)) .flatMap(v -> doStep3(v));
flatMap入參Function的返回值要求是Mono對象。map的入參Function只要求返回一個普通對象。 對於一些返回值是Mono的方法,想將調用串聯起鏈式調用,必須使用flatMap,而不是map。
通常使用Mono.zip,Tuple2等。
傳統編程方式併發執行是經過線程池+Future方式實現的。可是在作Future.get時是阻塞的。 Reactor中使用Mono和Flux中的zip方法以下:
Mono<CustomType1> item1Mono = ...; Mono<CustomType2> item2Mono = ...; Mono.zip(items -> { CustomType1 item1 = CustomType1.class.cast(items[0]); CustomType2 item2 = CustomType2.class.cast(items[1]); // Do merge return mergeResult; }, item1Mono, item2Mono);
這樣item1Mono 和 item2Mono 過程是並行執行的。
使用zip方法時須要作類型強轉換,類型強轉換是不安全的
通常使用:Flux.fromIterable(),Flux.reduce()方法。
好比:
Data initData = ...; List<SubData> list = ...; Flux.fromIterable(list) .reduce(initData,(data,itemInList) -> { // Do something on data and itemInList return data; });
直接消費的Mono和Flux就是調用subscriber方法,其餘的WebFlux接口能夠直接返回框架的Response輸出就能夠了。
Serverlet3.1支持了異步處理方式,Servlet線程不須要一直阻塞的等待任務執行。Servlet在接收到請求後,將請求委託給業務線程完成,本身則直接返回繼續接收新的請求。
因此Servlet3.1適用於那些業務處理很是耗時場景,這樣能夠減小服務器資源佔用,能夠提升併發處理速度,可是對於自己響應較爲迅速的應用來講收益不大。
WebFlux的異步處理是基於Reactor實現的,是將輸入流適配成Mono或Flux進行統一處理。
在最新的Spring Cloud Gateway中也是基於Netty和WebFlux實現的。
Flux和Mono
Flux和Mono屬於事件發佈者,相似於生產者,爲消費者提供訂閱接口。在實現發生時,Flux和Mono會回調消費者對應的方法通知消費者處理事件。 Flux能夠觸發多個事件,Mono只觸發一個事件。
Flux.fromIterable(getSomeLongList()) .mergeWith(Flux.interval(100)) .doOnNext(serviceA::someObserver) .map(d -> d * 2) .take(3) .onErrorResumeWith(errorHandler::fallback) .doAfterTerminate(serviceM::incrementTerminate) .subscribe(System.out::println ); Mono.fromCallable(System::currentTimeMillis) .flatMap(time -> Mono.first(serviceA.findRecent(time), serviceB.findRecent(time))) .timeout(Duration.ofSeconds(3), errorHandler::fallback) .doOnSuccess(r -> serviceM.incrementSuccess()) .subscribe(System.out::println);
選型注意
若是在框架中使用了WebFlux,他依賴的安全認證,數據訪問都必須使用Reactive API,在存儲層目前Reactive只支持MongoDB,Redis和Couchbase等幾種不支持事務管理的NoSql,須要注意。
WebFlux並不能將接口耗時減小,只是能夠減小線程擴展,提高系統的吞吐和伸縮能力。因爲其爲異步非阻塞Web框架,因此適用於IO密集型服務,好比咱們交易網關這種。
WebFlux支持兩種編程模式:
能夠經過Reactive Streams實現背壓控制。
ServerRequest和ServerResponse是JDK8友好訪問底層HTTP消息的不可變接口。徹底是響應式的。
在使用lambda寫處理函數時,若是多個處理函數可能缺少可讀性且不易於維護。能夠將相關處理函數分組到一個處理程序或控制器類中。