簡介: Reactive 編程模型有哪些價值?它的原理是什麼?如何正確使用?本文做者將根據他學習和使用的經歷,分享 Reactive 的概念、規範、價值和原理。歡迎同窗們共同探討、斧正。html
Reactive 直接翻譯的意思式反應式,反應性。咋一看,彷佛不太好懂。java
舉個例子:在 Excel 裏,C 單元格上設置函數 Sum(A+B),當你改變單元格 A 或者單元格 B 的數值時,單元格 C 的值同時也會發生變化。這種行爲就是 Reactive。react
在計算機編程領域,Reactive 通常指的是 Reactive programming。指的是一種面向數據流並傳播事件的異步編程範式(asynchronous programming paradigm)。算法
先舉個例子你們感覺一下:spring
public static void main(String[] args) { FluxProcessor<Integer, Integer> publisher = UnicastProcessor.create(); publisher.doOnNext(event -> System.out.println("receive event: " + event)).subscribe(); publisher.onNext(1); // print 'receive event: 1' publisher.onNext(2); // print 'receive event: 2' }
代碼 1編程
以上例代碼(使用 Reactor 類庫)爲例,publisher 產生了數據流 (1,2),而且傳播給了 OnNext 事件, 上例中 lambda 響應了該事件,輸出了相應的信息。上例代碼中生成數據流和註冊/執行 lambda 是在同一線程中,但也能夠在不一樣線程中。緩存
注:若是上述代碼執行邏輯有些疑惑,能夠暫時將 lambda 理解成 callback 就能夠了。網絡
對於 Reactive 如今你應該大體有一點感受了,可是 Reactive 有什麼價值,有哪些設計原則,估計你仍是有些模糊。這就是 Reactive Manifesto 要解決的疑問了。多線程
使用 Reactive 方式構建的系統具備如下特徵:架構
即時響應性 (Responsive)
只要有可能, 系統就會及時地作出響應。即時響應是可用性和實用性的基石, 而更加劇要的是,即時響應意味着能夠快速地檢測到問題而且有效地對其進行處理。即時響應的系統專一於提供快速而一致的響應時間, 確立可靠的反饋上限, 以提供一致的服務質量。這種一致的行爲轉而將簡化錯誤處理、 創建最終用戶的信任並促使用戶與系統做進一步的互動。
回彈性 (Resilient)
系統在出現失敗時依然保持即時響應性。這不只適用於高可用的、 任務關鍵型系統——任何不具有回彈性的系統都將會在發生失敗以後丟失即時響應性。回彈性是經過複製、 遏制、 隔離以及委託來實現的。失敗的擴散被遏制在了每一個組件內部, 與其餘組件相互隔離, 從而確保系統某部分的失敗不會危及整個系統,並能獨立恢復。每一個組件的恢復都被委託給了另外一個(外部的)組件, 此外,在必要時能夠經過複製來保證高可用性。(所以)組件的客戶端再也不承擔組件失敗的處理。
彈性 (Elastic)
系統在不斷變化的工做負載之下依然保持即時響應性。反應式系統能夠對輸入(負載)的速率變化作出反應,好比經過增長或者減小被分配用於服務這些輸入(負載)的資源。這意味着設計上並無爭用點和中央瓶頸, 得以進行組件的分片或者複製, 並在它們之間分佈輸入(負載)。經過提供相關的實時性能指標, 反應式系統能支持預測式以及反應式的伸縮算法。這些系統能夠在常規的硬件以及軟件平臺上實現成本高效的彈性。
消息驅動 (Message Driven)
反應式系統依賴異步的消息傳遞,從而確保了鬆耦合、隔離、位置透明的組件之間有着明確邊界。這一邊界還提供了將失敗做爲消息委託出去的手段。使用顯式的消息傳遞,能夠經過在系統中塑造並監視消息流隊列, 並在必要時應用回壓, 從而實現負載管理、 彈性以及流量控制。使用位置透明的消息傳遞做爲通訊的手段, 使得跨集羣或者在單個主機中使用相同的結構成分和語義來管理失敗成爲了可能。非阻塞的通訊使得接收者能夠只在活動時才消耗資源, 從而減小系統開銷。
注:
知道了 Reactive 的概念,特徵和價值後,是否有相關的產品或者框架來幫助咱們構建 Reactive 式系統呢?在早些時候有一些類庫 (Rxjava 1.x, Rx.Net) 可使用,可是規範並不統一,因此後來 Netfilx, Pivotal 等公司就制定了一套規範指導你們便於實現它(該規範也是受到早期產品的啓發),這就是 Reactive Stream 的做用。
Reactive Stream 是一個使用非阻塞 back pressure(回壓)實現異步流式數據處理的標準。目前已經在 JVM 和 JavaScript 語言中實現同一套語意的規範;以及嘗試在各類涉及到序列化和反序列化的傳輸協議(TCP, UDP, HTTP and WebSockets)基礎上,定義傳輸 reactive 數據流的網絡協議。
The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
當遇到未預料數據流時,依然能夠在可控資源消耗下保持系統的可用性。
控制在一個異步邊界的流式數據交換。例如傳遞一個數據到另一個線程或者線程池,確保接收方沒有 buffer(緩存)任意數量的數據。而 back pressure(回壓)是解決這種場景的不可或缺的特性。
此標準只描述經過回壓來實現異步流式數據交換的必要的行爲和實體,最小接口,例以下方的 Publisher, Subscriber。Reactive Streams 只關注在這些組件之間的流式數據中轉,並不關注流式數據自己的組裝,分割,轉換等行爲, 例如 map, zip 等 operator。Reactive Streams 規範包括:
Publisher
產生一個數據流(可能包含無限數據), Subscriber 們能夠根據它們的須要消費這些數據。
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
Subscriber
Publisher 建立的元素的接收者。監聽指定的事件,例如 OnNext, OnComplete, OnError 等。
publicinterface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Subscription
是 Publisher 和 Subscriber 一對一的協調對象。Subscriber 能夠經過它來向 Publisher 取消數據發送或者 request 更多數據。
public interface Subscription { public void request(long n); public void cancel(); }
Processor
同時具有 Publisher 和 Subscriber 特徵。代碼1中 FluxProcessor 既能夠發送數據(OnNext),也能夠接收數據 (doOnNext)。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
Thread
Callback
Future
Reactive Extensions (Rx)
Coroutines
Reactive 的實現原理我的認爲仍是回調,kotlin 協程實現原理一樣也是回調。但實現回掉的方式不同。一個是經過事件傳播, 一個是經過狀態機。但 cooutine 編程的易用性明顯強於 Rx,後面有空我會專門寫篇文章介紹 kotlin coroutine 的實現原理。
有了 Reactive Stream 這個規範,就會有相應實現該規範的類庫。Reactor 就是其中之一。
Reactor 是遵照 Reactive Stream 規範構建非阻塞應用的 Java 語言 Reactive 類庫,已經在 spring 5 中集成,與他類似的類庫有 RxJava2, RxJs, JDK9 Flow 等。
阿里內部的 Faas 系統目前使用 Reactor 來構建整個系統,包括函數應用和各類核心應用(邏輯架構)。根據咱們壓測結果顯示,使用 Reactive 方式構建的系統確實會有這些特色:
另外從原理上,我認爲資源利用率和吞吐量也會高於非反應式的應用。
阿里內部的 Faas 系統主要作了兩件事情:
涉及到 IO 的地方几乎全異步化。例如中間件(HSF, MetaQ 等提供異步 API)調用。
IO 線程模型變化。使用較少(通常 CPU 核數)線程處理全部的請求。
傳統 Java 應用 IO 線程模型
參考 Netty 中 Reactor IO (worker thread pool) 模型,下方僞代碼(kotlin)進行了簡化。
// 非阻塞讀取客戶端請求數據(in), 讀取成功後執行lambda. inChannel.read(in) { workerThreadPool.execute{ // 阻塞處理業務邏輯(process), 業務邏輯在worker線程池中執行,同步執行完後,再向客戶端返回輸出(out) val out = process(in) outChannel.write(out) } }
Reactive 應用 IO 線程模型
IO 線程也能夠執行業務邏輯 (process),能夠不須要 worker 線程池。
// 非阻塞讀取客戶端請求數據(in), 讀取成功後執行lambda inChannel.read(in) { // IO線程執行業務邏輯(process), 而後向客戶端返回輸出(out). 這要求業務處理流程必須是非阻塞的. process(in){ out-> outChannel.write(out) { // this lambda is executed when the writing completes ... } } }
以 Reactive 方式構建的系統有不少值得學習和發揮價值的地方,但坦白講 Reactive programing 方式目前接受程度並不高。特別是使用 Java 語言開發同窗,我我的也感同身受,由於這和 Java 面向命令控制流程的編程思惟方式有較大差別。因此這裏以 Reactor (Java) 學習爲例:
反應式的系統有不少優勢,可是完整構建反應式的系統卻並不容易。不只僅是語言上的差別,還有一些組件就不支持非阻塞式的調用方式,例如:JDBC。可是有一些開源組織正在推進這些技術進行革新,例如:R2DBC。另外,爲了方便構建反應式系統,一些組織/我的適配了一些主流技術組件 reactor-core, reactor-netty, reactor-rabbimq, reactor-kafka 等,來方便完整構建反應式系統。
當你的系統從底層到上層,從系統內部到依賴外部都變成了反應式,這就造成了 Reactive 架構。
這種架構價值有多大?將來可期。
參考
https://www.reactivemanifesto.org/
https://www.reactive-streams.org/
https://kotlinlang.org/docs/tutorials/coroutines/async-programming.html
https://projectreactor.io/docs/core/release/reference/index.html