RxJS速成 (上)

What is RxJS?

RxJS是ReactiveX編程理念的JavaScript版本。ReactiveX是一種針對異步數據流的編程。簡單來講,它將一切數據,包括HTTP請求,DOM事件或者普通數據等包裝成流的形式,而後用強大豐富的操做符對流進行處理,使你能以同步編程的方式處理異步數據,並組合不一樣的操做符來輕鬆優雅的實現你所須要的功能node

下面廢話不說, 直接切入正題.typescript

 

準備項目

我使用typescript來介紹rxjs. 由於我主要是在angular項目裏面用ts.數據庫

全局安裝typescript:npm

npm install -g typescript

全局安裝ts-node:編程

npm install -g ts-node

創建一個文件夾learn-rxjs, 進入並執行:數組

npm init

安裝rxjs:異步

npm install rxjs --save

 

RxJS的主要成員

  • Observable: 一系列值的生產者
  • Observer: 它是observable值的消費者
  • Subscriber: 鏈接observer和observable
  • Operator: 能夠在數據流的途中對值進行轉換的操做符
  • Subject: 既包括Observable也包括Observer

Observable, Observer, Subscriber的角色關係:

工廠生產雜誌, 郵遞員去送雜誌, 就至關因而Observable, 郵遞員給你帶來了啥? 帶來了雜誌, 而後(next)雜誌, next雜誌.....函數式編程

把雜誌帶給了誰? 看看這對夫婦, 多是丈夫來付帳單訂雜誌, 他就是Subscriber. 而這本女性雜誌確定不是丈夫來看(若是他是正經丈夫的話), 而妻子沒有直接去訂閱雜誌, 可是她看這本雜誌有用(知道怎麼去用它).函數

因此能夠這樣理解, 丈夫(Subscriber)把Observable和Observer聯繫到了一塊兒, 就是Subscriber爲Observable提供了一個Observer(丈夫訂雜誌, 告訴快遞員把貨給他媳婦就行).google

Observable能夠在Observer上調用三種方法(快遞員跟他妻子可能會有三種狀況...好像這麼說不太恰當), 當Observable把數據(雜誌)傳遞過來的時候, 這三種狀況是:

  • next(), 這期雜誌送完了, 等待下一期吧
  • error(), 送雜誌的時候出現問題了, 沒送到.
  • complete(), 訂的雜誌都處理完了, 之後不送了.

下面這個圖講的就是從Observable訂閱消息, 而且在Observer裏面處理它們:

Observable容許:

  • 訂閱/取消訂閱它的數據流
  • 發送下一個值給Observer
  • 告訴Observer發生了錯誤以及錯誤的信息
  • 告訴Observer整個流結束了.

Observer能夠提供:

  • 一個能夠處理流(stream)上的next的值的function
  • 處理錯誤的function
  • 處理流結束的function

 

建立Observable

  • Observable.from(), 把數組或iterable對象轉換成Observable
  • Observable.create(), 返回一個能夠在Observer上調用方法的Observable.
  • Observable.fromEvent(), 把event轉換成Observable.
  • Observable.fromPromise(), 把Promise轉換成Observable.
  • Observable.range(), 在指定範圍內返回一串數.

Observable.from()

observable_from.ts:

import { Observable } from "rxjs/Observable"; // 這裏沒有使用Rx對象而是直接使用其下面的Observable對象, 由於Rx裏面不少的功能都用不上.
import 'rxjs/add/observable/from'; // 這裏我須要使用from 操縱符(operator)

let persons = [
    { name: 'Dave', age: 34, salary: 2000 },
    { name: 'Nick', age: 37, salary: 32000 },
    { name: 'Howie', age: 40, salary: 26000 },
    { name: 'Brian', age: 40, salary: 30000 },
    { name: 'Kevin', age: 47, salary: 24000 },
];

let index = 1;
Observable.from(persons)
    .subscribe( person => { console.log(index++, person); }, err => console.log(err), () => console.log("Streaming is over.") );

subscribe裏面有3個function, 這3個function就是Observer.

第一個function是指當前這個person到來的時候須要作什麼;

第二個是錯誤發生的時候作什麼;

第三個function就是流都走完的時候作什麼.

注意, 是當執行到.subscribe()的時候, Observable纔開始推送數據.

運行這個例子須要執行下面的命令:

ts-node observable_from.ts

 

Observable.create()

Observable.create是Observable構造函數的一個別名而已. 它只有一個參數就是subscribe function. 

observable_creates.ts:

import { Observable } from "rxjs/Observable";

function getData() {

    let persons = [
        { name: 'Dave', age: 34, salary: 2000 },
        { name: 'Nick', age: 37, salary: 32000 },
        { name: 'Howie', age: 40, salary: 26000 },
        { name: 'Brian', age: 40, salary: 30000 },
        { name: 'Kevin', age: 47, salary: 24000 },
    ];

    return Observable.create(
        observer => { // 這部分就是subscribe function
            persons.forEach(p => observer.next(p)); observer.complete(); }
    );
}

getData()
    .subscribe(
        person => console.log(person.name),
        err => console.error(err),
        () => console.log("Streaming is over.")
    );

create裏面的部分是subscribe function. 這部分能夠理解爲, 每當有人訂閱這個Observable的時候, Observable會爲他提供一個Observer.

在這裏面, observer使用next方法對person進行推送. 當循環結束的時候, 使用complete()方法通知Observable流結束了.

儘管getDate裏面create了Observable, 可是整個數據流動並非在這時就開始的. 在這個地方, 這只不過是個聲明而已.

只有當有人去訂閱這個Observable的時候, 整個數據流纔會流動.

運行該文件:

 

RxJS Operator(操做符)

 

Operator是一個function, 它有一個輸入, 還有一個輸出. 這個function輸入是Observable輸出也是Observable.

在function裏面, 能夠作一些轉換的動做

下面是幾個例子:

observablePersons.filter(p => p.age > 40);

這個filter function和數組的filter相似, 它接受另外一個function(也能夠叫predicate)做爲參數, 這個function提供了某種標準, 經過這個標準能夠斷定是否當前的元素能夠被送到訂閱者那裏.

p => p.age > 40

 

這個function, 是pure function, 在functional programming(函數式編程)裏面, pure function是這樣定義的: 若是參數是同樣的, 不管外界環境怎麼變化, 它的結果確定是同樣的.

pure function不與外界打交道, 不保存到數據庫, 不會存儲文件, 不依賴於時間....

而這個filter function呢, 在函數式編程裏面是一個high order function.

什麼是High order function

若是一個function的參數能夠是另外一個function, 或者它能夠返回另外一個function, 那麼它就是High Order function.

 

Marble 圖

首先記住這個網址: http://rxmarbles.com/

有時候您能夠經過文檔查看operator的功能, 有時候文檔不是很好理解, 這時你能夠參考一下marble 圖.

例如 map:

能夠看到map接受一個function做爲參數, 經過該function能夠把每一個元素按照function的邏輯進行轉換.

 

例如 filter:

 

filter就是按條件過濾, 只讓合格的元素經過.

 

例 debounceTime (恢復時間):

若是該元素後10毫秒內, 沒有出現其它元素, 那麼該元素就能夠經過.

 

例 reduce:

這個也和數組的reduce是一個意思.

 

例子

import { Observable } from "rxjs/Observable"; import 'rxjs/add/observable/from'; import 'rxjs/add/operator/map'; import 'rxjs/add/operator/reduce';


let persons = [
    { name: 'Dave', age: 34, salary: 2000 },
    { name: 'Nick', age: 37, salary: 32000 },
    { name: 'Howie', age: 40, salary: 26000 },
    { name: 'Brian', age: 40, salary: 30000 },
    { name: 'Kevin', age: 47, salary: 24000 },
];

Observable.from(persons)
    .map(p => p.salary)
    .reduce((total, current) => total+ current, 0)
    .subscribe(
        totalSalary => console.log(`Total salary is ${totalSalary}`),
        err => console.log(err)
    );

 

這個例子很是的簡單, 典型的map-reduce, 就不講了.

結果以下:

 

用現實世界中鍊鋼生產流程的例子來解釋使用Operator來進行Reactive數據流處理的過程:

原料(礦石)整個過程當中會通過不少個工做站, 這裏每一個工做站均可以看做是RxJS的operator, 原料通過這些operator以後, 成品就被生產了出來.

每一個工做站(operator)都是能夠被組合使用的, 因此能夠再加幾個工做站也行.

 

錯誤處理

Observable是會發生錯誤的, 若是錯誤被髮送到了Observer的話, 整個流就結束了.

可是作Reactive編程的話, 有一個原則: Reactive的程序應該頗有彈性/韌性.

也就是說, 即便錯誤發生了, 程序也應該繼續運行.

可是若是error function在Observer被調用了的話, 那就太晚了, 這樣流就中止了.

那麼如何在error到達Observer以前對其進行攔截, 以便流能夠繼續走下去或者說這個流中止了,而後另一個流替它繼續走下去?

錯誤處理的Operators:

  • error() 被Observable在Observer上調用
  • catch() 在subscriber裏而且在oserver獲得它(錯誤)以前攔截錯誤,
  • retry(n) 當即重試最多n次
  • retryWhen(fn) 按照參數function的預約邏輯進行重試

使用catch()進行錯誤處理:

observable_catch.ts:

import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/from';
import 'rxjs/add/operator/catch';
import 'rxjs/add/operator/map';

function getFromGoogle(): Observable<any> {
    return Observable.create(function subscribe(observer) {
        observer.next('https://google.com');
        observer.error({
            message: 'Google can\'t be reached.',
            status: 404,
        });
        observer.complete();
    });
}

function getFromBing(): Observable<any> {
    return Observable.create(function subscribe(observer) {
        observer.next('https://global.bing.com');
        observer.complete();
    });
}


function getFromBaidu(): Observable<any> {
    return Observable.create(function subscribe(observer) {
        observer.next('https://www.baidu.com');
        observer.complete();
    });
}

getFromGoogle()
    .catch(err => {
        console.error(`Error: ${err.status}: ${err.message}`);
        if(err.status === 404) {
            return getFromBaidu();
        } else {
            return getFromBing();
        }
    })
    .map(x => `The site is : ${x}`)
    .subscribe(
        x => console.log('Subscriber got: ' + x),
        err => console.error(err),
        () => console.log('The stream is over.')
    );

 

在subscribe以前調用catch, catch裏能夠進行流的替換動做.

運行結果以下:

至關於:

 

Hot 和 Cold Observable

Cold: Observable能夠爲每一個Subscriber建立新的數據生產者

Hot: 每一個Subscriber從訂閱的時候開始在同一個數據生產者那裏共享其他的數據.

從原理來講是這樣的: Cold內部會建立一個新的數據生產者, 而Hot則會一直使用外部的數據生產者.

舉個例子:

Cold: 就至關於我在騰訊視頻買體育視頻會員, 能夠從頭看裏面的足球比賽.

Hot: 就至關於看足球比賽的現場直播, 若是來晚了, 那麼前面就看不到了.

 

Share Operator

share() 操做符容許多個訂閱者共享同一個Observable. 也就是把Cold變成Hot.

例子 observable_share.ts:

import { Observable } from "rxjs/Observable";
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/take';
import 'rxjs/add/operator/share';

const numbers = Observable
    .interval(1000)
    .take(5)
    .share();

function subscribeToNumbers(name) {
    numbers.subscribe(
        x => console.log(`${name}: ${x}`)
    );
}

subscribeToNumbers('Dave');

const anotherSubscription = () => subscribeToNumbers('Nick');

setTimeout(anotherSubscription, 2500);

 

這裏interval是每隔1秒產生一個數據, take(5)表示取5個數, 也就是1,2,3,4,5.

而後share()就把這個observable從cold變成了hot的.

後邊Dave進行了訂閱.

2.5秒之後, Nick進行了訂閱.

最後結果是:

相關文章
相關標籤/搜索