RxJava 學習筆記<十七> 緩存Buffer

原文:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.mdios

Time-shifted sequences

    Rx的關鍵特性之一是,您不知道何時會發出項。一些observables會當即同步地發射出全部的東西(例如range),一些會按必定的間隔發出,還有一些很難甚至是沒法預測的。例如,鼠標事件和UDP數據包只在到達時到達。咱們須要工具來決定如何處理這些事件,不只基於它們是什麼,並且還基於它們什麼時候到達和以何種頻率發生。git

Buffer

    「buffer」容許您收集值並以批量形式獲取它們,而不是一次收集一個值。它們是緩衝值的幾種不一樣方式。github

Complete, non-overlapping buffering

    首先,咱們將檢查緩衝區的變體,其中每一個值都只緩衝一次,沒有損失,也沒有重複。緩存

buffer by count

    最簡單的重載將固定數量的值分組在一塊兒,並在組準備就緒時發出該組。app

日誌輸出:函數

buffer by time

    下一個重載容許您根據時間進行緩衝。時間被分紅等長的窗口。爲每一個窗口收集值,並在每一個窗口的末尾發出緩衝區。工具

    再下一個例子中,咱們將每隔100ms發出一個值,而緩存時間是250ms。spa

日誌輸出:3d

    這裏集合的大小取決於在那個時間段內發出了多少值,而不是指望的大小。若是在窗口中沒有事件,集合甚至能夠是空的。日誌

buffer by count and time

    能夠同時使用緩衝區大小和Time來緩衝值。若是緩衝區已滿,若是時間到或者換衝區滿,則發出緩衝值。

日誌輸出:

    咱們在這裏看到不少空的,這是由於緩衝區在大小達到2時和時間窗口關閉時都會發出。從前面的示例中能夠看出,這些窗口中只有兩個值。由於緩衝區在大小達到2時被清空,因此當窗口關閉時它是空的。

buffer with signal

    您還能夠用可觀察到的刷新信號通知緩衝區,而不是固定的時間點。每當信號在Next上發出時,將發出緩衝區中的值。若是您想要緩衝值直到您準備好接受它們,那麼使用信號緩衝是很是有用的。

下面的示例與Buffer(250,TimeUnit.MILLISECONDS)相同

日誌輸出:

    上面的方法有一種變體,您能夠經過一個函數提供可觀察到的信號:buffer(() -> Observable.interval(250, TimeUnit.MILLISECONDS))。這裏的不一樣之處在於,建立可觀察到的函數是在訂閱發生時執行的。您能夠用來在訂閱開始時啓動信號。

Overlapping buffers

    咱們已經看到的每種緩衝方法都有容許緩衝區過載或忽略值的替代方法。

buffer by count

    根據所需的緩衝區大小進行緩衝時,還能夠聲明每一個緩衝區的起始位置之間的距離。

  

    正如咱們在圖中看到的,咱們每3個值啓動一個新的緩衝區,可是緩衝區被限制爲2個值。所以,每三個元素中就有一個被遺漏。您還能夠在前一個緩衝區關閉以前啓動新的緩衝區。

  • 當count > skip,緩衝區重疊
  • 當count < skip,元素被排除在外
  • 當count = skip,如前面的例子同樣

日誌輸出:

    如咱們所見,每3個元素啓動一個新的緩衝區,該緩衝區包含接下來的4個元素。

buffer by time

    咱們能夠對對象作很是相似的事情,其中緩衝是基於時間跨度的。您能夠決定打開新緩衝區的頻率以及它們應該持續多長時間。

    一樣,這容許您要麼使緩衝區重疊,要麼省略元素。

  • 當timespan > timeshift,緩衝區重疊
  • 當timespan < timeshift,元素被排除
  • 當 timespan < timeshift,和以前的例子同樣

    在下一個示例中,咱們將每200 ms建立一個新的緩衝區,並將其收集爲350 ms。這意味着緩衝區重疊150毫秒。

日誌輸出:

buffer by signal

    最後一個也是最強大的對象或緩衝區容許您使用可觀察到的信令來定義緩衝區的開始和結束。

    這個函數有兩個參數。第一個參數BufferOpenings是一個可觀察到的參數。每次這個觀測值發出一個值,就會有一個新的緩衝區開始。除了打開一個新的緩衝區以外,它所發出的值也會傳遞給緩衝區ClosingSelector,它是一個函數。該函數使用該值建立一個新的可觀察值,當它發出第一個onNext事件時,它將發出相應緩衝區的結束信號。

日誌輸出:

    咱們已經建立了一個Observable.interval,它每隔250毫秒就會發出一個新的緩衝區打開的信號。由於用Interval建立的可觀察對象不會當即發出值,而第一個緩衝區其實是在250 ms時開始的,而以前的值都丟失了。對於緩衝區的關閉,咱們提供了一個lambda函數,該函數接受緩衝區bufferOpenings發出的每一個值。由區間生成的值是天然級數0,1,2,3.可是咱們實際上沒有使用這個值,由於這樣的例子太複雜了。相反,咱們只是建立了一個等待200 ms,而後發出單個值的可觀察值。這意味着每一個緩衝區的持續時間正好爲200 ms,相似於按時間緩衝。

takeLastBuffer

    咱們已經看到了「takeLast」操做符,它返回最後N個項。在內部,「takeLast」須要緩衝項,並在源序列結束時從新發出它們。takLastBuffer運算符將最後的元素做爲一個緩衝區返回。

By count

    以計數方式獲取LastBuffer將發出列表中的最後N個元素。

日誌輸出:

By time

    按時間獲取LastBuffer將發出在指定的TimeSpan期間接收到的項做爲緩衝區,這是從源序列的末尾開始測量的。

日誌輸出:

By count and time

    這個帶LastBuffer的重載所發出的緩衝區將包含在結束以前在指定的時間跨度內發出的項。若是此窗口包含的項多於指定的項數,則緩衝區將包含最後的N項。

日誌輸出:

    這個帶LastBuffer的重載所發出的緩衝區將包含在結束以前在指定的時間跨度內發出的項。若是此窗口包含的項多於指定的項數,則緩衝區將包含最後的N項。

下節繼續!

有什麼討論的內容,能夠加我公衆號:

相關文章
相關標籤/搜索