rxjs是一個響應式編程的庫,它使異步編程和回調代碼變得更爲簡單。 mpvue是一個小程序的框架。它使用vue的語法,使小程序的開發更加快捷簡單,讓前端開發人員的一套代碼同時用於web端和小程序端變成了現實。php
響應式編程是rxjs的核心概念之一。在主流的三大框架之中都獲得了應用:vue的底層就採用了reactive programming.angular2+ 也全面引用了rxjs,,不論是在 http 仍是 animation 都用了 RxJS 的 Observable.Redux 從3.5版本開始,也加入了對Observable 操做的支持.甚至於主流的編程語言都有rx的Library,,好比RxRuby, RxPy, RxJava...等 RxJS 提供了一套完整的非同步解決方案,讓咱們在面對各類非同步行爲,不論是 Event, AJAX, 仍是 Animation 等,咱們均可以使用相同的 API 作開發。html
在前端開發過程當中,咱們會用到各類的js異步,如callback 或是 Promise 物件甚至是async/await,單隨着應用愈來愈複雜,編寫異步代碼愈來愈困難和繁瑣。異步常見的問題有: * 競態條件 (Race Condition) * 內存泄漏 (Memory Leak) * 複雜的狀態 (Complex State) * 異常處理 (Exception Handling)前端
若是咱們用rxjs來處理,彷佛就變得方便了許多:vue
var handler = (e) => {
console.log(e);
document.body.removeEventListener('click', handler);
}
document.body.addEventListener('click', handler);
`
能夠寫成:
`
Rx.Observable
.fromEvent(document.body, 'click') // 註冊監聽
.take(1) // 只取一次
.subscribe(console.log);
複製代碼
總之,RxJS 是一套藉由 Observable sequences 來組合非同步行爲和事件基礎程序的 Library,能夠把 RxJS 想成處理 非同步行爲 的 Lodash.是Functional Programming 及 Reactive Programming 兩個編程思想的結合.react
RxJS 的基礎就是 Observable,只要弄懂 Observable 就等於學會一半的 RxJS.Observable 就是觀察者模式(Observer) 和 迭代器模式(Iterator) 兩種思想的結合。web
Observer Patternajax
觀察者模式在api設計上獲得普遍應用,常見的一個例子是編程
function clickHandler(event) {
console.log('user click!');
}
document.body.addEventListener('click', clickHandler)
複製代碼
讓咱們本身來實現一個:json
//定義
class Producer {
constructor() {
this.listeners = [];
}
addListener(listener) {
if(typeof listener === 'function') {
this.listeners.push(listener)
} else {
throw new Error('listener 必須是 function')
}
}
removeListener(listener) {
this.listeners.splice(this.listeners.indexOf(listener), 1)
}
notify(message) {
this.listeners.forEach(listener => {
listener(message);
})
}
}
//應用
var egghead = new Producer();
function listener1(message) {
console.log(message + 'from listener1');
}
function listener2(message) {
console.log(message + 'from listener2');
}
egghead.addListener(listener1); // 註冊監聽
egghead.addListener(listener2);
egghead.notify('A new course!!') // 執行
//輸出結果:
a new course!! from listener1
a new course!! from listener2
複製代碼
這個例子很好的說明了 Observer Pattern 如何在event 跟 listeners的應用中作到解耦小程序
Iterator Pattern
下面是一個使用Iterator的例子
var arr = [1, 2, 3];
var iterator = arr[Symbol.iterator]();
iterator.next();
// { value: 1, done: false }
iterator.next();
// { value: 2, done: false }
iterator.next();
// { value: 3, done: false }
iterator.next();
// { value: undefined, done: true }
複製代碼
讓咱們來動手製做一個:
//定義
class IteratorFromArray {
constructor(arr) {
this._array = arr;
this._cursor = 0;
}
next() {
return this._cursor < this._array.length ?
{ value: this._array[this._cursor++], done: false } :
{ done: true };
}
map(callback) {
const iterator = new IteratorFromArray(this._array);
return {
next: () => {
const { done, value } = iterator.next();
return {
done: done,
value: done ? undefined : callback(value)
}
}
}
}
}
//使用
var iterator = new IteratorFromArray([1,2,3]);
var newIterator = iterator.map(value => value + 3);
newIterator.next();
// { value: 4, done: false }
newIterator.next();
// { value: 5, done: false }
newIterator.next();
// { value: 6, done: false }
複製代碼
相似的還有generator的例子:
function* getNumbers(words) {
for (let word of words) {
if (/^[0-9]+$/.test(word)) {
yield parseInt(word, 10);
}
}
}
const iterator = getNumbers('12咱們3學習4');
iterator.next();
// { value: 1, done: false }
iterator.next();
// { value: 2, done: false }
iterator.next();
// { value: 3, done: false }
iterator.next();
// { value: 4, done: false }
iterator.next();
// { value: undefined, done: true }
複製代碼
總之,Observer 與 Iterator 都有共同的特性,就是漸進式的獲取數據信息,差異在於Observer 是生產者push數據,Iterator 是消費者pull數據。而Observable 具有生產者推送數據的特性,同時能像序列,擁有序列處理數據的方法
RxJS 有一個核心和三個重點。核心就是Observable(map, filter...),三個重點分別是:Observer,Subject,Schedulers, 先來說講Observable的用法。
創建 Observable: create
var observable = Rx.Observable
.create(function(observer) {
observer.next('Jerry');
observer.next('Anna');
})
複製代碼
讓咱們訂閱observable,來接收數據
observable.subscribe(function(value) {
console.log(value);
})
複製代碼
須要注意的是,Observable不只能夠處理異步狀況,同步行爲也是能夠的。此外,觀察者(Observer) 有三個方法:
var observable = Rx.Observable
.create(function(observer) {
observer.next('Jerry');
observer.next('Anna');
observer.complete();
observer.next('not work');
})
var observer = {
next: function(value) {
console.log(value);
},
error: function(error) {
console.log(error)
},
complete: function() {
console.log('complete')
}
}
observable.subscribe(observer)
//輸出
Jerry
Anna
complete
複製代碼
Observable 的經常使用方法包括create,of,from,fromEvent,fromPromise,never,empty,throw,interval,timer等等
of的用法,能夠和上面的create方法作一個對比
var observable = Rx.Observable.of('Jerry', 'Anna');
observable.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error)
}
});
// Jerry
// Anna
// complete!
複製代碼
from的用法,參數是數組,對比of傳入的是一個個參數
var arr = ['Jerry', 'Anna', 2016, 2017, '30 days']
var observable = Rx.Observable.from(arr);
observable.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error)
}
});
// Jerry
// Anna
// 2016
// 2017
// 30 days
// complete!
複製代碼
此外,from的參數還能夠是字符串或者promise
var observable = Rx.Observable
.from(new Promise((resolve, reject) => {
setTimeout(() => {
resolve('Hello RxJS!');
},3000)
}))
observable.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error)
}
});
// Hello RxJS!
// complete!
複製代碼
fromEvent的用法,第一個參數是dom元素,第二個參數是要監聽的事件名,例如對body作點擊事件監聽:
var observable = Rx.Observable.fromEvent(document.body, 'click');
observable.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error)
}
});
複製代碼
fromEventPattern,這個方法是給類事件使用。所謂的類事件就是指其行爲跟事件相像,同時具備註冊監聽及移除監聽兩種行爲,就像 DOM Event 有 addEventListener 及 removeEventListener
class Producer {
constructor() {
this.listeners = [];
}
addListener(listener) {
if(typeof listener === 'function') {
this.listeners.push(listener)
} else {
throw new Error('listener 必須是 function')
}
}
removeListener(listener) {
this.listeners.splice(this.listeners.indexOf(listener), 1)
}
notify(message) {
this.listeners.forEach(listener => {
listener(message);
})
}
}
var egghead = new Producer();
// egghead 同時有 addEventListener 及 removeEventListener方法
var observable = Rx.Observable
.fromEventPattern(
(handler) => egghead.addListener(handler),
(handler) => egghead.removeListener(handler)
);
observable.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error)
}
})
egghead.notify('Hello! Can you hear me?');
// Hello! Can you hear me?
複製代碼
empty方法,會返回一個空的observable,當即執行complete
var observable = Rx.Observable.empty();
observable.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error)
}
});
// complete!
複製代碼
never方法,會返回一個無窮的observable,就是一個一直存在,但什麼都不作的observable
var observable = Rx.Observable.never();
observable.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log(error)
}
});
複製代碼
throw方法的做用就是拋出錯誤
var observable = Rx.Observable.throw('Oop!');
observable.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error)
}
});
// Throw Error: Oop!
複製代碼
interval方法,會發送一個從零開始依次遞增的整數,它的參數是間隔時間,單位是毫秒
var observable = Rx.Observable.interval(1000);
observable.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error)
}
});
// 0
// 1
// 2
// ...
複製代碼
timer方法與 interval有所不一樣,它有兩個參數。第一個參數表明發出第一個值的等待時間,第二個參數表明每次發出值的間隔時間
var observable = Rx.Observable.timer(1000, 5000);
observable.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error)
}
});
//先等一秒
// 0
// 1
// 2 ...
複製代碼
unsubscribe方法: 在訂閱observable後,會返回一個subscription,它有一個能夠釋放資源的unsubscribe方法
var observable = Rx.Observable.timer(1000, 1000);
// 取得 subscription
var subscription = observable.subscribe({
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error)
}
});
setTimeout(() => {
subscription.unsubscribe() // 中止訂閱
}, 5000);
// 0
// 1
// 2
// 3
// 4
複製代碼
map方法:參數是一個回調函數,對數據進行操做以後,再返回新的observable
var observable = Rx.Observable.interval(1000);
var newest = observable.map(x => x + 2);
newest.subscribe(console.log);
// 2
// 3
// 4
// 5..
複製代碼
讓咱們本身動手來實現一下:
function map(callback) {
return Rx.Observable.create((observer) => {
return this.subscribe(
(value) => {
try{
observer.next(callback(value));
} catch(e) {
observer.error(e);
}
},
(err) => { observer.error(err); },
() => { observer.complete() }
)
})
}
Rx.Observable.prototype.map = map;
var people = Rx.Observable.of('Jerry', 'Anna');
var helloPeople = people.map((item) => item + ' Hello~');
helloPeople.subscribe(console.log);
// Jerry Hello~
// Anna Hello~
複製代碼
mapTo方法:把原來的值都改爲一個固定值
var observable = Rx.Observable.interval(1000);
var newest = observable.mapTo(2);
newest.subscribe(console.log);
// 2
// 2
// 2
// 2..
複製代碼
filter方法:類型Array的filter方法,過濾出一些值.
var observable = Rx.Observable.interval(1000);
var newest = observable.filter(x => x % 2 === 0);
newest.subscribe(console.log);
// 0
// 2
// 4
// 6..
複製代碼
take方法:取前多少個元素就結束.
var observable = Rx.Observable.interval(1000);
var example = observable.take(3);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// complete
複製代碼
first方法:至關於take(1),取出第一個元素後就結束
var observable = Rx.Observable.interval(1000);
var example = observable.first();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// complete
複製代碼
takeUntil方法:在某個事件發生時,結束.
var observable = Rx.Observable.interval(1000);
var click = Rx.Observable.fromEvent(document.body, 'click');
var example = observable.takeUntil(click);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// complete //點擊body元素時
複製代碼
concatAll方法:當Observable傳遞的元素仍是observable時,相似於二維數組,咱們經過這個方法把它扁平化爲一維數組(concatAll 會一個一個處理,必定是等前一個 observable 完成(complete)纔會處理下一個 observable)
var observable = Rx.Observable.fromEvent(document.body, 'click');
var source = observable.map(e => Rx.Observable.of(1,2,3));
var example = source.concatAll();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
複製代碼
switch方法:同concatAll相似,扁平化observable。(它會在新的 observable 送出後直接處理新的 observable 無論前一個 observable 是否完成,每當有新的 observable 送出就會直接把舊的 observable 退訂(unsubscribe),永遠只處理最新的 observable!)
var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));
var example = source.switch();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
複製代碼
mergeAll:同concatAll,switch相似,扁平化observable。mergeAll 能夠傳入一個數值,這個數值表明他能夠同時處理的 observable 數量(不會像 switch 同樣退訂(unsubscribe)原先的 observable 而是並行處理多個 observable)
var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.interval(1000));
var example = source.mergeAll();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
//----------------00---11---22---33---(04)4--...
複製代碼
下面來實現一個拖拉功能:頁面上有個元素(#drag),在該元素上按下左鍵(mousedown)時,開始監聽鼠標滑動的位置。當鼠標釋放時(mouseup),結束監聽。當鼠標移動時(mousemove),改變元素的位置
const dragDOM = document.getElementById('drag');
const body = document.body;
const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown');
const mouseUp = Rx.Observable.fromEvent(body, 'mouseup');
const mouseMove = Rx.Observable.fromEvent(body, 'mousemove');
mouseDown
.map(event => mouseMove.takeUntil(mouseUp))
.concatAll()
.map(event => ({ x: event.clientX, y: event.clientY }))
.subscribe(pos => {
dragDOM.style.left = pos.x + 'px';
dragDOM.style.top = pos.y + 'px';
})
複製代碼
skip方法:跳過前幾個元素,從後面繼續取值
var observable = Rx.Observable.interval(1000);
var example = observable.skip(3);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 3
// 4
// 5...
複製代碼
takeLast方法:從後面取值
var observable = Rx.Observable.interval(1000).take(6);
var example = observable.takeLast(2);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 4
// 5
// complete
複製代碼
last方法:用來取到最後的元素
var observable = Rx.Observable.interval(1000).take(6);
var example = observable.last();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 5
// complete
複製代碼
concat方法:把多個observable合併成一個
var source = Rx.Observable.interval(1000).take(3);
var source2 = Rx.Observable.of(3)
var source3 = Rx.Observable.of(4,5,6)
var example = source.concat(source2, source3);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// complete
複製代碼
startWith方法:在observable最開始插入元素
var observable = Rx.Observable.interval(1000);
var example = observable.startWith(0);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 0
// 1
// 2
// 3...
複製代碼
merge方法:合併observable,和concat不一樣的是:merge把多個observable同時處理,而concat處理完一個以後纔會處理接下來的observable
var source = Rx.Observable.interval(500).take(3);
var source2 = Rx.Observable.interval(300).take(6);
var example = source.merge(source2);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 0
// 1
// 2
// 1
// 3
// 2
// 4
// 5
// complete
複製代碼
combineLatest方法:它會取到各個observable的值,通過處理後再輸出
var observale = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);
var example = observale.combineLatest(newest, (x, y) => x + y);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// complete
複製代碼
分析:combineLatest會等兩個observable都有傳送值的時候纔會執行callback
zip方法:會取每一個observable相同位置的元素傳入callbackvar observale = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);
var example = observale.zip(newest, (x, y) => x + y);
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 2
// 4
// complete
複製代碼
分析:zip 會等到 observale 跟 newest 都送出了第一個元素,再傳入 callback,下次則等到 observale 跟 newest 都送出了第二個元素再一塊兒傳入 callback
withLatestFrom方法:在主observable送出新值時,纔會執行callbackvar main = Rx.Observable.from('hello').zip(Rx.Observable.interval(500), (x, y) => x);
var some = Rx.Observable.from([0,1,0,0,0,1]).zip(Rx.Observable.interval(300), (x, y) => x);
var example = main.withLatestFrom(some, (x, y) => {
return y === 1 ? x.toUpperCase() : x;
});
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// h
// e
// l
// L
// O
// complete
複製代碼
分析:withLatestFrom 會在 main 送出值的時候執行 callback,但請注意若是 main 送出值時 some 以前沒有送出過任何值 callback 仍然不會執行
用學到的方法實現一個較爲複雜的拖拉功能const video = document.getElementById('video');
const anchor = document.getElementById('anchor');
const scroll = Rx.Observable.fromEvent(document, 'scroll');
scroll.map(e => anchor.getBoundingClientRect().bottom < 0)
.subscribe(bool => {
if(bool) {
video.classList.add('video-fixed');
} else {
video.classList.remove('video-fixed');
}
})
const mouseDown = Rx.Observable.fromEvent(video, 'mousedown')
const mouseUp = Rx.Observable.fromEvent(document, 'mouseup')
const mouseMove = Rx.Observable.fromEvent(document, 'mousemove')
const validValue = (value, max, min) => {
return Math.min(Math.max(value, min), max)
}
mouseDown
.filter(e => video.classList.contains('video-fixed'))
.map(e => mouseMove.takeUntil(mouseUp))
.concatAll()
.withLatestFrom(mouseDown, (move, down) => {
return {
x: validValue(move.clientX - down.offsetX, window.innerWidth - 320, 0),
y: validValue(move.clientY - down.offsetY, window.innerHeight - 180, 0)
}
})
.subscribe(pos => {
video.style.top = pos.y + 'px';
video.style.left = pos.x + 'px';
})
複製代碼
scan方法:相似Array的reduce方法,第一個參數傳入callback,第二個參數傳入初始值(能夠沒有)。返回一個observable 實例
var source = Rx.Observable.from('hello')
.zip(Rx.Observable.interval(600), (x, y) => x);
var observable = source.scan((origin, next) => origin + next, '');
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// h
// he
// hel
// hell
// hello
// complete
複製代碼
buffer方法:它會把本來的 observable (source)送出的元素緩存在數組中,等到傳入的 observable(source2) 送出元素時,就會觸發把緩存的元素送出。
var source = Rx.Observable.interval(300);
var source2 = Rx.Observable.interval(1000);
var observable = source.buffer(source2);
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...
複製代碼
bufferCount方法:逢n個數緩存在數組中,並輸出
var source = Rx.Observable.interval(300);
var observable = source.bufferCount(3);
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// [0,1,2]
// [3,4,5]
// [6,7,8]...
複製代碼
bufferTime方法:以下例子,鼠標在500ms內連續點兩下才輸出
const button = document.getElementById('demo');
const click = Rx.Observable.fromEvent(button, 'click')
const observable = click
.bufferTime(500)
.filter(arr => arr.length >= 2);
observable.subscribe({
next: (value) => { console.log('success'); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
複製代碼
delay方法:延遲多久以後再輸出元素,參數能夠是數字(ms),也能夠是日期格式
var source = Rx.Observable.interval(300).take(5);
var observable = source.delay(500);
// observable = source.delay(new Date(new Date().getTime() + 1000));
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
//延遲500ms以後
// 0
// 1
// 2
// 3
// 4
複製代碼
delayWhen方法:delayWhen 能夠影響每一個元素,並且須要傳一個 callback 並回傳一個 observable
var source = Rx.Observable.interval(300).take(5);
var observable = source
.delayWhen(
x => Rx.Observable.empty().delay(100 * x * x)
);
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
複製代碼
用學到的方法實現一個小功能:許多圖片跟着鼠標跑,可是不能跑的同樣快
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width">
<title>JS Bin</title>
<style>
* {
margin: 0;
padding: 0;
cursor: pointer;
}
img {
width: 50px;
position: absolute;
border-radius: 50%;
border: 3px white solid;
transform: translate3d(0,0,0);
}
</style>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.js"></script>
</head>
<body>
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover6.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover5.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover4.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover3.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover2.jpg" alt="">
<img src="https://res.cloudinary.com/dohtkyi84/image/upload/c_scale,w_50/v1483019072/head-cover1.jpg" alt="">
<script>
var imgList = document.getElementsByTagName('img');
var movePos = Rx.Observable.fromEvent(document, 'mousemove')
.map(e => ({ x: e.clientX, y: e.clientY }))
function followMouse(DOMArr) {
const delayTime = 600;
DOMArr.forEach((item, index) => {
movePos
.delay(delayTime * (Math.pow(0.65, index) + Math.cos(index / 4)) / 2) //時間規則能夠換成其餘的
.subscribe(function (pos){
item.style.transform = 'translate3d(' + (pos.x-25) + 'px, ' + (pos.y-25) + 'px, 0)';
});
});
}
followMouse(Array.from(imgList))
</script>
</body>
</html>
複製代碼
debounce 方法:和防抖函數功能一致,debounce 跟 debounceTime 一個是傳入 observable 另外一個則是傳入毫秒,比較經常使用到的是 debounceTime:會先把元素cache 住並等待一段時間,若是這段時間內已經沒有收到任何元素,則把元素送出;若是這段時間內又收到新的元素,則會把本來cache 住的元素釋放 掉並從新計時,不斷反覆
const searchInput = document.getElementById('searchInput');
const theRequestValue = document.getElementById('theRequestValue');
Rx.Observable.fromEvent(searchInput, 'input')
.debounceTime(300)
.map(e => e.target.value)
.subscribe((value) => {
theRequestValue.textContent = value;
// 在這請求接口
})
複製代碼
throttle 方法:和節流函數功能一致,throttle 跟 throttleTime 一個是傳入 observable 另外一個則是傳入毫秒,比較經常使用到的是throttleTime:會先開放送出元素,等到有元素被送出就會沉默一段時間,等到時間過了又會開放發送元素,throttle 是在控制行爲的最高頻率,更適合用在連續性行爲
var source = Rx.Observable.interval(300).take(5);
var observable = source.throttleTime(1000);
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// 0
// 4
// complete
複製代碼
distinct方法:相同的值只留一個
var source = Rx.Observable.from([{ value: 'a'}, { value: 'b' }, { value: 'c' }, { value: 'a' }, { value: 'c' }])
.zip(Rx.Observable.interval(300), (x, y) => x);
var observable = source.distinct((x) => {
return x.value
});
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// {value: "a"}
// {value: "b"}
// {value: "c"}
// complete
複製代碼
distinctUntilChanged方法:跟 distinct 同樣會把相同的元素過濾掉,但 distinctUntilChanged 只會跟最後一次送出的元素比較,不會每一個都比
var source = Rx.Observable.from(['a', 'b', 'c', 'c', 'b'])
.zip(Rx.Observable.interval(300), (x, y) => x);
var observable = source.distinctUntilChanged()
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// a
// b
// c
// b
// complete
複製代碼
catch方法:用來處理錯誤
var source = Rx.Observable.from(['a','b','c',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var observable = source
.map(x => x.toUpperCase())
.catch(error => Rx.Observable.of('h'));
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// A
// B
// C
// h
// complete
複製代碼
retry方法:當某個Observable發生錯誤時,從頭嘗試循環Observable.參數爲循環的次數
var source = Rx.Observable.from(['a','b',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var observable = source
.map(x => x.toUpperCase())
.retry(1);
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
// A
// B
// A
// B
// Error: TypeError: x.toUpperCase is not a function
複製代碼
retryWhen方法:當某個Observable發生錯誤時,去作某些處理以後再去循環嘗試
var source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x);
var observable = source
.map(x => x.toUpperCase())
.retryWhen(
errorObs => errorObs.map(err => fetch('...')));
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
複製代碼
repeat方法:和retry相似,是一直重複訂閱的效果,但沒有錯誤發生。參數是循環次數,沒有參數就默認無限循環
var source = Rx.Observable.from(['a','b','c'])
.zip(Rx.Observable.interval(500), (x,y) => x);
var observable = source.repeat(1);
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
複製代碼
concatMap方法:等同於 map 加上 concatAll,在前一個observable執行完成後再執行下一個observable
var source = Rx.Observable.fromEvent(document.body, 'click');
var observable = source
.concatMap(
e => Rx.Observable.interval(100).take(3)
);
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
複製代碼
此外,concatMap 還有第二個參數是一個 selector callback,這個 callback 會傳入四個參數,分別是 外部 observable 送出的元素、內部 observable 送出的元素、外部 observable 送出元素的 index、 內部 observable 送出元素的 index
function getPostData() {
return fetch('https://jsonplaceholder.typicode.com/posts/1')
.then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');
var observable = source.concatMap(
e => Rx.Observable.from(getPostData()),
(e, res, eIndex, resIndex) => res.title); //res就是Rx.Observable.from(getPostData())的observable
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
複製代碼
switchMap方法:等同於 map 加上 switch,會在下一個 observable 被送出後直接退訂前一個未處理完的 observable
function getPostData() {
return fetch('https://jsonplaceholder.typicode.com/posts/1')
.then(res => res.json())
}
var source = Rx.Observable.fromEvent(document.body, 'click');
var observable = source.switchMap(
e => Rx.Observable.from(getPostData()));
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
複製代碼
mergeMap方法:等同於 map 加上 mergeAll ,能夠並行處理多個 observable(也能傳入第二個參數 selector callback,這個 selector callback 跟 concatMap 第二個參數也是徹底同樣的,但 mergeMap 的重點是咱們能夠傳入第三個參數,來限制並行處理的數量)
var source = Rx.Observable.fromEvent(document.body, 'click');
var observable = source
.mergeMap(
e => Rx.Observable.interval(100).take(3)
);
observable.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
複製代碼
總之
* concatMap 用在能夠肯定內部的 observable 結束時間比外部 observable 發送時間來快的情境,而且不但願有任何並行處理行爲,適合少數要一次一次完成到底的的 UI 動畫或特別的 HTTP request 行爲。
* switchMap 用在只要最後一次行爲的結果,適合絕大多數的使用情境。
* mergeMap 用在並行處理多個 observable,適合須要並行處理的行爲,像是多個 I/O 的並行處理。
複製代碼
小范例(製做一個簡易的autocomplete)
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width">
<title>JS Bin</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.js"></script>
<style>
html, body {
height: 100%;
background-color: white;
padding: 0;
margin: 0;
}
.autocomplete {
position: relative;
display: inline-block;
margin: 20px;
}
.input {
width: 200px;
border: none;
border-bottom: 1px solid black;
padding: 0;
line-height: 24px;
font-size: 16px;
}
.input:focus {
outline: none;
border-bottom-color: blue;
}
.suggest {
width: 200px;
list-style: none;
padding: 0;
margin: 0;
-webkit-box-shadow: 0 2px 4px rgba(0,0,0,0.2);
}
.suggest li {
cursor: pointer;
padding: 5px;
}
.suggest li:hover {
background-color: lightblue;
}
</style>
</head>
<body>
<div class="autocomplete">
<input class="input" type="search" id="search" autocomplete="off">
<ul id="suggest-list" class="suggest">
</ul>
</div>
<script>
const url = 'https://zh.wikipedia.org/w/api.php?action=opensearch&format=json&limit=5&origin=*';
const getSuggestList = (keyword) => fetch(url + '&search=' + keyword, { method: 'GET', mode: 'cors' })
.then(res => res.json())
const searchInput = document.getElementById('search');
const suggestList = document.getElementById('suggest-list');
const keyword = Rx.Observable.fromEvent(searchInput, 'input');
const selectItem = Rx.Observable.fromEvent(suggestList, 'click');
const render = (suggestArr = []) => suggestList.innerHTML = suggestArr.map(item => '<li>'+ item +'</li>').join('')
keyword
// .filter(e => e.target.value.length > 2) 使用者打了 2 個字以上在發送 request
.debounceTime(100)
.switchMap(
e => getSuggestList(e.target.value),//.retry(3) 在 API 失敗的時候從新嘗試 3 次
(e, res) => res[1]
)
.subscribe(list => render(list))
selectItem
.filter(e => e.target.matches('li'))
.map(e => e.target.innerText)
.subscribe(text => {
searchInput.value = text;
render();
})
</script>
</body>
</html>
複製代碼
window方法:把拆出來的元素放入observable並送出observable,相似buffer拆分出來的元素放到數組並送出數組
var click = Rx.Observable.fromEvent(document, 'click');
var source = Rx.Observable.interval(1000);
var observable = source.window(click);
observable
.switch()
.subscribe(console.log);
// 0
// 1
// 2
// 3
// 4
// 5 ...
複製代碼
windowToggle方法,傳入兩個參數:第一個是開始的observable,第二個是一個回調函數回傳一個結束的observable
var source = Rx.Observable.interval(1000);
var mouseDown = Rx.Observable.fromEvent(document, 'mousedown');
var mouseUp = Rx.Observable.fromEvent(document, 'mouseup');
var observable = source
.windowToggle(mouseDown, () => mouseUp)
.switch();
observable.subscribe(console.log);
複製代碼
groupBy方法:把相同條件元素拆分紅一個observable
var people = [
{name: 'Anna', score: 100, subject: 'English'},
{name: 'Anna', score: 90, subject: 'Math'},
{name: 'Anna', score: 96, subject: 'Chinese' },
{name: 'Jerry', score: 80, subject: 'English'},
{name: 'Jerry', score: 100, subject: 'Math'},
{name: 'Jerry', score: 90, subject: 'Chinese' },
];
var source = Rx.Observable.from(people)
.zip(
Rx.Observable.interval(300),
(x, y) => x);
var observable = source
.groupBy(person => person.name)
.map(group => group.reduce((acc, curr) => ({
name: curr.name,
score: curr.score + acc.score
})))
.mergeAll();
observable.subscribe(console.log);
// { name: "Anna", score: 286 }
// { name: 'Jerry', score: 270 }
複製代碼
Observable 與數組相比,有兩大不一樣:1.延遲運算 2.漸進式取值
1.延遲運算:observable會等到訂閱後纔開始對元素作運算,若是沒有訂閱就不會有運算的行爲
var source = Rx.Observable.from([1,2,3,4,5]);
var example = source.map(x => x + 1);
//上面的代碼就不會去作運算
var source = [1,2,3,4,5];
var example = source.map(x => x + 1);
//數組執行完以後,已經作了運算
複製代碼
2.漸進式取值:observable的每次運算會一算到底,並非數組的運算徹底部的元素以後再返回
var source = Rx.Observable.from([1,2,3]);
var example = source
.filter(x => x % 2 === 0)
.map(x => x + 1)
example.subscribe(console.log);
//1到filter被過濾掉;2到filter再到mapb變成3返回並打印;3到filter被過濾掉
var source = [1,2,3];
var example = source
.filter(x => x % 2 === 0)
.map(x => x + 1)
//數組執行到filter會返回完整的數組[2],再到map返回完整的數組[3]
複製代碼
Subject 同時是 Observable 又是 Observer,Subject 會對內部的 observers 清單進行組播(multicast)。是 Observer Pattern 的實例而且繼承自 Observable。
動手實現一個Subject
var source = Rx.Observable.interval(1000).take(3);
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subject = {
observers: [],
subscribe: function(observer) {
this.observers.push(observer)
},
next: function(value) {
this.observers.forEach(o => o.next(value))
},
error: function(error){
this.observers.forEach(o => o.error(error))
},
complete: function() {
this.observers.forEach(o => o.complete())
}
}
subject.subscribe(observerA)
source.subscribe(subject);
setTimeout(() => {
subject.subscribe(observerB);
}, 1000);
// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"
複製代碼
對比一下真正的subject:能夠看出,Subject 能夠拿去訂閱 Observable(source) 表明他是一個 Observer,同時 Subject 又能夠被 Observer(observerA, observerB) 訂閱,表明他是一個 Observable。
var source = Rx.Observable.interval(1000).take(3);
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subject = new Rx.Subject()
subject.subscribe(observerA)
source.subscribe(subject);
setTimeout(() => {
subject.subscribe(observerB);
}, 1000);
// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"
複製代碼
在某些沒法直接使用Observable的前端框架中,咱們能夠用subject
class MyButton extends React.Component {
constructor(props) {
super(props);
this.state = { count: 0 };
this.subject = new Rx.Subject();
this.subject
.mapTo(1)
.scan((origin, next) => origin + next)
.subscribe(x => {
this.setState({ count: x })
})
}
render() {
return <button onClick={event => this.subject.next(event)}>{this.state.count}</button>
}
}
複製代碼
BehaviorSubject: 但願 Subject 能表明當下的狀態,而不是簡單的事件發送
var subject = new Rx.BehaviorSubject(0); // 0 爲起始值
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
subject.subscribe(observerA);
// "A next: 0" 若是是普通的subject則不會輸出此行
subject.next(1);
// "A next: 1"
subject.next(2);
// "A next: 2"
subject.next(3);
// "A next: 3"
setTimeout(() => {
subject.subscribe(observerB);
// "B next: 3" 若是是普通的subject則不會輸出此行
},3000)
複製代碼
ReplaySubject:在某些時候咱們會但願 Subject 表明事件,但又能在新訂閱時從新發送最後的幾個元素
var subject = new Rx.ReplaySubject(2); // 重複發送最後兩個元素
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
subject.subscribe(observerA);
subject.next(1);
// "A next: 1"
subject.next(2);
// "A next: 2"
subject.next(3);
// "A next: 3"
setTimeout(() => {
subject.subscribe(observerB);
// "B next: 2"
// "B next: 3"
},3000) //ReplaySubject 只是事件的重放而已。
複製代碼
AsyncSubject: 會在subject結束後送出最後一個值,行爲和promise很像
var subject = new Rx.AsyncSubject();
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
// "A next: 3"
// "A complete!"
setTimeout(() => {
subject.subscribe(observerB);
// "B next: 3"
// "B complete!"
},3000)
複製代碼
multicast:能夠用來掛載 subject 並回傳一個可連結(connectable)的 observable
var source = Rx.Observable.interval(1000)
.take(3)
.multicast(new Rx.Subject());
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
source.subscribe(observerA); // subject.subscribe(observerA)
source.connect(); // source.subscribe(subject)
setTimeout(() => {
source.subscribe(observerB); // subject.subscribe(observerB)
}, 1000);
複製代碼
refCount:需和 multicast一塊兒使用,它能夠創建一個只須要訂閱就自動connect的observable
var source = Rx.Observable.interval(1000)
.do(x => console.log('send: ' + x))
.multicast(new Rx.Subject())
.refCount();
var observerA = {
next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!')
}
var observerB = {
next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!')
}
var subscriptionA = source.subscribe(observerA);
// 訂閱數 0 => 1
var subscriptionB;
setTimeout(() => {
subscriptionB = source.subscribe(observerB);
// 訂閱數 0 => 2
}, 1000);
複製代碼
publish:是multicast(new Rx.Subject())的簡化寫法
var source = Rx.Observable.interval(1000)
.publish()
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.Subject())
// .refCount();
還有另外三種等價關係:
publishReplay:
var source = Rx.Observable.interval(1000)
.publishReplay(1)
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.ReplaySubject(1))
// .refCount();
publishBehavior:
var source = Rx.Observable.interval(1000)
.publishBehavior(0)
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.BehaviorSubject(0))
// .refCount();
publishLast:
var source = Rx.Observable.interval(1000)
.publishLast()
.refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.AsyncSubject(1))
// .refCount();
複製代碼
share:是publish + refCount的簡化
var source = Rx.Observable.interval(1000)
.share();
// var source = Rx.Observable.interval(1000)
// .publish()
// .refCount();
// var source = Rx.Observable.interval(1000)
// .multicast(new Rx.Subject())
// .refCount();
複製代碼
Subject 能夠簡單理解爲爲了在多個訂閱中共用執行結果而存在的
Subject 可讓咱們用命令的方式發送值到一個observable串流中。若是由於框架限制,咱們沒法直接建立observable,如React的Event中就可使用Subject
Subject是 Observer Design Pattern的實例。當observer訂閱subject時,subject會把訂閱者push進一個訂閱者清單中,在元素髮送時就去遍歷這份清單把元素一一送出。這跟observable像一個function執行是徹底不一樣的。
Subject繼承了Observable,因此纔會有Observable的全部方法。主要的只有next、error、 complete、subscribe 及 unsubscribe 五個方法
Subject同時具備Observer和Observable的特性,跟 Observable 最大的區別就是它是有狀態的,也就是存儲的那份清單。
當一個observable操做過程當中發生了 side-effect,而咱們不但願由於多個subscribe而被觸發屢次,就須要用到Subject
//不用subject,A和B打印的值不一致
var result = Rx.Observable.interval(1000).take(6)
.map(x => Math.random());// side-effect
var subA = result.subscribe(x => console.log('A: ' + x));
var subB = result.subscribe(x => console.log('B: ' + x));
//subject,A和B打印的值一致
var result = Rx.Observable.interval(1000).take(6)
.map(x => Math.random()) // side-effect
.multicast(new Rx.Subject())
.refCount();
var subA = result.subscribe(x => console.log('A: ' + x));
var subB = result.subscribe(x => console.log('B: ' + x));
複製代碼
// 空的 observer
const emptyObserver = {
next: () => {},
error: (err) => { throw err; },
complete: () => {}
}
class Observer {
constructor(destinationOrNext, error, complete) {
switch (arguments.length) {
case 0:
// 空的 observer
this.destination = this.safeObserver(emptyObserver);
break;
case 1:
if (!destinationOrNext) {
// 空的 observer
this.destination = this.safeObserver(emptyObserver);
break;
}
if (typeof destinationOrNext === 'object') {
// 傳入 observer 對象
this.destination = this.safeObserver(destinationOrNext);
break;
}
default:
// 若是上面都不是,表示傳入了一到三個 function
this.destination = this.safeObserver(destinationOrNext, error, complete);
break;
}
}
safeObserver(observerOrNext, error, complete) {
let next;
if (typeof (observerOrNext) === 'function') {
// observerOrNext 是 next function
next = observerOrNext;
} else if (observerOrNext) {
// observerOrNext 是 observer
next = observerOrNext.next || () => {};
error = observerOrNext.error || function(err) {
throw err
};
complete = observerOrNext.complete || () => {};
}
// 返回預期的 observer 對象
return {
next: next,
error: error,
complete: complete
};
}
next(value) {
if (!this.isStopped && this.next) {
// 判斷是否中止過
try {
this.destination.next(value); // 傳送值
} catch (err) {
this.unsubscribe();
throw err;
}
}
}
error(err) {
if (!this.isStopped && this.error) {
// 判斷是否中止過
try {
this.destination.error(err); // 傳送錯誤
} catch (anotherError) {
this.unsubscribe();
throw anotherError;
}
this.unsubscribe();
}
}
complete() {
if (!this.isStopped && this.complete) {
// 判斷是否中止過
try {
this.destination.complete(); // 完成
} catch (err) {
this.unsubscribe();
throw err;
}
this.unsubscribe(); // 退訂
}
}
unsubscribe() {
this.isStopped = true;
}
}
function create(subscriber) {
const observable = {
subscribe: function(observerOrNext, error, complete) {
const realObserver = new Observer(observerOrNext, error, complete)
subscriber(realObserver);
return realObserver;
}
};
return observable;
}
var observable = create(function(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
observer.next('not work');
})
var observer = {
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!')
}
}
observable.subscribe(observer);
複製代碼
// 定義空的 observer
const emptyObserver = {
next: () => {},
error: (err) => { throw err; },
complete: () => {}
}
class Observer {
constructor(destinationOrNext, error, complete) {
switch (arguments.length) {
case 0:
// 空的 observer
this.destination = this.safeObserver(emptyObserver);
break;
case 1:
if (!destinationOrNext) {
// 空的 observer
this.destination = this.safeObserver(emptyObserver);
break;
}
// 判斷傳入的 destinationOrNext 是不是 Observer 的實例,若是是就不用執行 `this.safeObserver`
if(destinationOrNext instanceof Observer){
this.destination = destinationOrNext;
break;
}
if (typeof destinationOrNext === 'object') {
// 傳入 observer
this.destination = this.safeObserver(destinationOrNext);
break;
}
default:
// 若是上面都不是,說明傳入了一到三個 function
this.destination = this.safeObserver(destinationOrNext, error, complete);
break;
}
}
safeObserver(observerOrNext, error, complete) {
let next;
if (typeof (observerOrNext) === 'function') {
// observerOrNext 是 next function
next = observerOrNext;
} else if (observerOrNext) {
// observerOrNext 是 observer
next = observerOrNext.next || () => {};
error = observerOrNext.error || function(err) {
throw err
};
complete = observerOrNext.complete || () => {};
}
// 返回預期的observer
return {
next: next,
error: error,
complete: complete
};
}
next(value) {
if (!this.isStopped && this.next) {
// 是否中止
try {
this.destination.next(value); // 發送值
} catch (err) {
this.unsubscribe();
throw err;
}
}
}
error(err) {
if (!this.isStopped && this.error) {
// 是否中止
try {
this.destination.error(err); // 發送值
} catch (anotherError) {
this.unsubscribe();
throw anotherError;
}
this.unsubscribe();
}
}
complete() {
if (!this.isStopped && this.complete) {
// 是否中止
try {
this.destination.complete(); // 完成
} catch (err) {
this.unsubscribe();
throw err;
}
this.unsubscribe(); // 中止訂閱
}
}
unsubscribe() {
this.isStopped = true;
}
}
class MapObserver extends Observer {
constructor(observer, callback) {
// 傳入原來的 observer 和 map 的 callback
super(observer); // 繼承Observer
this.callback = callback; // 保存 callback
this.next = this.next.bind(this); // 確保 next 的 this
}
next(value) {
try {
this.destination.next(this.callback(value));
// this.destination 是父類 Observer 保存的 observer
// 這裏 this.callback(value) 就是 map 的操做
} catch (err) {
this.destination.error(err);
return;
}
}
}
class Observable {
constructor(subscribe) {
this._subscribe = subscribe; // 把 subscribe 存到屬性中
}
subscribe(observerOrNext, error, complete) {
const observer = new Observer(observerOrNext, error, complete);
// 先用 this.operator 判斷當前的 observable 是否有 operator
if(this.operator) {
this.operator.call(observer, this.source)
} else {
// 若是沒有 operator 再直接把 observer 傳入 _subscribe
this._subscribe(observer);
}
return observer;
}
map(callback) {
const observable = new Observable(); // 創建新的 observable
observable.source = this; // 保存當前的 observable
observable.operator = {
call: (observer, source) => {
// 執行 operator
const newObserver = new MapObserver(observer, callback);
// 創建包裹後的 observer
// 訂閱並回傳
return source.subscribe(newObserver);
}
}; // 存儲當前 operator ,並做爲是否有 operator 的依據,
return observable; // 返回新的 observable
}
}
Observable.create = function(subscribe) {
return new Observable(subscribe);
}
Observable.fromArray = function(array) {
if(!Array.isArray(array)) {
throw new Error('params need to be an array');
}
return new Observable(function(observer) {
try{
array.forEach(value => observer.next(value))
observer.complete()
} catch(err) {
observer.error(err)
}
});
}
var observable = Observable.fromArray([1,2,3,4,5])
.map(x => x + 3)
.map(x => x + 1)
var observer = {
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!')
}
}
observable.subscribe(observer);
複製代碼
Scheduler是一個數據結構。它知道如何根據優先級或其餘標準來儲存並隊列任務
Scheduler是一個執行環境。它意味着任務什麼時候何地被執行,好比像是當即執行、在回調(callback)中執行、setTimeout中執行、animation frame中執行
Scheduler是一個虛擬時鐘。它透過now()這個方法提供了時間的概念,咱們可讓任務在特定的時間點被執行。
它有四個scheduler: queue(預設的當即執行,適合用在返回的operator 且有大量資料時使用); asap(非同步的執行,相似setTimeout 0); async(異步,相似setInterval,用在跟時間相關的操做); animationFrame(相似Window.requestAnimationFrame,用在複雜運算且高頻率觸發UI動畫時)
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
console.log('before subscribe');
observable.observeOn(Rx.Scheduler.async) // 設爲 async 異步
.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
console.log('after subscribe');
// "before subscribe"
// "after subscribe"
// 1
// 2
// 3
// "complete"
複製代碼
如何對rxjs進行debug
const source = Rx.Observable.interval(1000).take(3);
const example = source
.do(x => console.log('do log: ' + x))
.map(x => x + 1);
example.subscribe((x) => {
console.log('subscription log: ' + x)
})
// do log: 0
// subscription log: 1
// do log: 1
// subscription log: 2
// do log: 2
// subscription log: 3
複製代碼
Observable.prototype.debug = window.rxDevTool(Observable);
Observable.interval(1000).take(5)
.debug('source1')
.map(x => x + 1)
.debug('source2')
.subscribe(function() {
//...
})
複製代碼
Cold & Hot Observable: 區分不一樣行爲的Observable。Cold Observable 就是指每次訂閱都獨立的執行,而Hot Observable則是共同的訂閱。而這一切的差別來自因而在Observale的內部創建仍是外部創建。
Cold Observable:下面的代碼每次訂閱都是獨立的,它們之間不會互相影響
const source = Rx.Observable.interval(1000).take(5);
source.subscribe(value => console.log('sub1: ' + value))
setTimeout(() => {
source.subscribe(value => console.log('sub2: ' + value))
}, 3500);
// sub1: 0
// sub1: 1
// sub1: 2
// sub1: 3
// sub2: 0
// sub1: 4
// sub2: 1
// sub2: 2
// sub2: 3
// sub2: 4
複製代碼
Hot Observable:每一個訂閱都是共用的,具體是指一個Observable在屢次訂閱時,不會每次都重新開始發送元素
var source = Rx.Observable.interval(1000)
.take(5)
.share(); // 共用
source.subscribe(value => console.log('sub1: ' + value))
setTimeout(() => {
source.subscribe(value => console.log('sub2: ' + value))
}, 3500);
// sub1: 0
// sub1: 1
// sub1: 2
// sub1: 3
// sub2: 3
// sub1: 4
// sub2: 4
複製代碼
mpvue其實就是vue的翻版。前面講到,在某些沒法直接使用Observable的前端框架中,咱們能夠用subject,因此咱們就用subject在mpvue中,實現一個用戶頻繁點擊支付按鈕的功能。
定義一個支付class
import rxwx, { Rx } from 'rxjs-wx/RxWX'
export default class Payment {
constructor () {
this.rxSubject = new Rx.Subject()
this.count = 0
}
get () {
return this.rxSubject
}
clear () {
this.count = 0
}
pay () {
return Rx.Observable.of().multicast(this.rxSubject)
.do(() => {
this.count++
if (this.count > 1) {
wx.showToast({
title: '彆着急,正在完成支付..',
icon: 'none'
})
}
})
.debounceTime(1000)
.switchMap((payParams) => {
console.log(payParams)
return rxwx.requestPayment({
timeStamp: payParams.payParams,
nonceStr: payParams.nonceStr,
package: payParams.package,
signType: payParams.signType || 'MD5',
paySign: payParams.paySign
})
.debounceTime(1000)
})
}
}
複製代碼
頁面僞代碼
<template>
<span @click="payHandle">確認支付</span>
</template>
export default {
methods: {
payHandle () {
this.payment.get().next('xxx') // 調用支付功能
},
initPay () {
this.payment = new Payment()
this.payment.pay()
.catch((e) => {
wx.showToast({
title: '支付失敗',
icon: 'none'
})
})
.subscribe((resp) => {
wx.showToast({
title: '支付成功',
icon: 'none'
})
this.payment.clear() // 重置count
// 支付成功後balabala...
})
}
},
mounted () {
// 初始化
this.initPay()
},
}
複製代碼