Rxjs 核心概念

又一年要過去了,回顧2017,rxjs始終是我在項目裏使用最頻繁的庫,在我看來,它是一個很是優秀的數據處理工具。年初的時候就計劃寫點什麼,礙於目前公司的項目實在抽不出時間,這一拖就到了年末。
臨近新年,總算忙裏偷閒,但又不知道從何寫起,因而乎偷了個懶拿起了官方文檔開始翻譯。如下的文字僅供各位參考,仍是強烈建議去看官方文檔javascript

Rxjs概述

簡介


rxjs是一個使用觀察者模式來整合異步操做和事件系統的js庫,經過一系列可觀測的流(observable)將它們串聯起來。Observable是這個庫的核心類型,此外還包括諸如Observer,Schedulers,Subjects等類型。還包括一些和數組方法相似或通過演化的操做符,用來協助處理數據。java

  • <font face="仿宋">_能夠把 rxjs 想像成一個能夠發射事件的 lodash 庫。_</font>

響應式設計結合了觀察者模式,迭代模式和基於集合的函數式編程風格,從而提供了一種處理事件序列的理想方式。react

在rxjs中用來處理異步事件的核心概念包括:正則表達式

  • observable: 表明了將來可能會產生的一系列的值或事件的集合;
  • observer: 回調函數的集合,它知道如何去處理observable上產生的值或者事件,固然也包括異常。
  • subscription: 表明當前正在訂閱的observable,主要做用是用來取消訂閱行爲。
  • operators: 純函數,以函數式的風格經過各類各樣的操做符相互配合對observable上產生的數據集合進處理。
  • subject: 至關於一個事件發射器,容許將相同的值傳遞給多個訂閱者。
  • schedulers: 協調observable上數據的發射方式。

示例


在javascript中一般使用如下方式來註冊事件:typescript

var button = document.querySelector('button');

button.addEventListener('click', () => console.log('Clicked'));

使用rxjs的方式實現以下:編程

var button = document.querySelector('button');

Rx.Observable.fromEvent(button, 'click')
    .subscribe(() => console.log('Clicked'));

純度


使用不純的函數時,函數可能會改變外部數據的狀態,好比:數組

var count = 0;

var button = document.querySelector('button');

button.addEventListener('click', () => console.log(`Clicked ${++count} times`));

當使用rxjs時,必須把這些狀態隔離出去,好比:promise

var button = document.querySelect('button');

Rx.Observable.fromEvent(button, 'click')
    .scan(count => count + 1, 0)
    .subscribe(count => console.log(`Clicked ${count} times`));

這裏的scan操做符的行爲和數組的reduce方法的行爲很是相似,它提供一個初始值給迭代函數,迭代函數運行後的值將做爲下一次迭代的初始值。安全


rxjs提供了一套很是豐富的操做符來幫助用戶控制數據或事件如何在observable中進行流動。假如咱們須要控制用戶在一個按鈕上每秒最多隻能點擊一次數據結構

使用普通的javascript代碼:

var count = 0;

var rate = 1000;

var lastClick = Date.now() - rate;

var button = document.querySelector('button');

button.addEventListener('click', () => {
    if(Date.now() - lastClick >= rate) {
        console.log(`Clicked ${++count} timers`);
        lastClick = Date.now();
    }
});

使用rxjs:

var button = button.querySelector('button');

Rx.Observable.fromEvent(button, 'click')
    .throttleTime(1000)
    .scan(count => count + 1, 0)
    .subscribe(count => console.log(`Click ${count} times`));

諸如此類對流進行控制的操做符還有:filter, delay, debounceTime, take, takeUntil, distinct, distinctUntilChanged等等。


你能夠對流中的數據進行轉換,當咱們須要獲得每一次鼠標點擊時的x軸座標時,

使用普通的javascript代碼:

var count = 0;

var rate = 1000;

var lastClick = Date.now() - rate;

var button = document.querySelector('button');

button.addEventListener('click', (event) => {
    if(Date.now() - lastClick >= rate) {
        count += event.clientX;
        console.log(count);
        lastClick = Date.now();
    }
});

使用rxjs:

var button = button.querySelector('button');

Rx.Observable.fromEvent(button, 'click')
    .throttleTime(1000)
    .map(event => event.ClientX)
    .scan((count,clientX) => count + clientX, 0)
    .subscribe(count => console.log(count));

諸如此類能夠產生新值的操做符還有:pluck,pairwise,sample等等。

可觀測序列-Observable


可觀測序列能夠推送多個值,而且它的推送方式是‘懶’的,它知足下面表格中列出的條件

single multiple
pull function iterator
push promise observable

下面這個例子中的observable在被訂閱時能夠當即推送出1,2,3這個三個值,1秒之後推送第四的值4,而後當即結束。

var observable = Rx.Observable.create(function(observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    setTimeout(() => {
        observer.next(4);
        observer.complete();
    },1000);
});

爲了獲取到這個observable上的值, 咱們須要對它進行訂閱:

var observable = Rx.Observable.create(function(observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    setTimeout(() => {
        observer.next(4);
        observer.complete();
    },1000);
});

console.log('just before subscribe');

observable.subscribe({
    next: x => console.log('got value ' + x),
    error: err => console.error('something wrong occurred: ' + err),
    complete: () => console.log('done'),
});

console.log('just after subscribe);

這段代碼執行後將會獲得以下結果:

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

pull vs push


這裏使用 pull 和 push 來描述值的生產者和消費者之間是如何發生聯繫的,它們是兩種徹底不一樣的協議。

在pull的系統中,值的消費決定什麼時間從生產者上獲取數據,生產者自己並不關心數據什麼時間分發給消費者。

每個javascript函數均可以看做一個 pull 類型的系統。函數能夠產生值,但這是經過調用者主動調用函數,函數運行產生出值返回給調用者的方式進行的,因此能夠理解爲調用者主動去函數上拉取了值。

ES2015中介紹另外兩種 pull 類型的系統,generator函數 和 iterator。對於它們來說,遍歷器對象的 next 方法能夠視做值的消費者,經過iterator.next()能夠獲取到多個值。

Producer Consumer
pull 被動:當被調用時產生值 主動:決定什麼時候發起調用
push 主動:按自身的設置產生值 被動:響應接收到的值

在push的系統中,生產者決定何時發送值給消費者,消費者並知道何時能夠接收到值。

ES6中的 promise 就是一個很是典型的 push 系統,promise 將 resolve 的結果傳遞給註冊在它內部的回調函數,這與普通的函數有很大的不一樣,回調函數什麼時候能夠接收到數據徹底取決於 promise 什麼時間向它傳遞數據。

rxjs的 Observable 也是一種 push 類型的系統,一個 Observable 能夠產生多個值,而後推送給它的訂閱者。

  • Function 是一個 ‘懶’ 的求值過程,只有在被調用時它纔會同步的返回一個值給調用者。
  • generator 也是一個 ’懶‘ 的求值過種,在遍歷的過程當中同步的返回0個或多個值給調用者。
  • Promise 通過運算後可能產生一個值,固然也可能產生一個錯誤。
  • Observable 也是一個‘懶’的求值過程,當它被訂閱後能夠同步或者異步的產生出0個或者無限多個值給調用者,這個過程將一直持續到訂閱被取消或者流結束。

Observable 概述


不少人認爲Observable就是一個事件發射器或能夠產生多個值的promise,實際上這種觀點是不正確的。在某些狀況下Observable的行爲可能與事件發射器的行爲相似,例如使用Subject來發射值時,但一般狀況下它們的行爲與事件發射器有很大的不一樣。

  • <font face="仿宋">_能夠把 Observable 想像成一個不接受參數的函數,可是它卻能夠產生出不少的值。_</font>

請思考如下代碼:

function foo() {
    console.log('hello');
    return 42;
}

var x = foo.call();

console.log(x);

var y = foo.call();

console.log(y);

運行後的輸出:

"hello"
42
"hello"
42

使用rxjs的方式:

var foo = Rx.Observable.create(function(observer) {
    console.log('hello');
    observable.next(42);
});

foo.subscribe(function (x) {
    console.log(x);
});

foo.subscribe(function (y) {
    console.log(y);
});

運行後的輸出依然是:

"hello"
42
"hello"
42

首先,函數和Observable的求值過程都是‘懶’的,若是你不調用foo函數,console.log('hello')將不會執行,這對於Observable也是同樣的,只要不訂閱,console.log('hello')一樣也不會執行。另外,每一次的訂閱(subscribe)和調用(call)都是互不干擾的,兩次函數調用會觸發兩次執行過程,產生兩次反作用,一樣,兩次訂閱也會觸發兩次訂閱過程。對於事件發射器來講卻不一樣,它產生的反作用會傳遞給每個事件接收者,它並不理會有沒有接收者接收事件,都會把事件發射出去,這與Observable有很大的不一樣。

  • <font face="仿宋">_訂閱一個Observable與調用一個函數的過程相似。_</font>

你可能會認爲Observable都是異步的,事實上並非這樣,咱們看一下函數的調用過程:

console.log('before');
console.log('foo.call()');
console.log('after');

執行後的輸出:

"before"
"hello"
42
"after"

而後以一樣的順序,可是使用Observable:

console.log('before');
foo.subscribe(function(x) {
    console.log(x);
});
console.log('after');

你會發現結果是相同的:

"before"
"hello"
42
"after"

這個結果證實了foo這個Observable徹底是同步執行的,和調用一個函數沒有什麼兩樣。

  • <font face="仿宋">_Observable傳遞值的方式能夠是同步也能夠是異步的。_</font>

那麼Observable與函數之間有什麼樣的區別?Observable被訂閱後能夠返回多個值,這是普通的函數所沒法作到的。例如,你沒法像下面這樣去實現一個函數:

function foo() {
    console.log('hello');
    return 42;
    return 100; // 這句永遠不會執行
}

由於函數只能返回一個值,return 100; 會被解釋器忽略,若是是在typescript中,編譯器會告訴你這裏有一個錯誤。然而這對於Observable來講卻不是問題:

var foo = Rx.Observable.create(function (observer) {
    console.log('hello');
    observer.next(42);
    observer.next(100); // '返回‘另一個值
    observer.next(200); // 同上
});

console.log('before');
foo.subscribe(function(x) {
    console.log(x);
});
console.log('after');

上面的代碼執行後會以同步的方式輸出:

"before"
"Hello"
100
42
200
"after"

固然, 你也可讓它以異步的方式來返回值:

var foo = Rx.Observable.create(function (observer) {
    console.log('hello');
    observer.next(42);
    observer.next(100); // '返回‘另一個值
    observer.next(200); // 同上
    setTimeout(() => {
        observer.next(300);
    },1000);
});

console.log('before');
foo.subscribe(function(x) {
    console.log(x);
});
console.log('after');

執行後的輸出:

"before"
"Hello"
100
42
200
"after"
300

總結:

  • func.call() 表示以同步的方式返回一個值。
  • observable.subscribe() 表示以同步或者異步的方式返回一些值。

Observable詳解


咱們可使用Rx.Observable.create方法或者那些能夠建立 Observable 的操做符來建立 Observable,而後使用一個觀察者來觀察 Observable,當 Observable 被訂閱時它就會經過 next/error/complete/ 方法來通知觀察者,當觀察者放棄觀察行爲時如何處理 Observable 運行產生的數據。這四個動做都被編碼在一個可觀察的實例中,但其中一些動做與其它動做是相關的,如觀察和訂閱。

使用Observable時要關注的核心問題:

  • 如何建立
  • 被誰訂閱
  • 怎麼去執行
  • 廢棄時如何處理

建立Observable


Rx.Observable.create方法實際是 Observable 類的構造函數的別名,它接受一個參數——訂閱函數。下面這個示例會建立一個observable,它會每秒發一個 string 類型的值‘hi’ 給它的觀察者。

var observable = Rx.Observable.create(function subscribe(observer) {
    var id = setInterval(() => {
        observer.next('hi');
    },1000);
});
  • <font face="仿宋">_能夠經過 create 方法來建立 Observable,但實際狀況下咱們使用更多的是那些能夠建立出 Observable 的操做符,例如:from, interval 等。_</font>

上面代碼中的 subscribe 函數對於解釋 Observable 來講是很是重要的一部分,接下來解釋如何去理解它。

訂閱Observable


上面例子中的observable能夠這樣被訂閱:

observable.subscribe(x => console.log(x));

observable變量上有 subscribe 方法,一樣 Observable.create(function subscribe(observer) { ... }) 也有subscribe 方法,這並非一個巧合。在庫的實現中,這兩個subscribe是不同的,但在實際使用中你能夠認爲它們在概念上是相等的。

這個示例很好的展現了爲何訂閱行爲在多個訂閱者之間是不會共享的。當咱們使用一個觀察者來訂閱 observable 時,傳入 create 方法的 subscribe 函數就會爲這個觀察者運行一次。每次調用 observable.subscribe 都會爲給定的觀察者觸發它本身的獨立設置。

  • <font face="仿宋">_訂閱Observable和執行一個函數很相似,均可以提供一個回調函數來接收將要產生的值。_</font>

這個過程與諸如 addEventListener/removeEventListener 等事件處理API有很大的區別。在 observable.subscribe 的過程當中,給定的觀察者並無註冊成爲 Observable 上的監聽者。Observable 甚至不須要維護一個觀察者名單。

經過 subscribe 這種簡單的方式就可使一個 Observable 開始執行,而且將產生的數據或事件傳遞給當前 Observable 環境上的觀察者。

執行Observable


Observable.create(function subscribe(observer) { ... })內部的代碼表明瞭 Observable 的執行,一個‘懶’執行過程——僅在有觀察者訂閱它的時候執行,當它被執行後就能夠以同步或異步的方式不斷的生產值。

Observable 執行後能夠產生三種類型的值:

  • ’Next‘:發送一個正常的值,好比String, Number 或者 Object 等。
  • ‘Error’:發送一個javascript的錯誤或者拋出異常。
  • ’Complete‘:中止發送值。

Next類型的通知是最重要也是最經常使用的一種,它表明了將要傳遞給觀察者的數據。Error 和 Complete 類型的通知在整個Observable的執行過程當中只能夠發生一次,而且它們是互斥的,只有一個能夠發生。

以正則表達式來表述三者之間的關係的話,應該是:

next*(error|complete)?
  • <font face="仿宋">_在Observable執行的過程當中,能夠發生0個或無限多個Next類型的通知,可是當 Error 或 Complete 類型的通知中有一個發生後,Observable將不會再傳遞任何數據。_</font>

下面的代碼展現了一個能夠產生三個 Next 類型通知的 Observable,最後發送一個 Complete 通知:

var observable = Rx.Observable.create(function subscribe(observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
});

Observable 將嚴格遵照上述規則,因此試圖在Observable完成後繼續傳遞值的行爲是無效的。

var observable = Rx.Observable.create(function subscribe(observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
    observer.next(4); // 不會傳遞成功
});

若是Observable在執行過程當中發生異常,咱們能夠在subscribe函數內部使用 try/catch 塊來捕獲它:

var observable = Rx.Observable.create(function subscribe(observer) {
    try {
        observer.next(1);
        observer.next(2);
        observer.next(3);
        observer.complete();
    } catch(error) {
        observer.error(error); // 傳遞錯誤
    }
});

處理Observable


因爲 Observable 可能產生出無限多個值,但觀察者可能在某時間點想中止觀察行爲,這就須要有一種方法來取消 Observable 的執行。因爲每一個執行只對應一個觀察者,一旦觀察者完成接收值,它就須要知道如何中止執行,以免浪費計算或內存資源。

當咱們調用 observable.subscribe 時,觀察者會被附加在新建立的可觀察上下文中,同時此次調用會返回一個對象,它就是 Subscription 。

var subscription = observable.subscribe(x => console.log(x));

Subscription 表明了正在執行的上下文,它擁有一個能夠取消 Observable 執行的方法——unsubscribe。調用這個方法 subscription.unsubscribe() 就能夠取消執行。

var observable = Rx.Observable.from([10,20,30]);
var subscription = observable.subscribe(x => console.log(x));
// 一段時間之後
subscription.unsubscribe();
  • <font face="仿宋">_當你對一個 Observable 進行訂閱時,就會得到一個表明當前執行的 Subscription ,只須要調用它的 unsubscribe 方法就能夠取消訂閱。_</font>

當咱們使用 Observable.create 方法來建立 Observable 時,必須定義當取消訂閱時如何去釋放執行時的資源,此時能夠返回一個自定義的 unsubscribe 函數。

咱們能夠給以前的例子添加上它的 unsubscribe 方法:

var observable  Rx.Observable.create(function subscribe(observer) {
    var intervalID = setInterval(() => {
        observer.next('hi');
    },1000);

    return function unsubscribe() {
        clearInterval(intervalID);
    }
});

與 observable.unsubscribe 同樣,咱們可經過調用 unsubscribe 方法對這個流取消訂閱,這兩個 unsubscribe 在概念上是徹底相同的。

事實上在這個例子中,假如咱們把響應式的外殼剝離的話,它徹底就是一個普普統統的javascript函數:

function subscribe(observer) {
    var intervalID = setInterval(() => {
        observer.next('hi');
    },1000);

    return function unsubscribe() {
        clearInterval(intervalID);
    }
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

儘管如此,咱們依然有理由去使用Rx這種響應式的編程,經過 Observable,Observer,Subscription,配合各類操做符來實現更高效安全的數據處理。

觀察者——Observer


觀察者實際就是可觀察序列——Observable的消費者。Observer 會設置一些回調函數來對應 Observable 的各個通知類型(next/error/complete),從這些通知中接收值,而後對它們進行處理。

這是一個很典型的 Observer:

var observer = {
    next: x => console.log('Observer got a next value: ' + x),
    error: err => console.error('Observer got an error: ' + err),
    complete: () => console.log('Observer got a complete notification')
};

咱們可使用它來觀察一個 Observable:

observable.subscribe(observer);
  • <font face="仿宋">_觀察者就是一個簡單對象,擁有三個方法,每個方法對應處理可觀察序列上的相應類型的通知。_</font>

Rxjs中的觀察者容許只實現這三個方法中的某幾個,這對 Observable 的執行過程不會產生影響,僅僅是被忽略方法的對應通知沒法獲得處理,由於觀察者想要處理相應通知上的數據時必須實現對應的方法。

下面這個示例的 Observer 就沒有實現處理 complete 方法:

var observer = {
    next: x => console.log('Observer got a next value: ' + x),
    error: err => console.error('Observer got an error: ' + err),
};

當須要訂閱一個 Observable 時,能夠只傳入一個回調函數做爲參數,而沒必要傳入觀察者對象:

observable.subscribe(x => console.log('Observer got a next value: ' + x));

實際上在庫的內部實現中會根據傳入的參數爲咱們建立出一個 Observer 對象,此時,傳入的第一個函數做爲next方法的回調,第二個是error回調,第三個是complete回調。也就是說咱們能夠這樣去訂閱一個 Observable :

observable.subscribe(
    x => console.log('Observer got a next value: ' + x),
    err => console.error('Observer got an error: ' + err),
    () => console.log('Observer got a complete notification')
);

Subscription


Subscription是一個表明着當前正在執行的 Observable 的對象。它有一個很是重要的方法 unsubscribe ,這個方法不接受任何參數,僅僅是用來釋放 Observable 執行時的資源。在舊版本的Rxjs中,它也叫作 Disposable 。

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// 執行一段時間後

// 下面的操做將會取消上面已經訂閱的 Observable 的執行。
subscription.unsubscribe();
  • <font face="仿宋">_Subscription本質上能夠只有一個 unsubscribe 方法,用來取消已經訂閱的 Observable 的執行。_</font>

Subscription 能夠被組合在一塊兒,這樣只要調用一次 unsubscribe 方法就能夠將多個 Observable 的執行取消掉。咱們能夠經過 subscription 的 add 方法將它們添加在一塊兒。

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));

var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
    // 取消上面的兩次訂閱
    subscription.unsubscribe();
},1000);

執行後將輸出:

second:0
first:0
second:1
first:1
second:2

固然,能夠添加,就能夠移除,Subscription一樣擁有一個remove方法,用來從一組subscription 中移除指定的子 subscription。

Subject


Subject 是 Observable 的一種特殊類型,它容許把值同時傳播給多個 Observer,也就是說它是多播的。相比之下,普通的Observable 是單播的,每個 Observer 在訂閱 Observable 時都有其獨立的執行上下文。

  • <font face="仿宋">_Subject 相似於 Observable ,但它能夠把值同時廣播給多個觀察者。它也相似於一個事件發射器,所以它會維護一個屬於本身的監聽者列表。_</font>

每個 Subject 都是一個 Observable。你能夠提供一個訂閱者來訂閱 Subject,從而在它上面取值。從觀察者的角度來看,並不能區分出數據是從 Subject 上獲取的仍是從 Observable 上獲取的。

在 Subject 的內部實現中,並不會產生新的執行上下文來傳遞數據,它僅僅是簡單的將 Observer 註冊在本身的監聽者列表中,這與其它的庫或語言中添加事件的機制是相似的。

每個 Subject 都是一個 Observer。每個 Subject 上都有本身的 next,error,complete方法。當須要給 Subject 上添加一個值時,只要調用它的next方法,接下來它就會把這個值廣播給註冊在監聽者列表中的多個監聽者。

下面的例子中,咱們給 Subject 添加了兩個觀察者,而後給它添加一些值:

var subject = new Rx.Subject();

subject.subscribe({
    next: v => console.log('observerA: ' + v)
});

subject.subscribe({
    next: v => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

執行後的輸出:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

因爲 Subject 也是一個 Observer ,因此你能夠把他傳遞給一個 Observable 的subscribe方法:

var subject = new Rx.Subject();

subject.subscribe({
    next: v => console.log('observerA: ' + v)
});

subject.subscribe({
    next: v => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1,2,3]);

observable.subscribe(subject); // 經過 Subject 來訂閱這條 Observable

執行後輸出:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

經過上面的方法,咱們基本上就藉助 Subject 把一個單播的 Observable 轉換成了多播的。這個示例僅僅是演示了一種將一個執行上下文分享給多個觀察者的方法。

在 Subject 類型下,還有一些比較特殊的 Subject 類型:BehaviorSubject,ReplaySubject,AsyncSubject。

多播的Observable


一個‘多播’的 Observable 能夠藉助於 Subject 實現將數據通知給多個觀察者,然而單播的 Observable 僅僅只能把通知發送給一個觀察者。

  • <font face="仿宋">_多播的 Observable 會使用 Subject 做爲鉤子來實現將同一個 Observable 的執行傳遞給多個觀察者。_</font>

這就是那些多播的操做符實現時的真實狀況:觀察者訂閱底層的一個 Subject,使用這個 Subject 來訂閱產生源數據的 Observable。

下面這個例子和以前那個使用 subject 來訂閱 Observable 的例子很是相似:

var source = Rx.Observable.from([1,2,3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// 使用 subject.subscribe({...}) 實現
multicasted.subscribe({

next: v => console.log('observerA: ' + v)

});
multicasted.subscribe({

next: v => console.log('observerB: ' + v)

});

//使用 source.subscribe(subject) 實現

multicasted.connect();

上面代碼中的multicast方法返回的 Observable 看起來像一個普通的 Observable,可是當它被訂閱時,執行的方式卻和 Subject 同樣是多播的,同時這個 Observable 還有一個connect方法,能夠將多個流串聯在一塊兒。

什麼時候調用這個connect方法很是重要,由於它決定了這條能夠共享的流什麼時間開始執行。因爲connect方法在內部執行了 source.subscribe(subject) ,所以它會返回一個 Subscription ,能夠經過它來取消這個共享的 Observable 的執行。

引用計數


經過手動調用connect方法來獲得 Subscription 多少顯得有些笨重,一般狀況下,咱們但願當第一個觀察者抵達時能夠自動的調用connect方法,當最後一個觀察者取消訂閱時自動的取消這個共享 Observable 的執行。

請考慮實現如如下列表所概述的訂閱:

  1. 第一個觀察者訂閱多播流。
  2. 多播流被connect。
  3. 0 被傳遞給第一個訂閱者。
  4. 第二個觀察者訂閱這條多播流。
  5. 1 被傳遞給第一個訂閱者。
  6. 1 被傳遞給第二個訂閱者。
  7. 第一個訂閱者取消訂閱多播流。
  8. 2 被 傳遞給第二個訂閱者。
  9. 第二個訂閱者取消訂閱多播流。
  10. 取消這條多播流的訂閱。

爲了達到這個效果,咱們須要顯式的調用connect方法:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
    next: v => console.log('observerA: ' + v)
});

// 在這裏咱們須要顯示的調用connect方法以使第一個訂閱者能夠開始接收值
subscriptionConnect = multicasted.connect();

setTimeout(() => {
    subscription2 = multicasted.subscribe({
        next: v => console.log('observerB: ' + v)
    });
}, 600);

setTimeout(() => {
    subscription1.unsubscribe();
},1200);

// 在這裏咱們須要把多播流的訂閱取消掉,由於今後之後再也沒有訂閱者訂閱它了。
setTimeout(() => {
    subscription2.unsubscribe();
    subscriptionConnect.unsubscribe();
},2000);

咱們能夠經過使用 ConnectableObservable 的refCount方法來避免顯式的調用connect方法,這個方法會返回一個 Observable 用來追蹤當前有多少訂閱者正在訂閱多播流。當訂閱者的數量從0增長到1時,它幫助咱們調用connect方法以開始訂閱,當訂閱者的數量從1變爲0時,它幫助咱們取消訂閱以中止多播流的繼續執行。

  • <font face="仿宋">_refCount使得多播流在第一個訂閱者到達時開始執行,在最後一個訂閱者離開後中止執行。_</font>

請看下面的例子:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCount = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

// 此次調用會執行connect方法
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
    next: v => console.log('observerA: ' + v)
});

setTimeout(() => {
    console.log('observerB subscribed');
    subscription2 = refCounted.subscribe({
        next: v => console.log('observerB: ' + v)
    })
},600);

setTimeout(() => {
    console.log('observerA unsubscribed');
    subscription1.unsubscribe();
}, 1200);

// 這裏會調用多播流的unsubscribe方法
setTimeout(() => {
    console.log('observerB unsubscribed');
    subscription2.unsubscribe();
},2000);

執行後的輸出:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

refCount方法僅存在於 ConnectableObservable 上,它返回的是一個新的 Observable 而不是另外一個 ConnectableObservable 。

BehaviorSubject


BehaviorSubject 是 Subject 的一個變種上,關於它的一個重要概念是’當前值‘。它會把最後一次發送給訂閱者的值保存下來,當另一個訂閱者開始訂閱時,它會把這個值當即發送給新的訂閱者。

  • <font face="仿宋">_BehaviorSubject 對於表示隨時間變化的值是很是有用的。例如,表示生日的事件流能夠用 Subject,可是一我的的年齡的事件流能夠用 BehaviorSubject。_</font>

下面的例子中,BehaviorSubject初始化了一個值0,當第一個訂閱者開始訂閱時,這個值被髮送出去。接下來當第二個訂閱者開始訂閱時,將會接收到值2,即便這個值已經被髮送過了。

var subject = new Rx.BehaviorSubject(0);

subject.subscribe({
    next: v => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
    next: v => console.log('observerB: ' + v)
});

subject.next(3);

執行後的輸出:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject


ReplaySubject 和 BehaviorSubject 很相似,它們均可以把過去的值發送給新的訂閱者,不一樣的是它能夠把 Observable 執行時產生的一部分值記錄下來。

  • <font face="仿宋">_ReplaySubject 記錄 Observable 執行時的多個值,並將它們傳遞給新加入的訂閱者。_</font>

在建立 ReplaySubject 時,咱們能夠指定須要重複發送的值的數量:

var subject = new Rx.ReplaySubject(3); // 傳遞3個值給新加入的訂閱者

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

執行後的輸出:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

還能夠指定一個以毫秒爲單位的時間,配合記錄的數量共同決定究竟有個多少個值須要被重複發送。

下面的這個例子,使用了一個比較大的數量,可是時間上只限制了只發送500毫秒內的值:

var subject = new ReplaySubject(100, 500/* windowTime */);

subject.subscribe({
    next: v => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
    subject.subscribe({
        next: v => console.log('observerB: ' + v)
    });
},1000);

第二個訂閱者只能接收到最近的500毫秒內發出的值,下面是執行後的輸出,

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

AsyncSubject


它也是 Subject 的一個變種,AsyncSubject僅在流執行結束後把最後一個值發送給它的訂閱者。

var subject = new Rx.AsyncSubject();

subject.subscribe({
    next: v => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
    next: v => console.log('observerB: ' + v);
});

subject.next(5);
subject.complete();

執行後的輸出:

observerA: 5
observerB: 5

AsyncSubject 的行爲與last操做符的行爲很是類似,都是在等待完成後再發送一個值。你應該還記得以前提到的當 Observable 發出Complete通知或Error通知後就不能再發送值,AsyncSubject看起來違背了這個原則,其實否則,咱們能夠看一下它的源碼:

constructor() {
    super(...arguments);
    this.value = null;
    this.hasNext = false;
    this.hasCompleted = false;
}
complete() {
    this.hasCompleted = true;
    if (this.hasNext) {
        super.next(this.value);
    }
    super.complete();
}

這裏只摘錄出了AsyncSubject的構造函數和它的complete方法,首先AsyncSubject是繼承自Subject的,因此這裏的super類就是Subject,那麼就很明顯了,在AsyncSubject實例上調用complete方法時並無違背以前提到的原則,依然是先發出了Next通知,最後才發出Complete通知。

操做符


雖然 Observable 是構建rxjs的基石,可是真正精彩的部分應該是操做符。操做符的組合使用使得以聲明式的方式解決複雜的異步場景變得很是簡單,它構成rxjs必不可少的一部分。

什麼是操做符


操做符是 Observable 類上的方法,例如: .map(...),.filter(...),.merge(...)等。當咱們調用它們時,並不會去改變已有 Observable 實例,取而代之的是建立一個全新的 Observable ,它是訂閱邏輯是基於第一個 Observable 的。

  • <font face="仿宋">_操做符是一個純函數,它不會改變已有的 Observable 實例,而是基於當前的Observable 建立一個新的 Observable。_</font>

理解操做符時很是重要的一點是時刻謹記它是一個純函數,它接受一個 Observable 做爲輸入而後生成一個新的 Observable 做爲輸出。當訂閱輸出 Observable 的同時也就訂閱了輸入 Observable 。

下面的例子中,咱們自定義了一個操做函數,它將輸入的每個值都乘以10:

function multiplyByTen(input) {
    var output = Rx.Observable.create(function subscribe(observer) {
        input.subscribe({
            next: v => observer.next(10 * v),
            error: err => observer.error(error),
            complete: () => observer.complete()
        });
    });

    return output;
}

var input = Rx.Observable.from([1,2,3,4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));

執行後的輸出:

10
20
30
40

注意:咱們只是訂閱了輸出-——output,但這同時致使input被訂閱,這稱之爲’操做符的鏈式訂閱‘。

實例的操做符 VS 靜態操做符


一般在提到操做符時,咱們指的是 Observable 實例上的操做符。假如上面例子中咱們定義的操做函數 multiplyByTen 是一個操做符的話,它看起來應該是這樣:

Rx.Observable.prototype.multiplyByTen = function multiplyByTen() {
    var input = this;
    return Rx.Observable.create(function subscribe(observer) {
        input.subscribe({
            next: v => observer.next(10 * v),
            error: err => observer.error(err),
            complete: () => observer.complete()
        });
    });
}
  • <font face="仿宋">_實例操做符是使用this關鍵字來推斷輸入 Observable 的函數。_</font>

在這裏輸入的 Observable 再也不經過 multiplyByTen 的參數得到,而是經過this關鍵字來獲取,這就容許咱們能夠像下面這樣鏈式的調用:

var observable = Rx.Observable.from([1,2,3,4]).multiplyByTen();

observable.subscribe(x = console.log(x));

除了實例操做符,還有靜態操做符,它們是直接定義在 Observable 類上的靜態方法。靜態操做符不會使用this關鍵字來推斷輸入,它的輸入徹底依賴於輸入的參數。

  • <font face="仿宋">_靜態操做符時附加在 Observable 類上的靜態函數,一般用來從頭建立 Observable。_</font>

常見的靜態操做符都是一些可建立類型的操做符,與一般狀況下操做符接收輸入流,輸出輸出流不一樣,它們大多接收一些非 Observable 類型的參數,例如一個數字,而後建立出一條流。

var observable = Rx.Observable.interval(1000 /* 毫秒數 */);

另外的例子就是create,咱們在以前的例子中已經屢次使用過了。

而後,靜態操做符並不是只是那些簡單建立流的操做符,一些組合類的操做符也能夠有它的靜態類型,例如:merge,combineLatest,concat等。這樣作的意義在於咱們接收多個流做爲輸入而不只僅是一個。

var observable1 = Rx.Observable.interval(1000);
var observable2 = Rx.Observable.interval(400);

var merge = Rx.Observable.merge(observable1, observable2);

彈珠圖


單純靠文字可能還不足解釋的清楚操做符是如何工做的,許多操做符是與時間相關的,它們可能以不一樣的方式來影響值的發射,好比:delay,throttle,sample等。對於這些操做符來講圖表的形式會更加直觀。彈珠圖能夠模擬出操做符是如何工做的,能夠直觀的表現出輸入流,操做符,參數與輸出流之間的聯繫。

下面來詳細解釋彈珠圖各部分的含義:

// 這條從左到右的橫線表明隨時間的推移,輸入流的執行過程。
// 橫線上的值表明從流上發射出的值
// 橫線尾部的豎線表明complete通知執行的時間點,表示這條流已經成功的執行完成。
----------4------6--------------a-------8-------------|---->

            multipleByTen // 使用的操做符

// 這條從左到右的橫線表明通過操做符轉換後的輸出流。
// 橫線尾部的X表明在這個時間點上流發生了錯誤,至此以後不該該再有 Next 通知或 Complete 通知從流上發出。
---------40-----60--------------X--------------------------->

在整個文檔中,咱們都使用這種彈珠圖來解釋操做符是如何工做的。這種彈珠圖在其它場景中也是很是有用的,好比在白板演示或者單元測試中。

如何選擇合適的操做符


若是你明確本身須要解決的問題,可是不清楚應該使用哪個操做符,能夠在官網的 Manual > Operators > Choose an operator 中經過選擇符合問題的描述來找到你所須要的操做符。

操做符類別


爲了解決不一樣的問題,rxjs針對性的設計了各類各樣的操做符,大致上能夠分爲幾類:建立型、轉換型、過濾型、組合型、多播型、錯誤處理型、工具類等等。

關於操做符,將會在專門的文檔中進行解釋。

Scheduler


Scheduler決定了subscription何時開始以及何時開始分發值。它由3個部分組成:

  • 一個Scheduler就是一段數據結構:它知道如何去存儲數據,以及基於優先級或其它標準來排列任務的順序。
  • 一個Scheduler就是一個執行環境:決定什麼時間在什麼樣的環境下去執行任務, 是當即執行仍是在回調中執行,亦或是在定時器到達時,仍是下一次事件循環開始時執行。
  • Scheduler有本身的虛擬時鐘:被調度任務的執行時間只由虛擬時鐘決定。只會它的虛擬時鐘的環境中執行。

Scheduler類型


  • null: 按正常的規則發送值。
  • queue: 按當前事件隊列中的順序來發送值。
  • asap:按更小的單元上的事件隊列中的順序發送值。
  • async: 按異步順序來發送值。

能夠傳入scheduler的操做符:

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer
相關文章
相關標籤/搜索