原文:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part 4 - Concurrency/4. Backpressure.mdjava
Rx將事件從管道的一端引導到另外一端,在每一端發生的行動可能很是不一樣。當生產者和消費者須要不一樣的時間來處理值時會發生什麼?在同步模型中,這個問題不是問題。請考慮如下示例:react
輸出:ios
在這裏,生產者已經準備好了它的值,而且能夠絕不拖延地發出它們。相比之下,消費者很是緩慢,但這不會引發問題,由於上述代碼的同步性質會自動調節生產者和消費者的比率。當o.onNext(1)被調用,生產者的執行被阻止,直到整個Rx鏈完成。只有當該表達式返回時,執行才能進入o.onNext(2)。git
這相似於僅用於同步執行。生產者和消費者一般是異步的。那麼,當生產者和消費者以不一樣的速度異步操做時會發生什麼?github
咱們首先考慮傳統的基於拉的模型,例如迭代器iterator。在基於拉取的模型中,消費者請求值。若是生產者較慢,消費者將根據請求阻止並在下一個值到達時恢復。若是生產者更快,那麼生產者將有空閒時間等待消費者請求下一個值。緩存
Rx基於推送,而不是基於拉。在Rx中,生產者在值準備好時將值推送給消費者。若是生產者較慢,那麼消費者將有空閒時間等待下一個值到達。若是生產者更快,沒有任何規定,它將保持強制餵養數據給消費者,而不知道消費者的困難。安全
輸出:服務器
在這裏,MissingBackpressureException讓咱們知道生產者太快了,咱們連接在一塊兒的運算符沒法處理它。異步
咱們在前幾章中看到的一些operators能夠幫助消費者減輕因輸入過多而帶來的壓力。測試
(/Part 3 - Taming the sequence/5. Time-shifted sequences.md#sample) 運算符天然容許您指定最大輸入速率,而不會留下任何多餘的數據。
demo:
輸出:
相似的操做能夠服務於同一目的。
您可使用buffer和window在消費者忙碌時收集溢出的數據,而不是對數據進行採樣。若是批量處理項目更快,這將很是有用。或者,您能夠檢查緩衝區以手動肯定要處理的緩衝項的數量。
在咱們以前看到的示例中,消費者以幾乎相同的速度處理單個項目和批量。在這裏,咱們放慢了生產者的速度,使批次符合一條線,但原則保持不變。
輸出:
上述補救措施是解決問題的合法方法。然而,它們並不老是處理過分生產的可觀察事件的最佳方式。有時,問題能夠在生產者方面獲得更好的處理。背壓是管道抵抗值排放的一種方式。
背壓是指與諸如管道的受限位置中的所需流體流相對的壓力。它一般是因爲其移動的限制容器中的障礙物或彎曲彎曲引發的,例如管道或通風口。
- Wikipedia
RxJava已經爲subscriber實施了一種調節可觀察率的方法。訂閱者有一個request(n)方法,經過它能夠通知觀察者它已準備好接受更多的值。經過在訂閱服務器的onStart方法上調用請求,能夠創建reactive pull backpressure。這不是pull模型意義上的拉:它不會返回任何值,而且若是值未準備就不會阻止。相反,它只是通知可觀察訂閱者準備接受多少值並保留其他值。對請求的後續調用將容許更多值。
這是一個一次獲取一個值的訂閱者:
onStart中的request(1)創建背壓並通知可觀察到它應該只發出第一個值。在處理onNext中的值以後,咱們請求發送下一個項目,若是它可用的話。呼叫請求(Long.MAX_VALUE)禁用背壓。
回到咱們討論反作用的doOn_ 運算符時,咱們省略了doOnRequested。
當subscriber請求更多項目時,會發生doOnRequested元事件。提供給操做的值是請求的項目數。
咱們在這裏作了一個例外,由於它使咱們可以窺視穩定的背壓功能,不然這些功能是隱藏的。讓咱們看看在一個簡單的observable中發生了什麼:
輸出:
咱們看到subscribe從頭開始請求最大數量的items。這意味着訂閱根本不抵制values。若是咱們提供實施背壓的訂戶,訂閱將僅使用背壓。如下是訂戶的完整示例,它容許咱們從外部控制背壓。
除非咱們使用requestMore手動執行此操做,不然此簡單實現不會請求值。
輸出:
在內部使用隊列和緩衝區的Rx運算符應使用背壓來避免存儲無限量的值。大規模緩衝應留給明確用於此目的的運算符,例如緩存,緩衝區等。須要緩衝項的運算符的示例是zip:第一個observable可能在第二個observable發出以前發出兩個或更多值下一個價值。即便假設兩個序列具備相同的頻率,也指望這種小的不對稱性。須要緩衝一些項目不該該致使操做員失敗。出於這個緣由,zip有一個128項的小緩衝區。
輸出:
zip操做符首先請求足夠的項來填充其緩衝區,並在消耗它們時請求更多。zip請求的項目數量的詳細信息並不有趣。讀者應該帶走的是,不管開發人員是否請求,Rx中都存在一些緩衝和背壓。這爲Rx管道提供了一些靈活性,在這種狀況下你可能沒有。它可能會讓你認爲你的代碼是可靠的,經過默默地保存小測試失敗,可是在你明確聲明關於背壓的行爲以前你就不安全了。
許多Rx操做在內部使用背壓來避免過分填充其內部隊列。這樣,慢速消費者的問題在運算符鏈中向後傳播:若是運算符中止接受值,則前一個運算符將填充其緩衝區,直到它也中止接受值,依此類推。背壓不會使問題消失。它只是將它移動到能夠更好地處理的地方。咱們仍然須要決定如何處理過分生產的可觀察量的值。
有些Rx運算符聲明瞭如何處理訂閱者沒法接受正在發出的值的狀況。
onBackpressureBuffer運算符會將沒法使用的每一個值都被存儲,直到觀察者可使用它。
您能夠擁有無限大小的緩衝區或具備最大容量的緩衝區。若是緩衝區溢出,則observable將失敗。
輸出:
這裏發生的是生產者比消費者快100倍。咱們嘗試經過緩衝多達1000個項目來解決這個問題。很容易計算出,當消費者消費第11個項目時,生產者已經生產了1100個項目,遠遠超過了咱們的緩衝區容量。而後序列失敗,由於它沒法處理背壓。
若是沒法接收項目,onBackpressureDrop運算符將丟棄這些項目。
咱們在這裏看到的是前128個項目正常消耗,但後來咱們跳了起來。介於二者之間的項目被onBackPressureDrop刪除。即便咱們沒有請求它,前128個項目仍然是緩衝的,由於observeOn在切換線程之間使用一個小緩衝區。
原文:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part 4 - Concurrency/4. Backpressure.md
未完待續!
git:https://github.com/woshiyexinjie/rxjava-leaner