我所瞭解的RxJS

簡介

RxJS 是使用 Observables的響應式編程的庫,它使編寫異步或基於回調的代碼更容易,是ReactiveX編程理念的JavaScript版本。RxJS的強大之處正是它使用純函數來產生值的能力。這意味着你的代碼更不容易出錯。javascript

安裝

官方安裝

npm install rxjs
/// 導入整個核心功能集:
import Rx from 'rxjs/Rx';
Rx.Observable.of(1,2,3)
複製代碼

推薦安裝

根據官方安裝發現rxjs不能徹底加載,須要依賴rxjs-compat包,推薦使用如下安裝css

npm i -s Rxjs@6 rxjs-compat@6
import * as Rx from 'rxjs/Rx'
複製代碼

RxJS核心概念

Observable簡介

Observable舉例說明

Rx.Observable.of('1', '2', '3').map(x=>x*10).filter(x=>x>5).subscribe(x=>console.log(x))
複製代碼
  • 建立過程:Rx.Observable.of('1', '2', '3') 建立一個依次發送一、二、3的observable
  • 邏輯過程:*.map().filter()*每一個值乘以10,而後去過濾出大於5的值。若是先寫filter操做符,而後再map,則得不到數據
  • 訂閱過程:*subscribe()*相似回調函數。這個過程會獲得一個對象subscription。
  • 執行過程:x=>console.log(x) 默認狀況下爲執行next回調
  • 清理過程:示例以下
const subscription = Rx.Observable.of('1','2','3').map(x=>x*10).filter(x=>x>5).delay(1000).subscribe(x=>console.log(x));
subscription.unsubscribe()
複製代碼

Subject簡介

什麼是 Subject? - RxJS Subject 是一種特殊類型的 Observable,它容許將值多播給多個觀察者,因此 Subject 是多播的,而普通的 Observables 是單播的(每一個已訂閱的觀察者都擁有 Observable 的獨立執行)。 每一個 Subject 都是 Observable 。 - 對於 Subject,你能夠提供一個觀察者並使用 subscribe 方法,就能夠開始正常接收值。從觀察者的角度而言,它沒法判斷 Observable 執行是來自普通的 Observable 仍是 Subject 。 在 Subject 的內部,subscribe 不會調用發送值的新執行。它只是將給定的觀察者註冊到觀察者列表中,相似於其餘庫或語言中的 addListener 的工做方式。 每一個 Subject 都是觀察者。 - Subject 是一個有以下方法的對象: next(v)、error(e) 和 complete() 。要給 Subject 提供新值,只要調用 next(theValue),它會將值多播給已註冊監聽該 Subject 的觀察者們。 Subject 像是 Observable,可是能夠多播給多個觀察者。Subject 還像是 EventEmitters,維護着多個監聽器的註冊表。 根據官網,咱們大概能夠如下理解: Observable相似單車道單行線,逆行或者多輛車同時開都是不容許的 Subject相似沒有監控的雙行線,隨你往哪裏開,怎麼開,多少車開都沒有問題 因此能夠理解Subject是一類特殊的Observable,它能夠向多個Observer多路推送數值。普通的Observable並不具有多路推送的能力(每個Observer都有本身獨立的執行環境),而Subject能夠共享一個執行環境html

Subject舉例說明

const test = Observable.interval(1000).take(3);
const observerA = {
  v => console.log(`a:${v}`) 
}
const observerB = {
  v => console.log(`b:${v}`)
}                                                                              ///定義好observable
test .subscribe(observerA)
setTimeout(() => {test .subscribe(observerB) }, 2000)    
///由於observable是單播的,因此會輸出 a:0、a:一、b:0、a:二、b:一、b:2
const subject = new Subject()
subject.subscribe(observerA)
test.subscribe(subject)
setTimeout(() => {subject.subscribe(observerB)}, 2000)
///由於Subject是多播的,共享一個執行,因此輸出爲:a:0、a:一、a:二、b:2
複製代碼

Subject多態

因爲subject的特殊性,衍生出多種subject的變體,具體就不闡述了,他們的對好比下圖前端

Rxjs 是否存儲數據 是否須要初始值 什麼時候向訂閱者發佈數據
Subject 及時發佈,有新數據就發佈
BehaviorSubject 是,存儲最後一條數據或者初始值 及時發佈,有新數據就發佈
ReplaySubject 是,存儲全部數據 及時發佈,有新數據就發佈
AsyncSubject 是,存儲最後一條數據 延時發佈,只有當數據源完成時纔會發佈

Scheduler簡介

什麼是Scheduler? - Scheduler控制着什麼時候啓動 subscription 和什麼時候發送通知。它由三部分組成java

調度器是一種數據結構。它知道如何根據優先級或其餘標準來存儲任務和將任務進行排序。 調度器是執行上下文。 它表示在什麼時候何地執行任務(舉例來講,當即的,或另外一種回調函數機制(好比 setTimeout 或 process.nextTick),或動畫幀)。 調度器有一個(虛擬的)時鐘。 調度器功能經過它的 getter 方法 now() 提供了「時間」的概念。在具體調度器上安排的任務將嚴格遵循該時鐘所表示的時間。 調度器可讓你規定 Observable 在什麼樣的執行上下文中發送通知給它的觀察者。程序員

操做符概括

RxJS提供了各類API來建立數據流:npm

單值:of, empty, never 多值:from 定時:interval, timer 從事件建立:fromEvent 從Promise建立:fromPromise 自定義建立:create編程

建立出來的數據流是一種可觀察的序列,能夠被訂閱,也能夠被用來作一些轉換操做,好比:數組

改變數據形態:map, mapTo, pluck 過濾一些值:filter, skip, first, last, take 時間軸上的操做:delay, timeout, throttle, debounce, audit, bufferTime 累加:reduce, scan 異常處理:throw, catch, finally, retry, 條件執行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn 轉接:switch前端工程師

也能夠對若干個數據流進行組合:

concat,保持原來的序列順序鏈接兩個數據流 merge,合併序列 race,預設條件爲其中一個數據流完成 forkJoin,預設條件爲全部數據流都完成 zip,取各來源數據流最後一個值合併爲對象 combineLatest,取各來源數據流最後一個值合併爲數組

RxJS 難點

RxJS 處理異步邏輯,數據流,事件很是擅長。使用Rxjs前處理數據通常是處於一種'上帝'視角來對數據可視化的調試,Rxjs大大縮短了代碼量的同時可以更好的達到數據的處理(純淨性)。正是因爲其強大的特性,因此學習Rxjs有如下難點(我的認爲) 一、抽象程度比較高,須要開發人員具有比較強的概括總結能力 二、操做符多並且雜,須要花大力氣記住而且合理使用各個操做符

測試題

  • 一、鼠標點擊後console相隔2秒輸出5的倍數
  • 二、現有3個異步操做a、b、c,請提供讓三個異步並行完成後同時輸出值的方法
  • 三、’人和將來大數據‘ ===》 取最後4個字(多種方法)
  • 四、模擬一個程序員,工資不漲,天天賺相同的錢n,錢足夠了(100n)就買房,買了房而後把房子租給別人,每月收取房租m(5n),而後收入變成n+m,而後錢足夠了繼續買房,而後繼續租給訪客,收入變成n+2m

參考答案

//////題目1
const timer = Rx.Observable.interval(2000);
const event = Rx.Observable.fromEvent(document, 'click')
event.switchMap(() => timer)
 .map(x => x * 5)
 .subscribe(x => console.log('第1題:' + x));
複製代碼
/////題目2
const fa = (cb) => {
  setTimeout(() => cb('a'), 1000);
}
const fb = (cb) => {
  setTimeout(() => cb('b'), 2000);
}
const fc = (cb) => {
  setTimeout(() => cb('c'), 4000);
}
const oa = Rx.Observable.bindCallback(fa);
const ob = Rx.Observable.bindCallback(fb);
const oc = Rx.Observable.bindCallback(fc);

Rx.Observable.combineLatest(oa(),ob(),oc())
  .subscribe(x => console.log('第2題:' + x));
  /////同時還能夠用forkJoin,zip
複製代碼
//////題目3
const str = "人和將來大數據";
const param = str.split('');
Rx.Observable.from(param)
  .takeLast(4)
  .subscribe(x => console.log('第3題:' + x));
///////////////////////////////////////////////////////
Rx.Observable.from(param).subscribe(new ReplaySubject(3))
複製代碼
///////題目4
const house$ = new Subject()  ///房子
const houseCount$ = house$.scan((acc, num) => acc + num, 0).startWith(0) ///房子數

// 工資始終不漲
const salary$ = Observable.interval(100).mapTo(1) //程序員工資n
const rent$ = Observable.interval(3000)
  .withLatestFrom(houseCount$)
  .map(arr => arr[1] * 5)

// 一買了房,就沒現金了……
const income$ = Observable.merge(salary$, rent$)
const cash$ = income$
  .scan((acc, num) => {
    const newSum = acc + num
    const newHouse = Math.floor(newSum / 100)
    if (newHouse > 0) {
      house$.next(newHouse)
    }
    return newSum % 100
  }, 0)
houseCount$.subscribe(num => console.log(`houseCount: ${num}`))
cash$.subscribe(num => console.log(`cash: ${num}`))
複製代碼

原文連接:tech.gtxlab.com/sth-about-r…


做者簡介: 張栓,人和將來大數據前端工程師,專一於html/css/js的學習與開發。

相關文章
相關標籤/搜索