很明顯你是有興趣學習這種被稱做響應式編程的新技術纔來看這篇文章的。html
學習響應式編程是很困難的一個過程,特別是在缺少優秀資料的前提下。剛開始學習時,我試過去找一些教程,並找到了爲數很少的實用教程,可是它們都流於表面,從沒有圍繞響應式編程構建起一個完整的知識體系。庫的文檔每每也沒法幫助你去了解它的函數。不信的話能夠看一下這個:前端
經過合併元素的指針,將每個可觀察的元素序列放射到一個新的可觀察的序列中,而後將多個可觀察的序列中的一個轉換成一個只從最近的可觀察序列中產生值得可觀察的序列。java
天啊。react
我看過兩本書,一本只是講述了一些概念,而另外一本則糾結於如何使用響應式編程庫。我最終放棄了這種痛苦的學習方式,決定在開發中一邊使用響應式編程,一邊理解它。在 Futurice 工做期間,我嘗試在真實項目中使用響應式編程,而且當我遇到困難時,獲得了同事們的幫助。jquery
在學習過程當中最困難的一部分是 以響應式編程的方式思考 。這意味着要放棄命令式且帶狀態的編程習慣,而且要強迫你的大腦以一種不一樣的方式去工做。在互聯網上我找不到任何關於這方面的教程,而我以爲這世界須要一份關於怎麼以響應式編程的方式思考的實用教程,這樣你就有足夠的資料去起步。庫的文檔沒法爲你的學習提供指引,而我但願這篇文章能夠。git
在互聯網上有着一大堆糟糕的解釋與定義。Wikipedia 一如既往的空泛與理論化。Stackoverflow 的權威答案明顯不適合初學者。Reactive Manifesto 看起來是你展現給你公司的項目經理或者老闆們看的東西。微軟的 Rx terminology "Rx = Observables + LINQ + Schedulers" 過於重量級且微軟味十足,只會讓大部分人困惑。相對於你所使用的 MV* 框架以及鍾愛的編程語言,"Reactive" 和 "Propagation of change" 這些術語並無傳達任何有意義的概念。框架的 Views 層固然要對 Models 層做出反應,改變固然會傳播。若是沒有這些,就沒有東西會被渲染了。github
因此不要再扯這些廢話了。ajax
一方面,這並非什麼新東西。Event buses 或者 Click events 本質上就是異步事件流,你能夠監聽並處理這些事件。響應式編程的思路大概以下:你能夠用包括 Click 和 Hover 事件在內的任何東西建立 Data stream。Stream 廉價且常見,任何東西均可以是一個 Stream:變量、用戶輸入、屬性、Cache、數據結構等等。舉個例子,想像一下你的 Twitter feed 就像是 Click events 那樣的 Data stream,你能夠監聽它並相應的做出響應。數據庫
在這個基礎上,你還有使人驚豔的函數去組合、建立、過濾這些 Streams。這就是函數式魔法的用武之地。Stream 能接受一個,甚至多個 Stream 爲輸入。你能夠融合兩個 Stream,也能夠從一個 Stream 中過濾出你感興趣的 Events 以生成一個新的 Stream,還能夠把一個 Stream 中的數據值 映射到一個新的 Stream 中。編程
既然 Stream 在響應式編程中如此重要,那麼咱們就應該好好的瞭解它們,就從咱們熟悉的"Clicks on a button" Event stream 開始。
Stream 就是一個按時間排序的 Events 序列,它能夠放射三種不一樣的 Events:(某種類型的)Value、Error 或者一個" Completed" Signal。考慮一下"Completed"發生的時機,例如,當包含這個按鈕的窗口或者視圖被關閉時。
經過分別爲 Value、Error、"Completed"定義事件處理函數,咱們將會異步地捕獲這些 Events。有時能夠忽略 Error 與"Completed",你只須要定義 Value 的事件處理函數就行。監聽一個 Stream 也被稱做是訂閱 ,而咱們所定義的函數就是觀察者,Stream則是被觀察者,其實就是 Observer Design Pattern。
上面的示意圖也可使用ASCII重畫爲下圖,在下面的部分教程中咱們會使用這幅圖:
--a---b-c---d---X---|-> a, b, c, d are emitted values X is an error | is the 'completed' signal ---> is the timeline
既然已經開始對響應式編程感到熟悉,爲了避免讓你以爲無聊,咱們能夠嘗試作一些新東西:咱們將會把一個 Click event stream 轉爲新的 Click event stream。
首先,讓咱們作一個能記錄一個按鈕點擊了多少次的計數器 Stream。在常見的響應式編程庫中,每一個Stream都會有多個方法,如 map
, filter
, scan
, 等等。當你調用其中一個方法時,例如 clickStream.map(f)
,它就會基於原來的 Click stream 返回一個新的 Stream 。它不會對原來的 Click steam 做任何修改。這個特性稱爲不可變性,它對於響應式編程 Stream,就若是汁對於薄煎餅。咱們也能夠對方法進行鏈式調用,如 clickStream.map(f).scan(g)
:
clickStream: ---c----c--c----c------c--> vvvvv map(c becomes 1) vvvv ---1----1--1----1------1--> vvvvvvvvv scan(+) vvvvvvvvv counterStream: ---1----2--3----4------5-->
map(f)
會根據你提供的 f
函數把原 Stream 中的 Value 分別映射到新的 Stream 中。在咱們的例子中,咱們把每一次 Click 都映射爲數字 1。scan(g)
會根據你提供的 g
函數把 Stream 中的全部 Value 聚合成一個 Value x = g(accumulated, current)
,這個示例中 g
只是一個簡單的添加函數。而後,每 Click 一次, counterStream
就會把點擊的總次數發給它的觀察者。
爲了展現響應式編程真正的實力,讓咱們假設你想獲得一個包含「雙擊」事件的 Stream。爲了讓它更加有趣,假設咱們想要的這個 Stream 要同時考慮三擊(Triple clicks),或者更加寬泛,連擊(兩次或更多)。深呼吸一下,而後想像一下在傳統的命令式且帶狀態的方式中你會怎麼實現。我敢打賭代碼會像一堆亂麻,而且會使用一些變量保存狀態,同時也有一些計算時間間隔的代碼。
而在響應式編程中,這個功能的實現就很是簡單。事實上,這邏輯只有 4 行代碼。但如今咱們先無論那些代碼。用圖表的方式思考是理解怎樣構建Stream的最好方法,不管你是初學者仍是專家。
灰色的方框是用來轉換 Stream 函數的。首先,簡而言之,咱們把連續 250 ms 內的 Click 都積累到一個列表中(就是buffer(stream.throttle(250ms)
作的事。不要在乎這些細節,咱們只是展現一下響應式編程而已)。結果是一個列表的 Stream ,而後咱們使用 map()
把每一個列表映射爲一個整數,即它的長度。最終,咱們使用 filter(x >= 2)
把整數 1 給過濾掉。就這樣,3 個操做就生成了咱們想要的 Stream。而後咱們就能夠訂閱(「監聽」)這個 Stream,並以咱們所但願的方式做出反應。
我但願你能感覺到這個示例的優美之處。這個示例只是冰山一角:你能夠把一樣的操做應用到不一樣種類的 Stream 上,例如,一個 API 響應的 Stream;另外一方面,還有不少其它可用的函數。
響應式編程提升了代碼的抽象層級,因此你能夠只關注定義了業務邏輯的那些相互依賴的事件,而非糾纏於大量的實現細節。RP 的代碼每每會更加簡明。
特別是在開發如今這些有着大量與數據事件相關的 UI events 的高互動性 Webapps、手機 apps 的時候,RP 的優點就更加明顯。10年前,網頁的交互就只是提交一個很長的表單到後端,而在前端只產生簡單的渲染。Apps 就表現得更加的實時了:修改一個表單域就能自動地把修改後的值保存到後端,爲一些內容"點贊"時,會實時的反應到其它在線用戶那裏等等。
如今的 Apps 有着大量各類各樣的實時 Events,以給用戶提供一個交互性較高的體驗。咱們須要工具去應對這個變化,而響應式編程就是一個答案。
讓咱們作一些實踐。一個真實的例子一步一步的指導咱們以 RP 的方式思考。不是虛構的例子,也沒有隻解釋了一半的概念。學完教程以後,咱們將寫出真實可用的代碼,並作到知其然,知其因此然。
在這個教程中,我將會使用 JavaScript 和 RxJS 做爲工具 ,由於JavaScript是如今最多人會的語言,而 Rx* library family 有多種語言版本,並支持多種平臺(.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy等等)。因此,不管你用的是什麼工具,你都能從下面這個教程中受益。
在 Twitter 上,這個代表其餘帳戶的 UI 元素看起來是這樣的:
咱們將會重點模擬它的核心功能,以下:
咱們能夠忽略其它的特性和按鈕,由於它們是次要的。同時,由於 Twitter 最近關閉了對非受權用戶的 API,咱們將會爲 Github 實現這個推薦界面,而非 Twitter。這是Github獲取用戶的API。
若是你想先看一下最終效果,這裏有完成後的代碼 http://jsfiddle.net/staltz/8jFJH/48/。
在 Rx 中你該怎麼處理這個問題呢? 好吧,首先,(幾乎) 全部的東西均可以轉爲一個Stream 。這就是Rx的咒語。讓咱們先從最簡單的特性開始:"在啓動時,從API加載3個賬戶的數據"。這並無什麼特別,就只是簡單的(1)發出一個請求,(2)收到一個響應,(3)渲染這個響應。因此,讓咱們繼續,並用Stream表明咱們的請求。一開始可能會以爲殺雞用牛刀,但咱們應當從最基本的開始,對吧?
在啓動的時候,咱們只須要發出一個請求,因此若是咱們把它轉爲一個Data stream的話,那就是一個只有一個Value的Stream。稍後,咱們知道將會有多個請求發生,但如今,就只有一個請求。
--a------|-> Where a is the string 'https://api.github.com/users'
這是一個咱們想向其發出請求的 URL 的 Stream。每當一個請求事件發生時,它會告訴咱們兩件事:"何時"與"什麼東西"。"何時"這個請求會被執行,就是何時這個 Event 會被映射。"什麼東西"會被請求,就是這個映射出來的值:一個包含 URL 的 String。
在 RX 中,建立只有一個值的 Stream 是很是簡單的。官方把一個 Stream 稱做「Observable」,由於它能夠被觀察,可是我發現那是個很愚蠢的名子,因此我把它叫作 Stream*。
var requestStream = Rx.Observable.just('https://api.github.com/users');
可是如今,那只是一個包含了String的Stream,並無其餘操做,因此咱們須要以某種方式使那個值被映射。就是經過subscribing 這個 Stream。
requestStream.subscribe(function(requestUrl) { // execute the request jQuery.getJSON(requestUrl, function(responseData) { // ... }); }
留意一下咱們使用了 jQuery 的 Ajax 函數(咱們假設你已經知道 should know already)去處理異步請求操做。但先等等,Rx 能夠用來處理異步 Data stream。那這個請求的響應就不能看成一個包含了將會到達的數據的 Stream 嗎?固然,從理論上來說,應該是能夠的,因此咱們嘗試一下。
requestStream.subscribe(function(requestUrl) { // execute the request var responseStream = Rx.Observable.create(function (observer) { jQuery.getJSON(requestUrl) .done(function(response) { observer.onNext(response); }) .fail(function(jqXHR, status, error) { observer.onError(error); }) .always(function() { observer.onCompleted(); }); }); responseStream.subscribe(function(response) { // do something with the response }); }
Rx.Observable.create()
所作的事就是經過顯式的通知每個 Observer (或者說是「Subscriber」) Data events(onNext()
)或者 Errors ( onError()
)來建立你本身的 Stream。而咱們所作的就只是把 jQuery Ajax Promise 包裝起來而已。打擾一下,這意味者Promise本質上就是一個Observable?
是的。
Observable 就是 Promise++。在 Rx 中,你能夠用 var stream = Rx.Observable.fromPromise(promise)
輕易的把一個 Promise 轉爲 Observable,因此咱們就這樣子作吧。惟一的不一樣就是 Observable 並不遵循 Promises/A+,但概念上沒有衝突。Promise 就是隻有一個映射值的 Observable。Rx Stream 比 Promise 更進一步的是容許返回多個值。
這樣很是不錯,並展示了 Observables 至少有 Promise 那麼強大。因此若是你相信 Promise 宣傳的那些東西,那麼也請留意一下 Rx Observables 能勝任些什麼。
如今回到咱們的例子,若是你已經注意到了咱們在 subscribe()
內又調用了另一個 subscribe()
,這相似於 Callback hell。一樣,你應該也注意到 responseStream
是創建在 requestStream
之上的。就像你以前瞭解到的那樣,在 Rx 內有簡單的機制能夠從其它 Stream 中轉換並建立出新的 Stream,因此咱們也應該這樣子作。
你如今須要知道的一個基本的函數是 [map(f)](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md#rxobservableprototypemapselector-thisarg)
,它分別把 f()
應用到 Stream A 中的每個值中,並把返回的值放進 Stream B 裏。若是咱們也對請求 Stream 與響應 Stream 進行一樣的處理,咱們能夠把 Request URL 映射爲響應 Promise(而 Promise 能夠轉爲 Streams)。
var responseMetastream = requestStream .map(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });
而後,咱們將會創造一個叫作" Metastream "的怪物:包含 Stream 的 Stream。暫時不須要懼怕。Metastream 就是一個 Stream,其中映射的值仍是另一個 Stream。你能夠把它想像爲 pointers:每一個映射的值都是一個指向其它 Stream 的指針。在咱們的例子裏,每一個請求 URL 都會被映射一個指向包含響應 Promise stream 的指針。
Response 的 Metastream 看起來會讓人困惑,而且看起來也沒有幫到咱們什麼。咱們只想要一個簡單的響應 stream,其中每一個映射的值應該是 JSON 對象,而不是一個 JSON 對象的'Promise'。是時候介紹 (Mr. Flatmap)(https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md#rxobservableprototypeflatmapselector-resultselector) 了:它是 map()
的一個版本,經過把應用到"trunk" Stream 上的全部操做都應用到"branch" Stream 上,能夠"flatten" Metastream。Flatmap 並非用來"修復" Metastream 的,由於 Metastream 也不是一個漏洞,這只是一些用來處理 Rx 中的異步響應的工具。
var responseStream = requestStream .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });
很好。由於響應stream是根據請求 stream定義的,因此 若是 咱們後面在請求 stream上發起更多的請求的話,在響應 stream上咱們將會獲得相應的響應事件,就像預期的那樣:
requestStream: --a-----b--c------------|-> responseStream: -----A--------B-----C---|-> (lowercase is a request, uppercase is its response)
如今,咱們終於有了一個響應 stream,因此能夠把收到的數據渲染出來了:
responseStream.subscribe(function(response) { // render `response` to the DOM however you wish });
把目前爲止全部的代碼放到一塊兒就是這樣:
var requestStream = Rx.Observable.just('https://api.github.com/users'); var responseStream = requestStream .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); }); responseStream.subscribe(function(response) { // render `response` to the DOM however you wish });
我以前並無提到返回的 JSON 是一個有着 100 個用戶數據的列表。由於這個 API 只容許咱們設置偏移量,而沒法設置返回的用戶數,因此咱們如今是隻用了 3 個用戶的數據而浪費了另外 97 個的數據。這個問題暫時能夠忽略,稍後咱們會學習怎麼緩存這些數據。
每點擊一次刷新按鈕,請求 stream 就會映射一個新的 URL,同時咱們也能獲得一個新的響應。咱們須要兩樣東西:一個是刷新按鈕上 Click events 組成的 Stream(咒語:一切都能是 Stream),同時咱們須要根據刷新 click stream 而改變請求 stream。幸運的是,RxJS 提供了從 Event listener 生成 Observable 的函數。
var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
既然刷新 click event 自己並無提供任何要請求的 API URL,咱們須要把每一次的 Click 都映射爲一個實際的 URL。如今,咱們把刷新 click stream 改成新的請求 stream,其中每個 Click 都分別映射爲帶有隨機偏移量的 API 端點。
var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });
由於我比較笨而且也沒有使用自動化測試,因此我剛把以前作好的一個特性毀掉了。如今在啓動時不會再發出任何的請求,而只有在點擊刷新按鈕時纔會。額...這兩個行爲我都須要:不管是點擊刷新按鈕時仍是剛打開頁面時都該發出一個請求。
咱們知道怎麼分別爲這兩種狀況生成 Stream:
var requestOnRefreshStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
但咱們怎樣才能把這兩個"融合"爲一個呢?好吧,有 merge()
函數。這就是它作的事的圖解:
stream A: ---a--------e-----o-----> stream B: -----B---C-----D--------> vvvvvvvvv merge vvvvvvvvv ---a-B---C--e--D--o----->
這樣就簡單了:
var requestOnRefreshStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users'); var requestStream = Rx.Observable.merge( requestOnRefreshStream, startupRequestStream );
還有一個更加簡潔的可選方案,不須要使用中間變量。
var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }) .merge(Rx.Observable.just('https://api.github.com/users'));
甚至能夠更簡短,更具備可讀性:
var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }) .startWith('https://api.github.com/users');
startWith()
函數作的事和你預期的徹底同樣。不管你輸入的 Stream 是怎樣,startWith(x)
輸出的 Stream 一開始都是 x 。可是還不夠 DRY,我重複了 API 終端 string。一種修復的方法是去掉 refreshClickStream
最後的startWith()
,並在一開始的時候"模擬"一次刷新 Click。
var requestStream = refreshClickStream.startWith('startup click') .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });
很好。若是你把以前我"毀掉了的版本"的代碼和如今的相比,就會發現惟一的不一樣是加了 startWith()
函數。
到如今爲止,咱們只是談及了這個推薦 UI 元素在 responeStream 的 subscribe()
內執行的渲染步驟。對於刷新按鈕,咱們還有一個問題:當你點擊‘刷新’ 時,當前存在的三個推薦並不會被清除。新的推薦會在響應到達後出現,爲了讓 UI 看起來舒服一些,當點擊刷新時,咱們須要清理掉當前的推薦。
refreshClickStream.subscribe(function() { // clear the 3 suggestion DOM elements });
不,別那麼快,朋友。這樣很差,咱們如今有兩個訂閱者會影響到推薦的 DOM 元素(另一個是responseStream.subscribe()
),並且這樣徹底不符合 Separation of concerns。還記得響應式編程的咒語麼?
因此讓咱們把顯示的推薦設計成一個 stream,其中每個映射的值都是包含了推薦內容的 JSON 對象。咱們以此把三個推薦內容分開來。如今第一個推薦看起來是這樣子的:
var suggestion1Stream = responseStream .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)]; });
其餘的, suggestion2Stream
和 suggestion3Stream
能夠簡單的拷貝 suggestion1Stream
的代碼來使用。這不是 DRY,它會讓咱們的例子變得更加簡單一些,加之我以爲這是一個能夠幫助考慮如何減小重複的良好實踐。
咱們不在 responseStream 的 subscribe() 中處理渲染了,咱們這麼處理:
suggestion1Stream.subscribe(function(suggestion) { // render the 1st suggestion to the DOM });
回到"當刷新時,清理掉當前的推薦",咱們能夠很簡單的把刷新點擊映射爲 null
,而且在 suggestion1Stream
中包含進來,以下:
var suggestion1Stream = responseStream .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) );
當渲染時,null
解釋爲"沒有數據",因此把 UI 元素隱藏起來。
suggestion1Stream.subscribe(function(suggestion) { if (suggestion === null) { // hide the first suggestion DOM element } else { // show the first suggestion DOM element // and render the data } });
如今的示意圖:
refreshClickStream: ----------o--------o----> requestStream: -r--------r--------r----> responseStream: ----R---------R------R--> suggestion1Stream: ----s-----N---s----N-s--> suggestion2Stream: ----q-----N---q----N-q--> suggestion3Stream: ----t-----N---t----N-t-->
其中,N
表明了 null
做爲一種補充,咱們也能夠在一開始的時候就渲染「空的」推薦內容。這經過把 startWith(null)
添加到 Suggestion stream 就完成了:
var suggestion1Stream = responseStream .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);
如今結果是:
refreshClickStream: ----------o---------o----> requestStream: -r--------r---------r----> responseStream: ----R----------R------R--> suggestion1Stream: -N--s-----N----s----N-s--> suggestion2Stream: -N--q-----N----q----N-q--> suggestion3Stream: -N--t-----N----t----N-t-->
還有一個功能須要實現。每個推薦,都該有本身的"X"按鈕以關閉它,而後在該位置加載另外一個推薦。最初的想法,點擊任何關閉按鈕時都須要發起一個新的請求:
var close1Button = document.querySelector('.close1'); var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click'); // and the same for close2Button and close3Button var requestStream = refreshClickStream.startWith('startup click') .merge(close1ClickStream) // we added this .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });
這個沒有效果。這將會關閉而且從新加載 全部 的推薦,而不是僅僅處理咱們點擊的那一個。有一些不同的方法能夠解決,而且讓它變得更加有趣,咱們能夠經過複用以前的請求來解決它。API 的響應頁面有 100 個用戶,而咱們僅僅使用其中的三個,因此還有不少的新數據可使用,無須從新發起請求。
一樣的,咱們用Stream的方式來思考。當點擊'close1'時,咱們想要用 responseStream
最近的映射從響應列表中獲取一個隨機的用戶,如:
requestStream: --r---------------> responseStream: ------R-----------> close1ClickStream: ------------c-----> suggestion1Stream: ------s-----s----->
在 Rx* 中, 叫作鏈接符函數的 combineLatest
彷佛實現了咱們想要的功能。它接受兩個 Stream,A 和 B 做爲輸入,當其中一個 Stream 發射一個值時, combineLatest
把最近兩個發射的值 a 和 b 從各自的 Stream 中取出而且返回一個 c = f(x,y)
,其中 f
爲你定義的函數。用圖來表示更好:
stream A: --a-----------e--------i--------> stream B: -----b----c--------d-------q----> vvvvvvvv combineLatest(f) vvvvvvv ----AB---AC--EC---ED--ID--IQ----> where f is the uppercase function
咱們能夠在 close1ClickStream
和 responseStream
上使用 combineLatest(),因此不管何時當一個按鈕被點擊時,咱們能夠得到最新的響應發射值,而且在 suggestion1Stream
上產生一個新的值。另外一方面,combineLatest() 是對稱的,當一個新的響應在 responseStream
發射時,它將會把最後的'關閉 1'的點擊事件一塊兒合併來產生一個新的推薦。這是有趣的,由於它容許咱們把以前的 suggestion1Stream
代碼簡化成下邊這個樣子:
var suggestion1Stream = close1ClickStream .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);
還有一個問題須要解決。combineLatest() 使用最近的兩個數據源,可是當其中一個來源沒發起任何事件時,combineLatest() 沒法在 Output stream 中產生一個 Data event。從上邊的 ASCII 圖中,你能夠看到,當第一個 Stream 發射值 a
時,這個值時並無任何輸出產生,只有當第二個 Stream 發射值 b
時纔有值輸出。
有多種方法能夠解決這個問題,咱們選擇最簡單的一種,一開始在'close 1'按鈕上模擬一個點擊事件:
var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this .combineLatest(responseStream, function(click, listUsers) {l return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);
終於完成了,全部的代碼合在一塊兒是這樣子:
var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click'); var closeButton1 = document.querySelector('.close1'); var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click'); // and the same logic for close2 and close3 var requestStream = refreshClickStream.startWith('startup click') .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var responseStream = requestStream .flatMap(function (requestUrl) { return Rx.Observable.fromPromise($.ajax({url: requestUrl})); }); var suggestion1Stream = close1ClickStream.startWith('startup click') .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null); // and the same logic for suggestion2Stream and suggestion3Stream suggestion1Stream.subscribe(function(suggestion) { if (suggestion === null) { // hide the first suggestion DOM element } else { // show the first suggestion DOM element // and render the data } });
你能夠查看這個最終效果 http://jsfiddle.net/staltz/8jFJH/48/
這段代碼雖然短小,但實現了很多功能:它適當的使用 Separation of concerns 實現了對 Multiple events 的管理,甚至緩存了響應。函數式的風格讓代碼看起來更加 Declarative 而非 Imperative:咱們並不是給出一組指令去執行,而是經過定義 Stream 之間的關係 定義這是什麼 。舉個例子,咱們使用 Rx 告訴計算機 *suggestion1Stream*
是 由 'close 1' Stream 與最新響應中的一個用戶合併而來,在程序剛運行或者刷新時則是 null
。
留意一下代碼中並無出現如 if
、 for
、 while
這樣的控制語句,或者通常 JavaScript 應用中典型的基於回調的控制流。若是你想使用 filter()
,上面的 subscribe()
中甚至能夠不用 if
、 else
(實現細節留給讀者做爲練習)。在 Rx 中,咱們有着像 map
、 filter
、 scan
、 merge
、 combineLatest
、 startWith
這樣的 Stream 函數,甚至更多相似的函數去控制一個事件驅動(Event-driven)的程序。這個工具集讓你能夠用更少的代碼實現更多的功能。
若是你以爲 Rx* 會成爲你首選的響應式編程庫,花點時間去熟悉這個big list of functions,它包括瞭如何轉換、合併、以及建立 Observable。若是你想經過圖表去理解這些函數,請看 RxJava's very useful documentation with marble diagrams。不管何時你遇到問題,畫一下這些圖,思考一下,看一下這一大串函數,而後繼續思考。以我我的經驗,這樣效果很明顯。
一旦你開始使用 Rx 去編程,頗有必要去理解 Cold vs Hot Observables 中的概念。若是忽略了這些,你一不當心就會被它坑了。我提醒過你了。經過學習真正的函數式編程去提高本身的技能,並熟悉那些會影響到 Rx 的問題,好比反作用。
可是響應式編程不只僅是 Rx。還有相對容易理解的 Bacon.js,它沒有 Rx 那些怪癖。Elm Language 則以它本身的方式支持 RP:它是一門會編譯成 Javascript + HTML + CSS 的響應式編程語言 ,並有一個 time travelling debugger。很是厲害。
Rx 在須要處理大量事件的 Frontend 和 Apps 中很是有用。但它不只僅能用在客戶端,在後端或者與數據庫交互時也很是有用。事實上,RxJava 是實現Netflix's API服務器端併發的一個重要組件 。Rx 並非一個只能在某種應用或者語言中使用的 Framework。它本質上是一個在開發任何 Event-driven 軟件中都能使用的編程範式。
若是教程幫到你了,請支持。