以前我在知乎上受邀回答過一個關於RxJava背壓(Backpressure)機制的問題,今天我把它整理出來,但願對更多的人能有幫助。html
RxJava的官方文檔中對於背壓(Backpressure)機制比較系統的描述是下面這個:git
但本文的題目既然是要「形象地」描述各個機制,天然會力求表達簡潔,讓人一看就懂。因此,下面我會盡可能拋開一些抽象的描述,主要採用打比方的方式來闡明我對於這些機制的理解。github
首先,從大的方面說,上面這篇文檔的題目,雖然叫「Backpressure」(背壓),但倒是在講述一個更大的話題——「Flow Control」(流控)。Backpressure只是Flow Control的其中一個方案。緩存
在RxJava中,能夠經過對Observable連續調用多個Operator組成一個調用鏈,其中數據從上游向下遊傳遞。當上遊發送數據的速度大於下游處理數據的速度時,就須要進行Flow Control了。網絡
這就像小學作的那道數學題:一個水池,有一個進水管和一個出水管。若是進水管水流更大,過一段時間水池就會滿(溢出)。這就是沒有Flow Control致使的結果。異步
Flow Control有哪些思路呢?大概是有四種:async
下面分別詳細介紹。post
注意:目前RxJava的1.x和2.x兩個版本序列同時並存,2.x相對於1.x在接口上有很大變更,其中也包括Backpressure的部分。可是,這裏要討論的Flow Control機制中的相關概念,卻都是適用的。學習
Backpressure,也稱爲Reactive Pull,就是下游須要多少(具體是經過下游的request請求指定須要多少),上游就發送多少。這有點相似於TCP裏的流量控制,接收方根據本身的接收窗口的狀況來控制接收速率,並經過反向的ACK包來控制發送方的發送速率。
這種方案只對於所謂的cold Observable有效。cold Observable指的是那些容許下降速率的發送源,好比兩臺機器傳一個文件,速率可大可小,即便下降到每秒幾個字節,只要時間足夠長,仍是可以完成的。相反的例子是音視頻直播,數據速率低於某個值整個功能就無法用了(這種就屬於hot Observable了)。
節流(Throttling),說白了就是丟棄。消費不過來,就處理其中一部分,剩下的丟棄。仍是舉音視頻直播的例子,在下游處理不過來的時候,就須要丟棄數據包。
而至於處理哪些和丟棄哪些數據,就有不一樣的策略。主要有三種策略:
從細的方面分別解釋一下。
sample,採樣。類比一下音頻採樣,8kHz的音頻就是每125微秒採一個值。sample能夠配置成,好比每100毫秒採樣一個值,但100毫秒內上游可能過來不少值,選哪一個值呢,就是選最後那個值。因此它也叫throttleLast。
throttleFirst跟sample相似,好比仍是每100毫秒採樣一個值,但選這100毫秒內的第一個值。在Android開發中有時候能夠把throttleFirst用做點擊事件的防抖動處理,就是由於它能夠在指定的一段時間內處理第一個點擊事件(即採樣第一個值),但丟棄後面的點擊事件。
debounce,也叫throttleWithTimeout,名字裏就包含一個例子。好比,一個網絡程序維護一個TCP鏈接,不停地收發數據,但中間沒數據能夠收發的時候,就有間歇。這段間歇的時間,能夠稱爲idle time。當idle time超過一個預設值的時候,就算超時了(time out),這個時候可能就須要把鏈接斷開了。實際上一些作server端的網絡程序就是這麼工做的。每收發一個數據包以後,啓動一個計時器,等待一個idle time。若是計時器到時以前,又有收發數據包的行爲,那麼計時器重置,等待一個新的idle time;而若是計時器時間到了,就超時了(time out),這個鏈接就能夠關閉了。debounce的行爲,跟這個很是相似,能夠用它來找到那些連續的收發事件以後的idle time超時事件。換句話說,debounce能夠把連續發生的事件之間的較大的間歇找出來。
打包就是把上游來的小包裹打成大包裹,分發到下游。這樣下游須要處理的包裹的個數就減小了。RxJava中提供了兩類這樣的機制:buffer和window。
buffer和window的功能基本同樣,只是輸出格式不太同樣:buffer打包後的包裹用一個List表示,而window打包後的包裹又是一個Observable。
這是一種特殊狀況,阻塞住整個調用棧(Callstack blocking)。之因此說這是一種特殊狀況,是由於這種方式只適用於整個調用鏈都在一個線程上同步執行的狀況,這要求中間的各個operator都不能啓動新的線程。在日常使用中這種應該是比較少見的,由於咱們常用subscribeOn或observeOn來切換執行線程,並且有些複雜的operator自己也會在內部啓動新的線程來處理。另外,若是真的出現了徹底同步的調用鏈,前面的另外三種Flow Control思路仍然多是適用的,只不過這種阻塞的方式更簡單,不須要額外的支持。
這裏舉個例子把調用棧阻塞和前面的Backpressure比較一下。「調用棧阻塞」至關於不少車行駛在盤山公路上,而公路只有一條車道。那麼排在最前面的第一輛車就擋住了整條路,後面的車也只能排在後面。而「Backpressure」至關於銀行辦業務時的窗口叫號,窗口主動叫某個號過去(至關於請求),那我的纔過去辦理。
在RxJava 1.x中,有些Observable是支持Backpressure的,而有些不支持。但不支持Backpressure的Observable能夠經過一些operator來轉化成支持Backpressure的Observable。這些operator包括:
它們轉化成的Observable分別具備不一樣的Backpressure策略。
而在RxJava 2.x中,Observable再也不支持Backpressure,而是改用Flowable來專門支持Backpressure。上面提到的四種operator的前三種分別對應Flowable的三種Backpressure策略:
onBackpressureBuffer是不丟棄數據的處理方式。把上游收到的所有緩存下來,等下游來請求再發給下游。至關於一個水庫。但上游太快,水庫(buffer)就會溢出。
onBackpressureDrop和onBackpressureLatest比較相似,都會丟棄數據。這兩種策略至關於一種令牌機制(或者配額機制),下游經過request請求產生令牌(配額)給上游,上游接到多少令牌,就給下游發送多少數據。當令牌數消耗到0的時候,上游開始丟棄數據。但這兩種策略在令牌數爲0的時候有一點微妙的區別:onBackpressureDrop直接丟棄數據,不緩存任何數據;而onBackpressureLatest則緩存最新的一條數據,這樣當上遊接到新令牌的時候,它就先把緩存的上一條「最新」數據發送給下游。能夠結合下面兩幅圖來理解。
onBackpressureBlock是看下游有沒有需求,有需求就發給下游,下游沒有需求,不丟棄,但試圖堵住上游的入口(能不能真堵得住還得看上游的狀況了),本身並不緩存。這種策略已經廢棄不用。
本文重點在於以宏觀的角度來描述和對比RxJava中的Flow Control機制和Backpressure的各類機制,不少細節沒有涉及。好比,buffer和window除了能把一段時間內收到的數據打包,還能把固定數量的數據進行打包。再好比,onBackpressureDrop和onBackpressureLatest在一次收到下游多條數據的請求時分別會如何表現,本文沒有詳細說明。你們能夠查閱相應的API Reference來得到答案,也歡迎留言與我一塊兒討論。
(完)
其它精選文章: