響應式編程 · RxJS

寫在前面

作相關整理時,RxJS還停留在5.0,現在RxJS已升級到6.0,建議你們在學習相關內容時直接選擇最新版本:RxJS中文文檔react

概念

Rx(Reactive Extension,響應式擴展)確切講是一種編程思想,最先是微軟的開源類庫,以後隨着RxJava的出現才廣受追捧起來。顯然,RxJS是Rx的JavaScript的實現;

那麼響應式擴展,或者說響應式編程,究竟是怎樣的?先引一下wiki:
git

在計算領域,響應式編程是一種面向數據流和變化傳播的編程範式。這意味着能夠在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值經過數據流進行傳播。

Emm..這都說的什麼鬼?好吧,概念老是抽象的,那麼咱們看實例:github

  1. 傳統編程:
var a = 1;
var b = 2;
var c = a + b;
a = 2;
b = 3;
console.log(c);
// 3
// 沒什麼好說的,a和b的變化不會影響到c;
  1. 響應式編程
var a$ = Rx.Observable.from([1,2]);
var b$ = Rx.Observable.from([2,3]);
var c$ = Rx.Observable.zip(a$,b$,(a,b)=>{
    return b + c;
})
c$.subscribe(c => console.log('c: ' + c));
// c: 3
// c: 5
// 可見:隨着a,b值的變化,c的值也跟着發生變化

如今再回過頭來看wiki,是否是能理解一些了?

借用一條經典結論:傳統編程基於的是離散的點;而響應式編程基於的是連續的線編程

應用

講完概念,接下來該講講RxJS具體怎麼去用了。

操做符是RxJS重要成員,所以咱們先從操做符提及:
編程語言

Tips:如下代碼能夠直接在 https://jsbin.com/ 上運行
運行前需在body裏插入 <script src=" https://unpkg.com/@reactivex/...;></script>
  • 建立類操做符學習

    • from
    var array = [10, 20, 30];
    var result$ = Rx.Observable.from(array);
    result$.subscribe(x => console.log(x));
    • fromEvent
    var input = document.getElementById('input');
    var input$ = Rx.Observable.fromEvent(input, 'keyup');
    input$.subscribe(x => console.log(x.target.value));
    • fromEventPattern
    function addClickHandler(handler) {
      document.addEventListener('click', handler);
    }
    
    function removeClickHandler(handler) {
      document.removeEventListener('click', handler);
    }
    
    var click$ = Rx.Observable.fromEventPattern(
      addClickHandler,
      removeClickHandler
    );
    click$.subscribe(x => console.log(x));
    • interval
    var resource$ = Rx.Observable.interval(500).take(3)
    resource$.subcribe(x => console.log("Next: " + x), err => console.log("Error: " + err), ()=> console.log('finished'));
    • timer
    var resource$ = Rx.Observable.timer(2000);
    resource$.subcribe(x => console.log("Next: " + x), err => console.log("Error: " + err), ()=> console.log('finished'));
    
    var resource2$ = Rx.Observable.timer(2000, 1000);
    resource2$.subcribe(x => console.log("Next: " + x), err => console.log("Error: " + err), ()=> console.log('finished'));
  • 合併類操做符code

    • combineLatest
    var click = document.getElementById('click');
    var input = document.getElementById('input');
    var click$ = Rx.Observable.fromEvent(click,'click').mapTo('clicked');
    var input$ = Rx.Observable.fromEvent(input,'input').map(x => x.target.value);
    var result$ = Rx.Observable.combineLatest(click$,input$,(ev,input) => {
        return {ev: ev, input: input}
    })
    result$.subscribe(x => console.log(x));
    • zip
    var click = document.getElementById('click');
    var input = document.getElementById('input');
    var click$ = Rx.Observable.fromEvent(click,'click').mapTo('clicked');
    var input$ = Rx.Observable.fromEvent(input,'input').map(x => x.target.value);
    var result$ = Rx.Observable.zip(click$,input$,(ev,input) => {
        return {ev: ev, input: input}
    })
    result$.subscribe(x => console.log(x));
    // 試比較與combineLatest的不一樣
    
    var a$ = Rx.Observable.from(['hello','world','hello','rxjs']);
    var b$ = Rx.Observable.interval(2000);
    var result$ = a$.zip(b$,(item,index) => {return {item: item, index: index}})
    result$.subscribe(res => console.log('Index: ' + res.index + ', Item: ' + res.item))
    // "Index: 0, Item: hello"
    // "Index: 1, Item: world"
    // "Index: 2, Item: hello"
    // "Index: 3, Item: rxjs"
    • merge
    var a$ = Rx.Observable.interval(2000).map(x => -x*10).take(5);
    var b$ = Rx.Observable.timer(0,2000).map(x => x*10).take(5);
    var result$ = Rx.Observable.merge(a$,b$)
    result$.subscribe(x => console.log(x));
    • concat
    var a$ = Rx.Observable.from([1,3,5,7,9]);
    var b$ = Rx.Observable.from([2,4,6,8,10]);
    var result$ = Rx.Observable.concat(a$,b$);
    result$.subscribe(x => console.log(x));
  • 過濾類操做符server

    • filter
    var a$ = Rx.Observable.range(0,10);
    var result$ = a$.filter(x => x%5 == 0 );
    result$.subscribe(x => console.log(x))
    // 0
    // 5
    • take (略,前面有演示了)
    • debounce
    var input = document.getElementById('input');
    var input$ = Rx.Observable.fromEvent(input,'keyup').pluck('target','value');
    input$.debounceTime(500).subscribe(x => console.log(x));
  • 其餘(上面例子中都用到了,本身感覺下)對象

    • pluck
    • range
    • mapTo
    • map

說完運算符,再補充RxJS中另外一個重要成員:Subject

Subject是一類特殊的Observable對象,它既是觀察者(Observer),又是可觀察對象(Observable),所以:
rxjs

  • 做爲觀察者,它訂閱者能夠發推送
  • 做爲可觀察對象,它能夠被訂閱
let subject = new Rx.Subject();
subject.subscribe(value => console.log('observableA: ' + value));
subject.subscribe(value => console.log('observableB: ' + value));
subject.next(1);
subject.next(2);
// observableA: 1
// observableB: 1
// observableA: 2
// observableB: 2
  • Subject一樣是由next,error,complete這些方法構成的對象
  • Subject是一個Observable對象,所以能夠做爲訂閱普通Observable對象的參數
let subject = new Rx.Subject();
subject.subscribe(value => console.log('observableA: ' + value));
subject.subscribe(value => console.log('observableB: ' + value));
let observable = new Rx.Observable.from([1,2]);
observable.subscribe(subject);
// observableA: 1
// observableB: 1
// observableA: 2
// observableB: 2

Subject還有兩個經常使用的衍生類

  • BehaviorSubject
let subject = new Rx.BehaviorSubject(0);
// 此時,0做爲初始值
subject.subscribe(value => console.log('observableA: ' + value));
subject.next(1);
subject.next(2);
subject.subscribe(value => console.log('observableB: ' + value));
subject.next(3);
// observableA: 0
// observableA: 1
// observableA: 2
// observableB: 2
// observableA: 3
// observableB: 3
  • ReplaySubject
let subject = new Rx.ReplaySubject(3);
// 此時,3做爲參數,表示保留最近的三個值
subject.subscribe(value => console.log('observableA: ' + value));
subject.next(1);
subject.next(2);
subject.subscribe(value => console.log('observableB: ' + value));
subject.next(3);
// observableA: 1
// observableA: 2
// observableB: 1
// observableB: 2
// observableA: 3
// observableB: 3
相關文章
相關標籤/搜索