本文是 Rxjs 響應式編程-第二章:序列的深刻研究這篇文章的學習筆記。示例代碼託管在:http://www.github.com/dashnowords/blogsjavascript
更多博文:《大史住在大前端》目錄html
文中使用到的一些基本運算符:前端
map
-映射filter
-過濾reduce
-有限列聚合scan
-無限列聚合flatMap
-拉平操做(重點)catch
-捕獲錯誤retry
-序列重試from
-生成可觀測序列range
-生成有限的可觀測序列interval
-每隔指定時間發出一次順序整數distinct
-去除出現過的重複值建議本身動手嘗試一下,記住就能夠了,有過lodash
使用經驗的開發者來講並不難。原文中使用flatMap
轉換序列時有一處應該是手誤:java
原文中在http
請求拿到獲取到數據後,最初使用了forEach
實現了手動流程管理,因而原文提出了優化設想,試圖探究如何依賴響應式編程的特性將手動的數據加工轉換改造爲對流的轉換,好讓最終的消費者可以拿到直接可用的數據,而不是獲得一個響應後手動進行不少後處理。在代碼層面須要解決的問題就是,如何在不使用手動遍歷的前提下將一個有限序列中的數據逐個發給訂閱者,而不是一次性將整個數據集發過去。git
假設咱們如今並不知道有flatMap
這樣一個可使用的方法,那麼先來作一些嘗試:github
var quakes = Rx.Observable.create(function(observer) { //模擬獲得的響應流 var response = { features:[{ earth:1 },{ earth:2 }], test:1 } /* 最初的手動遍歷代碼 var quakes = response.features; quakes.forEach(function(quake) { observer.onNext(quake); });*/ observer.onNext(response); }) //爲了能將features數組中的元素逐個發送給訂閱者,須要構建新的流 .map(dataset){ return Rx.Observable.from(dataset.features) }
當咱們訂閱quakes
這個事件流的時候,每次都會獲得另外一個Observable
,它是由於數據源通過了映射變換,從數據變成了可觀測對象。那麼爲了獲得最終的序列值,就須要再次訂閱這個Observable
,這裏須要注意的是可觀測對象被訂閱前是不啓動的,因此不用擔憂它的時序問題。編程
quakes.subscribe(function(data){ data.subscribe(function(quake){ console.log(quake); }) });
若是將Observable
當作一個盒子,那麼每一層盒子只是實現了流程控制功能性的封裝,爲了取得真正須要使用的數據,最終的訂閱者不得不像剝洋蔥似的經過subscribe
一層層打開盒子拿到最裏面的數據,這樣的封裝性對於數據在流中的傳遞具備很好的隔離性,可是對最終的數據消費者而言,倒是一件很麻煩的事情。segmentfault
這時flatMap
運算符就派上用場了,它能夠將冗餘的包裹除掉,從而在主流被訂閱時直接拿到要使用的數據,從大理石圖來直觀感覺一下flatMap
:數組
乍看之下會以爲它和merge
好像是同樣的,其實仍是有一些區別的。merge
的做用是將多個不一樣的流合併成爲一個流,而上圖中A1,A2,A3這三個流都是當主流A返回數據時新生成的,能夠將他們想象爲A的支流,若是你想在支流裏撈魚,就須要在每一個支流里布網,而flatMap
至關於提供了一張大網,將全部A的支流裏的魚都給撈上來。緩存
因此在使用了flatMap
後,就能夠直接在一級訂閱中拿到須要的數據了:
var quakes = Rx.Observable.create(function(observer) { var response = { features:[{ earth:1 },{ earth:2 }], test:1 } observer.onNext(response); }).flatMap((data)=>{ return Rx.Observable.from(data.features); }); quakes.subscribe(function(quake) { console.log(quake) });
若是本節的基本知識你尚不熟悉,能夠經過javascript基礎修煉(8)——指向FP世界的箭頭函數這篇文章來簡單回顧一下函數式編程的基本知識,而後再繼續後續的部分。
/*map運算符的做用 *對全部容器類而言,它至關於打開容器,進行操做,而後把容器再蓋上。 *Container在這裏只是一個抽象定義,爲了看清楚它對於容器中包含的值意味着什麼。 *你會發現它其實就是Observable的抽象原型。 */ Container.prototype.map = function(f){ return Container.of(f(this.__value)) } //基本的科裏化函數 var curry = function(fn){ args = [].slice.call(arguments, 1); return function(){ [].push.apply(args, arguments); return fn.apply(this, args); } } //map pointfree風格的map運算符 var map = curry(function(f, any_functor_at_all) { return any_functor_at_all.map(f); }); /*compose函數組合方法 *運行後返回一個新函數,這個函數接受一個參數。 *函數科裏化的基本應用,也是函數式編程中運算管道構建的基本方法。 */ var compose = function (f, g) { return function (x) { return f(g(x)); } }; /*IO容器 *一個簡單的Container實現,用來作流程管理 *這裏須要注意,IO實現的做用是函數的緩存,且老是返回新的IO實例 *能夠看作一個簡化的Promise,重點是直觀感覺一下它做爲函數的 *容器是如何被使用的,對於理解Observable有很大幫助 */ var IO = function(f) { this.__value = f; } IO.of = function(x) { return new IO(function() { return x; }); } IO.prototype.map = function(f) { return new IO(compose(f, this.__value)); }
若是上面的基本知識沒有問題,那麼就繼續。
如今來實現這樣一個功能,讀入一個文件的內容,將其中的a
字符所有換成b
字符,接着存入另外一個文件,完成後在控制檯輸出一個消息,爲了更明顯地看到數據容器的做用,咱們使用同步方法並將其包裹在IO
容器中,而後利用函數式編程:
var fs = require('fs'); //讀取文件 var readFile = (filename)=>IO.of(fs.readFileSync(filename,'utf-8')); //轉換字符 var transContent = (content)=>IO.of((content)=>content.replace('a','b')); //寫入字符串 var writeFile = (content)=>IO.of(fs.writeFileSync('dest.txt',content));
當具體的函數被IO
容器包裹起來而實現延遲執行的效果時,就沒法按原來的方式使用compose( )
運算符直接對功能進行組合,由於readFile
函數運行時的輸出結果(一個io
容器實例)和transContent
函數須要的參數類型(字符串)再也不匹配,在不修改原有函數定義的前提下,函數式編程中採用的作法是使用map
操做符來預置一個參數:
/* *map(transContent)是一個高階函數,它的返回函數就能夠接收一個容器實例, *並對容器中的內容執行map操做。 */ var taskStep12 = compose(map(transContent), readFile);
這裏比較晦澀,涉及到不少功能性函數的嵌套,建議手動推導一下taskStep12
這個變量的值,它的結構是這樣一種形式:
io{ __value:io{ __value:someComposedFnExpression } }
若是試圖一次性將全部的步驟組合在一塊兒,就須要採用下面的形式:
var task = compose(map(map(writeFile)),map(transContent),readFile); //組合後的task形式就是 //io{io{io{__value:someComposedFnExpression}}}
問題已經浮出水面了,每多加一個針對容器操做的步驟,書寫時就須要多包裹一層map
,而運行時就須要多進入一層才能觸及組合好的能夠實現真正功能的函數表達式,真的是很麻煩。
提示一:如今來回想一下原示例中的Observable對象,將其看作是一個容器(含有map類方法),那麼若是map方法調用時傳入的參數是一個運行時會生成新的Observable對象的方法時,就會產生Observable嵌套,獲得observable{observable{.....}}這樣的結構,那麼在最終的數據消費者經過
subscribe
方法訂閱數據時,就不得不用不少個subscribe
才能拿到實際須要的數據。提示二:
沒有相關經驗的讀者在使用pointfree風格的
map
操做符時可能會感到很是不適應,若是你以爲它很難理解,也能夠嘗試直接使用IO.prototype.map
這種鏈式調用風格的寫法將上例中的三個步驟組合在一塊兒來查看最後的結果,畢竟在Rxjs
中常使用的也就是Observable
這一個容器類。
當咱們看到問題所在後就不難發現,其實這個問題的解決方法並不複雜,咱們要作的不過就是在必要的時候合併內容的容器,爲此來定義兩個合併運算的方法:
//鏈式調用風格 IO.prototype.join = function(){ return this.isNothing() ? IO.of(null):this.__value; } //pointfree風格運算符 var join = (m)=>m.join();
這裏引入一個新的概念Monad
,它的定義是能夠被展平的容器,也就是說擁有join
和of
方法並遵循必定規則的容器,都是Monad
,在這種設定下,3.1中的示例就能夠被改寫爲下面的形式:
var task = compose(join,map(writeFile),join,map(transContent),readFile);
不難發現map
和join
老是須要成對出現的,那麼再利用函數科裏化的技巧將map
和join
連起來:
var chain = curry(function(f,m){ return m.map(f).join(); })
那麼組合後的函數就變成了下面的形式:
var task = compose(chain(writeFile),chain(transContent),readFile);
這裏的chain
,就是FlatMap
。
最後將上面幾種形式放在一塊兒再來回顧一下:
//原有形式 var task = compose(map(map(writeFile)),map(transContent),readFile); //map-join形式 var task = compose(join,map(writeFile),join,map(transContent),readFile); //chain形式(flatMap) var task = compose(chain(writeFile),chain(transContent),readFile);
若是理解了這幾種形式,就不難理解flatMap
的拉平效應了,所謂flatMap
,說白了其實就是將容器展開的一種操做。
flatMap
所解決問題,是在函數式編程引入了Functor
的概念將邏輯函數包裹在容器中後才產生的,那麼這種容器概念的引入對函數式編程到底有什麼意義,筆者還沒有搞清楚,相關內容留做之後補充。
《javascript函數式編程指南》https://llh911001.gitbooks.io/mostly-adequate-guide-chinese/content/