callbag,一個有趣的規範

push 和 pull 模型

若是你瞭解 RxJs,在響應式編程中,Observable 和 Obsever 是 push 模型,與之對應的,還有一個 pull 模型:html

  • Pull(f(): B:返回一個值。
  • Push(f(x: A): void:響應式的,當有值產生時,會發出一個事件,並攜帶上這個值。訂閱了該事件的觀察者(Observer)將得到反饋。

JavaScript 中的 Math.random()window.outerHeight 等都是 pull 模型:git

const height = window.outerHeight();
// 或者是迭代器寫法
function* getWindowHeight() {
    while(true) {
        yield window.outerHeight;
    }
}
var iter = getWindowHeight()
iter.next()
複製代碼

pull 模型包含兩個部分:github

  • 生產者:負責生產數據,是數據源
  • 消費者:負責消費數據,是數據的使用方

在 pull 模型中,數據是按需索取的。typescript

再經過 RxJs 看一個 push 模型的例子:編程

Rx.Observable
    .fromEvent(document, 'click')
	.map(event => `Event time: ${event.timeStamp}`)
    .subscribe(function observer(val) {
    	console.log(val);
	})
複製代碼

push 模型的組成包含了兩個部分:bash

  • 可觀察(可監聽)對象:是數據來源
  • 觀察者(監聽者):是數據的使用方

與 pull 模型不一樣,觀察者不能主動索取數據,而是觀察數據源,當數據源有數據時,纔可消費和使用。框架

push 模型有這麼一些優勢:dom

  • 高度複用的可觀察對象:經過對源可觀察對象使用不一樣的運算子,可構建出新的可觀察對象。
  • 延遲執行:可觀察對象只有被觀察者訂閱,纔會派發數據。
  • 聲明式、描述將來的代碼:咱們只用聲明數據源和數據消費方式,而不用關心數據交付時的細節。

Cycle.js 的做者 Andre Staltz 長久以來面對一個問題,Cycle.js 及其推薦使用的響應式編程庫 xstream 都是 push 模型的,這讓框架的模型和業務代碼都受益於 push 模型的優勢。可是,實際項目中,咱們仍是有很多 pull 模型下的需求,Andre Staltz 也開了一個 issue ,討論如何更好的使用代碼描述 pull 模型。函數

push 與 pull 能夠是同型的

stalz 看到,咱們的 Observable 和 Observer:優化

interface Observer {
  next(x): void;
  error(e): void;
  complete(): void;
}

interface Observable {
  subscribe(observer): Subscription;
  unsubsribe(): void;
}
複製代碼

能夠經過函數進行描述:

function observable(msgType, msgPayload) {}
複製代碼
  • msgType == 0:payload 是 observer,意味着 observer 向 observable 問好,須要訂閱這個 observerble。(subscribe)
  • msgType == 1:意味着 observer 將取消對 observable 的訂閱。(unsubscribe)
function observer(msgType, msgPayload) {}
複製代碼

當:

  • msgType == 1:對應 observer.next(payload),即 observable 交付數據給 observer,此時 payload 攜帶了數據。
  • msgType == 2 且 payload 爲 undefined:對應於 observer.complete()
  • msgType == 2 且 payload 含有值:對應於 observer.error(payload),此時 payload 描述了錯誤。

進一步歸納就是:

Observer:

  • observer(1, data): void
    複製代碼
    • 數據交付 :observable 將數據交付給 observer
  • observer(2, err): void
    複製代碼
    • 出錯:observable 將錯誤告知 observer
  • observer(2): void
    複製代碼
    • 完成:observable 再也不有數據,告知 observer 任務完成

Observable:

  • observable(0, observer): void
    複製代碼
    • 問好:observer 訂閱了 observable
  • observable(2): void
    複製代碼
    • 結束:observer 取消對 observable 的訂閱

這麼歸納下來,咱們發現,pull 模型也能夠進行相似的歸納:

Consumer

  • consumer(0, producer): void
    複製代碼
    • 問好:在 pull 模型中,producer 須要向 consumer 問好,告訴 consumer 有須要時,從哪裏取值
  • consumer(1, data): void
    複製代碼
    • 數據交付:producer 將數據交付給 consumer
  • consumer(2, err): void
    複製代碼
    • 出錯:producer 將錯誤告知 consumer
  • consumer(2): void
    複製代碼
    • 完成:producer 告知 consumer 任務已完成

Producer

  • producer(0, consumer): void
    複製代碼
    • 問好:consumer 肯定和哪一個 producer 交互
  • producer(1, data): void
    複製代碼
    • 數據交付:在 pull 模型中,consumer 須要主動向 producer 取值
  • producer(2): void
    複製代碼
    • 結束:consumer 結束了和 producer 的交互

綜上,咱們發現,push 和 pull 模型是同型的(具備同樣的角色和函數簽名),所以,能夠經過一個規範同時定義兩者。

callbag

staltz 爲 push 和 pull 模型建立了一個名爲 callbag 的規範,這個規範的內容以下:

(type: number, payload?: any) => void
複製代碼

定義(Defination)

  • Callbag:一個函數,函數簽名爲: (type: 0 | 1 | 2, payload?: any) => void
  • Greet:若是一個 callbag 以 0 爲第一個參數被調用,咱們就說 該 callbag 被問好了。此時函數執行的操做是: 「向這個 callbag 問好」。
  • Deliver:若是一個 callbag 以 1 爲第一個參數被調用,咱們就說 「這個 callbag 正被交付數據」。此時函數執行的操做是:「交付數據給這個 callbag」。
  • Terminate:若是一個 callbag 以 2 爲第一個參數被調用,咱們就說 「這個 callbag 被終止了」。此時函數執行的操做是:「終止這個 callbag」。
  • Source:一個負責交付數據的 callbag。
  • Sink:一個負責接收(消費)數據的 callbag。

協議(Protocal)

問好(Greets): (type: 0, cb: Callbag) => void

當第一個參數是 0,而第二個參數是另一個 callbag(即一個函數)的時候,這個 callbag 就被問好了。

握手(Handshake)

當一個 source 被問好,並被做爲 payload 傳遞給了某個 sink,sink 必須使用一個 callbag payload 進行問好,這個 callbag 能夠是他本身,也能夠是另外的 callbag。換言之,問好是相互的。相互間的問好被稱爲握手

終止(Termination): (type: 2, err?: any) => void

當第一個參數是 0,而第二個參數要麼是 undefined(因爲成功引發的終止),要麼是任何的真實值(因爲失敗引發的終止),這個 callbag 就被終止了。

在握手以後,source 可能終止掉 sink,sink 也可能會終止掉 source。若是 source 終止了 sink,則 sink 不該當終止 source,反之亦然。換言之,終止行爲不該該是相互的。

數據交付(Data delivery) (type: 1, data: any) => void

交付次數:

  • 一個 callbag(source 或者 sink)可能會被一次或屢次交付數據

有效交付的窗口:

  • 一個 callbag 必定不能在被問好以前被交付數據
  • 一個 callbag 必定不能在終止後被交付數據
  • 一個 sink 必定不能在其終止了它的 source 後被交付數據

建立本身的 callbag

callbag 的組成能夠簡單概括爲:

  • handshake:一次握手過程,source 和 sink 如何握手
  • talkback:對講對象,sink 和 source 正在和誰溝通

listener(observer)sink

  • 定義問好過程:在問好階段,能夠知道在和誰對講:

    function sink(type, data) {
      if (type === 0) {
        // sink 收到了來自 source 的問好
        // 問好的時候肯定 source 和 sink 的對講方式
        const talkback = data;
        // 3s 後,sink 終止和 source 的對講
        setTimeout(() => talkback(2), 3000);
      }
    }
    複製代碼
  • 定義數據處理過程

    function sink(type, data) {
        if (type === 0) {
            const talkback = data;
            setTimeout(() => talkback(2), 3000);
        }
        if (type === 1) {
            console.log(data);
        }
    }
    複製代碼
  • 定義結束過程

    let handle;
    function sink(type, data) {
        if (type === 0) {
            const talkback = data;
            setTimeout(() => talkback(2), 3000);
        }
        if (type === 1) {
            console.log(data);
        }
        if (type === 2) {
            clearTimeout(handle);
        }
    }
    複製代碼

    能夠再用工廠函數讓代碼乾淨一些:

    function makeSink() {
      let handle;
      return function sink(type, data) {
        if (type === 0) {
          const talkback = data;
          handle = setTimeout(() => talkback(2), 3000);
        } 
        if (type === 1) {
          console.log(data);
        }
        if (type === 2) {
          clearTimeout(handle);
        }
      }
    }
    複製代碼

puller(consumer)sink

puller sink 則能夠向 source 主動請求數據:

let handle;
function sink(type, data) {
    if (type === 0) {
        const talkback = data;
        setInterval(() => talkback(1), 1000);
    }
    if (type === 1) {
        console.log(data);
    }
    if (type === 2) {
        clearTimeout(handle);
    }
}
複製代碼

listenable(observable)source

  • 定義問好過程:

    function source(type, data) {
      if (type === 0) {
        // 若是 source 收到 sink 的問好,
        // 則 payload 即爲 sink,source 能夠向 sink 發送數據了
        const sink = data;
        setInterval(() => {
          sink(1, null);
        }, 1000);
      }
      // 讓 source 也和 sink 問好,完成一次握手
      sink(0, /* talkback callbag here */)
    }
    複製代碼
  • 當 sink 想要中止觀察,須要讓 source 有處理中止的能力,另外,listenable 的 source 不會理會 sink 主動的數據索取。所以,咱們這麼告知 sink 溝通方式:

    function source(type, data) {
        if (type === 0) {
            const sink = data;
            let handle = setInterval(() => {
                sink(1, null);
            }, 1000);
        }
        const talkback = (type, data) => {
            if (type === 2) {
                clearInterval(handle);
            } 
        }
        sink(0, talkback);
    }
    複製代碼
  • 優化一下代碼可讀性:

    function source(start, sink) {
      if (start !== 0) return;
      let handle = setInterval(() => {
        sink(1, null);
      }, 1000);
      const talkback = (t, d) => {
        if (t === 2) clearInterval(handle);
      };
      sink(0, talkback);
    }
    複製代碼

pullable(iterable)source

pullable source 中,值時按照 sink 的須要獲取的,所以,只有在 sink 索取值時,source 才須要交付數據:

function source(start, sink) {
    if (start !== 0) retrun;
    let i = 10;
    const talkback = (t, d) => {
        if (t == 1) {
            if (i <= 20) sink(1, i++);
            else sink(2);
        }
    }
    sink(0, talkback)
}
複製代碼

建立運算子

藉助於 operator,可以不斷的構建新的 source,operator 的通常範式爲:

const myOperator = args => inputSource => outputSource
複製代碼

藉助於管道技術,咱們能一步步的聲明新的 source:

pipe(
  source,
  myOperator(args),
  iterate(x => console.log(x))
)
// same as...
pipe(
  source,
  inputSource => outputSource,
  iterate(x => console.log(x))
)
複製代碼

下面咱們建立了一個乘法 operator:

const multiplyBy = factor => inputSource => {
    return function outputSource(start, outputSink) {
        if (start !== 0) return;
        inputSource(start, (type, data) => {
            if (type === 1) {
                outputSink(1, data * factor);
            } else {
                outputSink(1, data * factor);
            } 
        })
    }
}
複製代碼

使用:

function source(start, sink) {
    if (start !== 0) return;
    let i = 0;
    const handle = setInterval(() => sink(1, i++), 3000);
    const talkback = (type, data) => {
        if (type === 2) {
            clearInterval(handle);
        }
    }
    sink(0, talkback);
}

let timeout;
function sink(type, data) {
    if (type === 0) {
        const talkback = data;
        timetout = setTimeout(() => talback(2), 9000);
    }
    if (type === 1) {
        console.log('data is', data);
    }
    if (type === 2) {
        clearTimeout(handle);
    }
}

const newSource = multiplyBy(3)(source);
newSource(0, sink);
複製代碼

總結

經過 callbag ,咱們能夠近乎一致的處理數據源和數據源的消費

例如,下面是 listenable 數據源,咱們用 forEach 消費:

const {forEach, fromEvent, map, filter, pipe} = require('callbag-basics');

pipe(
  fromEvent(document, 'click'),
  filter(ev => ev.target.tagName === 'BUTTON'),
  map(ev => ({x: ev.clientX, y: ev.clientY})),
  forEach(coords => console.log(coords))
);
複製代碼

下面則是 pullable 數據源,咱們仍能夠用 forEach 進行消費:

const {forEach, fromIter, take, map, pipe} = require('callbag-basics');

function* getRandom() {
  while(true) {
    yield Math.random();
  }
}

pipe(
  fromIter(getRandom()),
  take(5),
  forEach(x => console.log(x))
);
複製代碼

參考資料

相關文章
相關標籤/搜索