轉載請註明原文地址:juejin.im/editor/post…react
原文標題:Reactive Streams and Kotlin Flowsgit
原文做者:Roman Elizarovgithub
原文發佈於:2019-06-10編程
響應式擴展(簡稱爲 ReactiveX 或者 Rx)這一律念,由 Erik Meijer 首次在 .NET 中提出,並於 2010 公之於衆。它是一個新的異步數據流 API 的構建趨勢,即採用包含發射元素(onNext)、流執行完成(onCompleted)、出現錯誤(onError)的回調,同時引入了像 map 和 filter 等的流式處理操做符,讓流的數據處理變得和使用集合同樣容易。併發
基於觀察者的 Rx 提供了更優的性能比傳統基於迭代的數據處理 API。此外,Rx 主張使用「冷流」這一思想。它在那時是一個十分新穎的思想,由於在當時命令式語言處於業界主流,大多數的數據處理 API 都是「熱的」。在資源管理方面,「熱流」有不少弊端(一旦你打開它,你必須不要忘記關閉它),然而「冷流」提供了一個優雅的解決方案(可參考文章:Cold flows, hot channels)。異步
當下 Rx 不斷普及同時它被移植到了許多編程語言中,其中就包括 Java ,所以它就能夠運用在最大的編程環境 JVM 中了。在 2013 年,Rx 移植到 Java 被稱爲 RxJava。於 2014 年發佈了 1.0 穩定版本。編程語言
期間,在 JVM 平臺還有另外兩個項目 Akka 和 Project Reactor ,它們是致力於基於事件的異步系統,然而它們都面臨一個共同的控制流問題 —— 背壓。這樣就促成了一次合做,爲了提供一個標準的接口集合來解決 JVM 使用響應式數據流伴隨的背壓問題。它被稱爲「響應式流」 新方案,同時 Viktor Klang 發佈了一個重大的會談記錄下了它的歷史進程。響應式流的 1.0 版本於 2015 年發佈。異步編程
響應式流是一個使人欽佩的工程。它爲 JVM 世界帶來了支持背壓的異步事件流,不然它很難支持異步性。它是一個純基於庫的壯舉,引入大量的必須嚴格遵循的契約。本文再也不詳述這些,當你使用一些衆所周知的專家構建的操做符,它就能夠完美的運行,可是若是你本身去寫響應式流的操做符,要遵照全部的這些契約是一個巨大的挑戰。函數
在 2018 年 Kotlin 編程語言發佈了協程 ,做爲一個專門針對異步編程的語言廣泛特徵。在 Kotlin 中,「掛起」(suspension)這個概念,是一個與生俱來的流控制方案。把它與基於觀察者的 Rx 的具備「冷流」思想相結合,就能夠深刻理解 Kotlin Flows 了。post
咱們致力於 Kotlin Flows 的一系列工做就是以實現一個簡單的設計爲目標。這個設計就是,僅僅只須要不多的基本構建單元,就能夠編寫你本身的操做符。好比:想要爲沒一個值都延遲一秒 ?沒問題,使用基本的 flow 構建者,以及 collect 函數,就能實現:
fun <T> Flow<T>.delayASecond() = flow {
collect { value -> // collect from the original flow
delay(1000) // delay 1 second
emit(value) // emit value to the resulting flow
}
}
複製代碼
你並不會看到顯示任何有關處理背壓的代碼,由於它已經自動的在幕後完成了,這一切都歸功於 Kotlin 編譯器提供了對「掛起」(suspension)的支持。
從無到有的設計 Kotlin Flows,使得咱們也有機會來減小以前響應式流有關的一些模板代碼。好比:當訂閱一個響應式流,你最終會持有一個 「訂閱」(Subscription)對象的引用,你必須很當心的管理它,以便你能夠取消這個訂閱,不然你可能有泄漏它的風險。這個問題和併發性結構正在解決的問題是很是相似的,而在 Flow 的設計中你不用擔憂這些,你並不會由於不當心致使泄漏一個訂閱。
Kotlin Flow 沒有任何訂閱的概念。掛起和輕量級的協程前來救援。Flow 的 collect 操做符最像一個訂閱,可是它僅僅是一個掛起的函數,歸功於併發性結構,只要不是濫用它,調用它是很難引起泄漏的。
collect 操做符是基於掛起的設計,所以再也不須要每次都單獨設置 onError 和 onCompleted 回調。(譯者注:能夠回想一下 Kotlin 的 suspend 函數,它的設計初衷就是不用回調,讓異步代碼能夠像同步代碼通常無二的編寫。)想要在流正常完成後執行一些操做 ?那麼就在 collect 正常完成後作就行了:
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
// reemit all values from the original flow
collect { value -> emit(value) }
// this code runs only after the normal completion
action()
}
複製代碼
在咱們設計期間,經過研究現有的響應式流應用代碼,給咱們帶來了極大的好處。例如,咱們看到涉及執行上下文切換都有共同的代碼模式(無處不在的 subcribeOn / observeOn),從而僅僅採用一個簡單的 flowOn 操做符,來完成了一樣的機制(詳情可參考:Execution context of Kotlin Flows)。
爲此,在覈心庫中,咱們還能夠奢侈的不去實現全部能夠想到的操做符。咱們僅僅挑選了最流行和最基本的來實現,由於 Kotlin 支持擴展函數(參見:Extension-oriented design),把它與 Flow 的簡化設計相結合,能夠很容易的建立自定義操做符,就如同使用內置的操做符通常無二。
Kotlin Flows 仍然是響應式流的理念。儘管它們是基於掛起的並且沒有直接實現相關的接口,可是它這樣的設計,可讓它直截了當的與基於響應式流的系統集成起來。咱們提供了開箱便可用的 flow.asPublisher() 擴展函數來把 Flow 轉換爲響應式流的 Publisher 接口,以及 publisher.asFlow() 擴展來實現反向轉換。