從 RxJS 到 Flink:如何處理數據流?

簡介: 前端開發的本質是什麼?響應式編程相對於 MVVM 或者 Redux 有什麼優勢?響應式編程的思想是否能夠應用到後端開發中?本文以一個新聞網站爲例,闡述在前端開發中如何使用響應式編程思想;再以計算電商平臺雙11每小時成交額爲例,分享一樣的思想在實時計算中的相同與不一樣之處。

一 、前端開發在開發什麼

你們在前端開發的過程當中,可能會想過這樣一個問題:前端開發到底是在開發什麼?在我看來,前端開發的本質是讓網頁視圖可以正確地響應相關事件。在這句話中有三個關鍵字:"網頁視圖","正確地響應"和"相關事件"。前端

"相關事件"可能包括頁面點擊,鼠標滑動,定時器,服務端請求等等,"正確地響應"意味着咱們要根據相關的事件來修改一些狀態,而"網頁視圖"就是咱們前端開發中最熟悉的部分了。react

按照這樣的觀點咱們能夠給出這樣 視圖 = 響應函數(事件) 的公式:web

View = reactionFn(Event)

在前端開發中,須要被處理事件能夠歸類爲如下三種:算法

  • 用戶執行頁面動做,例如 click, mousemove 等事件。
  • 遠程服務端與本地的數據交互,例如 fetch, websocket。
  • 本地的異步事件,例如 setTimeout, setInterval async_event。

這樣咱們的公式就能夠進一步推導爲:數據庫

View = reactionFn(UserEvent | Timer | Remote API)

二 、應用中的邏輯處理

爲了可以更進一步理解這個公式與前端開發的關係,咱們以新聞網站舉例,該網站有如下三個要求:編程

  • 單擊刷新:單擊 Button 刷新數據。
  • 勾選刷新:勾選 Checkbox 時自動刷新,不然中止自動刷新。
  • 下拉刷新:當用戶從屏幕頂端下拉時刷新數據。

若是從前端的角度分析,這三種需求分別對應着:後端

  • 單擊刷新:click -> fetch
  • 勾選刷新:change -> (setInterval + clearInterval) -> fetch
  • 下拉刷新:(touchstart + touchmove + touchend) -> fetch news_app

一、 MVVMapi

在 MVVM 的模式下,對應上文的響應函數(reactionFn)會在 Model 與 ViewModel 或者 View 與 ViewModel 之間進行被執行,而事件 (Event) 會在 View 與 ViewModel 之間進行處理。瀏覽器

MVVM 能夠很好的抽象視圖層與數據層,可是響應函數(reactionFn)會散落在不一樣的轉換過程當中,這會致使數據的賦值與收集過程難以進行精確追蹤。另外由於事件 (Event) 的處理在該模型中與視圖部分緊密相關,致使 View 與 ViewModel 之間對事件處理的邏輯複用困難。緩存

2 、Redux

在 Redux 最簡單的模型下,若干個事件 (Event) 的組合會對應到一個 Action 上,而 reducer 函數能夠被直接認爲與上文提到的響應函數 (reactionFn) 對應。

可是在 Redux 中:

  • State 只能用於描述中間狀態,而不能描述中間過程。
  • Action 與 Event 的關係並不是一一對應致使 State 難以追蹤實際變化來源。

3 、響應式編程與 RxJS

維基百科中是這樣定義響應式編程:

在計算中,響應式編程或反應式編程(英語:Reactive programming)是一種面向數據流和變化傳播的聲明式編程範式。這意味着能夠在編程語言中很方便地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值經過數據流進行傳播。

以數據流維度從新考慮用戶使用該應用的流程:

  • 點擊按鈕 -> 觸發刷新事件 -> 發送請求 -> 更新視圖
  • 勾選自動刷新
  • 手指觸摸屏幕
  • 自動刷新間隔 -> 觸發刷新事件 -> 發送請求 -> 更新視圖
  • 手指在屏幕上下滑
  • 自動刷新間隔 -> 觸發刷新事件 -> 發送請求 -> 更新視圖
  • 手指在屏幕上中止滑動 -> 觸發下拉刷新事件 -> 發送請求 -> 更新視圖
  • 自動刷新間隔 -> 觸發刷新事件 -> 發送請求 -> 更新視圖
  • 關閉自動刷新

以 Marbles 圖表示:

拆分上圖邏輯,就會獲得使用響應式編程開發當前新聞應用時的三個步驟:

  • 定義源數據流
  • 組合/轉換數據流
  • 消費數據流並更新視圖

咱們分別來進行詳細描述。

定義源數據流

使用 RxJS,咱們能夠很方便的定義出各類 Event 數據流。

1)單擊操做

涉及 click 數據流。

click$ = fromEvent<MouseEvent>(document.querySelector('button'), 'click');

2)勾選操做

涉及 change 數據流。

change$ = fromEvent(document.querySelector('input'), 'change');

3)下拉操做

涉及 touchstart, touchmove 與 touchend 三個數據流。

touchstart$ = fromEvent<TouchEvent>(document, 'touchstart');
touchend$ = fromEvent<TouchEvent>(document, 'touchend');
touchmove$ = fromEvent<TouchEvent>(document, 'touchmove');

4)定時刷新

interval$ = interval(5000);

5)服務端請求

fetch$ = fromFetch('https://randomapi.azurewebsites.net/api/users');

組合/轉換數據流

1)點擊刷新事件流

在點擊刷新時,咱們但願短期內屢次點擊只觸發最後一次,這經過 RxJS 的 debounceTime operator 就能夠實現。

clickRefresh$ = this.click$.pipe(debounceTime(300));

2)自動刷新流

使用 RxJS 的 switchMap 與以前定義好的 interval$ 數據流配合。

autoRefresh$ = change$.pipe(
  switchMap(enabled => (enabled ? interval$ : EMPTY))
);

3)下拉刷新流

結合以前定義好的 touchstart$touchmove$ 與 touchend$ 數據流。

pullRefresh$ = touchstart$.pipe(
  switchMap(touchStartEvent =>
    touchmove$.pipe(
      map(touchMoveEvent => touchMoveEvent.touches[0].pageY - touchStartEvent.touches[0].pageY),
      takeUntil(touchend$)
    )
  ),
  filter(position => position >= 300),
  take(1),
  repeat()
);

最後,咱們經過 merge 函數將定義好的 clickRefresh$autoRefresh$ 與 pullRefresh$ 合併,就獲得了刷新數據流。

refresh$ = merge(clickRefresh$, autoRefresh$, pullRefresh$));

消費數據流並更新視圖

將刷新數據流直接經過 switchMap 打平到在第一步到定義好的 fetch$,咱們就得到了視圖數據流。

能夠經過在 Angular 框架中能夠直接 async pipe 將視圖流直接映射爲視圖:

<div *ngFor="let user of view$ | async">
</div>

在其餘框架中能夠經過 subscribe 得到數據流中的真實數據,再更新視圖。

至此,咱們就使用響應式編程完整的開發完成了當前新聞應用,示例代碼[1]由 Angular 開發,行數不超過 160 行。

咱們總結一下,使用響應式編程思想開發前端應用時經歷的三個過程與第一節中公式的對應關係:

View = reactionFn(UserEvent | Timer | Remote API)

1)描述源數據流

與事件UserEvent | Timer | Remote API 對應,在 RxJS 中對應函數分別是:

  • UserEvent: fromEvent
  • Timer: interval, timer
  • Remote API: fromFetch, webSocket

2)組合轉換數據流

與響應函數(reactionFn)對應,在 RxJS 中對應的部分方法是:

  • COMBINING: merge, combineLatest, zip
  • MAPPING: map
  • FILTERING: filter
  • REDUCING: reduce, max, count, scan
  • TAKING: take, takeWhile
  • SKIPPING: skip, skipWhile, takeLast, last
  • TIME: delay, debounceTime, throttleTime

3)消費數據流更新視圖

與 View 對應,在 RxJS 及 Angular 中可使用:

  • subscribe
  • async pipe

響應式編程相對於 MVVM 或者 Redux 有什麼優勢呢?

  • 描述事件發生的自己,而非計算過程或者中間狀態。
  • 提供了組合和轉換數據流的方法,這也意味着咱們得到了複用持續變化數據的方法。
  • 因爲全部數據流均由層層組合與轉換得到,這也就意味着咱們能夠精確追蹤事件及數據變化的來源。

若是咱們將 RxJS 的 Marbles 圖的時間軸模糊,並在每次視圖更新時增長縱切面,咱們就會發現這樣兩件有趣的事情:

  • Action 是 EventStream 的簡化。
  • State 是 Stream 在某個時刻的對應。

難怪咱們能夠在 Redux 官網中有這樣一句話:若是你已經使用了 RxJS,極可能你再也不須要 Redux 了。

The question is: do you really need Redux if you already use Rx? Maybe not. It's not hard to re-implement Redux in Rx. Some say it's a two-liner using Rx.scan() method. It may very well be!

寫到這裏,咱們對網頁視圖可以正確地響應相關事件這句話是否能夠進行進一步的抽象呢?

全部事件 -- 找到 --> 相關事件 -- 作出 --> 響應

而按時間順序發生的事件,本質上就是數據流,進一步拓展就可變成:

源數據流 -- 轉換 --> 中間數據流 -- 訂閱 --> 消費數據流

這正是響應式編程在前端可以完美工做的基礎思想。可是該思想是否只在前端開發中有所應用呢?

答案是否認的,該思想不只能夠應用於前端開發,在後端開發乃至實時計算中都有着普遍的應用。

3、 打破信息之牆

在先後端開發者之間,一般由一面叫 REST API 的信息之牆隔開,REST API 隔離了先後端開發者的職責,提高了開發效率。但它一樣讓先後端開發者的眼界被這面牆隔開,讓咱們試着來推倒這面信息之牆,一窺一樣的思想在實時計算中的應用。

1 、實時計算 與 Apache Flink

在開始下一部分以前,讓咱們先介紹一下 Flink。Apache Flink 是由 Apache 軟件基金會開發的開源流處理框架,用於在無邊界和有邊界數據流上進行有狀態的計算。它的數據流編程模型在有限和無限數據集上提供單次事件(event-at-a-time)處理能力。

在實際的應用中,Flink 一般用於開發如下三種應用:

  • 事件驅動型應用 事件驅動型應用從一個或多個事件流提取數據,並根據到來的事件觸發計算、狀態更新或其餘外部動做。場景包括基於規則的報警,異常檢測,反欺詐等等。
  • 數據分析應用 數據分析任務須要從原始數據中提取有價值的信息和指標。例如雙十一成交額計算,網絡質量監測等等。
  • 數據管道(ETL)應用 提取-轉換-加載(ETL)是一種在存儲系統之間進行數據轉換和遷移的經常使用方法。ETL 做業一般會週期性地觸發,將數據從事務型數據庫拷貝到分析型數據庫或數據倉庫。

咱們這裏以計算電商平臺雙十一每小時成交額爲例,看下咱們在以前章節獲得方案是否仍然能夠繼續使用。

在這個場景中咱們首先要獲取用戶購物下單數據,隨後計算每小時成交數據,而後將每小時的成交數據轉存到數據庫並被 Redis 緩存,最終經過接口獲取後展現在頁面中。

在這個鏈路中的數據流處理邏輯爲:

用戶下單數據流 -- 轉換 --> 每小時成交數據流 -- 訂閱 --> 寫入數據庫

與以前章節中介紹的:

源數據流 -- 轉換 --> 中間數據流 -- 訂閱 --> 消費數據流

思想徹底一致。

若是咱們用 Marbles 描述這個過程,就會獲得這樣的結果,看起來很簡單,彷佛使用 RxJS 的 window operator 也能夠完成一樣的功能,可是事實真的如此嗎?

2 、被隱藏的複雜度

真實的實時計算比前端中響應式編程的複雜度要高不少,咱們在這裏舉幾個例子:

事件亂序

在前端開發過程當中,咱們也會碰到事件亂序的狀況,最經典的狀況先發起的請求後收到響應,能夠用以下的 Marbles 圖表示。這種狀況在前端有不少種辦法進行處理,咱們在這裏就略過不講。

咱們今天想介紹的是數據處理時面臨的時間亂序狀況。在前端開發中,咱們有一個很重要的前提,這個前提大幅度下降了開發前端應用的複雜度,那就是:前端事件的發生時間和處理時間相同。

想象一下,若是用戶執行頁面動做,例如 click, mousemove 等事件都變成了異步事件,而且響應時間未知,那整個前端的開發複雜度會如何。

可是事件的發生時間與處理時間不一樣,在實時計算領域是一個重要的前提。咱們仍以每小時成交額計算爲例,當原始數據流通過層層傳輸以後,在計算節點的數據的前後順極可能已經亂序了。

若是咱們仍然以數據的到來時間來進行窗口劃分,最後的計算結果就會產生錯誤:

爲了讓 window2 的窗口的計算結果正確,咱們須要等待 late event 到來以後進行計算,可是這樣咱們就面臨了一個兩難問題:

  • 無限等下去:late event 可能在傳輸過程當中丟失,window2 窗口永遠沒有數據產出。
  • 等待時間過短:late event 尚未到來,計算結果錯誤。

Flink 引入了 Watermark 機制來解決這個問題,Watermark 定義了何時再也不等待 late event,本質上提供了實時計算的準確性和實時性的折中方案。

關於 Watermark 有個形象的比喻:上學的時候,老師會將班級的門關上,而後說:「從這個點以後來的同窗都算遲到了,通通罰站「。在 Flink 中,Watermark 充當了老師關門的這個動做。

數據反壓

在瀏覽器中使用 RxJS 時,不知道你們有沒有考慮這樣一種狀況:observable 產生的速度快於 operator 或者 observer 消費的速度時,會產生大量的未消費的數據被緩存在內存中。這種狀況被稱爲反壓,幸運的是,在前端產生數據反壓只會致使瀏覽器內存被大量佔用,除此以外不會有更嚴重的後果。

可是在實時計算中,當數據產生的速度高於中間節點處理能力,或者超過了下游數據的消費能力時,應當如何處理?

對於許多流應用程序來講,數據丟失是不可接受的,爲了保證這一點,Flink 設計了這樣一種機制:

  • 在理想狀況,在一個持久通道中緩衝數據。
  • 當數據產生的速度高於中間節點處理能力,或者超過了下游數據的消費能力時,速度較慢的接收器會在隊列的緩衝做用耗盡後當即下降發送器的速度。更形象的比喻是,在數據流流速變慢時,將整個管道從水槽「回壓」到水源,並對水源進行節流,以便將速度調整到最慢的部分,從而達到穩定狀態。

Checkpoint

實時計算領域,每秒鐘處理的數據可能有數十億條,這些數據的處理不可能由單臺機器獨立完成。事實上,在 Flink 中,operator 運算邏輯會由不一樣的 subtask 在 不一樣的 taskmanager 上執行,這時咱們就面臨了另一個問題,當某臺機器發生問題時,總體的運算邏輯與狀態該如何處理才能保證最後運算結果的正確性?

Flink 中引入了 checkpoint 機制用於保證能夠對做業的狀態和計算位置進行恢復,checkpoint 使 Flink 的狀態具備良好的容錯性。Flink 使用了 Chandy-Lamport algorithm 算法的一種變體,稱爲異步 barrier 快照(asynchronous barrier snapshotting)。

當開始 checkpoint 時,它會讓全部 sources 記錄它們的偏移量,並將編號的 checkpoint barriers 插入到它們的流中。這些 barriers 會通過每一個 operator 時標註每一個 checkpoint 先後的流部分。

當發生錯誤時,Flink 能夠根據 checkpoint 存儲的 state 進行狀態恢復,保證最終結果的正確性。

冰山一角

因爲篇幅的關係,今天介紹的部分只能是冰山一角,不過

源數據流 -- 轉換 --> 中間數據流 -- 訂閱 --> 消費數據流

的模型不管在響應式編程仍是實時計算都是通用的,但願這篇文章可以讓你們對數據流的思想有更多的思考。

做者:開發者小助手_LS
原文連接本文爲阿里雲原創內容,未經容許不得轉載

相關文章
相關標籤/搜索