若是你瞭解 RxJs,在響應式編程中,Observable 和 Obsever 是 push 模型,與之對應的,還有一個 pull 模型:html
f(): B
):返回一個值。f(x: A): void
):響應式的,當有值產生時,會發出一個事件,並攜帶上這個值。訂閱了該事件的觀察者(Observer)將得到反饋。JavaScript 中的 Math.random()
、window.outerHeight
等都是 pull 模型:git
const height = window.outerHeight();
// 或者是迭代器寫法
function* getWindowHeight() {
while(true) {
yield window.outerHeight;
}
}
var iter = getWindowHeight()
iter.next()
複製代碼
pull 模型包含兩個部分:github
在 pull 模型中,數據是按需索取的。typescript
再經過 RxJs 看一個 push 模型的例子:編程
Rx.Observable
.fromEvent(document, 'click')
.map(event => `Event time: ${event.timeStamp}`)
.subscribe(function observer(val) {
console.log(val);
})
複製代碼
push 模型的組成包含了兩個部分:bash
與 pull 模型不一樣,觀察者不能主動索取數據,而是觀察數據源,當數據源有數據時,纔可消費和使用。框架
push 模型有這麼一些優勢:dom
Cycle.js 的做者 Andre Staltz 長久以來面對一個問題,Cycle.js 及其推薦使用的響應式編程庫 xstream 都是 push 模型的,這讓框架的模型和業務代碼都受益於 push 模型的優勢。可是,實際項目中,咱們仍是有很多 pull 模型下的需求,Andre Staltz 也開了一個 issue ,討論如何更好的使用代碼描述 pull 模型。函數
stalz 看到,咱們的 Observable 和 Observer:優化
interface Observer {
next(x): void;
error(e): void;
complete(): void;
}
interface Observable {
subscribe(observer): Subscription;
unsubsribe(): void;
}
複製代碼
能夠經過函數進行描述:
function observable(msgType, msgPayload) {}
複製代碼
msgType == 0
:payload 是 observer,意味着 observer 向 observable 問好,須要訂閱這個 observerble。(subscribe)msgType == 1
:意味着 observer 將取消對 observable 的訂閱。(unsubscribe)function observer(msgType, msgPayload) {}
複製代碼
當:
msgType == 1
:對應 observer.next(payload)
,即 observable 交付數據給 observer,此時 payload 攜帶了數據。msgType == 2
且 payload 爲 undefined
:對應於 observer.complete()
。msgType == 2
且 payload 含有值:對應於 observer.error(payload)
,此時 payload 描述了錯誤。進一步歸納就是:
Observer:
observer(1, data): void
複製代碼
observer(2, err): void
複製代碼
observer(2): void
複製代碼
Observable:
observable(0, observer): void
複製代碼
observable(2): void
複製代碼
這麼歸納下來,咱們發現,pull 模型也能夠進行相似的歸納:
Consumer:
consumer(0, producer): void
複製代碼
consumer(1, data): void
複製代碼
consumer(2, err): void
複製代碼
consumer(2): void
複製代碼
Producer:
producer(0, consumer): void
複製代碼
producer(1, data): void
複製代碼
producer(2): void
複製代碼
綜上,咱們發現,push 和 pull 模型是同型的(具備同樣的角色和函數簽名),所以,能夠經過一個規範同時定義兩者。
staltz 爲 push 和 pull 模型建立了一個名爲 callbag 的規範,這個規範的內容以下:
(type: number, payload?: any) => void
複製代碼
(type: 0 | 1 | 2, payload?: any) => void
0
爲第一個參數被調用,咱們就說 該 callbag 被問好了
。此時函數執行的操做是: 「向這個 callbag 問好」。1
爲第一個參數被調用,咱們就說 「這個 callbag 正被交付數據」。此時函數執行的操做是:「交付數據給這個 callbag」。2
爲第一個參數被調用,咱們就說 「這個 callbag 被終止了」。此時函數執行的操做是:「終止這個 callbag」。問好(Greets): (type: 0, cb: Callbag) => void
當第一個參數是 0
,而第二個參數是另一個 callbag(即一個函數)的時候,這個 callbag 就被問好了。
握手(Handshake)
當一個 source 被問好,並被做爲 payload 傳遞給了某個 sink,sink 必須使用一個 callbag payload 進行問好,這個 callbag 能夠是他本身,也能夠是另外的 callbag。換言之,問好是相互的。相互間的問好被稱爲握手。
終止(Termination): (type: 2, err?: any) => void
當第一個參數是 0
,而第二個參數要麼是 undefined(因爲成功引發的終止),要麼是任何的真實值(因爲失敗引發的終止),這個 callbag 就被終止了。
在握手以後,source 可能終止掉 sink,sink 也可能會終止掉 source。若是 source 終止了 sink,則 sink 不該當終止 source,反之亦然。換言之,終止行爲不該該是相互的。
數據交付(Data delivery) (type: 1, data: any) => void
交付次數:
有效交付的窗口:
callbag 的組成能夠簡單概括爲:
定義問好過程:在問好階段,能夠知道在和誰對講:
function sink(type, data) {
if (type === 0) {
// sink 收到了來自 source 的問好
// 問好的時候肯定 source 和 sink 的對講方式
const talkback = data;
// 3s 後,sink 終止和 source 的對講
setTimeout(() => talkback(2), 3000);
}
}
複製代碼
定義數據處理過程
function sink(type, data) {
if (type === 0) {
const talkback = data;
setTimeout(() => talkback(2), 3000);
}
if (type === 1) {
console.log(data);
}
}
複製代碼
定義結束過程
let handle;
function sink(type, data) {
if (type === 0) {
const talkback = data;
setTimeout(() => talkback(2), 3000);
}
if (type === 1) {
console.log(data);
}
if (type === 2) {
clearTimeout(handle);
}
}
複製代碼
能夠再用工廠函數讓代碼乾淨一些:
function makeSink() {
let handle;
return function sink(type, data) {
if (type === 0) {
const talkback = data;
handle = setTimeout(() => talkback(2), 3000);
}
if (type === 1) {
console.log(data);
}
if (type === 2) {
clearTimeout(handle);
}
}
}
複製代碼
puller sink 則能夠向 source 主動請求數據:
let handle;
function sink(type, data) {
if (type === 0) {
const talkback = data;
setInterval(() => talkback(1), 1000);
}
if (type === 1) {
console.log(data);
}
if (type === 2) {
clearTimeout(handle);
}
}
複製代碼
定義問好過程:
function source(type, data) {
if (type === 0) {
// 若是 source 收到 sink 的問好,
// 則 payload 即爲 sink,source 能夠向 sink 發送數據了
const sink = data;
setInterval(() => {
sink(1, null);
}, 1000);
}
// 讓 source 也和 sink 問好,完成一次握手
sink(0, /* talkback callbag here */)
}
複製代碼
當 sink 想要中止觀察,須要讓 source 有處理中止的能力,另外,listenable 的 source 不會理會 sink 主動的數據索取。所以,咱們這麼告知 sink 溝通方式:
function source(type, data) {
if (type === 0) {
const sink = data;
let handle = setInterval(() => {
sink(1, null);
}, 1000);
}
const talkback = (type, data) => {
if (type === 2) {
clearInterval(handle);
}
}
sink(0, talkback);
}
複製代碼
優化一下代碼可讀性:
function source(start, sink) {
if (start !== 0) return;
let handle = setInterval(() => {
sink(1, null);
}, 1000);
const talkback = (t, d) => {
if (t === 2) clearInterval(handle);
};
sink(0, talkback);
}
複製代碼
pullable source 中,值時按照 sink 的須要獲取的,所以,只有在 sink 索取值時,source 才須要交付數據:
function source(start, sink) {
if (start !== 0) retrun;
let i = 10;
const talkback = (t, d) => {
if (t == 1) {
if (i <= 20) sink(1, i++);
else sink(2);
}
}
sink(0, talkback)
}
複製代碼
藉助於 operator,可以不斷的構建新的 source,operator 的通常範式爲:
const myOperator = args => inputSource => outputSource
複製代碼
藉助於管道技術,咱們能一步步的聲明新的 source:
pipe(
source,
myOperator(args),
iterate(x => console.log(x))
)
// same as...
pipe(
source,
inputSource => outputSource,
iterate(x => console.log(x))
)
複製代碼
下面咱們建立了一個乘法 operator:
const multiplyBy = factor => inputSource => {
return function outputSource(start, outputSink) {
if (start !== 0) return;
inputSource(start, (type, data) => {
if (type === 1) {
outputSink(1, data * factor);
} else {
outputSink(1, data * factor);
}
})
}
}
複製代碼
使用:
function source(start, sink) {
if (start !== 0) return;
let i = 0;
const handle = setInterval(() => sink(1, i++), 3000);
const talkback = (type, data) => {
if (type === 2) {
clearInterval(handle);
}
}
sink(0, talkback);
}
let timeout;
function sink(type, data) {
if (type === 0) {
const talkback = data;
timetout = setTimeout(() => talback(2), 9000);
}
if (type === 1) {
console.log('data is', data);
}
if (type === 2) {
clearTimeout(handle);
}
}
const newSource = multiplyBy(3)(source);
newSource(0, sink);
複製代碼
經過 callbag ,咱們能夠近乎一致的處理數據源和數據源的消費:
例如,下面是 listenable 數據源,咱們用 forEach
消費:
const {forEach, fromEvent, map, filter, pipe} = require('callbag-basics');
pipe(
fromEvent(document, 'click'),
filter(ev => ev.target.tagName === 'BUTTON'),
map(ev => ({x: ev.clientX, y: ev.clientY})),
forEach(coords => console.log(coords))
);
複製代碼
下面則是 pullable 數據源,咱們仍能夠用 forEach
進行消費:
const {forEach, fromIter, take, map, pipe} = require('callbag-basics');
function* getRandom() {
while(true) {
yield Math.random();
}
}
pipe(
fromIter(getRandom()),
take(5),
forEach(x => console.log(x))
);
複製代碼