最近看到SICP 3.5: Stream,裏面介紹了惰性求值,以及如何經過組合與抽象來操做惰性數據流。讓我感覺最深的幾點:html
接下來,讓咱們用JavaScript來本身實現惰性求值,在這個過程當中我會解釋上面的幾條領悟。python
假設原始的數據是[1, 1000]
的整數:git
Array.from(Array(1000)).map((x,i)=>i+1)
假設咱們如今想找出其中的偶數:github
Array.from(Array(1000)).map((x,i)=>i+1) .filter(x=>x%2===0)
假設咱們如今想找出其中的3的倍數:編程
Array.from(Array(1000)).map((x,i)=>i+1) .filter(x=>x%2===0) .filter(x=>x%3===0)
假設咱們只須要找出第10個這樣的數:數組
Array.from(Array(1000)).map((x,i)=>i+1) .filter(x=>x%2===0) .filter(x=>x%3===0) [9]
這是在js中很是常見的數組管道式處理,借鑑了函數式的思想,將多個處理步驟串聯組合,讓代碼很是清晰簡潔。
可是你應該發現了一個很嚴重的問題:有大量的計算被浪費。咱們只須要第9個偶數,可是前面的處理鏈路實際上會把整個大數組都處理一遍。在現實編程中,數據源更大、數據處理步驟更多,計算浪費的問題會更嚴重。雖然管道式處理可以大大簡化代碼,可是經常有計算浪費的通病。模塊化
爲了避免作多餘計算,咱們每每只能拋棄管道式處理,對於每一個數據,先讓它走完整個處理流程,而後循環,將下一個數據推入一樣的處理流程:函數
function compute() { let count = 0; for (let i = 1; i <= 1000; i++) { if (i % 2 == 0 && i % 3 == 0) { count++; if (count === 10) return i; } } }
這樣確實可以避免計算浪費,可是失去了管道式處理的模塊化和簡潔性。在循環的方案中:循環代碼、處理代碼、判斷結束的代碼糅雜在一塊兒了。而在前面的管道式處理中,每種處理都是界線清晰的,很容易作到模塊化。
假設如今要實現一個UI界面,讓用戶經過鼠標拖拽的方式(而不是編寫代碼)來編排數據處理流程,你必定會選擇管道模型,而不是迭代模型。由於管道模型很是容易作到模塊化,你只要將不一樣的處理功能都實現成模塊,用戶只須要給這些模塊排個順序就行了。而迭代模型,你須要讓用戶可以建立迭代,而後在迭代裏面加入處理邏輯、跳出邏輯……工具
那麼,有沒有方式可以保持管道模型的簡潔性,同時可以避免計算浪費呢?有,那就是惰性數據流。惰性數據流是一種特殊的數據序列(與之相對應的,數組是一種常見的數據序列),它只有在真正須要的時候纔會計算出下一項的值。下面是一個例子,getRangeIterator
能夠返回一個惰性數據流,它的數據就是依次返回[min,max]
的整數。測試
function getRangeIterator(min, max) { return () => { const done = min > max; if (done) return { done: true }; return { done: false, val: min, next: getRangeIterator(min + 1, max) }; }; }
若是你不適應這種函數式的編程,能夠經過顯式維護一個狀態變量來記錄數據流狀態:
function getRangeIterator2(min, max) { let current = min; return function next() { const done = current > max; if (done) return { done: true }; return { done: false, val: current++ }; }; }
下面的文章都會使用函數式的風格,不會使用帶反作用的賦值。你能夠嘗試實現過程式風格的版本,對比哪一種編程風格更加簡潔。
爲了測試咱們的惰性數據流,咱們寫一個工具函數來打印數據流中的全部數據:
function iteratorAll(it, cb) { const { done, val, next } = it(); // 計算下一個數據 if (done) return; cb(val); iteratorAll(next, cb); } iteratorAll(getRangeIterator(20, 30), console.log); // 會依次打印出20~30中的全部數字
使用惰性數據流來進行管道式的數據處理,完整代碼:
// 搭建數據流處理管道 const it = takeItUntil( // 第三道處理(中止邏輯) filterIt( // 第二道處理 filterIt( // 第一道處理 getRangeIterator(1, 1000), // 數據源 x => x % 2 === 0 ), x => x % 3 === 0 ), (x, idx) => idx >= 10 ); // 打印數據流的最後一項 console.log(takeLast(it)); // 60 // 生成順序數字流的方法 function getRangeIterator(min, max) { return () => { const done = min > max; if (done) return { done: true }; return { done: false, val: min, next: getRangeIterator(min + 1, max) }; }; } // 流處理方法,輸入是流,輸出也是流 function filterIt(it, cb) { return () => { const { done, val, next } = it(); if (done) return { done: true }; if (cb(val)) return { done: false, val, next: filterIt(next, cb) }; return filterIt(next, cb)(); }; } function takeItUntil(it, cb, currentIdx = 0) { return () => { const { done, val, next } = it(); if (done || cb(val, currentIdx)) return { done: true }; return { done: false, val, next: takeItUntil(next, cb, currentIdx + 1) }; }; } // 這個不是流處理,而是消費流的方法,由於它輸出的是數字而不是流 function takeLast(it) { const { done, val, next } = it(); if (done) throw new Error("can't takeLast from empty stream"); return helper(next, val); function helper(it, pre) { const { done, val, next } = it(); if (done) return pre; return helper(next, val); } }
下半部分定義的那些方法,所有都是能夠複用的流處理工具。
上半部分定義的流處理管道,實際是結構很是一致的,一層套一層,從內到外的處理管道。
咱們還能夠優化一下流處理管道的組合方式,把嵌套的組合方式改爲鏈式調用的組合方式,更符合人的閱讀習慣:
// 搭建數據流處理管道 const it = chain(getRangeIterator(1, 1000)) // 數據源 .pipe(filter(x => x % 2 === 0)) // 第一道處理 .pipe(filter(x => x % 3 === 0)) // 第二道處理 .pipe(takeUntil((x, idx) => idx >= 10)) // 第三道處理 .unWrap(); // 打印數據流的最後一項 console.log(takeLast(it)); // 60 // 生成順序數字流的方法 function getRangeIterator(min, max) { return () => { const done = min > max; if (done) return { done: true }; return { done: false, val: min, next: getRangeIterator(min + 1, max) }; }; } // 流處理方法配置方法 function filter(cb) { // 返回流處理函數 return function _filter(it) { // 流處理函數輸入一個流,返回一個流 return () => { const { done, val, next } = it(); if (done) return { done: true }; if (cb(val)) return { done: false, val, next: _filter(next) }; // val被filter掉,從流中拿出下一個值 return _filter(next)(); }; }; } function takeUntil(cb) { return it => _takeUntil(it, 0); function _takeUntil(it, currentIdx) { return () => { const { done, val, next } = it(); if (done || cb(val, currentIdx)) return { done: true }; return { done: false, val, next: _takeUntil(next, currentIdx + 1) }; }; } } // takeLast不是流處理,而是消費流的方法(相似於前面的iteratorAll),由於它輸出的不是流 function takeLast(it) { const { done, val, next } = it(); if (done) throw new Error("can't takeLast from empty stream"); return helper(next, val); function helper(it, pre) { const { done, val, next } = it(); if (done) return pre; return helper(next, val); } } function chain(it) { return { pipe: transformer => chain(transformer(it)), unWrap: () => it }; }
能夠看到,鏈式調用的惰性數據流處理,與文章最前面的數組鏈式調用同樣簡潔。
【計算關係的創建】與【實際計算工做】的時機分離,這個是惰性數據流與普通數組處理的最大不一樣:
// 搭建數據流處理管道 const it = chain(getRangeIterator(1, 1000)) // 數據源 .pipe(filter(x => x % 2 === 0)) // 第一道處理 .pipe(filter(x => x % 3 === 0)) // 第二道處理 .pipe(takeUntil((x, idx) => idx >= 10)) // 第三道處理 .unWrap(); // 這個時候實際尚未任何數據拉取、處理,只是創建了計算關係 // 消費下游數據的時候,纔會把上游數據抽出來 console.log(takeLast(it));
創建計算關係的代碼不會消費和處理數據。
而普通的數組處理則作不到這一點,創建計算關係的代碼會馬上完成全部工做:
Array.from(Array(1000)).map((x,i)=>i+1) .filter(x=>x%2===0) // 前面已經完成全部數據的計算 .filter(x=>x%3===0) // 前面已經完成全部數據的計算 [9] // 前面已經完成全部數據的計算
若是你仔細觀察,會發現不少大型計算系統都是先定義好計算關係,而後才真正開始計算的。好比TensorFlow:
model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ])
普通的(急切的)數據流是提早準備好全部數據,所以你沒法表示和處理無窮大的數據序列(好比斐波那契數列)。
而惰性數據流則是至關於只保存數列每一項的計算方法,在須要的時候才計算出來。這就是找規律和死記硬背的區別~
舉個例子,斐波那契數據流:
// 生成順序斐波那契流的方法 function getFibStream() { return helper(0, 1); function helper(pre2, pre1) { return () => { const val = pre2 + pre1; return { done: false, val, next: helper(pre1, val) }; }; } }
上游數據流有無窮多個,是常常發生的事情,只要下游轉換流、消費者會中止,程序就會終止。舉個例子,打印1000之內的斐波那契數:
// 搭建數據流處理管道 const it = chain(getFibStream()) .pipe(takeUntil(x => x >= 1000)) .unWrap(); // 打印小於1000的斐波那契數 iteratorAll(it, console.log); // 生成順序斐波那契流的方法 function getFibStream() { return helper(0, 1); function helper(pre2, pre1) { return () => { const val = pre2 + pre1; return { done: false, val, next: helper(pre1, val) }; }; } } function takeUntil(cb) { return it => _takeUntil(it, 0); function _takeUntil(it, currentIdx) { return () => { const { done, val, next } = it(); if (done || cb(val, currentIdx)) return { done: true }; return { done: false, val, next: _takeUntil(next, currentIdx + 1) }; }; } } function iteratorAll(it, cb) { const { done, val, next } = it(); if (done) return; cb(val); iteratorAll(next, cb); } function chain(it) { return { pipe: transformer => chain(transformer(it)), unWrap: () => it }; }