So you're curious in learning this new thing called Reactive Programming, particularly its variant comprising of Rx, Bacon.js, RAC, and others.html
相信大家在學習響應式編程這個新技術的時候都會充滿了好奇,特別是它的一些變體,包括Rx系列、Bacon.js、RAC和其餘的一些變體。前端
Learning it is hard, even harder by the lack of good material. When I started, I tried looking for tutorials. I found only a handful of practical guides, but they just scratched the surface and never tackled the challenge of building the whole architecture around it. Library documentations often don't help when you're trying to understand some function. I mean, honestly, look at this:java
Rx.Observable.prototype.flatMapLatest(selector, [thisArg])react
Projects each element of an observable sequence into a new sequence of observable sequences by incorporating the element's index and then transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.jquery
Holy cow.android
學習響應式編程是個困難的過程,尤爲是在當前缺少優秀資料的前提下。起初,我試圖尋找一些教程,卻只找到了少許的實踐指南,並且它們講的都很是淺顯(scratched the surface),歷來沒人接受圍繞着響應式編程創建一個完整知識體系的挑戰。而官方文檔一般也並不能徹底地幫助你理解某些函數,它們一般看起來很繞,不信請看這裏:git
Rx.Observable.prototype.flatMapLatest(selector, [thisArg])github
根據元素下標,將可觀察序列中每一個元素一一映射到一個新的可觀察序列當中,而後...%…………%&¥#@@……&**(暈了)web
天吶,這簡直太繞了!ajax
I've read two books, one just painted the big picture, while the other dived into how to use the Reactive library. I ended up learning Reactive Programming the hard way: figuring it out while building with it. At my work in Futurice I got to use it in a real project, and had the support of some colleagues when I ran into troubles.
The hardest part of the learning journey is thinking in Reactive. It's a lot about letting go of old imperative and stateful habits of typical programming, and forcing your brain to work in a different paradigm. I haven't found any guide on the internet in this aspect, and I think the world deserves a practical tutorial on how to think in Reactive, so that you can get started. Library documentation can light your way after that. I hope this helps you.
我讀過兩本相關的書,一本只是在給你描繪響應式編程的偉大景象,而另外一本卻只是深刻到如何使用響應式庫而已。我在不斷的構建(building)中把響應式編程瞭解的透徹了一些,最後以這種艱難的方式學完了響應式編程。在我工做公司的一個真實項目中我會用到它,當我遇到問題時,還能夠獲得同事的支持。
學習過程當中最難的部分是如何以響應式的方式來思考,更多的意味着要摒棄那些老舊的命令式和狀態式的典型編程習慣,而且強迫本身的大腦以不一樣的範式來運做。我尚未在網絡上找到任何一個教程是從這個層面來剖析的,我以爲這個世界很是值得擁有一個優秀的實踐教程來教你如何以響應式編程的方式來思考,方便引導你開始學習響應式編程。而後看各類庫文檔才能夠給你更多的指引。但願這篇文章可以幫助你快速地進入響應式編程的世界。
There are plenty of bad explanations and definitions out there on the internet. Wikipedia is too generic and theoretical as usual.Stackoverflow's canonical answer is obviously not suitable for newcomers. Reactive Manifesto sounds like the kind of thing you show to your project manager or the businessmen at your company. Microsoft's Rx terminology "Rx = Observables + LINQ + Schedulers" is so heavy and Microsoftish that most of us are left confused. Terms like "reactive" and "propagation of change" don't convey anything specifically different to what your typical MV* and favorite language already does. Of course my framework views react to the models. Of course change is propagated. If it wouldn't, nothing would be rendered.
So let's cut the bullshit.
網絡上有一大堆糟糕的解釋和定義,Wikipedia上一般都是些很是籠統和理論性的解釋,Stackoverflow上的一些規範的回答顯然也不適合新手來參考,Reactive Manifesto看起來也只像是拿給你的PM或者老闆看的東西,微軟的Rx術語"Rx = Observables + LINQ + Schedulers" 也顯得太過沉重,並且充滿了太多微軟式的東西,反而給咱們留下了更多的疑惑。相對於你使用的MV*框架以及你鍾愛的編程語言,"Reactive"和"Propagation of change"這樣的術語並無傳達任何有意義的概念。固然,個人view框架可以從model作出反應,個人改變固然也會傳播,若是沒有這些,個人界面根本就沒有東西可渲染(譯者:最後一句旨在說明微軟的術語化的東西都很是老套、很是教科書化)。
因此,不要再扯這些廢話了。
In a way, this isn't anything new. Event buses or your typical click events are really an asynchronous event stream, on which you can observe and do some side effects. Reactive is that idea on steroids. You are able to create data streams of anything, not just from click and hover events. Streams are cheap and ubiquitous, anything can be a stream: variables, user inputs, properties, caches, data structures, etc. For example, imagine your Twitter feed would be a data stream in the same fashion that click events are. You can listen to that stream and react accordingly.
一方面,這已經不是什麼新事物了。事件總線(Event Buses)或一些典型的點擊事件本質上就是一個異步事件流(asynchronous event stream),這樣你就能夠觀察它的變化並使其作出一些反應(產生一些效果(do some side effects))。響應式是這樣的一個思路:除了點擊(click)和懸停(hover)的事件外,你能夠給任何事物建立數據流。數據流無處不在(Streams are cheap and ubiquitous),任何東西均可以成爲一個數據流,例如變量、用戶輸入、屬性、緩存、數據結構等等。舉個栗子,你能夠把你的微博訂閱功能想象成跟點擊事件同樣的數據流,你能夠監聽這樣的數據流,並作出相應的反應。
On top of that, you are given an amazing toolbox of functions to combine, create and filter any of those streams.That's where the "functional" magic kicks in. A stream can be used as an input to another one. Even multiple streams can be used as inputs to another stream. You can merge two streams. You can filter a stream to get another one that has only those events you are interested in. You can map data values from one stream to another new one.
If streams are so central to Reactive, let's take a careful look at them, starting with our familiar "clicks on a button" event stream.
A stream is a sequence of ongoing events ordered in time. It can emit three different things: a value (of some type), an error, or a "completed" signal. Consider that the "completed" takes place, for instance, when the current window or view containing that button is closed.
We capture these emitted events only asynchronously, by defining a function that will execute when a value is emitted, another function when an error is emitted, and another function when 'completed' is emitted. Sometimes these last two can be omitted and you can just focus on defining the function for values. The "listening" to the stream is called subscribing. The functions we are defining are observers. The stream is the subject (or "observable") being observed. This is precisely the Observer Design Pattern.
An alternative way of drawing that diagram is with ASCII, which we will use in some parts of this tutorial:
--a---b-c---d---X---|-> a, b, c, d are emitted values X is an error | is the 'completed' signal ---> is the timeline
Since this feels so familiar already, and I don't want you to get bored, let's do something new: we are going to create new click event streams transformed out of the original click event stream.
最重要的是,你會擁有一些使人驚豔的函數去結合(combine)、建立(create)和過濾(filter)任何一組數據流。 這就是"函數式編程"的魔力所在。一個數據流能夠做爲另外一個數據流的輸入,甚至多個數據流也能夠做爲另外一個數據流的輸入。你能夠合併(merge)兩個數據流,也能夠過濾(filter)一個數據流獲得另外一個只包含你感興趣的事件的數據流,還能夠映射(map)一個數據流的值到一個新的數據流裏。
若是數據流對於響應式是如此的核心(so central to Reactive),那就讓咱們來仔細的看看它們,先從咱們熟悉的"點擊一個按鈕"的事件流開始
一個數據流是一個按時間排序的即將發生的事件(Ongoing events ordered in time)的序列。如上圖,它能夠發出3種不一樣的事件(上一句已經把它們叫作事件):一個某種類型的值事件,一個錯誤事件和一個完成事件。當一個完成事件發生時,在某些狀況下,咱們可能會作這樣的操做:關閉包含那個按鈕的窗口或者視圖組件。
咱們只能異步的的去捕捉這些被髮出的事件,這樣咱們就能夠在發出一個值事件時執行一個函數,發出錯誤事件時執行一個函數,發出完成事件時執行另外一個函數。有時候你能夠忽略後兩個事件,只需聚焦於如何定義和設計在發出值事件時要執行的函數,監聽這個事件流的過程叫作訂閱,咱們定義的函數叫作觀察者,而事件流就能夠叫作被觀察的主題(或者叫被觀察者)。你應該察覺到了,對的,它就是觀察者模式。
上面的示意圖咱們也能夠用ASCII碼的形式從新畫一遍,請注意,下面的部分教程中咱們會繼續使用這幅圖:
--a---b-c---d---X---|-> a, b, c, d 是值事件 X 是錯誤事件 | 是完成事件 ---> 是時間線(軸)
如今你對響應式編程事件流應該很是熟悉了,爲了避免讓你感到無聊,讓咱們來作一些新的嘗試吧:咱們將建立一個由原始點擊事件流演變而來的一種新的點擊事件流。
First, let's make a counter stream that indicates how many times a button was clicked. In common Reactive libraries, each stream has many functions attached to it, such as map
, filter
, scan
, etc. When you call one of these functions, such asclickStream.map(f)
, it returns a new stream based on the click stream. It does not modify the original click stream in any way. This is a property called immutability, and it goes together with Reactive streams just like pancakes are good with syrup. That allows us to chain functions like clickStream.map(f).scan(g)
:
clickStream: ---c----c--c----c------c--> vvvvv map(c becomes 1) vvvv ---1----1--1----1------1--> vvvvvvvvv scan(+) vvvvvvvvv counterStream: ---1----2--3----4------5-->
The map(f)
function replaces (into the new stream) each emitted value according to a function f
you provide. In our case, we mapped to the number 1 on each click. The scan(g)
function aggregates all previous values on the stream, producing value x = g(accumulated, current)
, where g
was simply the add function in this example. Then, counterStream
emits the total number of clicks whenever a click happens.
首先,讓咱們來建立一個記錄按鈕點擊次數的事件流。在經常使用的響應式庫中,每一個事件流都會附有一些函數,例如 map
,filter
, scan
等,當你調用這其中的一個方法時,好比clickStream.map(f)
,它會返回基於點擊事件流的一個新事件流。它不會對原來的點擊事件流作任何的修改。這種特性叫作不可變性(immutability),並且它能夠和響應式事件流搭配在一塊兒使用,就像豆漿和油條同樣完美的搭配。這樣咱們能夠用鏈式函數的方式來調用,例如:clickStream.map(f).scan(g)
:
clickStream: ---c----c--c----c------c--> vvvvv map(c becomes 1) vvvv ---1----1--1----1------1--> vvvvvvvvv scan(+) vvvvvvvvv counterStream: ---1----2--3----4------5-->
map(f)
函數會根據你提供的f
函數把原事件流中每個返回值分別映射到新的事件流中。在上圖的例子中,咱們把每一次點擊事件都映射成數字1,scan(g)
函數則把以前映射的值彙集起來,而後根據x = g(accumulated, current)
算法來做相應的處理,而本例的g
函數其實就是簡單的加法函數。而後,當一個點擊事件發生時,counterStream
函數則上報當前點擊事件總數。
To show the real power of Reactive, let's just say that you want to have a stream of "double click" events. To make it even more interesting, let's say we want the new stream to consider triple clicks as double clicks, or in general, multiple clicks (two or more). Take a deep breath and imagine how you would do that in a traditional imperative and stateful fashion. I bet it sounds fairly nasty and involves some variables to keep state and some fiddling with time intervals.
Well, in Reactive it's pretty simple. In fact, the logic is just 4 lines of code. But let's ignore code for now. Thinking in diagrams is the best way to understand and build streams, whether you're a beginner or an expert.
Grey boxes are functions transforming one stream into another. First we accumulate clicks in lists, whenever 250 milliseconds of "event silence" has happened (that's what buffer(stream.throttle(250ms))
does, in a nutshell. Don't worry about understanding the details at this point, we are just demoing Reactive for now). The result is a stream of lists, from which we applymap()
to map each list to an integer matching the length of that list. Finally, we ignore 1
integers using the filter(x >= 2)
function. That's it: 3 operations to produce our intended stream. We can then subscribe ("listen") to it to react accordingly how we wish.
I hope you enjoy the beauty of this approach. This example is just the tip of the iceberg: you can apply the same operations on different kinds of streams, for instance, on a stream of API responses; on the other hand, there are many other functions available.
爲了展現響應式編程真正的魅力,咱們假設你有一個"雙擊"事件流,爲了讓它更有趣,咱們假設這個事件流同時處理"三次點擊"或者"屢次點擊"事件,而後深吸一口氣想一想如何用傳統的命令式和狀態式的方式來處理,我敢打賭,這麼作會至關的討厭,其中還要涉及到一些變量來保存狀態,而且還得作一些時間間隔的調整。
而用響應式編程的方式處理會很是的簡潔,實際上,邏輯處理部分只須要四行代碼。可是,當前階段讓咱們現忽略代碼的部分,不管你是新手仍是專家,看着圖表思考來理解和創建事件流將是一個很是棒的方法。
圖中,灰色盒子表示將上面的事件流轉換下面的事件流的函數過程,首先根據250毫秒的間隔時間(event silence, 譯者:無事件發生的時間段,上一個事件發生到下一個事件發生的間隔時間)把點擊事件流一段一隔開,再將每一段的一個或多個點擊事件添加到列表中(這就是這個函數:buffer(stream.throttle(250ms))
所作的事情,當前咱們先不要急着去理解細節,咱們只需專一響應式的部分先)。如今咱們獲得的是多個含有事件流的列表,而後咱們使用了map()
中的函數來算出每個列表長度的整數數值映射到下一個事件流當中。最後咱們使用了過濾filter(x >= 2)
函數忽略掉了小於1
的整數。就這樣,咱們用了3步操做生成了咱們想要的事件流,接下來,咱們就能夠訂閱("監聽")這個事件並做出咱們想要的操做了。
我但願你能感覺到這個示例的優雅之處。固然了,這個示例也只是響應式編程魔力的冰山一角而已,你一樣能夠將這3步操做應用到不一樣種類的事件流中去,例如,一串API響應的事件流。另外一方面,你還有很是多的函數可使用。
Reactive Programming raises the level of abstraction of your code so you can focus on the interdependence of events that define the business logic, rather than having to constantly fiddle with a large amount of implementation details. Code in RP will likely be more concise.
The benefit is more evident in modern webapps and mobile apps that are highly interactive with a multitude of UI events related to data events. 10 years ago, interaction with web pages was basically about submitting a long form to the backend and performing simple rendering to the frontend. Apps have evolved to be more real-time: modifying a single form field can automatically trigger a save to the backend, "likes" to some content can be reflected in real time to other connected users, and so forth.
Apps nowadays have an abundancy of real-time events of every kind that enable a highly interactive experience to the user. We need tools for properly dealing with that, and Reactive Programming is an answer.
響應式編程能夠提升你的代碼抽象級別,好讓你能夠專一於定義與事件相互依存的業務邏輯,而不是把大量精力放在實現細節上,使用響應式編程會讓你的代碼變得更加簡潔。
特別對於如今流行的webapps和mobile apps,這些頻繁與數據事件相關的大量UI事件交互的程序,好處就更加的明顯了。十年前,web頁面的交互是經過提交一個很長的表單數據到後端,而後再作一些簡單的前端渲染操做。而如今的Apps則演變的更具備實時性:僅僅修改一個單獨的表單域就能自動的觸發保存到後端的代碼,就像某個用戶對一些內容點了贊,就可以實時反映到其餘已鏈接的用戶同樣,等等。
當今的Apps都含有豐富的實時事件來保證一個高效的用戶體驗,咱們就須要採用一個合適的工具來處理,那麼響應式編程就正好是咱們想要的答案。
Let's dive into the real stuff. A real-world example with a step-by-step guide on how to think in RP. No synthetic examples, no half-explained concepts. By the end of this tutorial we will have produced real functioning code, while knowing why we did each thing.
I picked JavaScript and RxJS as the tools for this, for a reason: JavaScript is the most familiar language out there at the moment, and the Rx* library family is widely available for many languages and platforms (.NET, Java, Scala, Clojure, JavaScript,Ruby, Python, C++, Objective-C/Cocoa, Groovy, etc). So whatever your tools are, you can concretely benefit by following this tutorial.
讓咱們深刻到一些真實的例子,一個可以一步一步教你如何以響應式編程的方式思考的例子,沒有虛構的示例,沒有隻知其一;不知其二的概念。在這個教程的末尾咱們將產生一些真實的函數代碼,並可以知曉每一步爲何那樣作的緣由(知其然,知其因此然)。
我選了JavaScript和RxJS來做爲本教程的編程語言,緣由是:JavaScript是目前最多人熟悉的語言,而Rx系列的庫對於不少語言和平臺的運用是很是普遍的,例如(.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy等等。因此,不管你用的是什麼語言、庫、工具,你都能從下面這個教程中學到東西(從中受益)。
In Twitter there is this UI element that suggests other accounts you could follow:
We are going to focus on imitating its core features, which are:
We can leave out the other features and buttons because they are minor. And, instead of Twitter, which recently closed its API to the unauthorized public, let's build that UI for following people on Github. There's a Github API for getting users.
The complete code for this is ready at http://jsfiddle.net/staltz/8jFJH/48/ in case you want to take a peak already.
在Twitter裏有一個UI元素向你推薦你能夠關注的用戶,以下圖:
咱們將聚焦於模仿它的主要功能,它們是:
咱們能夠先無論其餘的功能和按鈕,由於它們是次要的。由於Twitter最近關閉了未經受權的公共API調用,咱們將用Github獲取用戶的API代替,而且以此來構建咱們的UI。
若是你想先看一下最終效果,這裏有完成後的代碼。
How do you approach this problem with Rx? Well, to start with, (almost) everything can be a stream. That's the Rx mantra. Let's start with the easiest feature: "on startup, load 3 accounts data from the API". There is nothing special here, this is simply about (1) doing a request, (2) getting a response, (3) rendering the response. So let's go ahead and represent our requests as a stream. At first this will feel like overkill, but we need to start from the basics, right?
On startup we need to do only one request, so if we model it as a data stream, it will be a stream with only one emitted value. Later, we know we will have many requests happening, but for now, it is just one.
--a------|-> Where a is the string 'https://api.github.com/users'
This is a stream of URLs that we want to request. Whenever a request event happens, it tells us two things: when and what. "When" the request should be executed is when the event is emitted. And "what" should be requested is the value emitted: a string containing the URL.
To create such stream with a single value is very simple in Rx*. The official terminology for a stream is "Observable", for the fact that it can be observed, but I find it to be a silly name, so I call it stream.
var requestStream = Rx.Observable.just('https://api.github.com/users');
But now, that is just a stream of strings, doing no other operation, so we need to somehow make something happen when that value is emitted. That's done by subscribing to the stream.
requestStream.subscribe(function(requestUrl) { // execute the request jQuery.getJSON(requestUrl, function(responseData) { // ... }); }
在Rx中是怎麼處理這個問題呢?,在開始以前,咱們要明白,(幾乎)一切均可以成爲一個事件流,這就是Rx的準則(mantra)。讓咱們從最簡單的功能開始:"開始階段,從API加載推薦關注的用戶帳戶數據,而後顯示三個推薦用戶"。其實這個功能沒什麼特殊的,簡單的步驟分爲: (1)發出一個請求,(2)獲取響應數據,(3)渲染響應數據。ok,讓咱們把請求做爲一個事件流,一開始你可能會以爲這樣作有些誇張,但別急,咱們也得從最基本的開始,不是嗎?
開始時咱們只需作一次請求,若是咱們把它做爲一個數據流的話,它只能成爲一個僅僅返回一個值的事件流而已。一下子咱們還會有不少請求要作,但當前,只有一個。
--a------|-> a就是字符串:'https://api.github.com/users'
這是一個咱們要請求的URL事件流。每當發生一個請求時,它將告訴咱們兩件事:何時和作了什麼事(when and what)。何時請求被執行,何時事件就被髮出。而作了什麼就是請求了什麼,也就是請求的URL字符串。
在Rx中,建立返回一個值的事件流是很是簡單的。其實事件流在Rx裏的術語是叫"被觀察者",也就是說它是能夠被觀察的,可是我發現這名字比較傻,因此我更喜歡把它叫作事件流。
var requestStream = Rx.Observable.just('https://api.github.com/users');
但如今,這只是一個字符串的事件流而已,並無作其餘操做,因此咱們須要在發出這個值的時候作一些咱們要作的操做,能夠經過訂閱(subscribing)這個事件來實現。
requestStream.subscribe(function(requestUrl) { // execute the request jQuery.getJSON(requestUrl, function(responseData) { // ... }); }
Notice we are using a jQuery Ajax callback (which we assume you should know already) to handle the asynchronicity of the request operation. But wait a moment, Rx is for dealing with asynchronous data streams. Couldn't the response for that request be a stream containing the data arriving at some time in the future? Well, at a conceptual level, it sure looks like it, so let's try that.
requestStream.subscribe(function(requestUrl) { // execute the request var responseStream = Rx.Observable.create(function (observer) { jQuery.getJSON(requestUrl) .done(function(response) { observer.onNext(response); }) .fail(function(jqXHR, status, error) { observer.onError(error); }) .always(function() { observer.onCompleted(); }); }); responseStream.subscribe(function(response) { // do something with the response }); }
What Rx.Observable.create()
does is create your own custom stream by explicitly informing each observer (or in other words, a "subscriber") about data events (onNext()
) or errors (onError()
). What we did was just wrap that jQuery Ajax Promise.Excuse me, does this mean that a Promise is an Observable?
注意到咱們這裏使用的是JQuery的AJAX回調方法(咱們假設你已經很瞭解JQuery和AJAX了)來的處理這個異步的請求操做。可是,請稍等一下,Rx就是用來處理異步數據流的,難道它就不能處理來自請求(request)在將來某個時間響應(response)的數據流嗎?好吧,理論上是能夠的,讓咱們嘗試一下。
requestStream.subscribe(function(requestUrl) { // execute the request var responseStream = Rx.Observable.create(function (observer) { jQuery.getJSON(requestUrl) .done(function(response) { observer.onNext(response); }) .fail(function(jqXHR, status, error) { observer.onError(error); }) .always(function() { observer.onCompleted(); }); }); responseStream.subscribe(function(response) { // do something with the response }); }
Rx.Observable.create()
操做就是在建立本身定製的事件流,且對於數據事件(onNext()
)和錯誤事件(onError()
)都會顯示的通知該事件每個觀察者(或訂閱者)。咱們作的只是小小的封裝一下jQuery Ajax Promise而已。等等,這是否意味者jQuery Ajax Promise本質上就是一個被觀察者呢(Observable)?
Yes.
Observable is Promise++. In Rx you can easily convert a Promise to an Observable by doing var stream = Rx.Observable.fromPromise(promise)
, so let's use that. The only difference is that Observables are not Promises/A+compliant, but conceptually there is no clash. A Promise is simply an Observable with one single emitted value. Rx streams go beyond promises by allowing many returned values.
This is pretty nice, and shows how Observables are at least as powerful as Promises. So if you believe the Promises hype, keep an eye on what Rx Observables are capable of.
Now back to our example, if you were quick to notice, we have one subscribe()
call inside another, which is somewhat akin to callback hell. Also, the creation of responseStream
is dependent on requestStream
. As you heard before, in Rx there are simple mechanisms for transforming and creating new streams out of others, so we should be doing that.
The one basic function that you should know by now is map(f)
, which takes each value of stream A, applies f()
on it, and produces a value on stream B. If we do that to our request and response streams, we can map request URLs to response Promises (disguised as streams).
var responseMetastream = requestStream .map(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });
Then we will have created a beast called "metastream": a stream of streams. Don't panic yet. A metastream is a stream where each emitted value is yet another stream. You can think of it as pointers: each emitted value is a pointer to another stream. In our example, each request URL is mapped to a pointer to the promise stream containing the corresponding response.
是的。
Promise++就是被觀察者(Observable),在Rx裏你可使用這樣的操做:var stream = Rx.Observable.fromPromise(promise)
,就能夠很輕鬆的將Promise轉換成一個被觀察者(Observable),如此方便,讓咱們如今就開始使用它吧。不一樣的是,這些被觀察者都不能兼容Promises/A+,但理論上並不衝突。一個Promise就是一個只有一個返回值的簡單的被觀察者,而Rx就遠超於Promise,它容許多個值返回。
這樣更好,這樣更突出被觀察者至少比Promise強大,因此若是你相信Promise宣傳的東西,那麼也請留意一下響應式編程能勝任些什麼。
如今回到示例當中,你應該能快速發現,咱們在subscribe()
方法的內部再次調用了subscribe()
方法,這有點相似於回調地獄(callback hell),並且responseStream
的建立也是依賴於requestStream
的。在以前咱們說過,在Rx裏,有不少很簡單的機制來從其餘事件流的轉化並建立出一些新的事件流,那麼,咱們也應該這樣作試試。
如今你須要瞭解的一個最基本的函數是map(f)
,它能夠從事件流A中取出每個值,並對每個值執行f()
函數,而後將產生的新值填充到事件流B。若是將它應用到咱們的請求和響應事件流當中,那咱們就能夠將請求的URL映射到一個響應Promises上了(假裝成數據流)。
var responseMetastream = requestStream .map(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });
而後,咱們創造了一個叫作"metastream"的怪獸:一個裝載了事件流的事件流。先別驚慌,metastream就是每個發出的值都是另外一個事件流的事件流,你看把它想象成一個[指針(pointers)]((https://en.wikipedia.org/wiki/Pointer_(computer_programming))數組:每個單獨發出的值就是一個_指針_,它指向另外一個事件流。在咱們的示例裏,每個請求URL都映射到一個指向包含響應數據的promise數據流。
A metastream for responses looks confusing, and doesn't seem to help us at all. We just want a simple stream of responses, where each emitted value is a JSON object, not a 'Promise' of a JSON object. Say hi to Mr. Flatmap: a version of map()
than "flattens" a metastream, by emitting on the "trunk" stream everything that will be emitted on "branch" streams. Flatmap is not a "fix" and metastreams are not a bug, these are really the tools for dealing with asynchronous responses in Rx.
var responseStream = requestStream .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });
Nice. And because the response stream is defined according to request stream, if we have later on more events happening on request stream, we will have the corresponding response events happening on response stream, as expected:
requestStream: --a-----b--c------------|-> responseStream: -----A--------B-----C---|-> (lowercase is a request, uppercase is its response)
Now that we finally have a response stream, we can render the data we receive:
responseStream.subscribe(function(response) { // render `response` to the DOM however you wish });
Joining all the code until now, we have:
var requestStream = Rx.Observable.just('https://api.github.com/users'); var responseStream = requestStream .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); }); responseStream.subscribe(function(response) { // render `response` to the DOM however you wish });
一個響應的metastream,看起來確實讓人容易困惑,看樣子對咱們一點幫助也沒有。咱們只想要一個簡單的響應數據流,每個發出的值是一個簡單的JSON對象就行,而不是一個'Promise' 的JSON對象。ok,讓咱們來見識一下另外一個函數:Flatmap,它是map()
函數的另外一個版本,它比metastream更扁平。一切在"主軀幹"事件流發出的事件都將在"分支"事件流中發出。Flatmap並非metastreams的修復版,metastreams也不是一個bug。它倆在Rx中都是處理異步響應事件的好工具、好幫手。
var responseStream = requestStream .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); });
很贊,由於咱們的響應事件流是根據請求事件流定義的,若是咱們之後有更多事件發生在請求事件流的話,咱們也將會在相應的響應事件流收到響應事件,就如所期待的那樣:
requestStream: --a-----b--c------------|-> responseStream: -----A--------B-----C---|-> (小寫的是請求事件流, 大寫的是響應事件流)
如今,咱們終於有響應的事件流了,而且能夠用咱們收到的數據來渲染了:
responseStream.subscribe(function(response) { // render `response` to the DOM however you wish });
讓咱們把全部代碼合起來,看一下:
var requestStream = Rx.Observable.just('https://api.github.com/users'); var responseStream = requestStream .flatMap(function(requestUrl) { return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); }); responseStream.subscribe(function(response) { // render `response` to the DOM however you wish });
I did not yet mention that the JSON in the response is a list with 100 users. The API only allows us to specify the page offset, and not the page size, so we're using just 3 data objects and wasting 97 others. We can ignore that problem for now, since later on we will see how to cache the responses.
Everytime the refresh button is clicked, the request stream should emit a new URL, so that we can get a new response. We need two things: a stream of click events on the refresh button (mantra: anything can be a stream), and we need to change the request stream to depend on the refresh click stream. Gladly, RxJS comes with tools to make Observables from event listeners.
var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
Since the refresh click event doesn't itself carry any API URL, we need to map each click to an actual URL. Now we change the request stream to be the refresh click stream mapped to the API endpoint with a random offset parameter each time.
var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });
Because I'm dumb and I don't have automated tests, I just broke one of our previously built features. A request doesn't happen anymore on startup, it happens only when the refresh is clicked. Urgh. I need both behaviors: a request when either a refresh is clicked or the webpage was just opened.
我還沒提到本次響應的JSON數據是含有100個用戶數據的list,這個API只容許指定頁面偏移量(page offset),而不能指定每頁大小(page size),咱們只用到了3個用戶數據而浪費了其餘97個,如今能夠先忽略這個問題,稍後咱們將學習如何緩存響應的數據。
每當刷新按鈕被點擊,請求事件流就會發出一個新的URL值,這樣咱們就能夠獲取新的響應數據。這裏咱們須要兩個東西:點擊刷新按鈕的事件流(準則:一切都能做爲事件流),咱們須要將點擊刷新按鈕的事件流做爲請求事件流的依賴(即點擊刷新事件流會引發請求事件流)。幸運的是,RxJS已經有了能夠從事件監聽者轉換成被觀察者的方法了。
var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
由於刷新按鈕點擊事件不會攜帶將要請求的API的URL,咱們須要將每次的點擊映射到一個實際的URL上,如今咱們將請求事件流轉換成了一個點擊事件流,並將每次的點擊映射成一個隨機的頁面偏移量(offset)參數來組成API的URL。
var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });
由於我比較笨並且也沒有使用自動化測試,因此我剛把以前作好的一個功能搞爛了。這樣,請求在一開始的時候就不會執行,而只有在點擊事件發生時纔會執行。咱們須要的是兩種狀況都要執行:剛開始打開網頁和點擊刷新按鈕都會執行的請求。
We know how to make a separate stream for each one of those cases:
var requestOnRefreshStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
But how can we "merge" these two into one? Well, there's merge()
. Explained in the diagram dialect, this is what it does:
stream A: ---a--------e-----o-----> stream B: -----B---C-----D--------> vvvvvvvvv merge vvvvvvvvv ---a-B---C--e--D--o----->
咱們知道如何爲每一種狀況作一個單獨的事件流:
var requestOnRefreshStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
可是咱們是否能夠將這兩個合併成一個呢?沒錯,是能夠的,咱們可使用merge()
方法來實現。下圖能夠解釋merge()
函數的用處:
stream A: ---a--------e-----o-----> stream B: -----B---C-----D--------> vvvvvvvvv merge vvvvvvvvv ---a-B---C--e--D--o----->
It should be easy now:
var requestOnRefreshStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users'); var requestStream = Rx.Observable.merge( requestOnRefreshStream, startupRequestStream );
There is an alternative and cleaner way of writing that, without the intermediate streams.
var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }) .merge(Rx.Observable.just('https://api.github.com/users'));
如今作起來應該很簡單:
var requestOnRefreshStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var startupRequestStream = Rx.Observable.just('https://api.github.com/users'); var requestStream = Rx.Observable.merge( requestOnRefreshStream, startupRequestStream );
還有一個更乾淨的寫法,省去了中間事件流變量:
var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }) .merge(Rx.Observable.just('https://api.github.com/users'));
甚至能夠更簡短,更具備可讀性:
var requestStream = refreshClickStream .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }) .startWith('https://api.github.com/users');
The startWith()
function does exactly what you think it does. No matter how your input stream looks like, the output stream resulting of startWith(x)
will have x
at the beginning. But I'm not DRY enough, I'm repeating the API endpoint string. One way to fix this is by moving the startWith()
close to the refreshClickStream
, to essentially "emulate" a refresh click on startup.
var requestStream = refreshClickStream.startWith('startup click') .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });
Nice. If you go back to the point where I "broke the automated tests", you should see that the only difference with this last approach is that I added the startWith()
.
startWith()
函數作的事和你預期的徹底同樣。不管你的輸入事件流是怎樣的,使用startWith(x)
函數處理事後輸出的事件流必定是一個x
開頭的結果。可是我沒有老是重複代碼( DRY),我只是在重複API的URL字符串,改進的方法是將startWith()
函數挪到refreshClickStream
那裏,這樣就能夠在啓動時,模擬一個刷新按鈕的點擊事件了。
var requestStream = refreshClickStream.startWith('startup click') .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });
不錯,若是你倒回到"搞爛了的自動測試"的地方,而後再對比這兩個地方,你會發現我僅僅是加了一個startWith()
函數而已。
Until now, we have only touched a suggestion UI element on the rendering step that happens in the responseStream'ssubscribe()
. Now with the refresh button, we have a problem: as soon as you click 'refresh', the current 3 suggestions are not cleared. New suggestions come in only after a response has arrived, but to make the UI look nice, we need to clean out the current suggestions when clicks happen on the refresh.
refreshClickStream.subscribe(function() { // clear the 3 suggestion DOM elements });
No, not so fast, pal. This is bad, because we now have two subscribers that affect the suggestion DOM elements (the other one being responseStream.subscribe()
), and that doesn't really sound like Separation of concerns. Remember the Reactive mantra?
直到如今,在響應事件流(responseStream)的訂閱(subscribe()
)函數發生的渲染步驟裏,咱們只是稍微說起了一下推薦關注的UI。如今有了刷新按鈕,咱們就會出現一個問題:當你點擊了刷新按鈕,當前的三個推薦關注用戶沒有被清楚,而只要響應的數據達到後咱們就拿到了新的推薦關注的用戶數據,爲了讓UI看起來更漂亮,咱們須要在點擊刷新按鈕的事件發生的時候清楚當前的三個推薦關注的用戶。
refreshClickStream.subscribe(function() { // clear the 3 suggestion DOM elements });
不,老兄,還沒那麼快。咱們又出現了新的問題,由於咱們如今有兩個訂閱者在影響着推薦關注的UI DOM元素(另外一個是responseStream.subscribe()
),這看起來並不符合關注分離(Separation of concerns)原則,還記得響應式編程的原則麼?
So let's model a suggestion as a stream, where each emitted value is the JSON object containing the suggestion data. We will do this separately for each of the 3 suggestions. This is how the stream for suggestion #1 could look like:
var suggestion1Stream = responseStream .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)]; });
The others, suggestion2Stream
and suggestion3Stream
can be simply copy pasted from suggestion1Stream
. This is not DRY, but it will keep our example simple for this tutorial, plus I think it's a good exercise to think how to avoid repetition in this case.
如今,讓咱們把推薦關注的用戶數據模型化成事件流形式,每一個被髮出的值是一個包含了推薦關注用戶數據的JSON對象。咱們將把這三個用戶數據分開處理,下面是推薦關注的1號用戶數據的事件流:
var suggestion1Stream = responseStream .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)]; });
其餘的,如推薦關注的2號用戶數據的事件流suggestion2Stream
和推薦關注的3號用戶數據的事件流suggestion3Stream
均可以方便的從suggestion1Stream
複製粘貼就好。這裏並非重複代碼,只是爲讓咱們的示例更加簡單,並且我認爲這是一個思考如何避免重複代碼的好案例。
Instead of having the rendering happen in responseStream's subscribe(), we do that here:
suggestion1Stream.subscribe(function(suggestion) { // render the 1st suggestion to the DOM });
Back to the "on refresh, clear the suggestions", we can simply map refresh clicks to null
suggestion data, and include that in the suggestion1Stream
, as such:
var suggestion1Stream = responseStream .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) );
And when rendering, we interpret null
as "no data", hence hiding its UI element.
suggestion1Stream.subscribe(function(suggestion) { if (suggestion === null) { // hide the first suggestion DOM element } else { // show the first suggestion DOM element // and render the data } });
咱們不在responseStream的subscribe()中處理渲染了,咱們這樣處理:
suggestion1Stream.subscribe(function(suggestion) { // render the 1st suggestion to the DOM });
回到"當刷新時,清楚掉當前的推薦關注的用戶",咱們能夠很簡單的把刷新點擊映射爲沒有推薦數據(null
suggestion data),而且在suggestion1Stream
中包含進來,以下:
var suggestion1Stream = responseStream .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) );
當渲染時,咱們將 null
解釋爲"沒有數據",而後把UI元素隱藏起來。
suggestion1Stream.subscribe(function(suggestion) { if (suggestion === null) { // hide the first suggestion DOM element } else { // show the first suggestion DOM element // and render the data } });
The big picture is now:
refreshClickStream: ----------o--------o----> requestStream: -r--------r--------r----> responseStream: ----R---------R------R--> suggestion1Stream: ----s-----N---s----N-s--> suggestion2Stream: ----q-----N---q----N-q--> suggestion3Stream: ----t-----N---t----N-t-->
Where N
stands for null
.
As a bonus, we can also render "empty" suggestions on startup. That is done by adding startWith(null)
to the suggestion streams:
var suggestion1Stream = responseStream .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);
Which results in:
refreshClickStream: ----------o---------o----> requestStream: -r--------r---------r----> responseStream: ----R----------R------R--> suggestion1Stream: -N--s-----N----s----N-s--> suggestion2Stream: -N--q-----N----q----N-q--> suggestion3Stream: -N--t-----N----t----N-t-->
如今咱們的一個大的示意圖是這樣的:
refreshClickStream: ----------o--------o----> requestStream: -r--------r--------r----> responseStream: ----R---------R------R--> suggestion1Stream: ----s-----N---s----N-s--> suggestion2Stream: ----q-----N---q----N-q--> suggestion3Stream: ----t-----N---t----N-t-->
N
表明null
做爲一種補充,咱們能夠在一開始的時候就渲染空的推薦內容。這經過把startWith(null)添加到推薦關注的事件流就能夠了:
var suggestion1Stream = responseStream .map(function(listUsers) { // get one random user from the list return listUsers[Math.floor(Math.random()*listUsers.length)]; }) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);
結果是這樣的:
refreshClickStream: ----------o---------o----> requestStream: -r--------r---------r----> responseStream: ----R----------R------R--> suggestion1Stream: -N--s-----N----s----N-s--> suggestion2Stream: -N--q-----N----q----N-q--> suggestion3Stream: -N--t-----N----t----N-t-->
There is one feature remaining to implement. Each suggestion should have its own 'x' button for closing it, and loading another in its place. At first thought, you could say it's enough to make a new request when any close button is clicked:
var close1Button = document.querySelector('.close1'); var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click'); // and the same for close2Button and close3Button var requestStream = refreshClickStream.startWith('startup click') .merge(close1ClickStream) // we added this .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });
That does not work. It will close and reload all suggestions, rather than just only the one we clicked on. There are a couple of different ways of solving this, and to keep it interesting, we will solve it by reusing previous responses. The API's response page size is 100 users while we were using just 3 of those, so there is plenty of fresh data available. No need to request more.
只剩這一個功能沒有實現了,每一個推薦關注的用戶UI會有一個'x'按鈕來關閉本身,而後在當前的用戶數據UI中加載另外一個推薦關注的用戶。最初的想法是:點擊任何關閉按鈕時都須要發起一個新的請求:
var close1Button = document.querySelector('.close1'); var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click'); // and the same for close2Button and close3Button var requestStream = refreshClickStream.startWith('startup click') .merge(close1ClickStream) // we added this .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; });
這樣沒什麼效果,這樣會關閉和從新加載所有的推薦關注用戶,而不只僅是處理咱們點擊的那一個。這裏有幾種方式來解決這個問題,而且讓它變得有趣,咱們將重用以前的請求數據來解決這個問題。這個API響應的每頁數據大小是100個用戶數據,而咱們只使用了其中三個,因此還有一大堆未使用的數據能夠拿來用,不用去請求更多數據了。
Again, let's think in streams. When a 'close1' click event happens, we want to use the most recently emitted response onresponseStream
to get one random user from the list in the response. As such:
requestStream: --r---------------> responseStream: ------R-----------> close1ClickStream: ------------c-----> suggestion1Stream: ------s-----s----->
In Rx* there is a combinator function called combineLatest
that seems to do what we need. It takes two streams A and B as inputs, and whenever either stream emits a value, combineLatest
joins the two most recently emitted values a
and b
from both streams and outputs a value c = f(x,y)
, where f
is a function you define. It is better explained with a diagram:
stream A: --a-----------e--------i--------> stream B: -----b----c--------d-------q----> vvvvvvvv combineLatest(f) vvvvvvv ----AB---AC--EC---ED--ID--IQ----> where f is the uppercase function
ok,再來,咱們繼續用事件流的方式來思考。當'close1'點擊事件發生時,咱們想要使用最近發出的響應數據,並執行responseStream
函數來從響應列表裏隨機的抽出一個用戶數據來,就像下面這樣:
requestStream: --r---------------> responseStream: ------R-----------> close1ClickStream: ------------c-----> suggestion1Stream: ------s-----s----->
在Rx中一個組合函數叫作combineLatest
,應該是咱們須要的。這個函數會把數據流A和數據流B做爲輸入,而且不管哪個數據流發出一個值了,combineLatest
函數就會將從兩個數據流最近發出的值a
和b
做爲f
函數的輸入,計算後返回一個輸出值(c = f(x,y)
),下面的圖表會讓這個函數的過程看起來會更加清晰:
stream A: --a-----------e--------i--------> stream B: -----b----c--------d-------q----> vvvvvvvv combineLatest(f) vvvvvvv ----AB---AC--EC---ED--ID--IQ----> f是轉換成大寫的函數
We can apply combineLatest() on close1ClickStream
and responseStream
, so that whenever the close 1 button is clicked, we get the latest response emitted and produce a new value on suggestion1Stream
. On the other hand, combineLatest() is symmetric: whenever a new response is emitted on responseStream
, it will combine with the latest 'close 1' click to produce a new suggestion. That is interesting, because it allows us to simplify our previous code for suggestion1Stream
, like this:
var suggestion1Stream = close1ClickStream .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);
One piece is still missing in the puzzle. The combineLatest() uses the most recent of the two sources, but if one of those sources hasn't emitted anything yet, combineLatest() cannot produce a data event on the output stream. If you look at the ASCII diagram above, you will see that the output has nothing when the first stream emitted value a
. Only when the second stream emitted valueb
could it produce an output value.
There are different ways of solving this, and we will stay with the simplest one, which is simulating a click to the 'close 1' button on startup:
var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this .combineLatest(responseStream, function(click, listUsers) {l return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);
這樣,咱們就能夠把combineLatest()
函數用在close1ClickStream
和 responseStream
上了,只要關閉按鈕被點擊,咱們就能夠得到最近的響應數據,並在suggestion1Stream
上產生出一個新值。另外一方面,combineLatest()
函數也是相對的:每當在responseStream
上發出一個新的響應,它將會結合一次新的點擊關閉按鈕事件
來產生一個新的推薦關注的用戶數據,這很是有趣,由於它能夠給咱們的suggestion1Stream
簡化代碼:
var suggestion1Stream = close1ClickStream .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);
One piece is still missing in the puzzle. The combineLatest() uses the most recent of the two sources, but if one of those sources hasn't emitted anything yet, combineLatest() cannot produce a data event on the output stream. If you look at the ASCII diagram above, you will see that the output has nothing when the first stream emitted value a
. Only when the second stream emitted valueb
could it produce an output value.
There are different ways of solving this, and we will stay with the simplest one, which is simulating a click to the 'close 1' button on startup:
var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this .combineLatest(responseStream, function(click, listUsers) {l return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);
如今,咱們的拼圖還缺一小塊地方。combineLatest()
函數使用了最近的兩個數據源,可是若是某一個數據源尚未發出任何東西,combineLatest()
函數就不能在輸出流上產生一個數據事件。若是你看了上面的ASCII圖表(文章中第一個圖表),你會明白當第一個數據流發出一個值a
時並無任何的輸出,只有當第二個數據流發出一個值b
的時候纔會產生一個輸出值。
這裏有不少種方法來解決這個問題,咱們使用最簡單的一種,也就是在啓動的時候模擬'close 1'的點擊事件:
var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this .combineLatest(responseStream, function(click, listUsers) {l return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null);
And we're done. The complete code for all this was:
var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click'); var closeButton1 = document.querySelector('.close1'); var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click'); // and the same logic for close2 and close3 var requestStream = refreshClickStream.startWith('startup click') .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var responseStream = requestStream .flatMap(function (requestUrl) { return Rx.Observable.fromPromise($.ajax({url: requestUrl})); }); var suggestion1Stream = close1ClickStream.startWith('startup click') .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null); // and the same logic for suggestion2Stream and suggestion3Stream suggestion1Stream.subscribe(function(suggestion) { if (suggestion === null) { // hide the first suggestion DOM element } else { // show the first suggestion DOM element // and render the data } });
咱們完成了,下面是封裝好的完整示例代碼:
var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click'); var closeButton1 = document.querySelector('.close1'); var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click'); // and the same logic for close2 and close3 var requestStream = refreshClickStream.startWith('startup click') .map(function() { var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; }); var responseStream = requestStream .flatMap(function (requestUrl) { return Rx.Observable.fromPromise($.ajax({url: requestUrl})); }); var suggestion1Stream = close1ClickStream.startWith('startup click') .combineLatest(responseStream, function(click, listUsers) { return listUsers[Math.floor(Math.random()*listUsers.length)]; } ) .merge( refreshClickStream.map(function(){ return null; }) ) .startWith(null); // and the same logic for suggestion2Stream and suggestion3Stream suggestion1Stream.subscribe(function(suggestion) { if (suggestion === null) { // hide the first suggestion DOM element } else { // show the first suggestion DOM element // and render the data } });
You can see this working example at http://jsfiddle.net/staltz/8jFJH/48/
That piece of code is small but dense: it features management of multiple events with proper separation of concerns, and even caching of responses. The functional style made the code look more declarative than imperative: we are not giving a sequence of instructions to execute, we are just telling what something is by defining relationships between streams. For instance, with Rx we told the computer that suggestion1Stream
is the 'close 1' stream combined with one user from the latest response, besides being null
when a refresh happens or program startup happened.
Notice also the impressive absence of control flow elements such as if
, for
, while
, and the typical callback-based control flow that you expect from a JavaScript application. You can even get rid of the if
and else
in the subscribe()
above by using filter()
if you want (I'll leave the implementation details to you as an exercise). In Rx, we have stream functions such asmap
, filter
, scan
, merge
, combineLatest
, startWith
, and many more to control the flow of an event-driven program. This toolset of functions gives you more power in less code.
你能夠在這裏看到可演示的示例工程
以上的代碼片斷雖小但作到不少事:它適當的使用關注分離(separation of concerns)原則的實現了對多個事件流的管理,甚至作到了響應數據的緩存。這種函數式的風格使得代碼看起來更像是聲明式編程而非命令式編程:咱們並非在給一組指令去執行,只是定義了事件流之間關係來告訴它這是什麼。例如,咱們用Rx來告訴計算機suggestion1Stream
是'close 1'事件結合從最新的響應數據中拿到的一個用戶數據的數據流,除此以外,當刷新事件發生時和程序啓動時,它就是null
。
留意一下代碼中並未出現例如if
, for
, while
等流程控制語句,或者像JavaScript那樣典型的基於回調(callback-based)的流程控制。若是能夠的話(稍候會給你留一些實現細節來做爲練習),你甚至能夠在subscribe()
上使用 filter()
函數來擺脫if
和else
。在Rx裏,咱們有例如: map
, filter
, scan
, merge
, combineLatest
, startWith
等數據流的函數,還有不少函數能夠用來控制事件驅動編程(event-driven program)的流程。這些函數的集合可讓你使用更少的代碼實現更強大的功能。
If you think Rx* will be your preferred library for Reactive Programming, take a while to get acquainted with the big list of functionsfor transforming, combining, and creating Observables. If you want to understand those functions in diagrams of streams, take a look at RxJava's very useful documentation with marble diagrams. Whenever you get stuck trying to do something, draw those diagrams, think on them, look at the long list of functions, and think more. This workflow has been effective in my experience.
Once you start getting the hang of programming with Rx, it is absolutely required to understand the concept of Cold vs Hot Observables. If you ignore this, it will come back and bite you brutally. You have been warned. Sharpen your skills further by learning real functional programming, and getting acquainted with issues such as side effects that affect Rx.
But Reactive Programming is not just Rx. There is Bacon.js which is intuitive to work with, without the quirks you sometimes encounter in Rx. The Elm Language lives in its own category: it's a Functional Reactive Programming language that compiles to JavaScript + HTML + CSS, and features a time travelling debugger. Pretty awesome.
Rx works great for event-heavy frontends and apps. But it is not just a client-side thing, it works great also in the backend and close to databases. In fact, RxJava is a key component for enabling server-side concurrency in Netflix's API. Rx is not a framework restricted to one specific type of application or language. It really is a paradigm that you can use when programming any event-driven software.
If this tutorial helped you, tweet it forward.
若是你認爲Rx將會成爲你首選的響應式編程庫,接下來就須要花一些時間來熟悉一大批的函數用來變形、聯合和建立被觀察者。若是你想在事件流的圖表當中熟悉這些函數,那就來看一下這個:RxJava's very useful documentation with marble diagrams。請記住,不管什麼時候你遇到問題,能夠畫一下這些圖,思考一下,看一看這一大串函數,而後繼續思考。以我我的經驗,這樣效果頗有效。
一旦你開始使用了Rx編程,請記住,理解Cold vs Hot Observables的概念是很是必要的,若是你忽視了這一點,它就會反彈回來並殘忍的反咬你一口。我這裏已經警告你了,學習函數式編程能夠提升你的技能,熟悉一些常見問題,例如Rx會帶來的反作用
可是響應式編程庫並不只僅是Rx,還有相對容易理解的,沒有Rx那些怪癖的Bacon.js。Elm Language則以它本身的方式支持響應式編程:它是一門會編譯成Javascript + HTML + CSS的響應式編程語言,並有一個time travelling debugger功能,很棒吧。
而Rx對於像前端和App這樣須要處理大量的編程效果是很是棒的。可是它不僅是能夠用在客戶端,還能夠用在後端或者接近數據庫的地方。事實上,RxJava就是Netflix服務端API用來處理並行的組件。Rx並非侷限於某種應用程序或者編程語言的框架,它真的是你編寫任何事件驅動程序,能夠遵循的一個很是棒的編程範式。
若是這篇教程對你有幫助, 那麼就請來轉發一下吧(tweet it forward).