| 導語 反應式編程是在命令式編程、面向對象編程以後出現的一種新的編程模型,是一種以更優雅的方式,經過異步和數據流來構建事務關係的編程模型。本文包括反應式編程的概述和 RxPy 實戰,以及怎樣去理解反應式編程才能更好的把它融入到咱們的編程工做中,把反應式編程變成咱們手中的利器。html
1. 反應式編程概述
1.1 背影趨勢
在 google 趨勢中搜索反應式編程,能夠看到其趨勢在 2013 年後一直是往上走的。如圖1所示:前端
[ 圖1 google 趨勢搜索結果 ]vue
爲啥呢?爲啥是 2013 年纔有明顯的變化,由於2013 年後纔有能夠大範圍使用的框架和庫出現,纔有人專門投入去佈道反應式編程這個事情。java
在範圍縮小到中國,這個結果有點意思了,如圖 2 所示:react
[ 圖2 google趨勢搜索結果 ]git
在中國主要是北上廣深和杭州,說明什麼,這些技術仍是一線城市的開發同窗纔會使用,查看左下角主要是主題都是java相關,查看右上角,浙江省用得比較多,說明阿里是主要的使用方。github
1.2 定義
反應式編程又叫響應式編程,在維基百科中,其屬於聲明式編程,數據流。數據庫
其定義爲:編程
反應式編程 (reactive programming) 是一種基於數據流 (data stream) 和 變化傳遞 (propagation of change) 的聲明式 (declarative) 的編程範式。後端
換句話說:使用異步數據流進行編程,這意味着能夠在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值經過數據流進行傳播。
反應式編程提升了代碼的抽象級別,能夠只關注定義了業務邏輯的那些相互依賴的事件。
1.3 Rx的發展
反應式編程最着名的實現是 ReactiveX,其爲 Reactive Extensions 的縮寫,通常簡寫爲 Rx ,發展歷程如圖 3 所示:
[ 圖3 Rx來歷 ]
微軟 2009 年 以 .Net 的一個響應式擴展的方式創造了Rx,其藉助可觀測的序列提供一種簡單的方式來建立異步的,基於事件驅動的程序。2012 年 Netflix 爲了應對不斷增加的業務需求開始將 .NET Rx 遷移到 JVM 上面。並於 2013 年 2 月份正式向外發佈了 RxJava 。
1.4 反應式宣言
在 2014 年 9 月 16 號,反應式宣言正式發佈了 2.0 版本。在 2.0 以前,這份宣言的中文翻譯標題,其實是」響應式宣言「,而非」反應式宣言「
在反應式宣言中的 」Reactive「 其實是指一個副詞,表示系統老是會積極主動、甚至是智能地對內外的變化作出反應。因此這裏叫反應式編程會更貼切一些.
反應式宣言是一份構建現代雲擴展架構的參考方案框架。這個框架主要使用消息驅動的方法來構建系統,在形式上能夠達到彈性和回彈性,最後能夠產生即時響應性的價值。如圖 4 所示:
[ 圖4 反應式編程 ]
反應式系統具備如圖所示的4個特性:
-
即時響應性,對用戶有反應: 對用戶有反應咱們才說響應,通常咱們說的響應,基本上都說得針對跟用戶來交互。只要有可能,系統就會及時響應。
-
回彈性,對失敗有反應: 應用失敗了系統不能無動於衷,不能等着它掛掉,要有反應,使其具有可恢復性。可恢復性能夠經過複製、監控、隔離和委派等方式實現。在可恢復性的系統中,故障被包含在每一個組件中,各組件之間相互隔離,從而容許系統的某些部分出故障而且在不連累整個系統的前提下進行恢復。當某個模塊出現問題時,須要將這個問題控制在必定範圍內,這便須要使用隔絕的技術,避免雪崩等相似問題的發生。或是將出現故障部分的任務委託給其餘模塊。回彈性主要是系統對錯誤的容忍。
-
彈性,對容量和壓力變化有反應: 在不一樣的工做負載下,系統保持響應。系統能夠根據輸入的工做負載,動態地增長或減小系統使用的資源。這意味着系統在設計上能夠經過分片、複製等途徑來動態申請系統資源並進行負載均衡,從而去中心化,避免節點瓶頸。若是沒有狀態的話,就進行水平擴展,若是存在狀態,就使用分片技術,將數據分至不一樣的機器上。
-
消息驅動,對輸入有反應: 響應系統的輸入,也能夠叫作消息驅動。反應式系統依賴異步消息傳遞機制,從而在組件之間創建邊界,這些邊界能夠保證組件之間的鬆耦合、隔離性、位置透明性,還提供了以消息的形式把故障委派出去的手段。
前三種特性(即時響應性, 回彈性, 彈性)更多的是跟你的架構選型有關,咱們能夠很容易理解像 Microservices、Docker 和 K8s 這樣的技術對創建反應式系統的重要性。
1.5 回壓
這裏要特別要提一下回壓(Backpressure), Backpressure 實際上是一種現象,在數據流從上游生產者向下遊消費者傳輸的過程當中,上游生產速度大於下游消費速度,致使下游的 Buffer 溢出,這種現象就叫作 Backpressure 出現。這句話的重點不在於」上游生產速度大於下游消費速度」,而在於」Buffer 溢出」。回壓和 Buffer 是一對相生共存的概念,只有設置了 Buffer,纔有回壓出現;只要設置了 Buffer,必定存在出現回壓的風險。
好比咱們開發一個後端服務,有一個 Socket 不斷地接收來自用戶的請求來把用戶須要的數據返回給用戶。咱們服務所能承受的同時訪問用戶數是有上限的,假設最多隻能承受 10000 的併發,再多的話服務器就有當掉的風險了。對於超過 10000 的用戶,程序會直接丟棄。那麼對於這個案例 10000 就是咱們設置的 Buffer,當超過 10000 的請求產生時,就形成了回壓的產生;而咱們程序的丟棄行爲,就是對於回壓的處理。
對於回壓咱們通常有兩種處理方式,一種就是上面舉例中的拒絕或丟棄,這是否認應答的方式,另外一種是確定應答,先收下來,而後再慢慢處理。
1.6 Rx適用場景
[圖5 適用場景 ]
Rx 適用於前端,跨平臺,後端等場景,其中在Angular 2.x,vue,react版本中已經有了Rx的實現可使用,而且做爲其核心的特性在宣傳;Rx支持多達18種語言,在各平臺均可以使用,具備很強的跨平臺特性;在後端,經過異步調用,簡單的併發實現,能夠實現鬆耦合的架構。
1.7 哪些語言或框架支持反應式編程
18種語言Rx系統的框架出現比較早,已經發布了v2版本了,Rx* 系列語言支持以下:
Java: RxJava
JavaScript: RxJS
C#: Rx.NET
C#(Unity): UniRx
Scala: RxScala
Clojure: RxClojure
C++: RxCpp
Lua: RxLua
Ruby: Rx.rb
Python: RxPY
Go: RxGo
Groovy: RxGroovy
JRuby: RxJRuby
Kotlin: RxKotlin
Swift: RxSwift
PHP: RxPHP
Elixir: reaxive
Dart: RxDart
框架支持:
RxCocoa: RxCocoa是RxSwift的一部分,主要是UI相關的Rx封裝
RxAndroid: RxAndroid 源於RxJava,是一個實現異步操做的庫,具備簡潔的鏈式代碼,提供強大的數據變換。
RxNetty: RxNetty 是一個響應式、實時、非阻塞的網絡編程庫,基於 Netty 這個著名的事件驅動網絡庫的強大功能。支持Tcp/Udp/Http/Https。支持>RxJava。RxNetty 在 NetFlix公司的各類產品中獲得了普遍的應用。
Reactor: Reactor相對出生較晚,有發展前景Akka,scala系,用戶基礎薄弱
1.8. 哪些公司在用Rx
[ 圖6 哪些公司在用Rx ]
2. RxRy入門
2.1 Rx組成
Rx的組成包括5部分,被觀察者或者叫發射源,觀察者/訂閱者或者叫接收源,訂閱,調度器,操做符。
Observable<Data> 被觀察者能夠被觀察者訂閱,被觀察者將數據push給全部的訂閱者
Subscriber /Observer
Subscription 訂閱能夠被取消訂閱
Schedulers 調度器是Rx的線程池,操做中執行的任務能夠指定線程池,咱們能夠經過subscribeOn來指定Observable的任務在某線程池中執行Observable
也能夠經過observeOn來指定訂閱者/觀察者們,在哪一個線程執行onNext, onComplete, onError
Operators 操做符能夠對數據流進行各類操做,包括建立,轉換,過濾,組裝,合併 ,篩選等等
咱們常常用如圖7所示的示例圖來表示數據流動的過程。
[ 圖7 ]
圖中上面這條線表示被觀察者的時間線,表示輸入,從左到右輸入項,中間的各類顏色的塊塊是咱們要觀察的項,最後的豎線表示輸入結束。
Flip是變換過程,對全部的項作變換。下面這條線是變換的結果,也就是輸出,一樣各類顏色的塊塊是要觀察的結果的項,xx表示異常中斷。
2.2 第一次體驗Rx
需求以下:
從輸入框獲取輸入,從第 10 次輸入開始取前5次的輸入,打印出來。
這是一個命令式編程的示例,咱們須要將需求轉換成命令式的描述,引入了計數變量,經過計數變量來跳過輸入,而後再根據計算變量來標記取數的次數,打印出來,代碼如圖8所示:
[ 圖8 ]
換成反應式編程,代碼如圖 9 所示:
[ 圖9]
這是一個反應式的面向數據流的示例,建立流,跳過前 10 個項,取前5次,打印出來。如圖 10 所示爲其數據流動示例。
[ 圖10 ]
圖片來源:
https://github.com/ReactiveX/RxJava/wiki/How-To-Use-RxJava
對比命令式編程和反應式編程,區別以下:
-
命令式編程,重視控制(執行過程),以運算、循環、條件判斷、跳轉來完成任務;計算機爲先的思惟,指令驅動機器作事;容易引入大量狀態變量
-
反應式編程,重視任務的解決(執行結果),關注數據轉換和轉換的組合;人腦思惟,任務驅動,分治;明確的輸入和輸出狀態
Rx主要是作三件事:
-
數據/事件的建立
-
組合/轉換數據流
-
監聽處理結果
下面咱們以文檔+代碼的方式介紹這三件事情。
2.3 建立流
RxPy 有 10 種用於建立 Observable 的操做符,以下:
-
create – 使用 observer 方法,從頭建立一個 Observable,在 observer 方法中檢查訂閱狀態,以便及時中止發射數據或者運算。
-
-
defer — 只有當訂閱者訂閱才建立 Observable,爲每一個訂閱建立一個新的 Observable。
-
-
empty/never/throw — 建立一個什麼都不作直接通知完成的 Observable 建立一個什麼都不作直接通知錯誤的 Observable 建立一個什麼都不作的 Observable
-
-
from — 將一些對象或數據結構轉換爲 Observable
-
-
interval —建立一個按照給定的時間間隔發射從 0 開始的整數序列的 Observable
-
-
just — 將一個對象或對象集轉換爲發射這個或這些對象的 Observable
-
-
range — 建立一個發射指定範圍的整數序列的 Observable
-
-
repeat — 建立一個重複發射特定項或序列的 Observable
-
-
start — 建立一個發射函數返回值的 Observable
-
-
timer — 建立一個在給定的延時以後發射單個數據項的 Observable
create 從頭建立一個 Observable ,在 observer 方法中檢查訂閱狀態,以便及時中止發射數據或者運算。
observer 包含三個基本函數:
-
onNext():基本事件,用於傳遞項。
-
-
onCompleted(): 事件隊列完結。不只把每一個事件單獨處理,還會把它們看作一個隊列。當不會再有新的 onNext() 發出時,須要觸發 onCompleted() 方法做爲標誌。
-
-
onError(): 事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,會發出錯誤消息,同時隊列自動終止,不容許再有事件發出
-
在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,而且是事件序列中的最後一個。若是在隊列中調用了其中一個,就不該該再調用另外一個。
示例代碼見附件
2.4 變換
變換常見的操做符有 6 個:
-
buffer — 緩存,能夠簡單的理解爲緩存,它按期從 Observable 收集數據到一個集合,而後把這些數據集合打包發射,而不是一次發射一個
-
-
flat_map — 扁平映射,將 Observable 發射的數據變換爲 Observables 集合,而後將這些 Observable 發射的數據平坦化的放進一個單獨的 Observable,能夠認爲是一個將嵌套的數據結構展開的過程。
-
-
group_by — 分組,將原來的 Observable 分拆爲 Observable 集合,將原始 Observable 發射的數據按 Key 分組,每個 Observable 發射一組不一樣的數據
-
-
map — 映射,經過對序列的每一項都應用一個函數變換 Observable 發射的數據,實質是對序列中的每一項執行一個函數,函數的參數就是這個數據項
-
-
scan — 掃描,對 Observable 發射的每一項數據應用一個函數,而後按順序依次發射這些值
-
-
window — 窗口,按期未來自 Observable 的數據分拆成一些 Observable 窗口,而後發射這些窗口,而不是每次發射一項。相似於 Buffer,但 Buffer 發射的是數據,Window 發射的是 Observable,每個 Observable 發射原始 Observable 的數據的一個子集
-
其中 flat_map 和 map 是兩個很是重要的操做符,map 的操做很簡單,就是傳入一個函數,這個函數會將數據進行轉換,一個輸入對應一個輸出
flat_map 和 map 不一樣,其返回值是一個 Observable,一個輸入對應多個輸出。
這兩個操做的使用場景很好區分,當轉換過程是同步過程時,使用 map,當轉換過程是異步過程時使用 flat_map。
Group by 在工做中操做數據庫的時候常常用到,就是按某個字段分組,在這裏也是相同的意思,會按傳遞的函數生成的key來分組,注意這裏的返回是一個分組的Observable,不能直接訂閱,須要再作一次處理。
示例代碼見附件
2.5 過濾
過濾用於從 Observable 發射的數據中進行選擇,其常見操做符以下:
-
debounce —只有在空閒了一段時間後才發射數據,通俗的說,就是若是一段時間沒有操做,就執行一次操做
-
-
distinct —去重,過濾重複數據
-
-
element_at — 取值,發射某一項數據
-
-
filter — 過濾,僅發射 Observable 中經過檢測的項
-
-
first — 首項,只發射第一項(或者知足某個條件的第一項)數據
-
-
ignore_elements — 丟棄全部數據,只發射錯誤或正常終止的通知
-
-
last — 末項,只發射最後一項數據
-
-
sample — 取樣,按期發射Observable最近的數據
-
-
skip — 跳過開始的N項數據
-
-
skip_last — 跳過最後的N項數據
-
-
take — 只發射開始的N項數據
-
-
take_last — 只發射最後的N項數據
-
其中最經常使用的是 filter,filter 就是過濾,對於數據流,僅發射經過檢測的項,有點像 SQL 中的 where 條件,只是這裏的條件是一個函數,他會遍歷一個個項,並執行這個函數,看是否知足條件,對於 知足條件的纔會給到輸出流。
示例代碼見附件
2.6 合併
合併操做符或者叫組合操做符,其常見以下:
-
and_/then/when — 經過模式 (And 條件)和計劃 (Then 次序)組合兩個或多個 Observable 發射的數據集
-
-
combine_latest — 當兩個 Observables 中的任何一個發射了一個數據時,經過一個指定的函數組合每一個 Observable 發射的最新數據(一共兩個數據),而後發射這個函數的結果。相似於 zip,可是,不一樣的是 zip 只有在每一個Observable都發射了數據才工做,而 combine_latest 任何一個發射了數據均可以工做,每次與另外一個 Observable 最近的數據壓合。
-
merge — 將多個 Observable 合併爲一個。不一樣於concat,merge不是按照添加順序鏈接,而是按照時間線來鏈接。
-
-
start_with — 在數據序列的開頭增長一項數據。start_with 的內部也是調用了 concat
-
-
switch_latest/ — 將 Observable 發射出來的多個 Observables 轉換爲僅包括最近發射單個項的 Observable
-
-
zip — 使用一個函數組合多個 Observable 發射的數據集合,而後再發射這個結果。若是多個 Observable 發射的數據量不同,則以最少的Observable 爲標準進行壓合。
-
-
concat — 按順序鏈接多個 Observable。
其中 merge 和 concat 都是合併流,區別在於一個是鏈接,一個是合併,鏈接的時候是一個流接另外一個流,合併的流是無序的,原來兩個流的元素交錯,當其中一個結束時,另外一個就算是沒有結束整個合併過程也會中斷。
示例代碼見附件
2.7 條件/布爾
這些操做符可用於單個或多個數據項,也可用於 Observable。其常見以下:
-
all — 判斷全部的數據項是否知足某個條件
-
-
amb — 給定多個 Observable,只讓第一個發射數據的 Observable 發射所有數據,其餘 Observable 將會被忽略。
-
-
contains — 判斷在發射的全部數據項中是否包含指定的數據
-
-
default_if_empty — 若是原始 Observable 正常終止後仍然沒有發射任何數據,就發射一個默認值
-
-
sequence_equal —判斷兩個 Observable 是否按相同的數據序列
-
-
skip_until — 丟棄 Observable 發射的數據,直到第二個 Observable 發送數據。(丟棄條件數據)
-
-
skip_while — 丟棄 Observable 發射的數據,直到一個指定的條件不成立(不丟棄條件數據)
-
-
take_until — 當發射的數據知足某個條件後(包含該數據),或者第二個 Observable 發送完畢,終止第一個 Observable 發送數據。
-
-
take_while — 當發射的數據知足某個條件時(不包含該數據),Observable 終止發送數據。
示例代碼見附件
3. RxPy實戰
實戰包括如下內容:
讀取QQ號碼包並去重統計
從網絡地址中獲取數據
從數據庫獲取數據
文章信息關聯做者名稱
多線程獲取網絡地址中的股票數據並統計記錄數
3.1 讀取文件內容並統計行數
需求描述:
從文件中讀取全部QQ號,並對QQ號去重統計
代碼以下:
若是文件中有多列,或者是某些字符間隔,在返回的時候再多加一個map,作一次拆分便可,不用再寫循環處理,更直接。這裏和前面示例不一樣在於有一個publish。publish 將一個普通的 Observable 轉換爲可鏈接的,可鏈接的Observable 和普通的Observable差很少,不過它並不會在被訂閱時開始發射數據,而是直到使用了 Connect 操做符時纔會開始,這樣能夠更靈活的控制發射數據的時機。好比咱們這裏須要有多個觀察者訂閱的時候。
3.2 從網絡地址中獲取數據
需求描述:
獲取新浪的美股接口數據,並打印出股票名和價格
代碼以下:
3.3 從數據庫獲取數據
需求描述:
從MySQL數據庫中讀取用戶信息並打印出來
代碼以下:
3.4 文章信息關聯做者名稱
需求描述:
將文章信息列表關聯做者名稱
代碼以下:
3.5 多線程獲取網絡地址中的股票數據
需求描述:
以多線程的方式,按列表讀取新浪接口美股的數據
代碼以下:
4. 小結
4.1 一些坑
-
理解 Rx 最關鍵的部分,就是理解 Rx 的流,包括流的源頭(Observable)、操做 (Operation)、和終點 (Subscription)。
-
-
流的初始化函數,只有在被訂閱時,纔會執行。流的操做,只有在有數據傳遞過來時,纔會進行,這⼀切都是異步的。(錯誤的理解了代碼執行時機)
-
-
在沒有弄清楚 Operator 的意思和影響前,不要使用它。
-
-
當心那些不會 complete 的 observable 和收集類型的操做符好比 reduce, to_list, scan 等,必須等到 Observable complete,纔會返回結果。若是發現你的操做鏈條徹底不返回結果,看看是否是在不會 complete 的observable 上使用了收集型的操做符
-
4.2 反應式思考
-
傳統代碼一般是命令式的,順序的,而且一次只關注一個任務,並且還必須協調和管理數據狀態
-
-
現實中的數據都是在運行中的,股市價格一直在變,微博不停的有新的話題出來,抖音不停的有人上傳新的視頻
-
-
現實中也有靜態的數據,好比沒有更新的數據庫,文件等,咱們經過查詢這些靜態數據,將靜態數據建模爲動態的,從而將其與實時的事件流組合到一塊兒,將靜的數據動起來。
-
-
事件驅動和反應式編程的區別:事件驅動式編程圍繞事件展開,反應式編程圍繞數據展開
-
-
當構建傳統基於事件的系統時,咱們常常依賴於狀態機來決定何時從事件中退訂,Rx容許咱們以聲明的方式指定結束條件的事件流,一旦事件流結束,它會清除全部未退訂訂閱
-
-
聲明式編程,專一於要作什麼(what to do),命令式編程,專一於該怎樣作(how to do)
-
反應式編程已經在淘寶有一些應用,好比在淘寶的猜你喜歡,個人淘寶,都已經實踐,其QPS,RT都有較大優化效率,這些點的應用須要對整個業務框架作一次升級 ,主要包括編程框架、中間件,以及業務方的升級等。
其中中間件的升級,包括服務框架(RPC)、網關、緩存、消息(MQ)、DB(JDBC)、限流組件、分佈式跟蹤系統、移動端 Rx 框架等等。這是一個很大的升級。而反應式架構在各個模塊上基本都有成熟的方案,除了個別領域如數據庫,基本沒有特別的瓶頸。
學習反應式編程主要在於思惟轉換,由於以前主要使用同步式命令式編程的思惟寫程序,忽然要換成以流的方式編寫,思惟必需要作轉換,好比如何經過使用相似匹配、過濾和組合等轉換函數構建集合,如何使用功能組成轉換集合等等,當思惟轉變後,一切都會變得很是天然和順滑。
這篇文章從網上找了不少的資料,面網上的資料很是有限,特別是RxPy的,基本只有官方的說明文檔。
謹以此拋磚,但願有更多的同窗能夠了解多一種編程範式,把它融入到咱們的編程工做中,把反應式編程變成咱們手中的利器。
6. 參考資料
Rx(Reactive eXtension)官網 http://reactivex.io/
https://zhuanlan.zhihu.com/p/27678951
https://www.jianshu.com/p/757393ee4a2f
http://www.javashuo.com/article/p-tssgckts-mw.html
《維基:響應式編程》
《響應式架構與 RxJava 在有贊零售的實踐》
《全面異步化:淘寶反應式架構升級