本系列爲本人Java編程方法論 響應式解讀系列的Webflux
部分,現分享出來,前置知識Rxjava2 ,Reactor的相關解讀已經錄製分享視頻,併發布在b站,地址以下:java
Rxjava源碼解讀與分享:www.bilibili.com/video/av345…react
Reactor源碼解讀與分享:www.bilibili.com/video/av353…程序員
NIO源碼解讀相關視頻分享: www.bilibili.com/video/av432…web
NIO源碼解讀視頻相關配套文章:數據庫
BIO到NIO源碼的一些事兒之NIO 下 之 Selector BIO到NIO源碼的一些事兒之NIO 下 Buffer解讀 上 BIO到NIO源碼的一些事兒之NIO 下 Buffer解讀 下服務器
Java編程方法論-Spring WebFlux篇 01 爲何須要Spring WebFlux 上網絡
其中,Rxjava與Reactor做爲本人書中內容將不對外開放,你們感興趣能夠花點時間來觀看視頻,本人對着兩個庫進行了全面完全細緻的解讀,包括其中的設計理念和相關的方法論,也但願你們能夠留言糾正我其中的錯誤。
隨着Servlet 3.1
的引入,經過Spring MVC
便可以實現非阻塞行爲。 可是,因爲Servlet API
依然包含幾個阻塞的接口。一樣,咱們在應用程序設計的API中也可能會使用到阻塞,而該API原本是被設定爲非阻塞。 在這種狀況下,相關阻塞API的使用確定會下降應用程序性能。 咱們來看下面這段代碼:
@GetMapping
void onResponse(){
try{
//some logic here
}catch(Exception e){
//sendError() is a blocking API
response.sendError(500);
}
}
複製代碼
這段代碼使用在Spring MVC中,Spring容器針對這個錯誤而對相應頁面的渲染則是阻塞的。以下:
@Controller
public class MyCustomErrorController implements ErrorController {
@RequestMapping(path = "/error")
public String greeting() {
return "myerror";
}
@Override
public String getErrorPath() {
return "/error";
}
}
複製代碼
此處渲染的頁面爲myerror.jsp
,具體代碼就不貼了。固然,咱們確定有辦法來異步解決這個錯誤處理問題,但咱們出錯的可能性就會變大,要知道,咱們最終仍是要通過Servlet
對象的,而Servlet
相關api有阻塞的也有非阻塞的,咱們來經過一張圖來方便理解。
當產生請求訪問時事件時,則該事件處理流向如上圖所示(咱們只關注進入到Servlet容器的處理階段),能夠知道,這個過程尤爲是Filter鏈這裏,都是能夠發生IO阻塞的,再根據上一節所講內容,咱們可使用一張圖來展現咱們能夠肯定的非阻塞IO。
Spring MVC
中在所寫代碼邏輯中作到完美的無阻塞,咱們依然沒法改變與避免
Servlet 3.1+
中那些架構設計層面的缺陷,
Servlet
的相關阻塞API咱們依然會用到。那麼咱們是否是可使用netty來避免這樣的情形?因而咱們就能夠將目光放到
Spring WebFlux
之上。
咱們業務端來說,絕大多數程序員對於併發的操做並不在行的,也就很難寫出性能很好並且符合規範的代碼,這也形成了在Spring web MVC
下,咱們很難針對本身的業務進行合理的異步化操做。好比,咱們每每會將I/O
操做與當前執行線程進行綁定到一塊兒,也就是生產和消費兩種業務綁定在一塊兒,這樣,即使咱們異步,二者也是在同一個線程中進行,這樣,假如併發量很大的狀況下,異步化會產生大量的線程,CPU
會在切換線程上消耗更多的性能,這是咱們所不肯看到的,而RxJava
和Reactor
給咱們提供了很好的調度API
,如Reactor中的publishOn
,RxJava中的observeOn
,能夠保證咱們將生產和消費分離,同時,做爲生產或消費線程所在的線程池,其每每是針對於使用了這個線程池的多個訂閱服務,這樣,每個線程均可能同時爲多個訂閱關係服務,一個單獨的訂閱關係並不會一直佔有這個線程,當有元素下發時,將會根據訂閱者請求數量和元素產生的速度以及是否有多個線程在處理此訂閱關係的下發元素,使用調度器的話,這裏拿Reactor中的publishOn
來說,當上遊只支持同步的話(FluxPublishOn.PublishOnSubscriber#onSubscribe
內調用源的requestFusion
方法判斷),那就始終在同一個線程內消費(FluxPublishOn.PublishOnSubscriber#trySchedule
內進行判斷,經過WIP
控制),當咱們定義好publishOn
中隊列大小後,每當隊列內元素消耗完畢,而後上游元素產生太慢,就會跳出當前消費線程,直到有新元素下發時,就再次從線程池中拿到一個線程消費。讀者假如此處有疑問,請回顧本書以前內容(因書並未出版,可回顧本人相關分享視頻)。 這樣服務器的性能就能夠獲得最大程度的利用。這個咱們在Spring MVC
中確實很難自行實現,比較複雜。 另外,經過Reactor
對於背壓的實現,咱們能夠作到相似消息中間件對於消息的積壓,不至於數據在網絡傳輸的過程當中丟失,這樣就能夠更好的應對高併發場景下的訪問需求。 接下來,咱們就來對Webflux
下的背壓使用進行一波大體的說明。
爲了幫助理解Backpressure
在WebFlux
使用時底層的工做原理,咱們有必要回顧一下默認使用的TCP/IP
傳輸層。咱們知道,瀏覽器和服務器之間的正常通訊(服務器到服務器之間的通訊一般也是同樣)是經過TCP
鏈接完成的(一樣包括WebFlux
中的WebClient
和服務器之間的通訊)。同時,咱們會從Reactive Streams
規範的角度來回顧一下背壓的含義,以便更好的針對背壓進行控制。
在Reactive Streams
中,背壓包括兩部分,一部分是接收端的消息積壓,另外一部分是消費者能夠經過發出通知來表達該消費者能夠消耗多少元素,以此來進行需求調節。整個過程是操做的元素對象,那麼,在這裏,咱們就碰到一個棘手的問題:TCP
是針對字節抽象而不是邏輯元素抽象。 咱們一般所說的背壓控制是指制向或者從網絡發送或接收的邏輯元素的數量。而TCP本身的流程控制是基於字節而不是邏輯元素。
由上,可知道,在WebFlux
的實現中,背壓經過數據傳輸流程控制來調節,但它不會暴露接收方的實際需求。 咱們能夠經過下圖來觀察其中的交互流程:
上圖顯示了兩個微服務之間的通訊,其中左側發送數據流,右側對該流進行消費。接下來對上圖整個過程進行簡要說明:
WebFlux
中,它將邏輯對象元素轉換爲字節流並將它們傳輸到TCP網絡或從TCP網絡接收字節流並轉換爲邏輯對象元素。正如咱們從上圖中能夠看到的那樣,接收者的需求與發送者的需求不一樣(這裏指圖中的request
請求的邏輯元素)。這也就意味着二者的需求是相互獨立的,也就是說,在WebFlux中,咱們能夠經過業務邏輯(服務)交互來展示需求,但不多會暴露服務A與服務B交互的相關背壓細節。 也就是說,webflux
中的背壓設計並無對數據發送服務端進行按需設計,這點可能與咱們所指望的有所出入,不是那麼完美,顯得有失公平。
若是咱們想很簡單的對背壓進行控制,咱們能夠經過Reactor
的相關操做來控制請求數量,也能夠在自定義訂閱者的時候進行限定,這裏咱們經過Flux
下的limitRate(n)
來實現。首先咱們先來看下其實現思路,其實就是一個調度操做,只不過咱們以前有講,publishOn
本身是一箇中間存儲站,它將上下游進行分離下游的請求數量在這裏進行管理,publishOn
本身有一個每次向上遊請求的數量限制,關於publishOn
操做源碼細節,能夠回顧以前相關章節內容(因書並未出版,可回顧本人相關分享視頻)。也就是說,咱們只須要在publishOn
之上封裝一個API
來實現便可:
//reactor.core.publisher.Flux#limitRate(int)
public final Flux<T> limitRate(int prefetchRate) {
return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));
}
複製代碼
假如咱們有一個包含questions
的源,由於解決問題的能力有限,想要對其進行限流,因而咱們就能夠進行以下操做:
@PostMapping("/questions")
public Mono<Void> postAllQuestions(Flux<Question> questionsFlux) {
return questionService.process(questionsFlux.limitRate(10))
.then();
}
複製代碼
咱們熟悉publishOn
後,能夠知道limitRate()
操做會首先從上游獲取10個元素存到其內定義的隊列中。這意味着即便咱們定義的訂閱者所設定的請求元素數量爲Long.MAX_VALUE
,limitRate
操做也會將此需求拆分爲一塊一塊去請求下發。此處涉及的源碼以下,你們可對照理解:
//reactor.core.publisher.FluxPublishOn.PublishOnSubscriber#runAsync
if (e == limit) {
if (r != Long.MAX_VALUE) {
r = REQUESTED.addAndGet(this, -e);
}
s.request(e);
e = 0L;
}
複製代碼
上面是提交的數據的分塊處理,咱們有時候會涉及到數據庫請求數據的處理,好比查詢,同時將所發送數據進行限流逐步發送,能夠進行以下操做:
@GetMapping("/questions")
public Flux<Question> getAllQuestions() {
return questionService.retreiveAll()
.limitRate(10);
}
複製代碼
由此,咱們也能理解背壓在webflux
中的做用機制了。對於這些特性,Spring MVC
也就很難提供了。
相信你們也明確感覺到了使用Spring WebFlux
的好處了,也知道爲什麼會要求使用Servlet 3.1+
,同時對於webflux
中背壓的做用有了更清晰的認知。不過,咱們須要注意的是,經過官方文檔可知,Spring Webflux
能夠在Servlet Container
或Netty
上運行,而本書更關心Spring Webflux
基於Netty
服務器的運行。那麼,接下來,咱們將接觸Reactor-netty
的內在細節。