最全面的Kotlin協程: Coroutine/Channel/Flow 以及實際應用

協程這個概念在1958年就開始出現, 比線程更早, 目前不少語言開始原生支, Java沒有原生協程可是能夠大型公司都本身或者使用第三方庫來支持協程編程, 可是Kotlin原生支持協程.java

我認爲協程的核心就是一個詞: 做用域, 理解什麼是做用域就理解協程了android

什麼是協程git

協程是協做式任務, 線程是搶佔式任務, 本質上二者都屬於併發github

Kotlin協程就是線程庫不是協程? 內部代碼用的線程池?編程

  1. 最知名的協程語言Go內部也是維護了線程, 他也不是協程了?
  2. 協程只是方便開發者處理異步, 線程才能提高性能效率, 二者自己不是替換關係沒有說用了誰就不用另外一個了
  3. 協程是一種概念, 無關乎具體實現方式
  4. kotlin標準庫中的協程不包含線程池代碼, 僅擴展庫才內部處理了線程池

協程設計來源後端

  1. Kotlin的協程完美復刻了谷歌的Go語言的協程設計模式(做用域/channel/select), 將做用域用對象來具化出來; 且能夠更好地控制做用域生命週期;
  2. await模式(JavaScript的異步任務解決方案)
  3. Kotlin參考RxJava響應式框架創造出Flow
  4. 使用協程開始就不須要考慮線程的問題, 只須要在不一樣場景使用不一樣的調度器(調度器會對特定任務進行優化)就好

特性

使用場景設計模式

假設首頁存在七個接口網絡請求(後端人員處理差)的狀況一個個使用串行網絡請求的時間比並髮網絡請求慢了接近七倍緩存

目前計算機都是經過多核CPU提高計算能力, 因此熟練掌握併發編程是將來的趨勢安全

協程優點bash

  1. 併發實現方便
  2. 沒有回調嵌套發生, 代碼結構清晰
  3. 建立協程性能開銷優於建立線程, 一個線程能夠運行多個協程, 單線程便可異步

實驗特性

協程在Kotlin1.3時候放出正式版本, 目前仍然存在不穩定函數(不影響項目開發), 經過註解標識

@FlowPreview 表明可能之後存在Api函數變更

@ExperimentalCoroutinesApi  表明目前可能存在不穩定的因素的函數

@ObsoleteCoroutinesApi 可能存在被廢棄的可能
複製代碼

構成

Kotlin的協程主要構成分爲三部分

  1. CoroutineScope 協程做用域: 每一個協程體都存在一個做用域, 異步仍是同步由該做用域決定
  2. Channel 通道: 數據如同一個通道進行發送和接收, 能夠在協程之間互相傳遞數據或者控制阻塞和繼續
  3. Flow 響應流: 相似RxJava等結構寫法

爲自動化/併發網絡請求我建立一個庫, 我姑且稱爲Android最強的網絡請求庫: Net

1.0+版本爲RxJava實現, 2.0+版本爲Coroutine實現, 同時包含更強的輪循器用於替代RxJava的輪循功能

由於須要拋棄RxJava, 取代RxBus事件分發我使用協程建立出一個更增強大的: Channel

咱們公司項目屬於 MVVM + Kotlin + Coroutine + JetPack, 在國外很經常使用, 主要帶來的優點;

  1. 簡潔, 減小70%左右代碼
  2. 雙向數據綁定
  3. 併發異步任務(網絡)倍增速度
  4. 更健壯的數據保存和恢復

我平時項目開發必備框架

框架 描述
Net Android不是最強網絡請求/異步任務庫
BRV Android不是最強列表
Serialize 建立自動保存和恢復的字段
StateLayout Android不是最強缺省頁
LogCat JSON和長文本日誌打印工具
Tooltip 完善的吐司工具
DebugKit 開發調試窗口工具
StatusBar 一行代碼建立透明狀態欄
Channel 基於協程和JetPack特性的事件分發框架

展望

協程對於後端高併發優點很大, 至於Google的Jetpack基本上都有針對協程擴展, 最明顯的是併發網絡請求速度倍增; 同時代碼更加結構清晰, 本文章後續會根據Kotlin的版本中的協程迭代進行更新

依賴

這裏咱們使用協程擴展庫, kotlin標準庫的協程太過於簡陋不適用於開發者使用

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9"
複製代碼

建立協程

開啓主協程的分爲三種方式

生命週期和App一致, 沒法取消(不存在Job), 不存在線程阻塞

fun main() {
    GlobalScope.launch { // 在後臺啓動一個新的協程並繼續
        delay(1000L)
        println("World!")
    }
    Thread.sleep(2000) // 防止JVM虛擬機退出
}
複製代碼

這裏說的是GlobalScope沒有Job, 可是啓動的launch都是擁有Job的. GlobalScope自己就是一個做用域, launch屬於其子做用域;

不存在線程阻塞, 能夠取消, 能夠經過CoroutineContext控制協程生命週期

fun main() {
    CoroutineScope(Dispatchers.IO).launch {
    }
    Thread.sleep(1000)
}
複製代碼

線程阻塞, 適用於單元測試, 不須要延遲阻塞防止JVM虛擬機退出. runBlocking屬於全局函數能夠在任意地方調用

通常咱們在項目中是不會使用runBlocking, 由於阻塞主線程沒有開啓的任何意義

fun main() = runBlocking { 
    // 阻塞線程直到協程做用域內部全部協程執行完畢
}
複製代碼

建立做用域

協程內部還可使用函數建立其餘協程做用域, 分爲兩種建立函數:

  1. CoroutineScope的擴展函數, 只有在做用域內部才能建立其餘的做用域
  2. suspend修飾的函數內部
  3. 協程永遠會等待其內部做用域內全部協程都執行完畢後纔會關閉協程

在主協程內還能夠建立子協程做用域, 建立函數分爲兩種

  1. 阻塞做用域(串行): 會阻塞當前做用域

  2. 掛起做用域(併發): 不會阻塞當前做用域

同步做用域函數

都屬於suspend函數

  • withContext 能夠切換調度器, 有返回結果
  • coroutineScope 建立一個協程做用域, 該做用域會阻塞當前所在做用域而且等待其子協程執行完纔會恢復, 有返回結果
  • supervisorScope 使用SupervisorJob的coroutineScope, 異常不會取消父協程
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T
// 返回結果; 能夠和當前協程的父協程存在交互關係, 主要做用爲來回切換調度器

public suspend inline operator fun <T> CoroutineDispatcher.invoke( noinline block: suspend CoroutineScope.() -> T ): T = withContext(this, block)
// withContext工具函數而已

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R

public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
複製代碼

異步做用域函數

這兩個函數都不屬於suspend, 只須要CoroutineScope就能夠調用

  • launch: 異步併發, 沒有返回結果
  • async: 異步併發, 有返回結果
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job

public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T>
複製代碼

併發

同一個協程做用域中的異步任務遵照順序原則開始執行; 適用於串行網絡請求, 在一個異步任務須要上個異步任務的結果時.

協程掛起須要時間, 因此異步協程永遠比同步代碼執行慢

fun main() = runBlocking<Unit> {
    launch {
        System.err.println("(Main.kt:34) 後執行")
    }

    System.err.println("(Main.kt:37) 先執行")
}
複製代碼

當在協程做用域中使用async函數時能夠建立併發任務

public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T>
複製代碼

示例

fun main() = runBlocking<Unit> {
	val name = async { getName() }
    val title = async { getTitle() }

    System.err.println("(Main.kt:35) result = ${name.await() + title.await()}")
    delay(2000)
}
複製代碼
  1. 返回對象Deferred; 經過函數await獲取結果值
  2. Deferred集合還可使用awaitAll()等待所有完成
  3. 不執行await任務也會等待執行完協程關閉
  4. 若是Deferred不執行await函數則async內部拋出的異常不會被logCat或tryCatch捕獲, 可是依然會致使做用域取消和異常崩潰; 但當執 行await時異常信息會從新拋出

惰性併發

將async函數中的start設置爲CoroutineStart.LAZY時則只有調用Deferred對象的await時纔會開始執行異步任務(或者執行start函數)

啓動模式

  1. DEFAULT 當即執行
  2. LAZY 直到Job執行start或者join纔開始執行
  3. ATOMIC 在做用域開始執行以前沒法取消
  4. UNDISPATCHED 不執行任何調度器, 直接在當前線程中執行, 可是會根據第一個掛起函數的調度器切換

異常

協程中發生異常, 則父協程取消而且父協程其餘的子協程一樣所有取消

Deferred

繼承自Job

提供一個全局函數用於建立CompletableDeferred對象, 該對象能夠實現自定義Deferred功能

public suspend fun await(): T 
// 結果
public val onAwait: SelectClause1<T>
// 在select中使用

public fun getCompleted(): T
// 若是完成[isCompleted]則返回結果, 不然拋出異常
public fun getCompletionExceptionOrNull(): Throwable?
// 若是完成[isCompleted]則返回結果, 不然拋出異常
複製代碼

示例

fun main() = runBlocking<Unit> {
    val deferred = CompletableDeferred<Int>()
    
    launch {
        delay(1000 )
        deferred.complete(23)
    }

    System.err.println("(Demo.kt:72) 結果 = ${deferred.await()}")
}
複製代碼

建立CompletableDeferred的頂層函數

public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T>
public fun <T> CompletableDeferred(value: T): CompletableDeferred<T>
複製代碼

CompletableDeferred函數

public fun complete(value: T): Boolean
// 結果

public fun completeExceptionally(exception: Throwable): Boolean
// 拋出異常, 異常發生在`await()`時

public fun <T> CompletableDeferred<T>.completeWith(result: Result<T>): Boolean
// 能夠經過標記來判斷是否成功, 避免異常拋出
複製代碼

CoroutineScope

建立此對象表示建立一個協程做用域

結構化併發

若是你看協程的教程可能會常常看到這個詞, 這就是做用域內部開啓新的協程; 父協程會限制子協程的生命週期, 子協程承接父協程的上下文, 這種層級關係就是結構化併發

在一個協程做用域裏面開啓多個子協程進行併發行爲

CoroutineContext

協程上下文, 我認爲協程上下文能夠看作包含協程基本信息的一個Context(上下文), 其能夠決定協程的名稱或者運行

建立一個新的調度器

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
複製代碼

建立新的調度器比較消耗資源, 建議複用且當不須要的時候使用close函數釋放

調度器

Dispatchers繼承自CoroutineContext, 該枚舉擁有三個實現; 表示不一樣的線程調度; 當函數不使用調度器時承接當前做用域的調度器

  1. Dispatchers.Unconfined 不指定線程, 若是子協程切換線程那麼接下來的代碼也運行在該線程上
  2. Dispatchers.IO 適用於IO讀寫
  3. Dispatchers.Main 根據平臺不一樣而有所差, Android上爲主線程
  4. Dispatchers.Default 默認調度器, 在線程池中執行協程體, 適用於計算操做

當即執行

Dispatchers.Main.immediate
複製代碼

immediate屬於全部調度器都有的屬性, 該屬性表明着若是當前正處於該調度器中不執行調度器切換直接執行, 能夠理解爲在同一調度器內屬於同步協程做用域

例如launch函數開啓做用域會比後續代碼執行順序低, 可是使用該屬性協程屬於順序執行

示例

CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
	// 執行順序 1
}

// 執行順序 2

CoroutineScope(Job() + Dispatchers.Main).launch {
		// 執行順序 4
}

// 執行順序 3
複製代碼

協程命名

經過建立一個CoroutineName對象, 在構造函數中指定參數爲協程名稱, CoroutineName繼承自CoroutineContext.

launch(CoroutineName("吳彥祖")){

}
複製代碼

協程上下文名稱用於方便調試使用

協程掛起

yield函數可讓當前協程暫時掛起執行其餘協程體, 若是沒有其餘正在併發的協程體則繼續執行當前協程體(至關於無效調用)

public suspend fun yield(): Unit
複製代碼

看協程中可能常常說起掛起, 掛起能夠理解爲這段代碼(做用域)暫停, 而後執行後續代碼; 掛起函數通常表示suspend關鍵字修飾的函數, suspend要求只容許在suspend修飾的函數內部調用, 可是自己這個關鍵字是沒作任何事的. 只是爲了限制開發者隨意調用

掛起函數調用會在左側行號列顯示箭頭圖標

image-20200106120117080

JOB

在協程中Job一般被稱爲做業, 表示一個協程工做任務, 他一樣繼承自CoroutineContext

val job = launch {

}
複製代碼

Job屬於接口

interface Job : CoroutineContext.Element
複製代碼

函數

public suspend fun join()
// 等待協程執行完畢都阻塞當前線程
public val onJoin: SelectClause0
// 後面說起的選擇器中使用

public fun cancel(cause: CancellationException? = null)
// 取消協程
public suspend fun Job.cancelAndJoin()
// 阻塞而且在協程結束之後取消協程

public fun start(): Boolean
public val children: Sequence<Job>
// 所有子做業

public fun getCancellationException(): CancellationException

public fun invokeOnCompletion( onCancelling: Boolean = false, invokeImmediately: Boolean = true, handler: CompletionHandler): DisposableHandle
// p1: 當爲true表示cancel不會回調handler
// p2: 當爲true則先執行[handler]而後再返回[DisposableHandle], 爲false則先返回[DisposableHandle]

public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
// 當其做用域完成之後執行, 主協程指定纔有效, 直接給CoroutineScope指定時無效的
// 手動拋出CancellationException一樣會賦值給cause
複製代碼

狀態

經過字段能夠獲取JOB當前處於狀態

public val isActive: Boolean
public val isCancelled: Boolean
public val isCompleted: Boolean
複製代碼

擴展函數

public fun Job.cancelChildren(cause: CancellationException? = null)

public suspend fun Job.cancelAndJoin()
複製代碼

每一個協程做用域都存在coroutineContext. 而協程上下文中都存在Job對象

coroutineContext[Job]
複製代碼

結束協程

若是協程做用域內存在計算任務(一直打日誌也算)則沒法被取消, 若是使用delay函數則能夠被取消;

fun main() = runBlocking<Unit> {

  val job = launch(Dispatchers.Default) {
    while (true){
      delay(100) // 這行代碼存在則能夠成功取消協程, 不存在則沒法取消
      System.err.println("(Main.kt:30) ")
    }
  }
  
  delay(500)
  job.cancel() 
  System.err.println("(Main.kt:42) 結束")
}
複製代碼

經過使用協程內部isActive屬性來判斷是否應該結束

fun main() = runBlocking<Unit> {

    val job = launch(Dispatchers.Default) {
        while (isActive) { // 一旦協程被取消則爲false
            System.err.println("(Main.kt:30) ")
        }
    }

    delay(500)
    job.cancel()
    System.err.println("(Main.kt:42) 結束")
}
複製代碼

釋放資源

協程存在被手動取消的狀況, 可是有些資源須要在協程取消的時候釋放資源, 這個操做能夠在finally中執行

不管如何finally都會被執行

fun main() = runBlocking<Unit> {

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        try {
            repeat(1000){
                System.err.println("(Main.kt:31) it = $it")
                delay(500)
            }
        } finally {
           // 已被取消的協程沒法繼續掛起
        }
    }
    delay(1500)
    job.cancel()
    System.err.println("(Main.kt:42) ")
}
複製代碼

再次開啓協程

經過withContextNonCancellable能夠在已被取消的協程中繼續掛起協程; 這種用法其實能夠看作建立一個沒法取消的任務

withContext(NonCancellable) {
    println("job: I'm running finally")
    delay(1000L)
    println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
複製代碼

上下文組合

協程做用域能夠接收多個CoroutineContext做爲上下文參數; CoroutineContext自己屬於接口, 不少上下文相關的類都實現與他

配置多個CoroutineContext能夠經過+符號同時指定多個協程上下文, 每一個實現對象可能包含一部分信息能夠存在覆蓋行爲故相加時的順序存在覆蓋行爲

val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) {
  delay(1000)
  System.err.println("(Main.kt:51) ${Thread.currentThread()}")
}
複製代碼
launch(Dispatchers.IO + CoroutineName("吳彥祖")){	}
複製代碼

協程局部變量

使用ThreadLocal能夠獲取線程的局部變量, 可是要求使用擴展函數asContextElement轉爲協程上下文做爲參數傳入在建立協程的時候

該局部變量做用於持有該協程上下文的協程做用域內

public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> =
    ThreadLocalElement(value, this)
複製代碼

超時

public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T
// 超過指定時間timeMillis自動結束協程; 
// 當沒有超時時返回值獲取而且繼續執行協程; 
// 當超時會拋出異常TimeoutCancellationException, 可是不會致使程序結束

public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T?
// 若是超時不會結束協程而是返回null
複製代碼

沒法手動拋出TimeoutCancellationException, 由於其構造函數私有

全局協程做用域

全局協程做用域屬於單例對象, 整個JVM虛擬機只有一份實例對象; 他的壽命週期也跟隨JVM. 使用全局協程做用域的時候注意避免內存泄漏

public object GlobalScope : CoroutineScope {
    /** * Returns [EmptyCoroutineContext]; */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
複製代碼

全局協程做用域不繼承父協程做用域的上下文, 因此也不會由於父協程被取消而自身被取消

啓動模式

  • DEFAULT 當即執行協程體
  • ATOMIC 當即執行協程體,但在開始執行協程以前沒法取消協程
  • UNDISPATCHED 當即在當前線程執行協程體,第一個掛起函數執行在函數所在線程, 後面執行在函數指定線程
  • LAZY 手動執行startjoin纔會執行協程

協程取消

協程體若是已經執行實際上屬於不可取消的, 在協程體中經過isActive來判斷協程是否處於活躍中

經過取消函數的參數指定異常CancellationException能夠自定義異常對象

不可取消的協程做用域

NonCancellable該單例對象用於withContext函數建立一個沒法被取消的協程做用域

withContext(NonCancellable) {
  delay(2000)
}
複製代碼

示例

fun main() = runBlocking {
  launch {
    delay(1000)
    System.err.println("(Main.kt:19) ")
  }

  launch {
    withContext(NonCancellable) {
      delay(2000)
      System.err.println("(Main.kt:26) ")
    }
  }

  delay(500) // 防止launch還未開啓withContext就被取消
  cancel()
}
複製代碼
  1. 當子做用域內包含沒有終止的任務, 將等待任務完成後纔會取消(delay不存在, Thread.sleep能夠模擬未結束的任務)
  2. 拋出CancellationException視做結束異常, invokeOnCompletion也會執行(其中包含異常對象), 可是其餘異常將不會執行invokeOnCompletion

取消GlobalScope

GlobalScope屬於全局協程, 由他開啓的協程都不擁有Job, 因此沒法取消協程. 可是能夠經過給GlobalScope開啓的協程做用域指定Job而後就可使用Job取消協程

協程異常

經過CoroutineExceptionHandler函數能夠建立一個同名的對象, 該接口繼承自CoroutineContext, 一樣經過制定上下文參數傳遞給全局協程做用域使用, 看成用域拋出異常時會被該對象的回調函數接收到, 而且不會拋出異常

  1. CoroutineExceptionHandler 只有做爲最外層的父協程上下文才有效, 由於異常會層層上拋, 除非配合SupervisorJob監督做業禁止異常上拋, 子做用域的異常處理器才能捕獲到異常

  2. CoroutineExceptionHandler異常處理器並不能阻止協程做用域取消, 只是監聽到協程的異常信息避免JVM拋出異常退出程序而已

  3. 只要發生異常就會致使父協程和其全部子協程都被取消, 這種屬於雙向的異常取消機制, 後面提到的監督做業(SupervisorJob)屬於單向向下傳遞(即不會向上拋出)

  4. CoroutineExceptionHandler會被做用域一直做爲協程上下文向下傳遞給子做用域(除非子做用域單獨指定)

(以下示例)不要嘗試使用try/catch捕捉launch做用域的異常, 沒法被捕捉.

try {
  launch {
    throw NullPointerException()
  }
} catch (e: Exception) {
  e.printStackTrace()
}
複製代碼

後面專門介紹如何捕獲協程異常避免拋出.

協程取消異常

取消協程的做業(Job)會引起異常, 可是會被默認的異常處理器給忽略, 可是咱們能夠經過捕捉能夠看到異常信息

fun main() = runBlocking<Unit> {
  val job = GlobalScope.launch {

    try {
      delay(1000)
    } catch (e: Exception) {
      e.printStackTrace()
    }
  }

  job.cancel(CancellationException("自定義一個用於取消協程的異常"))
  delay(2000)
}
複製代碼

Job取消函數

public fun cancel(cause: CancellationException? = null)
複製代碼
  • cause: 參數不傳默認爲JobCancellationException

全局協程做用域的異常處理

val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
       System.err.println("(Main.kt:41):main coroutineContext = $coroutineContext, throwable = $throwable")
}

GlobalScope.launch(exceptionHandler) { 

}
複製代碼

子協程設置異常處理器是無效的, 即便設置了錯誤依然會拋到父協程從而而沒有意義. 除非同時使用異常處理器+監督做業(SupervisorJob), 這樣就是讓子協程的錯誤不向上拋(後面詳解監督做業), 從而被其內部的異常處理器來處理.

異常聚合和解包

全局協程做用域也存在嵌套子父級關係, 故異常可能也會依次拋出多個異常

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
				// 第三, 這裏的異常是第一個被拋出的異常對象
        println("捕捉的異常: $exception 和被嵌套的異常: ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE)
            } finally { // 當父協程被取消時其全部子協程都被取消, finally被取消以前或者完成任務以後必定會執行
                throw ArithmeticException() // 第二, 再次拋出異常, 異常被聚合
            }
        }
        launch {
            delay(100)
            throw IOException() // 第一, 這裏拋出異常將致使父協程被取消
        }
        delay(Long.MAX_VALUE)
    }
    job.join() // 避免GlobalScope做用域沒有執行完畢JVM虛擬機就退出
}
複製代碼

監督做業

通常狀況子協程發生異常會致使父協程被取消, 同時父協程發生異常會取消全部的子協程; 可是有時候子協程發生異常咱們並不但願父協程也被取消, 而是僅僅全部子協程取消(僅向下傳遞異常), 這個使用就是用SupervisorJob做業

建立監督做業對象

fun main() = runBlocking<Unit> {
    CoroutineScope(coroutineContext).launch {
        
        launch(SupervisorJob(coroutineContext[Job]) + CoroutineExceptionHandler { _, _ ->  }) {
            throw NullPointerException()
        }

        delay(500)
        println("( Process.kt:13 ) ")
    }

    println("( Process.kt:16 ) finish")
}
複製代碼
  1. 必須添加CoroutineExceptionHandler處理異常, 不然異常依然會向上傳遞取消父協程

  2. 直接建立 SupervisorJob() 對象傳入做用域中會致使該做用域和父協程生命週期不統一的問題, 即父協程取消之後該子協程依然處於活躍狀態, 故須要指定參數爲coroutineContext[Job]即傳入父協程的做業對象

  3. SupervisorJob僅能捕捉內部協程做用域的異常, 沒法直接捕捉內部協程

    supervisorScope {
        // throw NoSuchFieldException() 拋出崩潰
        
        launch {
             throw NoSuchFieldException() // 不會拋出
        }
    }
    複製代碼

監督做業在withContextasync中添加無效

直接建立一個異常向下傳遞監督做業的做用域

public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R
複製代碼
  • 該函數屬於阻塞
  • 具有返回值
  • supervisorScope函數使用的依然是當前做用域的Job, 因此跟隨當前做用域生命週期, 能夠被取消
fun main() = runBlocking<Unit> {
    CoroutineScope(coroutineContext).launch {
        
      // 在該做用域內只要設置CoroutineExceptionHandler都僅會向下傳遞
        supervisorScope {
            launch(CoroutineExceptionHandler { _, _ ->  }) {
                throw NullPointerException()
            }

            launch {
                delay(1000) // 即便上面的launch拋出異常也會繼續執行這裏
                println("( Process.kt:18 ) ")
            }
        }
    }

    println("( Process.kt:16 ) finish")
}
複製代碼

捕獲異常

在做用域中的異常捕獲和通常的異常捕獲有所區別

  • CoroutineExceptionHandler能夠捕獲全部子做用域內異常
  • async可使用監督做業能夠捕獲內部發生的異常, 可是其await要求trycatch
  • launch要求監督做業配合異常處理器同時使用, 缺一不可
  • withContext/supervisorScope/coroutineScope/select能夠trycatch捕獲異常

原始協程

函數 回調字段 描述
suspendCoroutine Continuation Result
suspendCancellableCoroutine CancellableContinuation 可取消
suspendAtomicCancellableCoroutine CancellableContinuation 可取消

[Continuation]

public val context: CoroutineContext public fun resumeWith(result: Result<T>) 複製代碼

[CancellableContinuation] -| Continuation

public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean

public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit)
public fun tryResume(value: T, idempotent: Any? = null): Any?
public fun tryResumeWithException(exception: Throwable): Any?
public fun completeResume(token: Any)

public fun cancel(cause: Throwable? = null): Boolean

public fun invokeOnCancellation(handler: CompletionHandler)
public fun CoroutineDispatcher.resumeUndispatched(value: T)
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
複製代碼

線程不安全

解決線程不安全問題

  1. 互斥鎖
  2. 切換線程實現單線程
  3. Channel

互斥

至關於Java中的Lock替代品: Mutex

建立互斥對象

public fun Mutex(locked: Boolean = false): Mutex
// p: 設置初始狀態, 是否當即上鎖
複製代碼

使用擴展函數能夠自動加鎖和解鎖

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
// owner: 鑰匙
複製代碼

函數

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
public fun holdsLock(owner: Any): Boolean
// owner是否被用於鎖
public fun tryLock(owner: Any? = null): Boolean
// 使用owner來上鎖, 若是owner已上鎖則返回false
複製代碼

Channel

  1. 多個做用域能夠經過一個Channel對象來進行數據的發送和接收
  2. Channel設計參考Go語言的chan設計, 可用於控制做用域的阻塞和繼續(經過配合select)

Channel屬於接口沒法直接建立, 咱們須要經過函數Channel()來建立其實現類

源碼

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
  when (capacity) {
    RENDEZVOUS -> RendezvousChannel() // 無緩存
    UNLIMITED -> LinkedListChannel() // 無限制
    CONFLATED -> ConflatedChannel()  // 合併
    BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) // 64
    else -> ArrayChannel(capacity) // 指定緩存大小
  }
複製代碼
  • capacity

    緩衝大小, 默認0
    當Channel發送一條數據時就會掛起通道(不繼續執行發送後續代碼), 只有在接收這條數據時纔會解除掛起繼續執行; 可是咱們能夠設置緩存大小
    複製代碼

通道容許被遍歷獲取當前發送數據

val channel = Channel<Int>()

for (c in channel){

}
複製代碼
public suspend fun yield(): Unit
複製代碼

Channel

Channel接口同時實現發送渠道(SendChannel)和接收渠道(ReceiveChannel)兩個接口, 因此既能發送又能接收數據

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 
複製代碼

SendChannel

public val isClosedForSend: Boolean
// 是否關閉
public fun close(cause: Throwable? = null): Boolean
// 關閉發送通道

public fun offer(element: E): Boolean
// 推薦使用send函數
public suspend fun send(element: E)
// 發送消息

public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
// 當通道關閉時執行回調

public val onSend: SelectClause2<E, SendChannel<E>>
// 當即發送數據(若是容許), 在select中使用
複製代碼
  • 發送通道關閉後不能繼續使用ReceiveChannel接收數據, 會致使ClosedReceiveChannelException拋出

ReceiveChannel

public val isClosedForReceive: Boolean
// SendChannel是否已經關閉通道, 若是關閉通道之後還存在緩存則會接收完緩存以後返回false

public val isEmpty: Boolean // 通道是否爲空

public fun poll(): E? // 推薦使用receive函數
public suspend fun receive(): E
// 接受通道事件

public val onReceive: SelectClause1<E> // 若是通道關閉, 拋出異常
public val onReceiveOrNull: SelectClause1<E?> // 廢棄函數, 若是通道關閉返回null
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
// 在select中使用的監聽器, 推薦使用第三個函數

public suspend fun receiveOrClosed(): ValueOrClosed<E>
// `ValueOrClosed`對象能夠判斷通道是否已關閉

public fun cancel(cause: CancellationException? = null)
// 關閉通道
複製代碼
  1. 通道的發送和接收都會致使做用域被阻塞, 可是發送消息能夠經過設置緩存讓他不阻塞, 或者取消通道可讓阻塞繼續
  2. 通道只容許在掛起函數中發送和接收, 可是建立通道不限制
  3. 關閉通道會致使receive拋出異常
  4. SendChannel執行close函數後不容許再發送或者接收數據, 不然拋出異常
  5. Channel的send | receive函數所在做用域被取消cancel不會致使通道結束(isClosedForReceive返回false)
  6. receive接收而不是遍歷則會致使卡住做用域

consume

ReceiveChannel不只能夠經過迭代器來接收事件, 還可使用consume系列函數來接收事件; 本質上consume和迭代沒有任何區別只是consume會在發生異常時自動取消通道(經過cancel函數);

源碼

public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
    var cause: Throwable? = null
    try {
        return block() // 直接返回
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        cancelConsumed(cause) // 若是發生異常取消通道
    }
}
複製代碼

consumeEach函數僅是迭代接收事件且異常自動取消; 通常建議使用consume函數來接收事件

BroadcastChannel

這個通道和通常的通道區別在於他的每一個數據能夠被每一個做用域所有接收到; 默認的通道一個數據被接收後其餘的協程是沒法再接收到數據的

廣播通道經過全局函數建立對象

public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>
複製代碼

自己廣播通道繼承自SendChannel, 只能發送數據, 經過函數能夠拿到接收通道

public fun openSubscription(): ReceiveChannel<E>
複製代碼

取消通道

public fun cancel(cause: CancellationException? = null)
複製代碼

將Channel轉成BroadcastChannel

fun <E> ReceiveChannel<E>.broadcast( capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY ): BroadcastChannel<E>
複製代碼

經過擴展函數在協程做用域中快速建立一個廣播發送通道

public fun <E> CoroutineScope.broadcast( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY, onCompletion: CompletionHandler? = null, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): BroadcastChannel<E> 
複製代碼

迭代通道

接收通道實現操做符重載可使用迭代

public operator fun iterator(): ChannelIterator<E>
複製代碼

示例

for (i in produce){
	// 收到每一個髮型的消息
}
複製代碼

當多個協程接收同一個渠道數據會依次輪流接收到數據, 渠道對於多個協程是公平的

Produce

上面介紹的屬於建立Channel對象來發送和接收數據, 可是還能夠經過擴展函數快速建立並返回一個具有發送數據的ReceiveChannel對象

public fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E>
複製代碼
  • context: 能夠經過協程上下文決定調度器等信息
  • capacity: 初始化通道空間

ProducerScope 該接口繼承自SendChannel以及CoroutineScope, 具有發送通道數據以及協程做用域做用;

當produce做用域執行完成會關閉通道, 前面已經說起關閉通道沒法繼續接收數據

等待取消

該函數會在通道被取消時回調其函數參數, 前面說起協程取消時能夠經過finally來釋放內存等操做, 可是通道取消沒法使用finally只能使用該函數

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) 
// [SendChannel.close] or [ReceiveChannel.cancel] 表明取消通道
複製代碼

Actor

能夠經過actor函數建立一個具有通道做用的協程做用域

public fun <E> CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, // todo: Maybe Channel.DEFAULT here? start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit ): SendChannel<E>
複製代碼
  • context: 協程上下文
  • capacity: 通道緩存空間
  • start: 協程啓動模式
  • onCompletion: 完成回調
  • block: 回調函數中能夠進行發送數據

該函數和produce函數類似,

  1. produce返回ReceiveChannel, 外部進行數據接收; actor返回的SendChannel, 外部進行數據發送
  2. actor的回調函數擁有屬性channel:Channel, 既能夠發送數據又能夠接收數據, produce的屬性channel屬於SendChannel
  3. 不管是produce或者actor他們的通道都屬於Channel, 既能夠發送又能夠接收數據, 只須要類型強轉便可;
  4. 自己Channel能夠進行雙向數據通訊, 可是設計produce和actor屬於設計思想中的生產者和消費者模式
  5. 他們都屬於協程做用域和數據通道的結合

輪循器

不管是RxJava仍是協程都支持輪循器的功能, 在個人網絡請求庫中還賦予了輪循器暫停|繼續|多個觀察者|重置等功能

這裏的協程輪循器就比較簡陋

public fun ticker( delayMillis: Long, initialDelayMillis: Long = delayMillis, context: CoroutineContext = EmptyCoroutineContext, mode: TickerMode = TickerMode.FIXED_PERIOD ): ReceiveChannel<Unit>
複製代碼

該通道返回的數據是Unit

默認狀況下能夠理解爲通道會在指定間隔時間後一直髮送Unit數據

fun main() = runBlocking<Unit> {
  
    val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)

  // 每秒打印
    for (unit in tickerChannel) {
        System.err.println("unit = $unit")
    }
}
複製代碼

可是若是下游不是在發送數據之後當即接收數據, 而是延遲使用receive函數來接收通道數據

TickerMode該枚舉擁有兩個字段

  • FIXED_PERIOD 默認值, 動態調節通道發送數據的時間間隔, 時間間隔能夠看作是上游發送數據的
  • FIXED_DELAY 只有當接收數據後纔會開始計算間隔時間, 時間間隔能夠看作是下游接收數據的

這個輪循器不支持多訂閱|暫停|繼續|重置|完成, 可是個人Net庫中Interval對象已實現全部功能

Select

select函數回調中監聽多個Deferred/Channel的結果, 且只會執行最快接收數據的通道或者結果回調.

動做

在前面的函數介紹中能夠看到一系列on{動做}變量, 他們的值所有是SelectClause{數字}接口對象;

[SelectBuilder]

public interface SelectBuilder<in R> {
    public operator fun SelectClause0.invoke(block: suspend () -> R)
    public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
    public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
    public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R) = invoke(null, block)
    @ExperimentalCoroutinesApi
    public fun onTimeout(timeMillis: Long, block: suspend () -> R)
}
複製代碼

根據這定義的擴展函數就能夠直接使用動做

對象 使用的函數
SelectClause0 onJoin
SelectClause1 OnReceive
SelectClause2 onSend

示例

@ObsoleteCoroutinesApi
@UseExperimental(InternalCoroutinesApi::class)
suspend fun selectMulti(a: Channel<Int>, b: Channel<Int>): String = select<String> {

    b.onReceive {
        "b $it" // 優先執行第一個, 不是函數緣由, 而是順序
    }

    b.onReceiveOrClosed {
        "b $it"
    }
    
    a.onSend(23) {
        "發送 23"
    }
}

fun main() = runBlocking<Unit> {
    val a = Channel<Int>(1) // 緩衝數量, 避免發送數據時阻塞
    val b = Channel<Int>(1)
    
    launch {
        b.send(24)
        val s = selectMulti(a, b)
        println("結果 = $s")
    }
}
複製代碼
  • onReceive 在關閉通道時會致使拋出異常, 若是不想拋出異常應當使用onReceiveOrClosed來替換
  • onSend 該函數等效於Channel.send, 就是發送一個值, 假設註冊多個onSend確定是第一個先回調返回結果
  • 即便已經有成員被選中(select)也不會致使其餘的成員協程做用域結束

[ValueOrClosed]

public val isClosed: Boolean // 通道是否已關閉

public val value: T
public val valueOrNull: T?
// 二者都是獲取通道內的值, 可是第2個若是通道關閉不會拋出異常而是返回NULL
複製代碼
  1. 當在select中一個通道同時存在發送和接收監聽時, 若是二者都執行到(即select沒有被打斷都執行到)會致使異常拋出
  2. 若是通道重複監聽(多個動做), 優先執行第一個
  3. 關閉通道一樣會收到數據, onReceive拋出異常, onReceiveOrClose數據爲null

Flow

Flow類似於RxJava一樣分爲三個部分:

  1. 上游
  2. 操做符
  3. 下游

下游接收事件要求在協程做用域內執行(suspend函數)

建立Flow

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
複製代碼

示例

fun shoot() = flow {
    for (i in 1.;3) {
        delay(1000) // 僞裝咱們在這裏作了一些有用的事情
        emit(i) // 發送下一個值
    }
}
複製代碼
  • 集合或者Sequence均可以經過asFlow函數轉成Flow對象

  • 也能夠像建立集合同樣經過fowOf直接建立Flow對象

  • Channel通道轉成Flow

    public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   
    複製代碼
  • 甚至掛起函數也能夠轉成Flow

    public fun <T> (suspend () -> T).asFlow(): Flow<T>
    複製代碼

collect和flow的回調函數自己屬於suspend函數能夠開啓協程做用域

建立Flow的函數

函數 描述
flow 普通Flow
channelFlow 建立通道, 其支持緩衝通道, 容許不一樣的CorotineContext發送事件
callbackFlow 與channelFlow函數除了不使用awaitClose會報錯之外沒有區別
emptyFlow 空的Flow
flowOf 直接發送數據

flow的發射函數emit不是線程安全的不容許其餘線程調用, 若是須要線程安全請使用channelFlow而不是flow

channelFlow使用send函數發送數據

發射數據示例

flow<Int> {
  emit(23)
}

channelFlow<Int> {
  send(23) // offer(23)
}
複製代碼
  1. offer能夠在非suspend函數中使用, send必須在suspend函數中使用
  2. offer存在一個返回值, 假設沒有元素空間則會直接返回false, send則會掛起阻塞等待新的元素空間.

Flow在取消做用域時釋放資源可使用callbackFlow. 這裏演示註冊和取消一個廣播AppWidgetProvider

callbackFlow<Int> {
  val appWidgetProvider = AppWidgetProvider()
  registerReceiver(appWidgetProvider, IntentFilter()) // 註冊
  awaitClose {  // 該回調會在協程做用域被取消時回調
    unregisterReceiver(appWidgetProvider) // 註銷
  }
}.collect { 

}
複製代碼

收集

收集數據

Flow是冷數據, 要求調用函數collect收集數據時纔會進行數據的發射; 該系列函數也成爲末端操做符;

flow {
  emit(23)
}.collect {
	System.err.println("(Demo.kt:9) it = $it")
}
複製代碼

查看源碼會發現這個emit實際上就是執行collect的參數函數

collect函數表示接收上游發送的數據

public suspend fun Flow<*>.collect() 
// 不作任何處理的收集器, 僅僅爲了觸發發射數據

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit
// 收集

public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
// 和上個函數的區別是: 若是在下游沒有處理完狀況下上游繼續下個發射會致使上次的下游被取消

public suspend inline fun <T> Flow<T>.collectIndexed( crossinline action: suspend (index: Int, value: T) -> Unit ): Unit
// 具有索引和值的收集器

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
// 將Flow運行在指定的協程做用域內
複製代碼

[FlowCollector] 發射器

public suspend fun emit(value: T)
// 發送一個數據

public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
// 發射另外一個flow對象
複製代碼

調度器

調度器

Flow默認使用的是其所在的當前線程或者協程上下文, Flow不容許在內部使用withContext來切換調度器, 而是應該使用flowOn函數

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
複製代碼

該函數改變的是Flow函數內部發射時的線程, 而在collect收集數據時會自動切回建立Flow時的線程

緩存

不須要等待收集執行就當即執行發射數據, 只是數據暫時被緩存而已, 提升性能

默認切換調度器時會自動緩存

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>
複製代碼

合併函數, 這個函數實際上就是buffer, 當下遊沒法及時處理上游的數據時會丟棄掉該數據

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
複製代碼

合併

將多個事件合併後發送給下游

zip

將兩個Flow在回調函數中進行處理返回一個新的值 R

當兩個flow的長度不等時只發送最短長度的事件

public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
複製代碼

示例

val nums = (1.;3).asFlow().onEach { delay(300) } // 發射數字 1.;3,間隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒發射一次字符串
val startTime = System.currentTimeMillis() // 記錄開始的時間
nums.zip(strs) { a, b -> "$a -> $b" } // 使用「zip」組合單個字符串
    .collect { value -> // 收集並打印
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
複製代碼

combine

public fun <T1, T2, R> Flow<T1>.combine( flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R ): Flow<R>
// 組合兩個流,在通過第一次發射之後,任意方有新數據來的時候就能夠發射,另外一方有多是已經發射過的數據

public fun <T1, T2, R> Flow<T1>.combineTransform( flow: Flow<T2>, @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit ): Flow<R>
複製代碼

集合

Flow直接轉成集合函數

public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
複製代碼

疊加

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

public suspend inline fun <T, R> Flow<T>.fold( initial: R, crossinline operation: suspend (acc: R, value: T) -> R ): R
// `acc`爲上次回調函數返回值, 第一次爲初始值, 等同於疊加效果; 該函數和reduce的區別就是支持初始值; reduce累計兩次元素纔會回調函數

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
複製代碼

轉換

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R>

public fun <T, R> Flow<T>.flatMapMerge( concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R> ): Flow<R> = map(transform).flattenMerge(concurrency)
// 上游先發送全部的元素, 而後上游每一個元素會致使回調函數中的Flow發送全部元素一次

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
// 等同於RxJava的FlatMap

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>
// 串行收集數據

public fun <T> Flow<Flow<T>>.flattenMerge( concurrency: Int = DEFAULT_CONCURRENCY ): Flow<T>
// 併發收集數據

public inline fun <T, R> Flow<T>.flatMapLatest( @BuilderInference crossinline transform: suspend (value: T) -> Flow<R> ): Flow<R> 
// 在每次 emit 新的數據之後,會取消先前的 collect

public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>
// 包含元素索引
複製代碼

生命週期

public fun <T> Flow<T>.onStart( action: suspend FlowCollector<T>.() -> Unit ): Flow<T>
// 開始

public fun <T> Flow<T>.onCompletion( action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit ): Flow<T>
// 回調函數中的參數`cause`若是爲null表示正常完成沒有拋出異常, 反之則拋出異常非正常結束, 
// 和catch函數同樣只能監聽到上游發生的異常, 可是沒法避免異常拋出只能在異常拋出以前執行回調函數

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
// 該函數只能捕獲上游異常, 若是異常處於函數調用之下則依然會被拋出
複製代碼

過濾

限制流發送

public fun <T> Flow<T>.take(count: Int): Flow<T>
// 只接受指定數量事件
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>

public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 丟棄指定數量事件
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
// 回調函數判斷是否丟棄或者接收, 只要丟棄或者接收後面就不會繼續發送事件(結束流)

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T>

public suspend fun <T> Flow<T>.single(): T
// 期待只有一個元素, 不然拋出`IllegalStateException`
public suspend fun <T: Any> Flow<T>.singleOrNull(): T?
// 不拋出異常, 但若是不是僅有元素則返回null

public suspend fun <T> Flow<T>.first(): T
// 若是不存在一個元素則會拋出`NoSuchElementException`
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
// 返回回調函數判斷爲true的第一個條件符合的元素
複製代碼

重試

public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, // 重試次數 predicate: suspend (cause: Throwable) -> Boolean = { true } ): Flow<T>

public fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean ): Flow<T>
複製代碼

過濾

public inline fun <T, R> Flow<T>.transform( @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R>
// 轉換函數, 能夠在回調函數中發送新的元素

public fun <T, R> Flow<T>.transformLatest( @BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R>
複製代碼

scan和reduce的區別在於

  • reduce是所有疊加計算完成後被收集
  • scan是每次疊加一次後收集一次數據

StateFlow/SharedFlow

類關係

SharedFlow

|- MutableSharedFlow

|- StateFlow

​ |- MutableStateFlow

SharedFlow屬於熱流數據, 既沒有收集(collect)狀況下也會發送, 而後在收集時進行重放(replay). 可使用shareIn將冷流轉成熱流. 也能夠直接使用如下函數建立

public fun <T> MutableSharedFlow( replay: Int = 0, // 重放數量 extraBufferCapacity: Int = 0, // 緩存數量(不包含重放數量) onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow<T>
複製代碼

使用BufferOverflow

  1. DROP_LATEST 丟棄最新值
  2. DROP_OLDEST 丟失最舊值
  3. SUSPEND 掛起阻塞

StateFlow能夠看作在Flow的基礎上加上了LiveData的特性. 可是沒法不存在生命週期跟隨, 一直均可以收集數據

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    // Backing property to avoid state updates from other classes
    private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
    // The UI collects from this StateFlow to get its state updates
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Update View with the latest favorite news
                // Writes to the value property of MutableStateFlow,
                // adding a new element to the flow and updating all
                // of its collectors
                .collect { favoriteNews ->
                    _uiState.value = LatestNewsUiState.Success(favoriteNews)
                }
        }
    }
}
複製代碼

示例

將flow從冷流轉換成熱流使用函數shareIn

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
)
複製代碼

SharingStarted:

  1. WhileSubscribed 在第一個訂閱者出現後開始共享數據,並使數據流永遠保持活躍狀態
  2. Lazily 存在訂閱者時,將使上游提供方保持活躍狀態
  3. Eagerly 當即啓動提供方

Android

Google發行的Jetpack庫中不少組件都附有KTX擴展依賴, 這種依賴主要是增長kotlin和協程支持

Lifecycle

官方提供生命週期協程做用域的快速建立實現;

  • 指定生命週期運行協程
  • 自動在onDestory取消協程

引入ktx依賴庫

implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc03"
複製代碼

當執行到某個生命週期時運行協程

fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job

suspend fun <T> Lifecycle.whenStateAtLeast( minState: Lifecycle.State, block: suspend CoroutineScope.() -> T )
複製代碼

這些函數都屬於LifecycleLifecycleOwner的擴展函數

LiveData

依賴

implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-rc03"
複製代碼

提供開發者使用的只有這兩個函數, 兩個函數功能同樣, 只是每一個參數接收時間單位不一致

fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit ): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeout: Duration, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit ): LiveData<T> = CoroutineLiveData(context, timeout.toMillis(), block)
複製代碼
  • timeout: 若是liveData的沒有處於活躍的觀察者則在指定的時間內(單位毫秒)會取消其做用域[block]
  • block: 該做用域只在活躍狀態纔會觸發, 默認在Dispatchers.Main.immediate調度器

liveData做用域具有發射數據和LiveData的做用

interface LiveDataScope<T> {
    /** * Set's the [LiveData]'s value to the given [value]; If you've called [emitSource] previously, * calling [emit] will remove that source. * * Note that this function suspends until the value is set on the [LiveData]; * * @param value The new value for the [LiveData] * * @see emitSource */
    suspend fun emit(value: T)

    /** * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]; Calling this * method will remove any source that was yielded before via [emitSource]; * * @param source The [LiveData] instance whose values will be dispatched from the current * [LiveData]; * * @see emit * @see MediatorLiveData.addSource * @see MediatorLiveData.removeSource */
    suspend fun emitSource(source: LiveData<T>): DisposableHandle

    /** * References the current value of the [LiveData]; * * If the block never `emit`ed a value, [latestValue] will be `null`; You can use this * value to check what was then latest value `emit`ed by your `block` before it got cancelled. * * Note that if the block called [emitSource], then `latestValue` will be last value * dispatched by the `source` [LiveData]; */
    val latestValue: T?
}
複製代碼
  1. 若是emitSource在emit以前執行則無效
  2. 該做用域會在每次處於活躍狀態時都執行一遍, 若是將應用從後臺切換到前臺則會返回執行該做用域, 可是觀察者只會在活躍時才收到數據
相關文章
相關標籤/搜索