將RxJS融入React項目

前言

最近準備畢設,技術選型的時候由於功能的一些需求準備將RxJs融入到項目中,考慮RxJs的時候由於以前的技術棧還猶豫了一下,查了一些資料以及粗略瀏覽了一些文檔。感受對於畢設項目RxJs的加入是有幫助的,所以打算系統的學習而後摘抄知識點以及實踐一些demo作技術積累。html

RxJS技術積累

RxJs通過社區的努力學習資料仍是不少的,官方中文文檔就已經很不錯,不過咱們先從30 天精通 RxJS初步感覺一下RxJS.而後配合一些中文文檔來補充知識點,最後再根據官方文檔來校驗整個知識體系。前端

RxJS 基本介紹

RxJS是一套由Observable sequences來組合異步行爲和事件基礎程序的Librarygit

RxJS 是Functional Programming Reactive Programming 的結合es6

把每一個運算包成一個個不一樣的function,並用這些function 組合出咱們要的結果,這就是最簡單的Functional Programming

Functional Programming 強調沒有Side Effect,也就是function 要保持純粹,只作運算並返回一個值,沒有其餘額外的行爲。github

Side Effectredux

Side Effect是指一個function作了跟自己運算返回值沒有關係的事,好比說修改某個全域變數,或是修改傳入參數的值,甚至是執行console.log都算是Side Effect。

前端常見的Side Effect:api

  • 發送http request
  • 在畫面輸出值或是log
  • 得到用戶的input
  • Query DOM

Reactive Programming簡單來講就是當變數或資源發生變更時,由變數或資源自動告訴我發生變更了數組

Observable

Observer Pattern(觀察者模式)

Observer Pattern 其實很常遇到,許多API 的設計上都用了Observer Pattern,最簡單的例子就是DOM 物件的事件監聽:前端框架

function clickHandler(event) {
    console.log('user click!');
}

document.body.addEventListener('click', clickHandler)

觀察者模式:咱們能夠對某件事註冊監聽,並在事件發生時,自動執行咱們註冊的監聽者(listener)。併發

Es5版本:

function Producer() {
    
    // 這個 if 只是避免使用者不當心把 Producer 當作函數調用
    if(!(this instanceof Producer)) {
      throw new Error('請用 new Producer()!');
    }
    
    this.listeners = [];
}

// 加入監聽的方法
Producer.prototype.addListener = function(listener) {
    if(typeof listener === 'function') {
        this.listeners.push(listener)
    } else {
        throw new Error('listener 必須是 function')
    }
}

// 移除監聽的方法
Producer.prototype.removeListener = function(listener) {
    this.listeners.splice(this.listeners.indexOf(listener), 1)
}

// 發送通知的方法
Producer.prototype.notify = function(message) {
    this.listeners.forEach(listener => {
        listener(message);
    })
}

es6 版本

class Producer {
    constructor() {
        this.listeners = [];
    }
    addListener(listener) {
        if(typeof listener === 'function') {
            this.listeners.push(listener)
        } else {
            throw new Error('listener 必須是 function')
        }
    }
    removeListener(listener) {
        this.listeners.splice(this.listeners.indexOf(listener), 1)
    }
    notify(message) {
        this.listeners.forEach(listener => {
            listener(message);
        })
    }
}

調用例子:

var egghead = new Producer(); 

function listener1(message) {
    console.log(message + 'from listener1');
}

function listener2(message) {
    console.log(message + 'from listener2');
}

egghead.addListener(listener1);egghead.addListener(listener2);

egghead.notify('A new course!!')

輸出:

a new course!! from listener1
a new course!! from listener2

Iterator Pattern (迭代器模式)

JavaScript 到了ES6 纔有原生的Iterator

在ECMAScript中Iterator最先實際上是要採用相似Python的Iterator規範,就是Iterator在沒有元素以後,執行next會直接拋出錯誤;但後來通過一段時間討論後,決定採更functional的作法,改爲在取得最後一個元素以後執行next永遠都回傳{ done: true, value: undefined }

var arr = [1, 2, 3];

var iterator = arr[Symbol.iterator]();

iterator.next();
// { value: 1, done: false }
iterator.next();
// { value: 2, done: false }
iterator.next();
// { value: 3, done: false }
iterator.next();
// { value: undefined, done: true }

簡單實現:

es5:

function IteratorFromArray(arr) {
    if(!(this instanceof IteratorFromArray)) {
        throw new Error('請用 new IteratorFromArray()!');
    }
    this._array = arr;
    this._cursor = 0;    
}

IteratorFromArray.prototype.next = function() {
    return this._cursor < this._array.length ?
        { value: this._array[this._cursor++], done: false } :
        { done: true };
}

es6:

class IteratorFromArray {
    constructor(arr) {
        this._array = arr;
        this._cursor = 0;
    }
  
    next() {
        return this._cursor < this._array.length ?
        { value: this._array[this._cursor++], done: false } :
        { done: true };
    }
}

優點

  1. Iterator的特性能夠拿來作延遲運算(Lazy evaluation),讓咱們能用它來處理大數組。
  2. 第二由於iterator 自己是序列,因此能夠第調用方法像map, filter... 等!

延遲運算(Lazy evaluation)

function* getNumbers(words) {
        for (let word of words) {
            if (/^[0-9]+$/.test(word)) {
                yield parseInt(word, 10);
            }
        }
    }
    
    const iterator = getNumbers('30 天精通 RxJS (04)');
    
    iterator.next();
    // { value: 3, done: false }
    iterator.next();
    // { value: 0, done: false }
    iterator.next();
    // { value: 0, done: false }
    iterator.next();
    // { value: 4, done: false }
    iterator.next();
    // { value: undefined, done: true }
把一個字串丟進getNumbersh函數時,並無立刻運算出字串中的全部數字,必須等到咱們執行next()時,纔會真的作運算,這就是所謂的延遲運算(evaluation strategy)

Observable簡介

Observer跟Iterator有個共通的特性,就是他們都是漸進式 (progressive)的取得資料,差異只在於Observer是生產者(Producer)推送資料(push ),而Iterator是消費者(Consumer)請求資料(pull)!

Observable其實就是這兩個Pattern思想的結合,Observable具有生產者推送資料的特性,同時能像序列,擁有序列處理資料的方法 (map, filter...)!

RxJS說白了就是一個核心三個重點。

一個核心是Observable 再加上相關的Operators(map, filter...),這個部份是最重要的,其餘三個重點本質上也是圍繞着這個核心在轉,因此咱們會花將近20 天的篇數講這個部份的觀念及使用案例。

另外三個重點分別是

  • Observer
  • Subject
  • Schedulers

Observable 實踐

Observable 同時能夠處理同步與異步的行爲!

同步操做

var observable = Rx.Observable
    .create(function(observer) {
        observer.next('Jerry'); 
        observer.next('Anna');
    })
    
// 訂閱 observable    
observable.subscribe(function(value) {
    console.log(value);
})

> Jerry
> Anna

異步操做:

var observable = Rx.Observable
    .create(function(observer) {
        observer.next('Jerry'); // RxJS 4.x 之前的版本用 onNext
        observer.next('Anna');
        
        setTimeout(() => {
            observer.next('RxJS 30 days!');
        }, 30)
    })
    
console.log('start');
observable.subscribe(function(value) {
    console.log(value);
});
console.log('end');

>
start
Jerry
Anna
end
RxJS 30 days!

觀察者Observer

Observable 能夠被訂閱(subscribe),或說能夠被觀察,而訂閱Observable的又稱爲觀察者(Observer)。
觀察者是一個具備三個方法(method)的對象,每當Observable 發生事件時,便會呼叫觀察者相對應的方法。

  • next:每當Observable 發送出新的值,next 方法就會被呼叫。
  • complete:在Observable 沒有其餘的資料能夠取得時,complete 方法就會被呼叫,在complete 被呼叫以後,next 方法就不會再起做用。
  • error:每當Observable 內發生錯誤時,error 方法就會被呼叫。
var observable = Rx.Observable
    .create(function(observer) {
            observer.next('Jerry');
            observer.next('Anna');
            observer.complete();
            observer.next('not work');
    })
    
// 定義一個觀察者
var observer = {
    next: function(value) {
        console.log(value);
    },
    error: function(error) {
        console.log(error)
    },
    complete: function() {
        console.log('complete')
    }
}

//  訂閱 observable    
observable.subscribe(observer)

>
Jerry
Anna
complete


// complete執行後,next就會自動失效,因此沒有印出not work。

捕獲錯誤實例:

var observable = Rx.Observable
  .create(function(observer) {
    try {
      observer.next('Jerry');
      observer.next('Anna');
      throw 'some exception';
    } catch(e) {
      observer.error(e)
    }
  });

var observer = {
    next: function(value) {
        console.log(value);
    },
    error: function(error) {
        console.log('Error: ', error)
    },
    complete: function() {
        console.log('complete')
    }
}

observable    
observable.subscribe(observer)

>
Jerry
Anna
Error:  some exception

觀察者能夠是不完整的,他能夠只具備一個next 方法

訂閱一個Observable 就像是執行一個function

Operator操做符

Operators 就是一個個被附加到Observable 型別的函數,例如像是map, filter, contactAll... 等等

每一個operator都會回傳一個新的observable,而咱們能夠透過create的方法創建各類operator

Observable 有許多建立實例的方法,稱爲creation operator。下面咱們列出RxJS 經常使用的creation operator:

create
of
from
fromEvent
fromPromise
never
empty
throw
interval
timer

當咱們想要同步的傳遞幾個值時,就能夠用of這個operator來簡潔的表達!

var source = Rx.Observable.of('Jerry', 'Anna');

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

// Jerry
// Anna
// complete!

用from來接收任何可枚舉的參數(Set, WeakSet, Iterator 等均可)

var arr = ['Jerry', 'Anna', 2016, 2017, '30 days'] 
var source = Rx.Observable.from(arr);

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

// Jerry
// Anna
// 2016
// 2017
// 30 days
// complete!


var source = Rx.Observable.from('123');

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});
// 1
// 2
// 3
// complete!


var source = Rx.Observable
  .from(new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve('Hello RxJS!');
    },3000)
  }))
  
source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
    console.log(error)
    }
});

// Hello RxJS!
// complete!

能夠用Event創建Observable,經過fromEvent的方法

var source = Rx.Observable.fromEvent(document.body, 'click');

source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
});

fromEvent的第一個參數要傳入DOM ,第二個參數傳入要監聽的事件名稱。上面的代碼會針對body 的click 事件作監聽,每當點擊body 就會印出event。

獲取 DOM 的經常使用方法:
document.getElementById()
document.querySelector()
document.getElementsByTagName()
document.getElementsByClassName()

Event來創建Observable實例還有另外一個方法fromEventPattern,這個方法是給類事件使用

所謂的類事件就是指其行爲跟事件相像,同時具備註冊監聽及移除監聽兩種行爲,就像DOM Event有addEventListener及removeEventListener同樣
class Producer {
    constructor() {
        this.listeners = [];
    }
    addListener(listener) {
        if(typeof listener === 'function') {
            this.listeners.push(listener)
        } else {
            throw new Error('listener 必須是 function')
        }
    }
    removeListener(listener) {
        this.listeners.splice(this.listeners.indexOf(listener), 1)
    }
    notify(message) {
        this.listeners.forEach(listener => {
            listener(message);
        })
    }
}

var egghead = new Producer(); 


var source = Rx.Observable
    .fromEventPattern(
        (handler) => egghead.addListener(handler), 
        (handler) => egghead.removeListener(handler)
    );
  
source.subscribe({
    next: function(value) {
        console.log(value)
    },
    complete: function() {
        console.log('complete!');
    },
    error: function(error) {
        console.log(error)
    }
})

egghead.notify('Hello! Can you hear me?');

字數受限,能夠去博客看完整版

Subject簡介

Subject 能夠拿去訂閱Observable(source) 表明他是一個Observer,同時Subject 又能夠被Observer(observerA, observerB) 訂閱,表明他是一個Observable。

Subject 同時是Observable 又是Observer

Subject 會對內部的observers 清單進行組播(multicast)

Subject應用

Subject 在內部管理一份observer 的清單,並在接收到值時遍歷這份清單並送出值
var subject = new Rx.Subject();

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

subject.subscribe(observerA);
subject.subscribe(observerB);

subject.next(1);
// "A next: 1"
// "B next: 1"
subject.next(2);
// "A next: 2"
// "B next: 2"

這裏咱們能夠直接用subject 的next 方法傳送值,全部訂閱的observer 就會接收到,又由於Subject 自己是Observable,因此這樣的使用方式很適合用在某些沒法直接使用Observable 的前端框架中,例如在React 想對DOM 的事件作監聽

class MyButton extends React.Component {
    constructor(props) {
        super(props);
        this.state = { count: 0 };
        this.subject = new Rx.Subject();
        
        this.subject
            .mapTo(1)
            .scan((origin, next) => origin + next)
            .subscribe(x => {
                this.setState({ count: x })
            })
    }
    render() {
        return <button onClick={event => this.subject.next(event)}>{this.state.count}</button>
    }
}

BehaviorSubject

BehaviorSubject 是用來呈現當前的值,而不是單純的發送事件。BehaviorSubject 會記住最新一次發送的元素,並把該元素看成目前的值,在使用上BehaviorSubject 建構式須要傳入一個參數來表明起始的狀態
// BehaviorSubject 在創建時就須要給定一個狀態,並在以後任何一次訂閱,就會先送出最新的狀態。其實這種行爲就是一種狀態的表達而非單存的事件,就像是年齡跟生日同樣,年齡是一種狀態而生日就是事件;因此當咱們想要用一個stream 來表達年齡時,就應該用BehaviorSubject 。

var subject = new Rx.BehaviorSubject(0); // 0
var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

subject.subscribe(observerA);
// "A next: 0"
subject.next(1);
// "A next: 1"
subject.next(2);
// "A next: 2"
subject.next(3);
// "A next: 3"

setTimeout(() => {
    subject.subscribe(observerB); 
    // "B next: 3"
},3000)

ReplaySubject

在新訂閱時從新發送最後的幾個元素,這時咱們就能夠用ReplaySubject
var subject = new Rx.ReplaySubject(2); // 重複發送最後倆個元素
var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

subject.subscribe(observerA);
subject.next(1);
// "A next: 1"
subject.next(2);
// "A next: 2"
subject.next(3);
// "A next: 3"

setTimeout(() => {
    subject.subscribe(observerB);
    // "B next: 2"
    // "B next: 3"
},3000)

AsyncSubject

在subject結束後送出最後一個值
var subject = new Rx.AsyncSubject();
var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
// "A next: 3"
// "A complete!"

setTimeout(() => {
    subject.subscribe(observerB);
    // "B next: 3"
    // "B complete!"
},3000)

Observable and Subject

multicast

multicast 能夠用來掛載subject 並回傳一個可連結(connectable)的observable
var source = Rx.Observable.interval(1000)
             .take(3)
             .multicast(new Rx.Subject());

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

source.subscribe(observerA); // subject.subscribe(observerA)

source.connect(); // source.subscribe(subject)

setTimeout(() => {
    source.subscribe(observerB); // subject.subscribe(observerB)
}, 1000);

必須真的等到執行connect()後纔會真的用subject訂閱source,並開始送出元素,若是沒有執行connect()observable是不會真正執行的。

var source = Rx.Observable.interval(1000)
             .do(x => console.log('send: ' + x))
             .multicast(new Rx.Subject()); // 無限的 observable 

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

var subscriptionA = source.subscribe(observerA);

var realSubscription = source.connect();

var subscriptionB;
setTimeout(() => {
    subscriptionB = source.subscribe(observerB);
}, 1000);

setTimeout(() => {
    subscriptionA.unsubscribe();
    subscriptionB.unsubscribe(); 
    // 雖然A,B退訂,可是時間流仍是繼續執行
}, 5000);

setTimeout(() => {
    realSubscription.unsubscribe();
    // 這裏纔會真正的退訂
}, 7000);

refCount

創建一個只要有訂閱就會自動connect 的observable
var source = Rx.Observable.interval(1000)
             .do(x => console.log('send: ' + x))
             .multicast(new Rx.Subject())
             .refCount();

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

var subscriptionA = source.subscribe(observerA); // 當source 一被observerA 訂閱時(訂閱數從0 變成1),就會當即執行併發送元素


var subscriptionB;
setTimeout(() => {
    subscriptionB = source.subscribe(observerB);

}, 1000);

setTimeout(() => {
    subscriptionA.unsubscribe(); // 訂閱減一    subscriptionB.unsubscribe(); // 訂閱爲0,中止發送
}, 5000);

publish

等價於 multicast(new Rx.Subject())
var source = Rx.Observable.interval(1000)
             .publish() 
             .refCount();
             
// var source = Rx.Observable.interval(1000)
//             .multicast(new Rx.Subject()) 
//             .refCount();

var source = Rx.Observable.interval(1000)
             .publishReplay(1) 
             .refCount();
             
// var source = Rx.Observable.interval(1000)
//             .multicast(new Rx.ReplaySubject(1)) 
//             .refCount();


var source = Rx.Observable.interval(1000)
             .publishBehavior(0) 
             .refCount();
             
// var source = Rx.Observable.interval(1000)
//             .multicast(new Rx.BehaviorSubject(0)) 
//             .refCount();

var source = Rx.Observable.interval(1000)
             .publishLast() 
             .refCount();
             
// var source = Rx.Observable.interval(1000)
//             .multicast(new Rx.AsyncSubject(1)) 
//             .refCount();

share

等價於 publish + refCount
var source = Rx.Observable.interval(1000)
             .share();
             
// var source = Rx.Observable.interval(1000)
//             .publish() 
//             .refCount();

// var source = Rx.Observable.interval(1000)
//             .multicast(new Rx.Subject()) 
//             .refCount();

Scheduler

Scheduler簡介

Scheduler 控制一個observable 的訂閱何時開始,以及發送元素何時送達,主要由如下三個元素所組成

Scheduler 是一個對象結構。它知道如何根據優先級或其餘標準來儲存並執行任務。
Scheduler 是一個執行環境。它意味着任務什麼時候何地被執行,好比像是當即執行、在回調(callback)中執行、setTimeout 中執行、animation frame 中執行
Scheduler是一個虛擬時鐘。它透過now()這個方法提供了時間的概念,咱們可讓任務在特定的時間點被執行。
// Scheduler 會影響Observable 開始執行及元素送達的時機

var observable = Rx.Observable.create(function (observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
});

console.log('before subscribe');
observable.observeOn(Rx.Scheduler.async) // 設爲 async
.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
console.log('after subscribe');

項目中的RxJs

在項目中RxJs能夠經過庫的形式引用,也能夠引用結合了框架的組合。

經過以前的學習,對RxJs有了必定的瞭解。對我而言RxJS最好的應用場景就是複雜的UI交互。

並且在學習RxJS的資料中,不少典型的Demo都是:

  • 拖拽交互
  • Auto Complete
  • 等等

利用RxJS能把咱們之前須要寫不少判斷,不少邏輯的UI交互都簡化了,經過它自帶的一套Stream的用法,能夠利用更少的代碼完成之前的複雜的工做量,提供了開發效率。

RxJS同時能應用在組件狀態管理中,能夠參考Reactive 視角審視 React 組件

在React中,內部經過setState管理狀態。狀態的變動能夠依賴RxJS流,在須要的Response中setState便可。

其餘方案能夠自行根據項目需求加入,須要就引入,不須要就不要加,不要爲RxJS而RxJS.

還要注意的是RxJS的操做符很是強大,可是數量極多,所以一開始開發入門的時候先設計好邏輯再去查文檔。

官方的example有不少例子能夠參考應用。

認識一下 redux-observable

redux-observable,則是經過建立epics中間件,爲每個dispatch添加相應的附加效果。相較於thunk中間件,使用redux-observable來處理異步action,有如下兩個優勢:

不須要修改reducer,咱們的reducer能夠依然保持簡單的純函數形態。
epics中間件會將action封裝成Observable對象,可使用RxJs的相應api來控制異步流程。

比起redux-thunk,redux-observable可以強有力的支持更爲複雜的異步邏輯。用更少的代碼來實現需求。

總結

經過幾天的學習,對RxJS有了必定的瞭解,以後就是將其應用到實際項目中。

資料

學習操做符的時候能夠對照彈珠圖

Rx Observables 的交互彈珠圖

Learn RxJS 的中文版

redux-observable中文文檔

相關文章
相關標籤/搜索