『譯』Kotlin Flows 與 Reactive Streams —— 青出於藍的歷史

轉載請註明原文地址:juejin.im/editor/post…react

原文標題Reactive Streams and Kotlin Flowsgit

原文做者Roman Elizarovgithub

原文發佈於:2019-06-10編程

圖片來自於Darron Birgenheier

響應式擴展(簡稱爲 ReactiveX 或者 Rx)這一律念,由 Erik Meijer 首次在 .NET 中提出,並於 2010 公之於衆。它是一個新的異步數據流 API 的構建趨勢,即採用包含發射元素(onNext)、流執行完成(onCompleted)、出現錯誤(onError)的回調,同時引入了像 map 和 filter 等的流式處理操做符,讓流的數據處理變得和使用集合同樣容易。併發

基於觀察者的 Rx 提供了更優的性能比傳統基於迭代的數據處理 API。此外,Rx 主張使用「冷流」這一思想。它在那時是一個十分新穎的思想,由於在當時命令式語言處於業界主流,大多數的數據處理 API 都是「熱的」。在資源管理方面,「熱流」有不少弊端(一旦你打開它,你必須不要忘記關閉它),然而「冷流」提供了一個優雅的解決方案(可參考文章:Cold flows, hot channels)。異步

當下 Rx 不斷普及同時它被移植到了許多編程語言中,其中就包括 Java ,所以它就能夠運用在最大的編程環境 JVM 中了。在 2013 年,Rx 移植到 Java 被稱爲 RxJava。於 2014 年發佈了 1.0 穩定版本。編程語言

響應式流(Reactive Streams)

期間,在 JVM 平臺還有另外兩個項目 AkkaProject Reactor ,它們是致力於基於事件的異步系統,然而它們都面臨一個共同的控制流問題 —— 背壓。這樣就促成了一次合做,爲了提供一個標準的接口集合來解決 JVM 使用響應式數據流伴隨的背壓問題。它被稱爲「響應式流」 新方案,同時 Viktor Klang 發佈了一個重大的會談記錄下了它的歷史進程。響應式流的 1.0 版本於 2015 年發佈。異步編程

響應式流是一個使人欽佩的工程。它爲 JVM 世界帶來了支持背壓的異步事件流,不然它很難支持異步性。它是一個純基於庫的壯舉,引入大量的必須嚴格遵循的契約。本文再也不詳述這些,當你使用一些衆所周知的專家構建的操做符,它就能夠完美的運行,可是若是你本身去寫響應式流的操做符,要遵照全部的這些契約是一個巨大的挑戰。函數

Kotlin Flows

在 2018 年 Kotlin 編程語言發佈了協程 ,做爲一個專門針對異步編程的語言廣泛特徵。在 Kotlin 中,「掛起」(suspension)這個概念,是一個與生俱來的流控制方案。把它與基於觀察者的 Rx 的具備「冷流」思想相結合,就能夠深刻理解 Kotlin Flows 了。post

咱們致力於 Kotlin Flows 的一系列工做就是以實現一個簡單的設計爲目標。這個設計就是,僅僅只須要不多的基本構建單元,就能夠編寫你本身的操做符。好比:想要爲沒一個值都延遲一秒 ?沒問題,使用基本的 flow 構建者,以及 collect 函數,就能實現:

fun <T> Flow<T>.delayASecond() = flow {

    collect { value -> // collect from the original flow
        delay(1000) // delay 1 second
        emit(value) // emit value to the resulting flow
    }
}
複製代碼

你並不會看到顯示任何有關處理背壓的代碼,由於它已經自動的在幕後完成了,這一切都歸功於 Kotlin 編譯器提供了對「掛起」(suspension)的支持。

併發性結構

從無到有的設計 Kotlin Flows,使得咱們也有機會來減小以前響應式流有關的一些模板代碼。好比:當訂閱一個響應式流,你最終會持有一個 「訂閱」(Subscription)對象的引用,你必須很當心的管理它,以便你能夠取消這個訂閱,不然你可能有泄漏它的風險。這個問題和併發性結構正在解決的問題是很是相似的,而在 Flow 的設計中你不用擔憂這些,你並不會由於不當心致使泄漏一個訂閱。

Kotlin Flow 沒有任何訂閱的概念。掛起和輕量級的協程前來救援。Flow 的 collect 操做符最像一個訂閱,可是它僅僅是一個掛起的函數,歸功於併發性結構,只要不是濫用它,調用它是很難引起泄漏的。

collect 操做符是基於掛起的設計,所以再也不須要每次都單獨設置 onError 和 onCompleted 回調。(譯者注:能夠回想一下 Kotlin 的 suspend 函數,它的設計初衷就是不用回調,讓異步代碼能夠像同步代碼通常無二的編寫。)想要在流正常完成後執行一些操做 ?那麼就在 collect 正常完成後作就行了:

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
    // reemit all values from the original flow
    collect { value -> emit(value) }
    // this code runs only after the normal completion
    action()
}
複製代碼

前事不忘後事之師

在咱們設計期間,經過研究現有的響應式流應用代碼,給咱們帶來了極大的好處。例如,咱們看到涉及執行上下文切換都有共同的代碼模式(無處不在的 subcribeOn / observeOn),從而僅僅採用一個簡單的 flowOn 操做符,來完成了一樣的機制(詳情可參考:Execution context of Kotlin Flows)。

爲此,在覈心庫中,咱們還能夠奢侈的不去實現全部能夠想到的操做符。咱們僅僅挑選了最流行和最基本的來實現,由於 Kotlin 支持擴展函數(參見:Extension-oriented design),把它與 Flow 的簡化設計相結合,能夠很容易的建立自定義操做符,就如同使用內置的操做符通常無二。

集成

Kotlin Flows 仍然是響應式流的理念。儘管它們是基於掛起的並且沒有直接實現相關的接口,可是它這樣的設計,可讓它直截了當的與基於響應式流的系統集成起來。咱們提供了開箱便可用的 flow.asPublisher() 擴展函數來把 Flow 轉換爲響應式流的 Publisher 接口,以及 publisher.asFlow() 擴展來實現反向轉換。

相關文章
相關標籤/搜索