ReactiveX流式編程ReactiveX
來自微軟,它是一種針對異步數據流的編程。簡單來講,它將一切數據,包括HTTP請求,DOM事件或者普通數據等包裝成流的形式,而後用強大豐富的操做符對流進行處理,使你能以同步編程的方式處理異步數據,並組合不一樣的操做符來輕鬆優雅的實現你所須要的功能。javascript
xstream的做者也是rxjs的深度用戶,可是做者基於一些實踐中考慮而開發這個庫,做者的解釋:WHY WE BUILT XSTREAM
html
熱
模式流stream
、listener
、producer
三個概念,比較好理解import xs from 'xstream'
// Tick every second incremental numbers,
// only pass even numbers, then map them to their square,
// and stop after 5 seconds has passed
var stream = xs.periodic(1000)
.filter(i => i % 2 === 0)
.map(i => i * i)
.endWhen(xs.periodic(5000).take(1))
// So far, the stream is idle.
// As soon as it gets its first listener, it starts executing.
stream.addListener({
next: i => console.log(i),
error: err => console.error(err),
complete: () => console.log('completed'),
})
複製代碼
流
表明從事件發生、處理、監聽的一條管道,每一個stream都有不少operator相似:map
, filter
, fold
, take
;
每次調用operator都返回一個新的stream;
通常說來stream中的數據是由producer生產的,可是你能夠調用shamefullySend*
系列函數手動發射事件,可是這種方法是反reactive
的,做者強烈不推薦使用;按照個人理解,這些方法在多個stream聯合工做的,用來mock某些流的數據時候會比較有用前端
監聽者
,stream的出口,消費管道最終產物;包含有3個方法java
next
:stream裏每次有管道里產生的數據到流入到這個next方法裏接收error
:stream數據流轉中有異常狀況時調用complete
:生產者調用了stop方法後調用var listener = {
next: (value) => {
console.log('The Stream gave me a value: ', value);
},
error: (err) => {
console.error('The Stream gave me an error: ', err);
},
complete: () => {
console.log('The Stream told me it is done.');
},
}
複製代碼
生產者,stream的入口,用來持續產生流的輸入react
var producer = {
start: function (listener) {
this.id = setInterval(() => listener.next('yo'), 1000)
},
stop: function () {
clearInterval(this.id)
},
id: 0,
}
// This fellow delivers a 'yo' next event every 1 second
var stream = xs.create(producer)
複製代碼
記憶流
普通stream的記憶版:它會記住最後一次發送給listener的next方法的數據,這樣後來addListener添加的監聽者能收到記住的這個數據; 這個特性是頗有用的,可以用來保存應用運行過程當中的一些臨時狀態。typescript
標準的經過producer構造, create(producer)
編程
標準的經過producer構造memorystream, createWithMemory(producer)
promise
從Array|PromiseLike|Observable建立一個streamapp
從字面量建立一個stream,這樣建立的stream會馬上發射全部的參數,並觸發completeddom
從promise建立一個stream
合併兩個stream成爲一個stream,合併的後的數據按照本來的時間線繼續輸出(以下圖)
這個單純用文字不太好解釋,請看下圖(借用的rxjs裏的combineLatest圖,功能是相似的)
另外,rxjs中還有個一個相似的zip操做符(xstream中不存在,本身實現),看下圖仔細體會和xstream的combine的不一樣
圖片借用的rx裏的skip,是同樣的效果
圖片借用的rx裏的scan,是同樣的效果
這個是操做符就有點複雜了,涉及到了分流的狀況,主要功能是將主stream裏返回的支流直接打平,輸出支流裏的數據;整個xstream標準operators(extra下有擴展的)裏只有這個操做符有涉及到分流的處理,彈珠(Marble)圖以下
這裏解釋一下,爲何b輸出以後,主流程走到第二個tick,開始輸出第二個支流,這是第一個支流的後續輸出都會被廢棄;
假如如今須要咱們寫一個簡單的todolist:有一個 input
和一個 button
當我在input輸入內容以後,點擊 button
就往todolist集合裏添加一條數據,每條todo行前面有個 checkbox
用來勾選todo的完成狀態,每條todo行後面有一個 del
按鈕,用來刪除這條todo
ok,讓咱們開始以前先用 流
式的方式思考一下這個問題, 流
式的方式是基於時間線的演進系統動態變化的一個抽象,那麼基於此咱們能夠很簡單抽閒出 3
條時間線:
// 工具函數,方便的建立dom事件流
import fromEvent from 'xstream/extra/fromEvent';
// 從添加按鈕建立的stream
const addTodoBtn$ = fromEvent(addBtnEl, 'click').map(() => inputEl.value).filter(v => v && v !== '');
// 從刪除按鈕觸發的stream
const delTodoBtn$ = fromEvent(document.body, 'click').map((e: Event) => e.target).filter((target: HTMLElement) => target.classList.contains('delTodo')).map((target: HTMLElement) => parseInt(target.dataset.index));
// 從標記完成選項觸發的stream
const toggleTodoInput$ = fromEvent(document.body, 'change').map((e: Event) => e.target).filter((target: HTMLElement) => target.classList.contains('toggleTodo')).map((target: HTMLInputElement) => ({ checked: target.checked, index: parseInt(target.dataset.index) }));
複製代碼
好了,如今咱們有了3條stream:
button
的點擊,並判斷input框裏是否有輸入有效內容,若是有的話就將輸入的內容做爲stream的數據發射出去del
的點擊,並將綁定的data-index數值發射出去checkbox
的點擊,並將當前checkbox的選中狀態和data-index一塊兒發射出去如今咱們有了3條stream,那麼該如何將這些stream與dom的操做對應起來呢?同時還有另一個問題:傳統的開發過程當中,咱們須要有一個外部變量相似state這樣用來保存每次操做後最新的todolist數據集合(反作用); 可是ReactiveX提倡的方式就是要消除反作用,咱們須要一點兒技巧來處理這個情況;
這裏咱們思考一下整個操做分兩部分:增量數據、減量數據、更新數據, 而減量數據和更新數據都是基於增量數據源的基礎上操做的;那麼咱們須要定義一個增量數據源,而且須要能持續保持這個數據源最後的數據狀態。
讓咱們翻翻上面的operators,看看有哪一個操做符是能夠用來持久保存局部變量的? 沒錯,就是 fold
操做符:
// 數據來源
const startUp$ = addTodoBtn$.fold((todos: Todo[], inputValue) => {
const todo:Todo = { text: inputValue, completed: false };
todos.push(todo);
return todos;
}, []);// 初始化空數據列表
複製代碼
因爲這裏的todolist的增量來源只有 button
一個(若是有多個,能夠看看 combine
操做符,這裏不展開);
有了增量數據源,那麼咱們在增量數據源的每一個tick上分出一個減量、更新的 支流
(參看上面的flatten操做符),這樣支流執行的時候拿到的都是數據源的最新數據;
// 監聽數據來源,並觸發刪除的stream
const delTodos$ = startUp$.map(todos => delTodoBtn$.map(index => {
console.log(index);
todos.splice(index, 1);
return todos;
})).flatten();
// 監聽數據來源,並觸發選中的stream
const toggleTodos$ = startUp$.map(todos => toggleTodoInput$.map(({ checked, index }) => {
console.log(checked, index);
if (todos[index]) {
todos[index].completed = checked;
}
return todos;
})).flatten();
複製代碼
ok,如今咱們的數據來源是多個,輸出的都是todolist最新的數據集合狀態,讓咱們把這些stream管道組裝起來:
// 組合起來
const todos$ = xs.merge(startUp$, delTodos$, toggleTodos$);
todos$.addListener({
next: function(todos: Todo[]) {
console.log(todos);
renderTodos(todos);
},
error: function(e) {
console.error(e);
},
complete: function() {
}
})
複製代碼
在定義完清晰的stream後,咱們的實際業務代碼就是這麼"簡單",因爲stream的出口一直都是最新的todolist集合咱們實現了相似react的全量渲染;哈哈,實際上這裏還有個不怎麼簡單的反作用方法:
const inputEl = document.getElementById('input') as HTMLInputElement;
const addBtnEl = document.getElementById('addBtn') as HTMLButtonElement;
const todoListEl = document.getElementById('lists');
const initTodos = [];
const renderTodos = function(todos: Todo[]) {
if (!todos) {
return;
}
todoListEl.innerHTML = '';
const fragement = document.createDocumentFragment();
todos.forEach((todo, index) => {
const liEL = document.createElement('li');
if (todo.completed) {
liEL.className = 'completed';
}
liEL.innerHTML = `<input type='checkbox' class="toggleTodo" name="toggleTodo" data-index="${index}" ${todo.completed ? 'checked' : null} /> <span>${todo.text}</span> <a href='javascript:void(0);' class='delTodo' data-index="${index}">x</a>`;
fragement.appendChild(liEL);
});
todoListEl.appendChild(fragement);
}
複製代碼
經過實際例子能夠看到這裏有一大段不怎麼優雅的反作用方法,用來操做dom元素,因爲咱們基於全量渲染的思想,並無使用傳統的同步增刪改dom的方式,不然反作用代碼會更多; 同時因爲沒有vdom的加持,此段渲染代碼純粹只能用來demo展現一下; 至於更優雅的反作用處理和vdom能力,能夠期待我後續關於cycle.js的介紹😜
FE One