rxjs在mpvue中的應用

前言

rxjs是一個響應式編程的庫,它使異步編程和回調代碼變得更爲簡單。 mpvue是一個小程序的框架。它使用vue的語法,使小程序的開發更加快捷簡單,讓前端開發人員的一套代碼同時用於web端和小程序端變成了現實。php

先來聊聊rxjs

響應式編程是rxjs的核心概念之一。在主流的三大框架之中都獲得了應用:vue的底層就採用了reactive programming.angular2+ 也全面引用了rxjs,,不論是在 http 仍是 animation 都用了 RxJS 的 Observable.Redux 從3.5版本開始,也加入了對Observable 操做的支持.甚至於主流的編程語言都有rx的Library,,好比RxRuby, RxPy, RxJava...等 RxJS 提供了一套完整的非同步解決方案,讓咱們在面對各類非同步行爲,不論是 Event, AJAX, 仍是 Animation 等,咱們均可以使用相同的 API 作開發。html

1.網頁的世界是異步的

在前端開發過程當中,咱們會用到各類的js異步,如callback 或是 Promise 物件甚至是async/await,單隨着應用愈來愈複雜,編寫異步代碼愈來愈困難和繁瑣。異步常見的問題有: * 競態條件 (Race Condition) * 內存泄漏 (Memory Leak) * 複雜的狀態 (Complex State) * 異常處理 (Exception Handling)前端

  • 競態條件: 發送了第一個請求以後又發送了第二條請求,這兩條請求的順序會影響到最終接收的不一樣結果.
  • 內存泄露: 在單頁面應用中,若是有對dom註冊監聽事件,而沒有在適當的時機點把監聽的事件移除,就會形成Memory Leak.
  • 複雜的狀態: 好比說咱們有一支付費用戶才能播放的影片,首先可能要先抓取這部影片的資訊,接着咱們要在播放時去驗證使用者是否有權限播放,而使用者也有可能再按下播放後又當即按了取消,而這些都是非同步執行,這時就會各類複雜的狀態須要處理.
  • 異常處理: JavaScript 的try/catch能夠捕捉同步的例外,但非同步的程式就沒這麼容易,尤爲當咱們的非同步行爲很複雜時,這個問題就越發明顯。

若是咱們用rxjs來處理,彷佛就變得方便了許多:vue

var handler = (e) => {
  console.log(e);
  document.body.removeEventListener('click', handler); 
 } 
 document.body.addEventListener('click', handler);
`
能夠寫成:
 `
Rx.Observable
  .fromEvent(document.body, 'click') // 註冊監聽
  .take(1) // 只取一次
  .subscribe(console.log);
複製代碼

總之,RxJS 是一套藉由 Observable sequences 來組合非同步行爲和事件基礎程序的 Library,能夠把 RxJS 想成處理 非同步行爲 的 Lodash.是Functional Programming 及 Reactive Programming 兩個編程思想的結合.react

2.聊聊Observable

RxJS 的基礎就是 Observable,只要弄懂 Observable 就等於學會一半的 RxJS.Observable 就是觀察者模式(Observer) 和 迭代器模式(Iterator) 兩種思想的結合。web

  • Observer Patternajax

    觀察者模式在api設計上獲得普遍應用,常見的一個例子是編程

    function clickHandler(event) {
        console.log('user click!');
    }
    
    document.body.addEventListener('click', clickHandler)
    複製代碼

    讓咱們本身來實現一個:json

    //定義
    class Producer {
    	constructor() {
    		this.listeners = [];
    	}
    	addListener(listener) {
    		if(typeof listener === 'function') {
    			this.listeners.push(listener)
    		} else {
    			throw new Error('listener 必須是 function')
    		}
    	}
    	removeListener(listener) {
    		this.listeners.splice(this.listeners.indexOf(listener), 1)
    	}
    	notify(message) {
    		this.listeners.forEach(listener => {
    			listener(message);
    		})
    	}
    }
    
    //應用
    var egghead = new Producer(); 
    
    function listener1(message) {
    	console.log(message + 'from listener1');
    }
    
    function listener2(message) {
    	console.log(message + 'from listener2');
    }
    
    egghead.addListener(listener1); // 註冊監聽
    egghead.addListener(listener2);
    egghead.notify('A new course!!') // 執行
    
    //輸出結果:
    a new course!! from listener1
    a new course!! from listener2
    複製代碼

    這個例子很好的說明了 Observer Pattern 如何在event 跟 listeners的應用中作到解耦小程序

  • Iterator Pattern

    下面是一個使用Iterator的例子

    var arr = [1, 2, 3];
    
    var iterator = arr[Symbol.iterator]();
    
    iterator.next();
    // { value: 1, done: false }
    iterator.next();
    // { value: 2, done: false }
    iterator.next();
    // { value: 3, done: false }
    iterator.next();
    // { value: undefined, done: true }
    複製代碼

    讓咱們來動手製做一個:

    //定義
    class IteratorFromArray {
    	constructor(arr) {
    		this._array = arr;
    		this._cursor = 0;
    	}
      
    	next() {
    		return this._cursor < this._array.length ?
    		{ value: this._array[this._cursor++], done: false } :
    		{ done: true };
    	}
    	
    	map(callback) {
    		const iterator = new IteratorFromArray(this._array);
    		return {
    			next: () => {
    				const { done, value } = iterator.next();
    				return {
    					done: done,
    					value: done ? undefined : callback(value)
    				}
    			}
    		}
    	}
    }
    
    //使用
    var iterator = new IteratorFromArray([1,2,3]);
    var newIterator = iterator.map(value => value + 3);
    
    newIterator.next();
    // { value: 4, done: false }
    newIterator.next();
    // { value: 5, done: false }
    newIterator.next();
    // { value: 6, done: false }
    複製代碼

    相似的還有generator的例子:

    function* getNumbers(words) {
    	for (let word of words) {
    		if (/^[0-9]+$/.test(word)) {
    		    yield parseInt(word, 10);
    		}
    	}
    }
    
    const iterator = getNumbers('12咱們3學習4');
    
    iterator.next();
    // { value: 1, done: false }
    iterator.next();
    // { value: 2, done: false }
    iterator.next();
    // { value: 3, done: false }
    iterator.next();
    // { value: 4, done: false }
    iterator.next();
    // { value: undefined, done: true }
    複製代碼

總之,Observer 與 Iterator 都有共同的特性,就是漸進式的獲取數據信息,差異在於Observer 是生產者push數據,Iterator 是消費者pull數據。而Observable 具有生產者推送數據的特性,同時能像序列,擁有序列處理數據的方法

3.建立Observable

RxJS 有一個核心和三個重點。核心就是Observable(map, filter...),三個重點分別是:Observer,Subject,Schedulers, 先來說講Observable的用法。

創建 Observable: create

var observable = Rx.Observable
  .create(function(observer) {
  	observer.next('Jerry'); 
  	observer.next('Anna');
  })
複製代碼

讓咱們訂閱observable,來接收數據

observable.subscribe(function(value) {
  console.log(value);
})
複製代碼

須要注意的是,Observable不只能夠處理異步狀況,同步行爲也是能夠的。此外,觀察者(Observer) 有三個方法:

  • next:每當 Observable 發送出新的值,next 方法就會被調用。
  • complete:在 Observable 沒有其餘的數據能夠取得時,complete 方法就會被調用,在 complete 被調用以後,next 方法就不會再起做用。
  • error:每當 Observable 內發生錯誤時,error 方法就會被調用。
var observable = Rx.Observable
 	.create(function(observer) {
 			observer.next('Jerry');
 			observer.next('Anna');
 			observer.complete();
 			observer.next('not work');
 	})
 	
 var observer = {
 	next: function(value) {
 		console.log(value);
 	},
 	error: function(error) {
 		console.log(error)
 	},
 	complete: function() {
 		console.log('complete')
 	}
 }
 
 observable.subscribe(observer)
 
 //輸出
 Jerry
 Anna
 complete
複製代碼

4.Observable的經常使用方法

Observable 的經常使用方法包括create,of,from,fromEvent,fromPromise,never,empty,throw,interval,timer等等

of的用法,能夠和上面的create方法作一個對比

var observable  = Rx.Observable.of('Jerry', 'Anna');
  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  });
  
  // Jerry
  // Anna
  // complete!
複製代碼

from的用法,參數是數組,對比of傳入的是一個個參數

var arr = ['Jerry', 'Anna', 2016, 2017, '30 days'] 
  var observable = Rx.Observable.from(arr);
  
  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  });
  
  // Jerry
  // Anna
  // 2016
  // 2017
  // 30 days
  // complete!
複製代碼

此外,from的參數還能夠是字符串或者promise

var observable = Rx.Observable
    .from(new Promise((resolve, reject) => {
      setTimeout(() => {
        resolve('Hello RxJS!');
      },3000)
    }))
    
  observable.subscribe({
      next: function(value) {
      	console.log(value)
      },
      complete: function() {
      	console.log('complete!');
      },
      error: function(error) {
      console.log(error)
      }
  });
  
  // Hello RxJS!
  // complete!
複製代碼

fromEvent的用法,第一個參數是dom元素,第二個參數是要監聽的事件名,例如對body作點擊事件監聽:

var observable = Rx.Observable.fromEvent(document.body, 'click');

  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  });

複製代碼

fromEventPattern,這個方法是給類事件使用。所謂的類事件就是指其行爲跟事件相像,同時具備註冊監聽及移除監聽兩種行爲,就像 DOM Event 有 addEventListener 及 removeEventListener

class Producer {
  	constructor() {
  		this.listeners = [];
  	}
  	addListener(listener) {
  		if(typeof listener === 'function') {
  			this.listeners.push(listener)
  		} else {
  			throw new Error('listener 必須是 function')
  		}
  	}
  	removeListener(listener) {
  		this.listeners.splice(this.listeners.indexOf(listener), 1)
  	}
  	notify(message) {
  		this.listeners.forEach(listener => {
  			listener(message);
  		})
  	}
  }
  
  var egghead = new Producer(); 
  // egghead 同時有 addEventListener 及 removeEventListener方法
  
  var observable = Rx.Observable
      .fromEventPattern(
          (handler) => egghead.addListener(handler), 
          (handler) => egghead.removeListener(handler)
      );
    
  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  })
  
  egghead.notify('Hello! Can you hear me?');
  // Hello! Can you hear me?

複製代碼

empty方法,會返回一個空的observable,當即執行complete

var observable = Rx.Observable.empty();

  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  });
  // complete!

複製代碼

never方法,會返回一個無窮的observable,就是一個一直存在,但什麼都不作的observable

var observable = Rx.Observable.never();

  observable.subscribe({
      next: function(value) {
          console.log(value)
      },
      complete: function() {
          console.log('complete!');
      },
      error: function(error) {
          console.log(error)
      }
  });

複製代碼

throw方法的做用就是拋出錯誤

var observable = Rx.Observable.throw('Oop!');

  observable.subscribe({
  	next: function(value) {
  		console.log(value)
  	},
  	complete: function() {
  		console.log('complete!');
  	},
  	error: function(error) {
          console.log('Throw Error: ' + error)
  	}
  });
  // Throw Error: Oop!

複製代碼

interval方法,會發送一個從零開始依次遞增的整數,它的參數是間隔時間,單位是毫秒

var observable = Rx.Observable.interval(1000);

  observable.subscribe({
  	next: function(value) {
  		console.log(value)
  	},
  	complete: function() {
  		console.log('complete!');
  	},
  	error: function(error) {
          console.log('Throw Error: ' + error)
  	}
  });
  // 0
  // 1
  // 2
  // ...

複製代碼

timer方法與 interval有所不一樣,它有兩個參數。第一個參數表明發出第一個值的等待時間,第二個參數表明每次發出值的間隔時間

var observable = Rx.Observable.timer(1000, 5000);

  observable.subscribe({
  	next: function(value) {
  		console.log(value)
  	},
  	complete: function() {
  		console.log('complete!');
  	},
  	error: function(error) {
          console.log('Throw Error: ' + error)
  	}
  });
  //先等一秒
  // 0  
  // 1
  // 2 ...

複製代碼

unsubscribe方法: 在訂閱observable後,會返回一個subscription,它有一個能夠釋放資源的unsubscribe方法

var observable = Rx.Observable.timer(1000, 1000);

  // 取得 subscription
  var subscription = observable.subscribe({
  	next: function(value) {
  		console.log(value)
  	},
  	complete: function() {
  		console.log('complete!');
  	},
  	error: function(error) {
      console.log('Throw Error: ' + error)
  	}
  });
  
  setTimeout(() => {
      subscription.unsubscribe() // 中止訂閱
  }, 5000);
  // 0
  // 1
  // 2
  // 3
  // 4

複製代碼

map方法:參數是一個回調函數,對數據進行操做以後,再返回新的observable

var observable = Rx.Observable.interval(1000);
  var newest = observable.map(x => x + 2); 
  
  newest.subscribe(console.log);
  // 2
  // 3
  // 4
  // 5..
複製代碼

讓咱們本身動手來實現一下:

function map(callback) {
      return Rx.Observable.create((observer) => {
          return this.subscribe(
              (value) => { 
                  try{
                      observer.next(callback(value));
                  } catch(e) {
                      observer.error(e);
                  }
              },
              (err) => { observer.error(err); },
              () => { observer.complete() }
          )
      })
  }
  Rx.Observable.prototype.map = map;
  var people = Rx.Observable.of('Jerry', 'Anna');
  var helloPeople = people.map((item) => item + ' Hello~');
  
  helloPeople.subscribe(console.log);
  // Jerry Hello~
  // Anna Hello~
複製代碼

mapTo方法:把原來的值都改爲一個固定值

var observable = Rx.Observable.interval(1000);
  var newest = observable.mapTo(2); 
  
  newest.subscribe(console.log);
  // 2
  // 2
  // 2
  // 2..
複製代碼

filter方法:類型Array的filter方法,過濾出一些值.

var observable = Rx.Observable.interval(1000);
  var newest = observable.filter(x => x % 2 === 0); 
  
  newest.subscribe(console.log);
  // 0
  // 2
  // 4
  // 6..
複製代碼

take方法:取前多少個元素就結束.

var observable = Rx.Observable.interval(1000);
  var example = observable.take(3);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 1
  // 2
  // complete
複製代碼

first方法:至關於take(1),取出第一個元素後就結束

var observable = Rx.Observable.interval(1000);
  var example = observable.first();
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  
  // 0
  // complete
複製代碼

takeUntil方法:在某個事件發生時,結束.

var observable = Rx.Observable.interval(1000);
  var click = Rx.Observable.fromEvent(document.body, 'click');
  var example = observable.takeUntil(click);     
     
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 1
  // 2
  // 3
  // complete //點擊body元素時
複製代碼

concatAll方法:當Observable傳遞的元素仍是observable時,相似於二維數組,咱們經過這個方法把它扁平化爲一維數組(concatAll 會一個一個處理,必定是等前一個 observable 完成(complete)纔會處理下一個 observable)

var observable = Rx.Observable.fromEvent(document.body, 'click');
  var source = observable.map(e => Rx.Observable.of(1,2,3));
  
  var example = source.concatAll();
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
複製代碼

switch方法:同concatAll相似,扁平化observable。(它會在新的 observable 送出後直接處理新的 observable 無論前一個 observable 是否完成,每當有新的 observable 送出就會直接把舊的 observable 退訂(unsubscribe),永遠只處理最新的 observable!)

var click = Rx.Observable.fromEvent(document.body, 'click');
  var source = click.map(e => Rx.Observable.interval(1000));
  
  var example = source.switch();
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
複製代碼

mergeAll:同concatAll,switch相似,扁平化observable。mergeAll 能夠傳入一個數值,這個數值表明他能夠同時處理的 observable 數量(不會像 switch 同樣退訂(unsubscribe)原先的 observable 而是並行處理多個 observable)

var click = Rx.Observable.fromEvent(document.body, 'click');
  var source = click.map(e => Rx.Observable.interval(1000));
  
  var example = source.mergeAll();
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  //----------------00---11---22---33---(04)4--...
複製代碼

下面來實現一個拖拉功能:頁面上有個元素(#drag),在該元素上按下左鍵(mousedown)時,開始監聽鼠標滑動的位置。當鼠標釋放時(mouseup),結束監聽。當鼠標移動時(mousemove),改變元素的位置

const dragDOM = document.getElementById('drag');
  const body = document.body;
  
  const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown');
  const mouseUp = Rx.Observable.fromEvent(body, 'mouseup');
  const mouseMove = Rx.Observable.fromEvent(body, 'mousemove');
  
  mouseDown
    .map(event => mouseMove.takeUntil(mouseUp))
    .concatAll()
    .map(event => ({ x: event.clientX, y: event.clientY }))
    .subscribe(pos => {
    	dragDOM.style.left = pos.x + 'px';
      dragDOM.style.top = pos.y + 'px';
    })
複製代碼

skip方法:跳過前幾個元素,從後面繼續取值

var observable = Rx.Observable.interval(1000);
  var example = observable.skip(3);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 3
  // 4
  // 5...
複製代碼

takeLast方法:從後面取值

var observable = Rx.Observable.interval(1000).take(6);
  var example = observable.takeLast(2);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 4
  // 5
  // complete
複製代碼

last方法:用來取到最後的元素

var observable = Rx.Observable.interval(1000).take(6);
  var example = observable.last();
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 5
  // complete
複製代碼

concat方法:把多個observable合併成一個

var source = Rx.Observable.interval(1000).take(3);
  var source2 = Rx.Observable.of(3)
  var source3 = Rx.Observable.of(4,5,6)
  var example = source.concat(source2, source3);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 1
  // 2
  // 3
  // 4
  // 5
  // 6
  // complete
複製代碼

startWith方法:在observable最開始插入元素

var observable = Rx.Observable.interval(1000);
  var example = observable.startWith(0);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 0
  // 1
  // 2
  // 3...
複製代碼

merge方法:合併observable,和concat不一樣的是:merge把多個observable同時處理,而concat處理完一個以後纔會處理接下來的observable

var source = Rx.Observable.interval(500).take(3);
  var source2 = Rx.Observable.interval(300).take(6);
  var example = source.merge(source2);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 0
  // 1
  // 2
  // 1
  // 3
  // 2
  // 4
  // 5
  // complete
複製代碼

combineLatest方法:它會取到各個observable的值,通過處理後再輸出

var observale = Rx.Observable.interval(500).take(3);
  var newest = Rx.Observable.interval(300).take(6);
  
  var example = observale.combineLatest(newest, (x, y) => x + y);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 1
  // 2
  // 3
  // 4
  // 5
  // 6
  // 7
  // complete
複製代碼

分析:combineLatest會等兩個observable都有傳送值的時候纔會執行callback

zip方法:會取每一個observable相同位置的元素傳入callback

var observale = Rx.Observable.interval(500).take(3);
  var newest = Rx.Observable.interval(300).take(6);
  
  var example = observale.zip(newest, (x, y) => x + y);
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 2
  // 4
  // complete
複製代碼

分析:zip 會等到 observale 跟 newest 都送出了第一個元素,再傳入 callback,下次則等到 observale 跟 newest 都送出了第二個元素再一塊兒傳入 callback

withLatestFrom方法:在主observable送出新值時,纔會執行callback

var main = Rx.Observable.from('hello').zip(Rx.Observable.interval(500), (x, y) => x);
  var some = Rx.Observable.from([0,1,0,0,0,1]).zip(Rx.Observable.interval(300), (x, y) => x);
  
  var example = main.withLatestFrom(some, (x, y) => {
      return y === 1 ? x.toUpperCase() : x;
  });
  
  example.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // h
  // e
  // l
  // L
  // O
  // complete
複製代碼

分析:withLatestFrom 會在 main 送出值的時候執行 callback,但請注意若是 main 送出值時 some 以前沒有送出過任何值 callback 仍然不會執行

用學到的方法實現一個較爲複雜的拖拉功能

const video = document.getElementById('video');
   const anchor = document.getElementById('anchor');
   const scroll = Rx.Observable.fromEvent(document, 'scroll');
   scroll.map(e => anchor.getBoundingClientRect().bottom < 0)
   .subscribe(bool => {
       if(bool) {
           video.classList.add('video-fixed');
       } else {
           video.classList.remove('video-fixed');
       }
   })
   
   const mouseDown = Rx.Observable.fromEvent(video, 'mousedown')
   const mouseUp = Rx.Observable.fromEvent(document, 'mouseup')
   const mouseMove = Rx.Observable.fromEvent(document, 'mousemove')
   const validValue = (value, max, min) => {
       return Math.min(Math.max(value, min), max)
   }
   mouseDown
       .filter(e => video.classList.contains('video-fixed'))
       .map(e => mouseMove.takeUntil(mouseUp))
       .concatAll()
       .withLatestFrom(mouseDown, (move, down) => {
           return {
               x: validValue(move.clientX - down.offsetX, window.innerWidth - 320, 0),
               y: validValue(move.clientY - down.offsetY, window.innerHeight - 180, 0)
           }
       })
       .subscribe(pos => {
           video.style.top = pos.y + 'px';
           video.style.left = pos.x + 'px';
       })

複製代碼

scan方法:相似Array的reduce方法,第一個參數傳入callback,第二個參數傳入初始值(能夠沒有)。返回一個observable 實例

var source = Rx.Observable.from('hello')
           .zip(Rx.Observable.interval(600), (x, y) => x);

  var observable = source.scan((origin, next) => origin + next, '');
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // h
  // he
  // hel
  // hell
  // hello
  // complete
複製代碼

buffer方法:它會把本來的 observable (source)送出的元素緩存在數組中,等到傳入的 observable(source2) 送出元素時,就會觸發把緩存的元素送出。

var source = Rx.Observable.interval(300);
  var source2 = Rx.Observable.interval(1000);
  var observable = source.buffer(source2);
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // [0,1,2]
  // [3,4,5]
  // [6,7,8]...
複製代碼

bufferCount方法:逢n個數緩存在數組中,並輸出

var source = Rx.Observable.interval(300);
  var observable = source.bufferCount(3);
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // [0,1,2]
  // [3,4,5]
  // [6,7,8]...
複製代碼

bufferTime方法:以下例子,鼠標在500ms內連續點兩下才輸出

const button = document.getElementById('demo');
  const click = Rx.Observable.fromEvent(button, 'click')
  const observable = click
                  .bufferTime(500)
                  .filter(arr => arr.length >= 2);
  
  observable.subscribe({
      next: (value) => { console.log('success'); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
複製代碼

delay方法:延遲多久以後再輸出元素,參數能夠是數字(ms),也能夠是日期格式

var source = Rx.Observable.interval(300).take(5);
  var observable = source.delay(500);
  // observable = source.delay(new Date(new Date().getTime() + 1000));
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  //延遲500ms以後
  // 0
  // 1
  // 2
  // 3
  // 4
複製代碼

delayWhen方法:delayWhen 能夠影響每一個元素,並且須要傳一個 callback 並回傳一個 observable

var source = Rx.Observable.interval(300).take(5);
  var observable = source
                .delayWhen(
                    x => Rx.Observable.empty().delay(100 * x * x)
                );
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
複製代碼

用學到的方法實現一個小功能:許多圖片跟着鼠標跑,可是不能跑的同樣快

<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width">
    <title>JS Bin</title>
    <style>
        * {
            margin: 0;
            padding: 0;
            cursor: pointer;
        }
        img {
          width: 50px;
          position: absolute;
          border-radius: 50%;
          border: 3px white solid;
          transform: translate3d(0,0,0);        
        }
    </style>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.js"></script>
  </head>
  <body>
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover6.jpg" alt="">
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover5.jpg" alt="">
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover4.jpg" alt="">
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover3.jpg" alt="">
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover2.jpg" alt="">
    <img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover1.jpg" alt="">
    <script>
          var imgList = document.getElementsByTagName('img');
          var movePos = Rx.Observable.fromEvent(document, 'mousemove')
          .map(e => ({ x: e.clientX, y: e.clientY }))
          
          function followMouse(DOMArr) {
            const delayTime = 600;
            DOMArr.forEach((item, index) => {
              movePos
                .delay(delayTime * (Math.pow(0.65, index) + Math.cos(index / 4)) / 2) //時間規則能夠換成其餘的
                .subscribe(function (pos){
                  item.style.transform = 'translate3d(' + (pos.x-25) + 'px, ' + (pos.y-25) + 'px, 0)';
                });
            });
          }
          followMouse(Array.from(imgList))
      </script>
  </body>
</html>
複製代碼

debounce 方法:和防抖函數功能一致,debounce 跟 debounceTime 一個是傳入 observable 另外一個則是傳入毫秒,比較經常使用到的是 debounceTime:會先把元素cache 住並等待一段時間,若是這段時間內已經沒有收到任何元素,則把元素送出;若是這段時間內又收到新的元素,則會把本來cache 住的元素釋放 掉並從新計時,不斷反覆

const searchInput = document.getElementById('searchInput');
  const theRequestValue = document.getElementById('theRequestValue');
  
  Rx.Observable.fromEvent(searchInput, 'input')
    .debounceTime(300)
    .map(e => e.target.value)
    .subscribe((value) => {
      theRequestValue.textContent = value;
      // 在這請求接口
    })
複製代碼

throttle 方法:和節流函數功能一致,throttle 跟 throttleTime 一個是傳入 observable 另外一個則是傳入毫秒,比較經常使用到的是throttleTime:會先開放送出元素,等到有元素被送出就會沉默一段時間,等到時間過了又會開放發送元素,throttle 是在控制行爲的最高頻率,更適合用在連續性行爲

var source = Rx.Observable.interval(300).take(5);
  var observable = source.throttleTime(1000);
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // 0
  // 4
  // complete
複製代碼

distinct方法:相同的值只留一個

var source = Rx.Observable.from([{ value: 'a'}, { value: 'b' }, { value: 'c' }, { value: 'a' }, { value: 'c' }])
              .zip(Rx.Observable.interval(300), (x, y) => x);
  var observable = source.distinct((x) => {
      return x.value
  });
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // {value: "a"}
  // {value: "b"}
  // {value: "c"}
  // complete
複製代碼

distinctUntilChanged方法:跟 distinct 同樣會把相同的元素過濾掉,但 distinctUntilChanged 只會跟最後一次送出的元素比較,不會每一個都比

var source = Rx.Observable.from(['a', 'b', 'c', 'c', 'b'])
          .zip(Rx.Observable.interval(300), (x, y) => x);
  var observable = source.distinctUntilChanged()
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  // a
  // b
  // c
  // b
  // complete
複製代碼

catch方法:用來處理錯誤

var source = Rx.Observable.from(['a','b','c',2])
          .zip(Rx.Observable.interval(500), (x,y) => x);

  var observable = source
                  .map(x => x.toUpperCase())
                  .catch(error => Rx.Observable.of('h'));
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });    
  // A
  // B
  // C
  // h
  // complete
複製代碼

retry方法:當某個Observable發生錯誤時,從頭嘗試循環Observable.參數爲循環的次數

var source = Rx.Observable.from(['a','b',2])
          .zip(Rx.Observable.interval(500), (x,y) => x);

  var observable = source
                  .map(x => x.toUpperCase())
                  .retry(1);
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  }); 
  // A
  // B
  // A
  // B
  // Error: TypeError: x.toUpperCase is not a function
複製代碼

retryWhen方法:當某個Observable發生錯誤時,去作某些處理以後再去循環嘗試

var source = Rx.Observable.from(['a','b','c','d',2])
          .zip(Rx.Observable.interval(500), (x,y) => x);

  var observable = source
                  .map(x => x.toUpperCase())
                  .retryWhen(
                  errorObs => errorObs.map(err => fetch('...')));
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  }); 
複製代碼

repeat方法:和retry相似,是一直重複訂閱的效果,但沒有錯誤發生。參數是循環次數,沒有參數就默認無限循環

var source = Rx.Observable.from(['a','b','c'])
          .zip(Rx.Observable.interval(500), (x,y) => x);

  var observable = source.repeat(1);
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
複製代碼

concatMap方法:等同於 map 加上 concatAll,在前一個observable執行完成後再執行下一個observable

var source = Rx.Observable.fromEvent(document.body, 'click');
  var observable = source
                  .concatMap(
                      e => Rx.Observable.interval(100).take(3)
                  );
                  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
複製代碼

此外,concatMap 還有第二個參數是一個 selector callback,這個 callback 會傳入四個參數,分別是 外部 observable 送出的元素、內部 observable 送出的元素、外部 observable 送出元素的 index、 內部 observable 送出元素的 index

function getPostData() {
      return fetch('https://jsonplaceholder.typicode.com/posts/1')
      .then(res => res.json())
  }
  var source = Rx.Observable.fromEvent(document.body, 'click');
  
  var observable = source.concatMap(
                  e => Rx.Observable.from(getPostData()), 
                  (e, res, eIndex, resIndex) => res.title); //res就是Rx.Observable.from(getPostData())的observable
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
複製代碼

switchMap方法:等同於 map 加上 switch,會在下一個 observable 被送出後直接退訂前一個未處理完的 observable

function getPostData() {
      return fetch('https://jsonplaceholder.typicode.com/posts/1')
      .then(res => res.json())
  }
  var source = Rx.Observable.fromEvent(document.body, 'click');
  
  var observable = source.switchMap(
                      e => Rx.Observable.from(getPostData()));
  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
複製代碼

mergeMap方法:等同於 map 加上 mergeAll ,能夠並行處理多個 observable(也能傳入第二個參數 selector callback,這個 selector callback 跟 concatMap 第二個參數也是徹底同樣的,但 mergeMap 的重點是咱們能夠傳入第三個參數,來限制並行處理的數量)

var source = Rx.Observable.fromEvent(document.body, 'click');

  var observable = source
                  .mergeMap(
                      e => Rx.Observable.interval(100).take(3)
                  );
                  
  observable.subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
複製代碼

總之

*  concatMap 用在能夠肯定內部的 observable 結束時間比外部 observable 發送時間來快的情境,而且不但願有任何並行處理行爲,適合少數要一次一次完成到底的的 UI 動畫或特別的 HTTP request 行爲。
*  switchMap 用在只要最後一次行爲的結果,適合絕大多數的使用情境。
*  mergeMap 用在並行處理多個 observable,適合須要並行處理的行爲,像是多個 I/O 的並行處理。
複製代碼

小范例(製做一個簡易的autocomplete)

<!DOCTYPE html>
  <html>
  <head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width">
    <title>JS Bin</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.js"></script>
    <style>
          html, body {
              height: 100%;
              background-color: white;
              padding: 0;
              margin: 0;
          }
          .autocomplete {
              position: relative;
              display: inline-block;
              margin: 20px;
          }
          .input {
              width: 200px;
              border: none;
              border-bottom: 1px solid black;
              padding: 0;
              line-height: 24px;
              font-size: 16px;
          }
          .input:focus {
              outline: none;
              border-bottom-color: blue;
          }
          .suggest {
              width: 200px;
              list-style: none;
              padding: 0;
              margin: 0;
              -webkit-box-shadow: 0 2px 4px rgba(0,0,0,0.2);
              
          }
          .suggest li {
                  cursor: pointer;
                  padding: 5px;
          }
          .suggest li:hover {
              background-color: lightblue;
          }
    </style>
  </head>
  <body>
    <div class="autocomplete">
      <input class="input" type="search" id="search" autocomplete="off">
      <ul id="suggest-list" class="suggest">
      </ul>
    </div>
    <script>
          const url = 'https://zh.wikipedia.org/w/api.php?action=opensearch&format=json&limit=5&origin=*';
  
          const getSuggestList = (keyword) => fetch(url + '&search=' + keyword, { method: 'GET', mode: 'cors' })
                                              .then(res => res.json())
          const searchInput = document.getElementById('search');
          const suggestList = document.getElementById('suggest-list');
  
          const keyword = Rx.Observable.fromEvent(searchInput, 'input');
          const selectItem = Rx.Observable.fromEvent(suggestList, 'click');
          const render = (suggestArr = []) => suggestList.innerHTML = suggestArr.map(item => '<li>'+ item +'</li>').join('')
  
          keyword
          // .filter(e => e.target.value.length > 2) 使用者打了 2 個字以上在發送 request
          .debounceTime(100)
          .switchMap(
              e => getSuggestList(e.target.value),//.retry(3) 在 API 失敗的時候從新嘗試 3 次
              (e, res) => res[1]
          )
          .subscribe(list => render(list))
  
          selectItem
          .filter(e => e.target.matches('li'))
          .map(e => e.target.innerText)
          .subscribe(text => { 
              searchInput.value = text;
              render();
          })
    </script>
  </body>
  </html>
複製代碼

window方法:把拆出來的元素放入observable並送出observable,相似buffer拆分出來的元素放到數組並送出數組

var click = Rx.Observable.fromEvent(document, 'click');
  var source = Rx.Observable.interval(1000);
  var observable = source.window(click);
  
  observable
    .switch()
    .subscribe(console.log);
  // 0
  // 1
  // 2
  // 3
  // 4
  // 5 ...
複製代碼

windowToggle方法,傳入兩個參數:第一個是開始的observable,第二個是一個回調函數回傳一個結束的observable

var source = Rx.Observable.interval(1000);
  var mouseDown = Rx.Observable.fromEvent(document, 'mousedown');
  var mouseUp = Rx.Observable.fromEvent(document, 'mouseup');
  
  var observable = source
    .windowToggle(mouseDown, () => mouseUp)
    .switch();
    
  observable.subscribe(console.log);
複製代碼

groupBy方法:把相同條件元素拆分紅一個observable

var people = [
  {name: 'Anna', score: 100, subject: 'English'},
  {name: 'Anna', score: 90, subject: 'Math'},
  {name: 'Anna', score: 96, subject: 'Chinese' }, 
  {name: 'Jerry', score: 80, subject: 'English'},
  {name: 'Jerry', score: 100, subject: 'Math'},
  {name: 'Jerry', score: 90, subject: 'Chinese' }, 
  ];
  var source = Rx.Observable.from(people)
      .zip(
       Rx.Observable.interval(300), 
       (x, y) => x);
  
  var observable = source
    .groupBy(person => person.name)
    .map(group => group.reduce((acc, curr) => ({ 
  	    name: curr.name,
  	    score: curr.score + acc.score 
  	})))
  	.mergeAll();
  	
  observable.subscribe(console.log);
  // { name: "Anna", score: 286 }
  // { name: 'Jerry', score: 270 }
複製代碼

5.Observable的特性

Observable 與數組相比,有兩大不一樣:1.延遲運算 2.漸進式取值

1.延遲運算:observable會等到訂閱後纔開始對元素作運算,若是沒有訂閱就不會有運算的行爲

var source = Rx.Observable.from([1,2,3,4,5]);
  var example = source.map(x => x + 1);
  //上面的代碼就不會去作運算
  
  var source = [1,2,3,4,5];
  var example = source.map(x => x + 1);
  //數組執行完以後,已經作了運算
複製代碼

2.漸進式取值:observable的每次運算會一算到底,並非數組的運算徹底部的元素以後再返回

var source = Rx.Observable.from([1,2,3]);
  var example = source
                .filter(x => x % 2 === 0)
                .map(x => x + 1)
  
  example.subscribe(console.log);
  //1到filter被過濾掉;2到filter再到mapb變成3返回並打印;3到filter被過濾掉
  
  var source = [1,2,3];
  var example = source
                .filter(x => x % 2 === 0) 
                .map(x => x + 1)
  //數組執行到filter會返回完整的數組[2],再到map返回完整的數組[3]
複製代碼

6.Subject是什麼

Subject 同時是 Observable 又是 Observer,Subject 會對內部的 observers 清單進行組播(multicast)。是 Observer Pattern 的實例而且繼承自 Observable。

動手實現一個Subject

var source = Rx.Observable.interval(1000).take(3);

  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  var subject = {
      observers: [],
      subscribe: function(observer) {
          this.observers.push(observer)
      },
      next: function(value) {
          this.observers.forEach(o => o.next(value))    
      },
      error: function(error){
          this.observers.forEach(o => o.error(error))
      },
      complete: function() {
          this.observers.forEach(o => o.complete())
      }
  }
  
  subject.subscribe(observerA)
  
  source.subscribe(subject);
  
  setTimeout(() => {
      subject.subscribe(observerB);
  }, 1000);
  
  // "A next: 0"
  // "A next: 1"
  // "B next: 1"
  // "A next: 2"
  // "B next: 2"
  // "A complete!"
  // "B complete!"
複製代碼

對比一下真正的subject:能夠看出,Subject 能夠拿去訂閱 Observable(source) 表明他是一個 Observer,同時 Subject 又能夠被 Observer(observerA, observerB) 訂閱,表明他是一個 Observable。

var source = Rx.Observable.interval(1000).take(3);

  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  var subject = new Rx.Subject()
  
  subject.subscribe(observerA)
  
  source.subscribe(subject);
  
  setTimeout(() => {
      subject.subscribe(observerB);
  }, 1000);
  
  // "A next: 0"
  // "A next: 1"
  // "B next: 1"
  // "A next: 2"
  // "B next: 2"
  // "A complete!"
  // "B complete!"
複製代碼

在某些沒法直接使用Observable的前端框架中,咱們能夠用subject

class MyButton extends React.Component {
      constructor(props) {
          super(props);
          this.state = { count: 0 };
          this.subject = new Rx.Subject();
          
          this.subject
              .mapTo(1)
              .scan((origin, next) => origin + next)
              .subscribe(x => {
                  this.setState({ count: x })
              })
      }
      render() {
          return <button onClick={event => this.subject.next(event)}>{this.state.count}</button>
      }
  }
複製代碼

BehaviorSubject: 但願 Subject 能表明當下的狀態,而不是簡單的事件發送

var subject = new Rx.BehaviorSubject(0); // 0 爲起始值
  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  subject.subscribe(observerA);
  // "A next: 0"  若是是普通的subject則不會輸出此行
  subject.next(1);
  // "A next: 1"
  subject.next(2);
  // "A next: 2"
  subject.next(3);
  // "A next: 3"
  
  setTimeout(() => {
      subject.subscribe(observerB); 
      // "B next: 3"   若是是普通的subject則不會輸出此行
  },3000)
複製代碼

ReplaySubject:在某些時候咱們會但願 Subject 表明事件,但又能在新訂閱時從新發送最後的幾個元素

var subject = new Rx.ReplaySubject(2); // 重複發送最後兩個元素
  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  subject.subscribe(observerA);
  subject.next(1);
  // "A next: 1"
  subject.next(2);
  // "A next: 2"
  subject.next(3);
  // "A next: 3"
  
  setTimeout(() => {
      subject.subscribe(observerB);
      // "B next: 2"
      // "B next: 3"
  },3000) //ReplaySubject 只是事件的重放而已。
複製代碼

AsyncSubject: 會在subject結束後送出最後一個值,行爲和promise很像

var subject = new Rx.AsyncSubject();
  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  subject.subscribe(observerA);
  subject.next(1);
  subject.next(2);
  subject.next(3);
  subject.complete();
  // "A next: 3"
  // "A complete!"
  
  setTimeout(() => {
      subject.subscribe(observerB);
      // "B next: 3"
      // "B complete!"
  },3000)
複製代碼

multicast:能夠用來掛載 subject 並回傳一個可連結(connectable)的 observable

var source = Rx.Observable.interval(1000)
           .take(3)
           .multicast(new Rx.Subject());

  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  source.subscribe(observerA); // subject.subscribe(observerA)
  
  source.connect(); // source.subscribe(subject)
  
  setTimeout(() => {
      source.subscribe(observerB); // subject.subscribe(observerB)
  }, 1000);
複製代碼

refCount:需和 multicast一塊兒使用,它能夠創建一個只須要訂閱就自動connect的observable

var source = Rx.Observable.interval(1000)
               .do(x => console.log('send: ' + x))
               .multicast(new Rx.Subject())
               .refCount();
  
  var observerA = {
      next: value => console.log('A next: ' + value),
      error: error => console.log('A error: ' + error),
      complete: () => console.log('A complete!')
  }
  
  var observerB = {
      next: value => console.log('B next: ' + value),
      error: error => console.log('B error: ' + error),
      complete: () => console.log('B complete!')
  }
  
  var subscriptionA = source.subscribe(observerA);
  // 訂閱數 0 => 1
  
  var subscriptionB;
  setTimeout(() => {
      subscriptionB = source.subscribe(observerB);
      // 訂閱數 0 => 2
  }, 1000);
複製代碼

publish:是multicast(new Rx.Subject())的簡化寫法

var source = Rx.Observable.interval(1000)
           .publish() 
           .refCount();
           
  // var source = Rx.Observable.interval(1000)
  //             .multicast(new Rx.Subject()) 
  //             .refCount();
  
  還有另外三種等價關係:
  
  publishReplay:
  var source = Rx.Observable.interval(1000)
               .publishReplay(1) 
               .refCount();
               
  // var source = Rx.Observable.interval(1000)
  //             .multicast(new Rx.ReplaySubject(1)) 
  //             .refCount();
  
  publishBehavior:
  var source = Rx.Observable.interval(1000)
               .publishBehavior(0) 
               .refCount();
               
  // var source = Rx.Observable.interval(1000)
  //             .multicast(new Rx.BehaviorSubject(0)) 
  //             .refCount();
  
  publishLast:
  var source = Rx.Observable.interval(1000)
               .publishLast() 
               .refCount();
               
  // var source = Rx.Observable.interval(1000)
  //             .multicast(new Rx.AsyncSubject(1)) 
  //             .refCount();
複製代碼

share:是publish + refCount的簡化

var source = Rx.Observable.interval(1000)
               .share();
               
  // var source = Rx.Observable.interval(1000)
  //             .publish() 
  //             .refCount();
  
  // var source = Rx.Observable.interval(1000)
  //             .multicast(new Rx.Subject()) 
  //             .refCount();
複製代碼

7.Subject與Observable的區別

Subject 能夠簡單理解爲爲了在多個訂閱中共用執行結果而存在的

Subject 可讓咱們用命令的方式發送值到一個observable串流中。若是由於框架限制,咱們沒法直接建立observable,如React的Event中就可使用Subject

Subject是 Observer Design Pattern的實例。當observer訂閱subject時,subject會把訂閱者push進一個訂閱者清單中,在元素髮送時就去遍歷這份清單把元素一一送出。這跟observable像一個function執行是徹底不一樣的。

Subject繼承了Observable,因此纔會有Observable的全部方法。主要的只有next、error、 complete、subscribe 及 unsubscribe 五個方法

Subject同時具備Observer和Observable的特性,跟 Observable 最大的區別就是它是有狀態的,也就是存儲的那份清單。

當一個observable操做過程當中發生了 side-effect,而咱們不但願由於多個subscribe而被觸發屢次,就須要用到Subject

//不用subject,A和B打印的值不一致
 var result = Rx.Observable.interval(1000).take(6)
               .map(x => Math.random());// side-effect
  var subA = result.subscribe(x => console.log('A: ' + x));
  var subB = result.subscribe(x => console.log('B: ' + x));
  
  //subject,A和B打印的值一致
  var result = Rx.Observable.interval(1000).take(6)
               .map(x => Math.random()) // side-effect
               .multicast(new Rx.Subject())
               .refCount();
  
  var subA = result.subscribe(x => console.log('A: ' + x));
  var subB = result.subscribe(x => console.log('B: ' + x));
複製代碼

8.簡易版 Observable 的實現

// 空的 observer 
  const emptyObserver = {
    next: () => {},
    error: (err) => { throw err; },
    complete: () => {}
  }
  
  class Observer {
    constructor(destinationOrNext, error, complete) {
      switch (arguments.length) {
        case 0:
          // 空的 observer
          this.destination = this.safeObserver(emptyObserver);
          break;
        case 1:
          if (!destinationOrNext) {
            // 空的 observer
            this.destination = this.safeObserver(emptyObserver);
            break;
          }
          if (typeof destinationOrNext === 'object') {
            // 傳入 observer 對象
            this.destination = this.safeObserver(destinationOrNext);
            break;
          }
        default:
          // 若是上面都不是,表示傳入了一到三個 function
          this.destination = this.safeObserver(destinationOrNext, error, complete);
          break;
      }
    }
    safeObserver(observerOrNext, error, complete) {
      let next;
  
      if (typeof (observerOrNext) === 'function') {
        // observerOrNext 是 next function
        next = observerOrNext;
      } else if (observerOrNext) {
        // observerOrNext 是 observer 
        next = observerOrNext.next || () => {};
        error = observerOrNext.error || function(err) { 
          throw err 
        };
        complete = observerOrNext.complete || () => {};
      }
      // 返回預期的 observer 對象
      return {
        next: next,
        error: error,
        complete: complete
      };
    }
    
    next(value) {
      if (!this.isStopped && this.next) {
        // 判斷是否中止過
        try {
          this.destination.next(value); // 傳送值
        } catch (err) {
          this.unsubscribe();
          throw err;
        }
      }
    }
    
    error(err) {
      if (!this.isStopped && this.error) {
        // 判斷是否中止過
        try {
          this.destination.error(err); // 傳送錯誤
        } catch (anotherError) {
          this.unsubscribe();
          throw anotherError;
        }
        this.unsubscribe();
      }
    }
  
    complete() {
      if (!this.isStopped && this.complete) {
        // 判斷是否中止過
        try {
          this.destination.complete(); // 完成
        } catch (err) {
          this.unsubscribe();
          throw err;
        }
        this.unsubscribe(); // 退訂
      }
    }
    
    unsubscribe() {
      this.isStopped = true;
    }
  }
  
  function create(subscriber) {
      const observable = {
          subscribe: function(observerOrNext, error, complete) {
              const realObserver = new Observer(observerOrNext, error, complete)
              subscriber(realObserver);
              return realObserver;
          }       
      };
      return observable;
  }
  
  var observable = create(function(observer) {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
    observer.next('not work');
  })
  
  var observer = {
    next: function(value) {
      console.log(value)
    },
    complete: function() {
        console.log('complete!')
    }
  }
  
  observable.subscribe(observer);
複製代碼

9.複雜版 Observable 的實現

// 定義空的 observer 
  const emptyObserver = {
    next: () => {},
    error: (err) => { throw err; },
    complete: () => {}
  }
  
  class Observer {
    constructor(destinationOrNext, error, complete) {
      switch (arguments.length) {
        case 0:
          // 空的 observer
          this.destination = this.safeObserver(emptyObserver);
          break;
        case 1:
          if (!destinationOrNext) {
            // 空的 observer
            this.destination = this.safeObserver(emptyObserver);
            break;
          }
          // 判斷傳入的 destinationOrNext 是不是 Observer 的實例,若是是就不用執行 `this.safeObserver`
          if(destinationOrNext instanceof Observer){
            this.destination = destinationOrNext;
            break;
          }
          if (typeof destinationOrNext === 'object') {
            // 傳入 observer 
            this.destination = this.safeObserver(destinationOrNext);
            break;
          }
        default:
          // 若是上面都不是,說明傳入了一到三個 function
          this.destination = this.safeObserver(destinationOrNext, error, complete);
          break;
      }
    }
    safeObserver(observerOrNext, error, complete) {
      let next;
  
      if (typeof (observerOrNext) === 'function') {
        // observerOrNext 是 next function
        next = observerOrNext;
      } else if (observerOrNext) {
        // observerOrNext 是 observer 
        next = observerOrNext.next || () => {};
        error = observerOrNext.error || function(err) { 
          throw err 
        };
        complete = observerOrNext.complete || () => {};
      }
      // 返回預期的observer
      return {
        next: next,
        error: error,
        complete: complete
      };
    }
    
    next(value) {
      if (!this.isStopped && this.next) {
        // 是否中止
        try {
          this.destination.next(value); // 發送值
        } catch (err) {
          this.unsubscribe();
          throw err;
        }
      }
    }
    
    error(err) {
      if (!this.isStopped && this.error) {
        // 是否中止
        try {
          this.destination.error(err); // 發送值
        } catch (anotherError) {
          this.unsubscribe();
          throw anotherError;
        }
        this.unsubscribe();
      }
    }
  
    complete() {
      if (!this.isStopped && this.complete) {
        // 是否中止
        try {
          this.destination.complete(); // 完成
        } catch (err) {
          this.unsubscribe();
          throw err;
        }
        this.unsubscribe(); // 中止訂閱
      }
    }
    
    unsubscribe() {
      this.isStopped = true;
    }
  }
  
  class MapObserver extends Observer {
    constructor(observer, callback) {
      // 傳入原來的 observer 和 map 的 callback
      super(observer); // 繼承Observer
      this.callback = callback; // 保存 callback
      this.next = this.next.bind(this); // 確保 next 的 this
    }
    next(value) {
      try {
        this.destination.next(this.callback(value)); 
        // this.destination 是父類 Observer 保存的 observer 
        // 這裏 this.callback(value) 就是 map 的操做
      } catch (err) {
        this.destination.error(err);
        return;
      }
    }
  }
  
  class Observable {
    constructor(subscribe) {
      this._subscribe = subscribe; // 把 subscribe 存到屬性中
    }
    subscribe(observerOrNext, error, complete) {
      const observer = new Observer(observerOrNext, error, complete);
      // 先用 this.operator 判斷當前的 observable 是否有 operator 
      if(this.operator) {
        this.operator.call(observer, this.source)
      } else {
        // 若是沒有 operator 再直接把 observer 傳入 _subscribe
        this._subscribe(observer);
      }
      return observer;
    }
    map(callback) {
      const observable = new Observable(); // 創建新的 observable
      
      observable.source = this; // 保存當前的 observable
      
      observable.operator = {
          call: (observer, source) => { 
              // 執行 operator 
              const newObserver = new MapObserver(observer, callback);
              // 創建包裹後的 observer
              // 訂閱並回傳
              return source.subscribe(newObserver);
          }
      }; // 存儲當前 operator ,並做爲是否有 operator 的依據,
      
      return observable; // 返回新的 observable
    }
  }
  
  Observable.create = function(subscribe) {
      return new Observable(subscribe);
  }
  
  Observable.fromArray = function(array) {
      if(!Array.isArray(array)) {
          throw new Error('params need to be an array');
      }
      return new Observable(function(observer) {
          try{
              array.forEach(value => observer.next(value))
              observer.complete()
          } catch(err) {
              observer.error(err)
          }
      });
  }
  
  var observable = Observable.fromArray([1,2,3,4,5])
                    .map(x => x + 3)
                    .map(x => x + 1)
  
  var observer = {
    next: function(value) {
      console.log(value)
    },
    complete: function() {
        console.log('complete!')
    }
  }
  
  observable.subscribe(observer);
複製代碼

10.Scheduler的基本概念

Scheduler是一個數據結構。它知道如何根據優先級或其餘標準來儲存並隊列任務

Scheduler是一個執行環境。它意味着任務什麼時候何地被執行,好比像是當即執行、在回調(callback)中執行、setTimeout中執行、animation frame中執行

Scheduler是一個虛擬時鐘。它透過now()這個方法提供了時間的概念,咱們可讓任務在特定的時間點被執行。

它有四個scheduler: queue(預設的當即執行,適合用在返回的operator 且有大量資料時使用); asap(非同步的執行,相似setTimeout 0); async(異步,相似setInterval,用在跟時間相關的操做); animationFrame(相似Window.requestAnimationFrame,用在複雜運算且高頻率觸發UI動畫時)

var observable = Rx.Observable.create(function (observer) {
      observer.next(1);
      observer.next(2);
      observer.next(3);
      observer.complete();
  });
  
  console.log('before subscribe');
  observable.observeOn(Rx.Scheduler.async) // 設爲 async 異步
  .subscribe({
      next: (value) => { console.log(value); },
      error: (err) => { console.log('Error: ' + err); },
      complete: () => { console.log('complete'); }
  });
  console.log('after subscribe');
  
  // "before subscribe"
  // "after subscribe"
  // 1
  // 2
  // 3
  // "complete"
複製代碼

11.rxjs收尾

如何對rxjs進行debug

  • 用do這個operator.它不會對元素產生任何影響。
const source = Rx.Observable.interval(1000).take(3);
    
    const example = source
                    .do(x => console.log('do log: ' + x))
                    .map(x => x + 1);
    
    example.subscribe((x) => {
        console.log('subscription log: ' + x)
    })
    
    // do log: 0
    // subscription log: 1
    // do log: 1
    // subscription log: 2
    // do log: 2
    // subscription log: 3
複製代碼
  • 畫Marble Diagram(大理石)圖
  • 用RxJS Devtools這個谷歌開發工具
Observable.prototype.debug = window.rxDevTool(Observable);
    
    Observable.interval(1000).take(5)
    .debug('source1')
    .map(x => x + 1)
    .debug('source2')
    .subscribe(function() {
        //...
    })
複製代碼

Cold & Hot Observable: 區分不一樣行爲的Observable。Cold Observable 就是指每次訂閱都獨立的執行,而Hot Observable則是共同的訂閱。而這一切的差別來自因而在Observale的內部創建仍是外部創建。

Cold Observable:下面的代碼每次訂閱都是獨立的,它們之間不會互相影響

const source = Rx.Observable.interval(1000).take(5);
    
    source.subscribe(value => console.log('sub1: ' + value))
    
    setTimeout(() => {
        source.subscribe(value => console.log('sub2: ' + value))    
    }, 3500);
    
    // sub1: 0
    // sub1: 1
    // sub1: 2
    // sub1: 3
    // sub2: 0
    // sub1: 4
    // sub2: 1
    // sub2: 2
    // sub2: 3
    // sub2: 4
複製代碼

Hot Observable:每一個訂閱都是共用的,具體是指一個Observable在屢次訂閱時,不會每次都重新開始發送元素

var source = Rx.Observable.interval(1000)
                .take(5)
                .share(); // 共用
    
    source.subscribe(value => console.log('sub1: ' + value))
    
    setTimeout(() => {
        source.subscribe(value => console.log('sub2: ' + value))    
    }, 3500);
    
    // sub1: 0
    // sub1: 1
    // sub1: 2
    // sub1: 3
    // sub2: 3
    // sub1: 4
    // sub2: 4
複製代碼

mpvue中使用rxjs

mpvue其實就是vue的翻版。前面講到,在某些沒法直接使用Observable的前端框架中,咱們能夠用subject,因此咱們就用subject在mpvue中,實現一個用戶頻繁點擊支付按鈕的功能。

定義一個支付class

import rxwx, { Rx } from 'rxjs-wx/RxWX'

export default class Payment {
  constructor () {
    this.rxSubject = new Rx.Subject()
    this.count = 0
  }
  get () {
    return this.rxSubject
  }
  clear () {
    this.count = 0
  }
  pay () {
    return Rx.Observable.of().multicast(this.rxSubject)
      .do(() => {
        this.count++
        if (this.count > 1) {
          wx.showToast({
            title: '彆着急,正在完成支付..',
            icon: 'none'
          })
        }
      })
      .debounceTime(1000)
      .switchMap((payParams) => {
        console.log(payParams)
        return rxwx.requestPayment({
            timeStamp: payParams.payParams,
            nonceStr: payParams.nonceStr,
            package: payParams.package,
            signType: payParams.signType || 'MD5',
            paySign: payParams.paySign
            
        })
         .debounceTime(1000)
      })
  }
}
複製代碼

頁面僞代碼

<template>
    <span @click="payHandle">確認支付</span>
</template>
export default {
    methods: {
        payHandle () {
          this.payment.get().next('xxx') // 調用支付功能
        },
        initPay () {
          this.payment = new Payment()
          this.payment.pay()
            .catch((e) => {
              wx.showToast({
                title: '支付失敗',
                icon: 'none'
              })
            })
            .subscribe((resp) => {
              wx.showToast({
                title: '支付成功',
                icon: 'none'
              })
              this.payment.clear() // 重置count
              // 支付成功後balabala...
            })
        }
    },
    mounted () {
        // 初始化
        this.initPay()
      },
}
複製代碼
相關文章
相關標籤/搜索