一塊兒來看 rxjs

更新日誌

  • 2018-05-26 校訂
  • 2016-12-03 初版翻譯

過去你錯過的 Reactive Programming 的簡介

你好奇於這名爲Reactive Programming(反應式編程)的新事物, 更確切地說,你想了解它各類不一樣的實現(好比 [Rx*], [Bacon.js], RAC 以及其它各類各樣的框架或庫)javascript

學習它比較困難, 由於比較缺好的學習材料(譯者注: 原文寫就時, RxJs 還在 v4 版本, 彼時社區對 RxJs 的探索還不夠完善). 我在開始學習的時候, 試圖找過教程, 不過能找到的實踐指南屈指可數, 並且這些教程只不過隔靴搔癢, 並不能幫助你作真正瞭解 RxJs 的基本概念. 若是你想要理解其中一些函數, 每每代碼庫自帶的文檔幫不到你. 說白了, 你能一下看懂下面這種文檔麼:前端

Rx.Observable.prototype.flatMapLatest(selector, [thisArg])java

按照將元素的索引合併的方法, 把一個 "observable 隊列 " 中的做爲一個新的隊列加入到 "observable 隊列的隊列" 中, 而後把 "observable 隊列的隊列" 中的一個 "observable 隊列" 轉換成一個 "僅從最近的 'observable 隊列' 產生的值構成的一個新隊列."react

這是都是什麼鬼?git

我讀了兩本書, 一本只是畫了個大體的藍圖, 另外一本則是一節一節教你 "如何使用 Reactive Libarary" . 最後我以一種艱難的方式來學習 Reactive Programming: 一遍寫, 一遍理解. 在我就任於 Futurice 的時候, 我第一次在一個真實的項目中使用它, 我在遇到問題時, 獲得了來自同事的支持.github

學習中最困難的地方是 以 Reactive(反應式) 的方式思考. 這意思就是, 放下你以往熟悉的編程中的命令式和狀態化思惟習慣, 鼓勵本身以一種不一樣的範式去思考. 至今我還沒在網上找到任何這方面的指南, 而我認爲世界上應該有一個說明如何以 Reactive(反應式) 的方式思考的教程, 這樣你才知道要如何開始使用它. 在閱讀完本文後以後. 請繼續閱讀代碼庫自帶的文檔來指引你以後的學習. 我但願, 這篇文檔對你有所幫助.web

"什麼是 Reactive Programming(反應式編程)?"

在網上能夠找到大量對此糟糕的解釋和定義. Wikipedia 的 意料之中地泛泛而談和過於理論化. Stackoverflow 的 聖經般的答案也絕對不適合初學者. Reactive Manifesto 聽起來就像是要給你公司的項目經理或者是老闆看的東西. 微軟的 Rx 術語 "Rx = Observables + LINQ + Schedulers" 也讀起來太繁重, 太微軟了, 以致於你看完後仍然一臉懵逼. 相似於 "reactive" 和 "propagation" 的術語傳達出的含義給人感受無異於你之前用過的 MV* 框架和趁手的語言已經作到的事情. 咱們現有的框架視圖固然是會對數據模型作出反應, 任何的變化固然也是要冒泡的. 要否則, 什麼東西都不會被渲染出來嘛.ajax

因此, 讓咱們撇開那些無用的說辭, 嘗試去了解本質.sql

Reactive programming(反應式編程) 是在以異步數據流來編程

固然, 這也不是什麼新東西. 事件總線或者是典型的點擊事件確實就是異步事件流, 你能夠對其進行 observe(觀察) 或者作些別的事情. 不過, Reactive 是比之更優秀的思惟模型. 你可以建立任何事物的數據流, 而不僅是從點擊和懸浮事件中. "流" 是廣泛存在的, 一切均可能是流: 變量, 用戶輸入, 屬性, 緩存, 數據結構等等. 好比, 想象你的 Twitter 時間線會成爲點擊事件一樣形式的數據流.數據庫

熟練掌握該思惟模型以後, 你還會接觸到一個使人驚喜的函數集, 其中包含對任何的數據流進行合併、建立或者從中篩選數據的工具. 它充分展示了 "函數式" 的魅力所在. 一個流能夠做爲另外一個流的輸入. 甚至多個流能夠做爲另外一個流的輸入. 你能夠合併兩個流. 你能夠篩選出一個僅包含你須要的數據的另外一個流. 你能夠從一個流映射數據值到另外一個流.

讓咱們基於 "流是 Reactive 的中心" 這個設想, 來細緻地作看一下整個思惟模型, 就從咱們熟知的 "點擊一個按鈕" 事件流開始.

Click event stream

每一個流是一個按時序不間斷的事件序列. 它可能派發出三個東西: (某種類型的)一個數值, 一個錯誤, 或者一個 "完成" 信號. 說到 "完成" , 舉個例子, 當包含了這個按鈕的當前窗口/視圖關閉時, 也就是 "完成" 信號發生時.

咱們僅能異步地捕捉到這些事件: 經過定義三種函數, 分別用來捕捉派發出的數值、錯誤以及 "完成" 信號. 有時候後二者能夠被忽略, 你只需定義用來捕捉數值的函數. 咱們把對流的 "偵聽" 稱爲訂閱(subscribing), 咱們定義的這三種函數合起來就是觀察者, 流則是被觀察的主體(或者叫"被觀察者"). 這正是設計模式中的觀察者模式.

描述這種方式的另外一種方式用 ASCII 字符來畫個導圖, 在本教程的後續的部分也能看到這種圖形.

--a---b-c---d---X---|-> a, b, c, d 表明被派發出的值 X 表明錯誤 | 表明"完成"信號 ---> 則是時間線

這些都是是老生常談了, 爲了避免讓你感到無聊, 如今來點新鮮的東西: 咱們將原生的點擊事件流進行變換, 來建立新的點擊事件流.

首先, 咱們作一個計數流, 來指明一個按鈕被點擊了多少次. 在通常的 Reactive 庫中, 每一個流都附帶了許多諸如mapfilterscan 等的方法. 當你調用這些方法之一(好比好比clickStream.map(f))時, 它返回一個基於 clickStream 的新的流. 它沒有對原生的點擊事件作任何修改. 這種(不對原有流做任何修改的)特性叫作immutability(不可變性), 而它和 Reactive(反應式) 這個概念的契合度之高比如班戟和糖漿(譯者注: 班戟就是薄煎餅, 該稱呼多見於中國廣東地區. 此句意爲 immutability 與 Reactive 兩個概念高度契合). 這樣的流容許咱們進行鏈式調用, 好比clickStream.map(f).scan(g):

clickStream: ---c----c--c----c------c--> vvvvv map(c becomes 1) vvvv ---1----1--1----1------1--> vvvvvvvvv scan(+) vvvvvvvvv counterStream: ---1----2--3----4------5-->

map(f) 方法根據你提供的函數f替換每一個被派發的元素造成一個新的流. 在上例中, 咱們將每次點擊都映射爲計數 1. scan(g) 方法則在內部運行x = g(accumulated, current), 以某種方式連續聚合處理該流以前全部的值, 在該例子中, g 就是簡單的加法. 而後, 一次點擊發生時, counterStream 就派發一個點擊數的總值.

爲了展現 Reactive 真正的能力, 咱們假設你想要作一個 "雙擊事件" 的流. 或者更厲害的, 咱們假設咱們想要獲得一個 "三擊事件流" , 甚至推廣到更廣泛的狀況, "多擊流". 如今, 深呼吸, 想象一下按照傳統的命令式和狀態化思惟習慣要如何完成這項工做? 我敢說那會煩死你了, 它必須包含各類各樣用來保持狀態的變量, 以及一些對週期性工做的處理.

然而, 以 Reactive 的方式, 它會很是簡單. 事實上, 這個邏輯只不過是四行代碼. 不過讓咱們如今忘掉代碼.不管你是個初學者仍是專家, 藉助導圖來思考, 纔是理解和構建流最好的方法.

Multiple clicks stream

圖中的灰色方框是將一個流轉換成另外一個流的方法. 首先, 每通過 "250毫秒" 的 "事件靜默" (簡單地說, 這是在 buffer(stream.throttle(250ms)) 完成的. (如今先)沒必要擔憂對這點的細節的理解, 咱們主要是演示下 Reactive 的能力.), 咱們就獲得了一個 "點擊動做" 的列表, 即, 轉換的結果是一個列表的流, 而從這個流中咱們應用 map() 將每一個列表映射成對應該隊列的長度的整數值. 最後, 咱們使用 filter(x >= 2) 方法忽略掉全部的 1. 如上: 這 3 步操做將產生咱們指望的流. 咱們以後能夠訂閱("偵聽")它, 並按咱們但願的處理方式處理流中的數據.

我但願你感覺到了這種方式的美妙. 這個例子只是一次不過揭示了冰山一角: 你能夠將相同的操做應用到不一樣種類的流上, 好比 API 返回的流中. 除此之外, 還有許多有效的函數.

"爲何我應該採用反應式編程?"

Reactive Programming (反應式編程) 提高了你代碼的抽象層次, 你能夠更多地關注用於定義業務邏輯的事件之間的互相依賴, 而沒必要寫大量的細節代碼來處理事件. RP(反應式編程)的代碼會更簡潔明瞭.

在現代網頁應用和移動應用中, 這種好處是顯而易見的, 這些場景下, 與數據事件關聯的大量 UI 事件須要被高頻地交互. 10 年前, 和 web 頁面的交互只是很基礎地提交一個長長的表單給後端, 而後執行一次簡單的從新渲染. 在這 10 年間, App 逐漸變得更有實時性: 修改表單中的單個字段可以自動觸發一次到後端的保存動做, 對某個內容的 "點贊" 須要實時反饋到其餘相關的用戶......

現今的 App 有大量的實時事件, 它們共同做用, 以帶給用戶良好的體驗. 咱們要能簡潔處理這些事件的工具, 而 Reactive Programming 方式咱們想要的.

舉例說明如何以反應式編程的方式思考

如今咱們進入到實戰. 一個真實的手把手教你如何以 RP(反應式編程) 的方式來思考的例子. 注意這裏不是隨處抄來的例子, 不是半吊子解釋的概念. 到這篇教程結束爲止, 咱們會在寫出真正的功能性代碼的同時, 理解咱們作的每一個動做.

我選擇了 JavaScript 和 RxJS 做爲工具, 緣由是, JavaScript 是當下最爲人熟知的語言, 而 [Rx*] 支持多數語言和平臺 (.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy等等). , 不管你的工具是什麼, 你能夠從這篇教程中收益.

實現一個"建議關注"盒子

在 Twitter 上, 有一個 UI 元素是建議你能夠關注的其它帳戶.

Twitter Who to follow suggestions box

咱們將着重講解如何模仿出它的核心特性:

  • 在頁面啓動時, 從 API 中加載帳戶數據, 並展現三個推薦關注者
  • 在點擊"刷新"時, 加載另外的三個推薦關注的帳戶, 造成新三行
  • 在點擊一個帳戶的 "x" 按鈕時, 清除該帳戶並展現一個新的
  • 每一行顯示帳戶的頭像和到他們主頁的連接

咱們能夠忽略其它的特性和按鈕, 它們都是次要的. 另外, Twitter 最近關閉了非認證請求接口, 做爲替代, 咱們使用 [Github 的 API] 來構建這個關注別人 UI.(注: 到本稿的最新的校訂爲止, github 的該接口對非認證用戶啓用了一段時間內訪問頻次限制)

若是你想盡早看一下完整的代碼, 請點擊[樣例代碼].

請求和回覆

你如何用 Rx 處理這個問題?

首先, (幾乎) 萬物皆可爲流 .這是 "Rx 口訣". 讓咱們從最容易的特性開始: "在頁面啓動時, 從 API 中加載帳戶數據". 這沒什麼可貴, 只須要(1) 發一個請求, (2) 讀取回復, (3) 渲染回覆的中的數據. 因此咱們直接把咱們咱們的請求當作流. 一開始就用流也許很有"殺雞焉用牛刀"的意味, 但爲了理解, 咱們須要從基本的例子開始.

在應用啓動的時候, 咱們只須要一個請求, 所以若是咱們將它做爲一個數據流, 它將會只有一個派發的值. 咱們知道以後咱們將有更多的請求, 但剛開始時只有一個.

--a------|-> 其中 a 是字符串 'https://api.github.com/users'

這是一個將請求的 URL 的流. 不管請求什麼時候發生, 它會告訴咱們兩件事: 請求發生的時刻和內容. 請求執行之時就是事件派發之時, 請求的內容就是被派發的值: 一個 URL 字符串.

建立這樣一個單值流對 [Rx*] 來講很是簡單, 官方對於流的術語, 是 "Observable"(可被觀察者), 顧名思義它是可被觀察的, 但我以爲這名字有點傻, 因此我稱呼它爲 _流_.

var requestStream = Rx.Observable.just('https://api.github.com/users');

但如今, 這只是一個字符串流, 不包含其餘操做, 因此咱們須要要在值被派發的時候作一些事情. 這依靠對流的訂閱.

requestStream.subscribe(function(requestUrl) { // 執行該請求 jQuery.getJSON(requestUrl, function(responseData) { // ... }); }

注意咱們使用了 jQuery Ajax 回調(咱們假定你應已對此有了解)來處理請求操做的異步性. 但稍等, Rx 就是處理 異步 數據流的. 難道這個請求的回覆不就是一個在將來某一刻會帶回返回數據的流麼? 從概念上講, 它看起來就是的, 咱們來嘗試寫一下.

requestStream.subscribe(function(requestUrl) { // 執行該請求 var responseStream = Rx.Observable.create(function (observer) { jQuery.getJSON(requestUrl) .done(function(response) { observer.onNext(response); }) .fail(function(jqXHR, status, error) { observer.onError(error); }) .always(function() { observer.onCompleted(); }); }); responseStream.subscribe(function(response) { // 對回覆作一些處理 }); }

Rx.Observable.create() 所作的是自定義一個流, 這個流會通知其每一個觀察者(或者說其"訂閱者" )有數據產生 (onNext()) 或發生了錯誤 (onError()). 咱們須要作的僅僅是包裝 jQuery Ajax Promise. 稍等, 這難道是說 Promise 也是一個 Observable?

 

是的. Observable 就是一個 Promise++ 對象. 在 Rx 中, 經過運行 var stream = Rx.Observable.fromPromise(promise) 你就能夠把一個 Promise 轉換成一個 Observable. 僅有的區別在於 Observables 不符合 Promises/A+ 標準, 但他們在概念上是不衝突的. 一個 Promise 就是一個僅派發一個值的 Observable. Rx 流就是容許屢次返回值的 Promise.

這個例子很能夠的, 它展現了 Observable 是如何至少有 Promise 的能力. 所以若是你喜歡 Promise, 請注意 Rx Observable 也能夠作到一樣的事.

如今回到咱們的例子上, 也許你已經注意到了, 咱們在一箇中 subscribe() 調用了另外一個 subscribe(), 這有點像回調地獄. 另外, responseStream 的建立也依賴於 requestStream. 但正如前文所述, 在 Rx 中有簡單的機制來最流做變換並支持從其餘流建立一個新的流, 接下來咱們來作這件事.

到目前爲止, 你應該知道的對流進行變換的一個基礎方法是 map(f), 將 "流 A" 中的每個元素做 f() 處理, 而後在 "流 B" 中生成一一對應的值. 若是咱們這樣處理咱們的請求和回覆流, 咱們能夠把請求 URL 映射到回覆的 Promise (被當作是流) 中.

var responseMetastream = requestStream .map(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });

這下咱們建立了一個叫作 元流 (流的流) 的奇怪的東西. 沒必要對此感到疑惑, 元流, 就是其中派發值是流的流. 你能夠把它想象成 指針): 每一個被派發的值都是對其它另外一個流的 指針 . 在咱們的例子中, 每一個請求的 URL 都被映射爲一個指針, 指向一個個包含 URL 對應的返回數據的 promise 流.

Response metastream

這個元流看上去有點讓人迷惑, 並且對咱們根本沒什麼用. 咱們只是想要一個簡單的回覆流, 其中每一個派發的值都應是一個 JSON 對象, 而不是一個包含 JSON 對象的 Promise. 如今來認識 Flatmap: 它相似於 map(), 但它是把 "分支" 流中派發出的的每一項值在 "主幹" 流中派發出來, 如此, 它就能夠對元流進行扁平化處理.(譯者注: 這裏, "分支" 流指的是元流中每一個被派發的值, "主幹" 流是指這些值有序構成的流, 因爲元流中的每一個值都是流, 做者不得不用 "主幹" 和 "分支" 這樣的比喻來描述元流與其值的關係). 在此, Flatmap 並非起到了"修正"的做用, 元流也並非一個 bug, 相反, 它們正是 Rx 中處理異步回覆流的工具.

var responseStream = requestStream .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });

Response stream

漂亮. 由於回覆流是依據請求流定義的, 設想以後
有更多的發生在請求流中的事件, 不難想象, 就會有對應的發生在回覆流中的的回覆事件:

requestStream:  --a-----b--c------------|-> responseStream: -----A--------B-----C---|-> (小寫的是一個請求, 大寫的是一個回覆)

如今咱們終於獲得了回覆流, 咱們就能夠渲染接收到的數據

responseStream.subscribe(function(response) { // 按你設想的方式渲染 `response` 爲 DOM });

整理一下到目前爲止的代碼, 以下:

var requestStream = Rx.Observable.just('https://api.github.com/users'); var responseStream = requestStream .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); }); responseStream.subscribe(function(response) { // 按你設想的方式渲染 `response` 爲 DOM });

刷新按鈕

如今咱們注意到, 回覆中的 JSON 是一個包含 100 個用戶的列表. [Github 的 API] 只容許咱們指定一頁的偏移量, 而不能指定讀取的一頁中的項目數量, 因此咱們只用到 3 個數據對象, 剩下的 97 個只能浪費掉. 咱們暫時忽略這個問題, 以後咱們會看到經過緩存回覆來處理它.

每次刷新按鈕被點擊的時候, 請求流應該派發一個新的 URL, 所以咱們會獲得一個新的回覆. 咱們須要兩樣東西: 一個刷新按鈕的點擊事件流(口訣: 萬物皆可成流), 而且咱們須要改變請求流以依賴刷新點擊流. 好在, RxJs 擁有從事件監聽器產生 Observable 的工具.

var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

既然刷新點擊事件自身不帶任何 API URL, 咱們須要映射每次點擊爲一個實際的 URL. 如今咱們將請求流改爲刷新點擊流, 這個流被映射爲每次帶有隨機的偏移參數的、到 API 的請求.

var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });

若是我直接這樣寫, 也不作自動化測試, 那這段代碼其實有個特性沒實現. 即請求不會在頁面加載完時發生, 只有當刷新按鈕被點擊的時候纔會. 但其實, 兩種行爲咱們都須要: 刷新按鈕被點擊的時候的請求, 或者是頁面剛打開時的請求.

兩種場景下須要不一樣的流:

var requestOnRefreshStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

但咱們如何才能"合併"這二者爲同一個呢? 有一個 merge() 方法. 用導圖來解釋的話, 它看起來像是這樣的.

stream A: ---a--------e-----o-----> stream B: -----B---C-----D--------> vvvvvvvvv merge vvvvvvvvv ---a-B---C--e--D--o----->

那咱們要作的事就變得很容易了:

var requestOnRefreshStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users'); var requestStream = Rx.Observable.merge( requestOnRefreshStream, startupRequestStream );

也有另一種更乾淨的、不須要中間流的寫法:

var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }) .merge(Rx.Observable.just('https://api.github.com/users'));

甚至再短、再有可讀性一點:

var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }) .startWith('https://api.github.com/users');

startWith() 會照你猜的那樣去工做: 給流一個起點. 不管你的輸入流是怎樣的, 帶 startWith(x) 的輸出流總會以 x 做爲起點. 但我這樣作還不夠 [DRY], 我把 API 字符串寫了兩次. 一種修正的作法是把 startWith() 用在 refreshClickStream 上, 這樣能夠從"模擬"在頁面加載時一次刷新點擊事件.

var requestStream = refreshClickStream.startWith('startup click') .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });

漂亮. 若是你如今回頭去看我說 "有個特性沒實現" 的那一段, 你應該能看出那裏的代碼和這裏的代碼的區別僅僅是多了一個 startWith().

使用流來創建"3個推薦關注者"的模型

到如今爲止, 咱們只是寫完了一個發生在回覆流的 subscribe() 中的 推薦關注者 的 UI. 對於刷新按鈕, 咱們要解決一個問題: 一旦你點擊了"刷新", 如今的三個推薦關注者仍然沒有被清理. 新的推薦關注者只在請求內回覆後才能拿到, 不過爲了讓 UI 看上去使人溫馨, 咱們須要在刷新按鈕被點擊的時候就清理當前的推薦關注者.

refreshClickStream.subscribe(function() { // 清理 3 個推薦關注者的 DOM 元素 });

稍等一下. 這樣作不太好, 由於這樣咱們就有兩個會影響到推薦關注者的 DOM 元素的 subscriber (另外一個是 responseStream.subscribe()), 這聽起來不符合 Separation of concerns. 還記得 Reactive 口訣嗎?

Mantra

在 "萬物皆可爲流" 的指導下, 咱們把推薦關注者構建爲一個流, 其中每一個派發出來的值都是一個包含了推薦關注人數據的 JSON 對象. 咱們會對三個推薦關注者的數據分別作這件事. 像這樣來寫:

var suggestion1Stream = responseStream .map(function(listUsers) { // 從列表中隨機獲取一個用戶 return listUsers[Math.floor(Math.random()*listUsers.length)]; });

至於獲取另外兩個用戶的流, 即 suggestion2Stream 和 suggestion3Stream, 只須要把 suggestion1Stream 複製一遍就好了. 這不夠 [DRY], 不過對咱們的教程而言, 這樣能讓咱們的示例簡單些, 同時我認爲, 思考如何在這個場景下避免重複編寫 suggestion[N]Stream 也是個好的思惟練習, 就留給讀者去考慮吧.

咱們讓渲染的過程發生在回覆流的 subscribe() 中, 而是這樣作:

suggestion1Stream.subscribe(function(suggestion) { // 渲染第 1 個推薦關注者 });

回想以前咱們說的 "刷新的時候, 清理推薦關注者", 咱們能夠簡單地將刷新單擊事件映射爲 "null" 數據(它表明當前的推薦關注者爲空), 而且在 suggestion1Stream 作這項工做, 以下:

var suggestion1Stream = responseStream .map(function(listUsers) { // 從列表中隨機獲取一個用戶 return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) );

在渲染的時候, 咱們把 null 解釋爲 "沒有數據", 隱藏它的 UI 元素.

suggestion1Stream.subscribe(function(suggestion) { if (suggestion === null) { // 隱藏第 1 個推薦關注者元素 } else { // 顯示第 1 個推薦關注者元素並渲染數據 } });

整個情景是這樣的:

refreshClickStream: ----------o--------o----> requestStream: -r--------r--------r----> responseStream: ----R---------R------R--> suggestion1Stream: ----s-----N---s----N-s--> suggestion2Stream: ----q-----N---q----N-q--> suggestion3Stream: ----t-----N---t----N-t-->

其中 N 表示 null

(譯者注: 注意, 當 refreshClickStream 產生新值, 即用戶進行點擊時, null 的產生老是馬上發生在 refreshClickStream 以後; 而 refreshClickStream => requestStream => responseStream, responseStream 中的值, 是發給 API 接口的異步請求的結果, 這個結果的產生每每會須要花一點時間, 必然在 null 以後, 所以能夠達到 "爲了讓 UI 看上去使人溫馨, 咱們須要在刷新按鈕被點擊的時候就清理當前的推薦關注者" 的效果).

稍微完善一下, 咱們會在頁面啓動的時候也會渲染 "空" 推薦關注人. 爲此能夠 startWith(null) 放在推薦關注人的流裏:

var suggestion1Stream = responseStream .map(function(listUsers) { // 從列表中隨機獲取一個用戶 return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);

最後咱們獲得的流:

refreshClickStream: ----------o---------o----> requestStream: -r--------r---------r----> responseStream: ----R----------R------R--> suggestion1Stream: -N--s-----N----s----N-s--> suggestion2Stream: -N--q-----N----q----N-q--> suggestion3Stream: -N--t-----N----t----N-t-->

關閉推薦關注人, 並利用已緩存的回覆數據

目前還有一個特性沒有實現. 每一個推薦關注人格子應該有它本身的 'x' 按鈕來關閉它, 而後加載另外一個數據來代替. 也許你的第一反應是, 用一種簡單方法: 在點擊關閉按鈕的時候, 發起一個請求, 而後更新這個推薦人:

var close1Button = document.querySelector('.close1'); var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click'); // close2Button 和 close3Button 重複此過程 var requestStream = refreshClickStream.startWith('startup click') .merge(close1ClickStream) // 把關閉按鈕加在這裏 .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });

然而這沒不對. (因爲 refreshClickStream 影響了全部的推薦人流, 因此)該過程會關閉而且從新加載_全部的_推薦關注人, 而不是僅更新咱們想關掉的那一個. 這裏有不少方式來解決這個問題, 爲了玩點炫酷的, 咱們會重用以前的回覆數據中別的推薦人. API 返回的數據每頁包含 100 個用戶, 但咱們每次只用到其中的 3 個, 因此咱們有不少有效的刷新數據能夠用, 不必再請求新的.

再一次的, 讓咱們用流的思惟來思考. 當一個 'close1'點擊事件發生的時候, 咱們使用 responseStream中 最近被派發的 回覆來從回覆的用戶列表中隨機獲取一個用戶. 以下:

requestStream: --r---------------> responseStream: ------R-----------> close1ClickStream: ------------c-----> suggestion1Stream: ------s-----s----->

在 [Rx*] 中, 有一個合成器方法叫作 combineLatest, 彷佛能夠完成咱們想作的事情. 它把兩個流 A 和 B 做爲其輸入, 而當其中任何一個派發值的時候, combineLatest 會把二者最近派發的值 a 和 b 按照 c = f(x,y) 的方法合併處理再輸出, 其中 f 是你能夠定義的方法. 用圖來解釋也許更清楚:

stream A: --a-----------e--------i--------> stream B: -----b----c--------d-------q----> vvvvvvvv combineLatest(f) vvvvvvv ----AB---AC--EC---ED--ID--IQ----> 在該例中, f 是一個轉換爲全大寫的函數

咱們能夠把 combineLatest() 用在 close1ClickStream 和 responseStream 上, 所以一旦 "關閉按鈕1" 被點擊(致使 close1ClickStream 產生新值), 咱們都能獲得最新的返回數據, 並在 suggestion1Stream中產生一個新的值. 因爲 combineLatest() 的對稱性的, 任什麼時候候, 只要 responseStream 派發了一個新的回覆, 它也將合併最新的一次 '關閉按鈕1被點擊' 事件來產生一個新的推薦關注人. 這個特性很是有趣, 由於它容許咱們簡化咱們以前的 suggestion1Stream , 以下:

var suggestion1Stream = close1ClickStream .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);

在上述思考中, 還有一點東西被遺漏. combineLatest() 使用了兩個數據源中最近的數據, 可是若是這些源中的某個從未派發過任何東西, combineLatest() 就不能產生一個數據事件到輸出流. 若是你再細看上面的 ASCII 圖, 你會發現當第一個流派發 a 的時候, 不會有任何輸出. 只有當第二個流派發 b 的時候才能產生一個輸出值.

有幾種方式來解決該問題, 咱們仍然採起最簡單的一種, 就是在頁面啓動的時候模擬一次對 '關閉按鈕1' 按鈕的點擊:

var suggestion1Stream = close1ClickStream.startWith('startup click') // 把對"關閉按鈕1"的點擊的模擬加在這裏 .combineLatest(responseStream, function(click, listUsers) {l return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);

總結整理

如今咱們的工做完成了. 完整的代碼以下所示:

var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click'); var closeButton1 = document.querySelector('.close1'); var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click'); // close2 和 close3 是一樣的邏輯 var requestStream = refreshClickStream.startWith('startup click') .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var responseStream = requestStream .flatMap(function (requestUrl) { return Rx.Observable.fromPromise($.ajax({url: requestUrl})); }); var suggestion1Stream = close1ClickStream.startWith('startup click') .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null); // suggestion2Stream 和 suggestion3Stream 是一樣的邏輯 suggestion1Stream.subscribe(function(suggestion) { if (suggestion === null) { // 隱藏第 1 個推薦關注者元素 } else { // 顯示第 1 個推薦關注者元素並渲染數據 } });

你能夠在這裏查看完整的[樣例代碼]

很慚愧, 這只是一個微小的代碼示例, 但它的信息量很大: 它着重表現了, 如何對關注點進行適當的隔離, 從而對不一樣流進行管理, 甚至充分利用了返回數據的流. 這樣的函數式風格使得代碼像聲明式多於像命令式: 咱們並不用給出一個要執行的的結構化序列, 咱們只是經過定義流之間的關係來表達系統中每件事物是什麼. 舉例來講, 經過 Rx, 咱們告訴計算機 _suggestion1Stream 就是 點擊關閉按鈕1 的流, 與最近一個API返回的(用戶中隨機選擇的一個)的用戶的流, 刷新時產生 null 的流, 和應用啓動時產生 null 的流的合併流_.

回想一下那些你熟稔的流程控制的語句(好比 ifforwhile), 以及 Javascript 應用中隨處可見的基於回調的控制流. (只要你願意, )你甚至能夠在上文的 subscribe() 中不寫 if 和 else, 而是(在 observable 上)使用 filter()(這一塊我就不寫實現細節了, 留給你做爲練習). 在 Rx 中, 有不少流處理方法, 好比 mapfilterscanmergecombineLateststartWith, 以及很是多用於控制一個事件驅動的程序的流的方法. 這個工具集讓你用更少的代碼而寫出更強大的效果.

接下來還有什麼?

若是你願意用 [Rx*] 來作反應式編程, 請花一些時間來熟悉這個 函數列表, 其中涉及如何變換, 合併和建立 Observables (被觀察者). 若是你想以圖形的方式理解這些方法, 能夠看一下 彈珠圖解 RxJava. 一旦你對理解某物有困難的時候, 試着畫一畫圖, 基於圖來思考, 看一下函數列表, 再繼續思考. 以個人經驗, 這樣的學習流程很是有用.

一旦你熟悉瞭如何使用 [Rx] 進行變成, 理解冷熱酸甜, 想吃就吃...哦不, 冷熱 Observables 就頗有必要了. 反正就算你跳過了這一節, 你也會回來從新看的, 勿謂言之不預也. 建議經過學習真正的函數式編程來磨練你的技巧, 而且熟悉影響各類議題, 好比"影響 [Rx] 的反作用"什麼的.

不過, 實現了反應式編程的庫並不是並不是只有 [Rx]. [Bacon.js] 的運行機制就很直觀, 理解它不像理解 [Rx] 那麼難; [Elm Language] 在特定的應用場景有很強的生命裏: 它是一種會編譯到 Javascript + HTML + CSS 的反應式編程語言, 它的特點在於 [time travelling debugger]. 這些都很不錯.

Rx 在嚴重依賴事件的前端應用中表現優秀. 但它不僅是隻爲客戶端應用服務的, 在接近數據庫的後端場景中也大有可爲. 實際上, [RxJava 正是激活 Netflex 服務端併發能力的關鍵]. Rx 不是一個嚴格限於某種特定類型應用的框架或者是語言. 它實際上是一種範式, 你能夠在任何事件驅動的軟件中實踐它.

 


原文連接本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索