協程這個概念在1958年就開始出現, 比線程更早, 目前不少語言開始原生支, Java沒有原生協程可是能夠大型公司都本身或者使用第三方庫來支持協程編程, 可是Kotlin原生支持協程.java
我認爲協程的核心就是一個詞: 做用域, 理解什麼是做用域就理解協程了android
什麼是協程git
協程是協做式任務, 線程是搶佔式任務, 本質上二者都屬於併發github
Kotlin協程就是線程庫不是協程? 內部代碼用的線程池?編程
協程設計來源後端
做用域/channel/select
), 將做用域用對象來具化出來; 且能夠更好地控制做用域生命週期;await
模式(JavaScript的異步任務解決方案)Flow
使用場景設計模式
假設首頁存在七個接口網絡請求(後端人員處理差)的狀況一個個使用串行網絡請求的時間比並髮網絡請求慢了接近七倍緩存
目前計算機都是經過多核CPU提高計算能力, 因此熟練掌握併發編程是將來的趨勢安全
協程優點bash
實驗特性
協程在Kotlin1.3時候放出正式版本, 目前仍然存在不穩定函數(不影響項目開發), 經過註解標識
@FlowPreview 表明可能之後存在Api函數變更
@ExperimentalCoroutinesApi 表明目前可能存在不穩定的因素的函數
@ObsoleteCoroutinesApi 可能存在被廢棄的可能
複製代碼
構成
Kotlin的協程主要構成分爲三部分
CoroutineScope
協程做用域: 每一個協程體都存在一個做用域, 異步仍是同步由該做用域決定Channel
通道: 數據如同一個通道進行發送和接收, 能夠在協程之間互相傳遞數據或者控制阻塞和繼續Flow
響應流: 相似RxJava等結構寫法爲自動化/併發網絡請求我建立一個庫, 我姑且稱爲Android最強的網絡請求庫: Net
1.0+版本爲RxJava實現, 2.0+版本爲Coroutine實現, 同時包含更強的輪循器用於替代RxJava的輪循功能
由於須要拋棄RxJava, 取代RxBus事件分發我使用協程建立出一個更增強大的: Channel
咱們公司項目屬於 MVVM + Kotlin + Coroutine + JetPack
, 在國外很經常使用, 主要帶來的優點;
我平時項目開發必備框架
框架 | 描述 |
---|---|
Net | Android不是最強網絡請求/異步任務庫 |
BRV | Android不是最強列表 |
Serialize | 建立自動保存和恢復的字段 |
StateLayout | Android不是最強缺省頁 |
LogCat | JSON和長文本日誌打印工具 |
Tooltip | 完善的吐司工具 |
DebugKit | 開發調試窗口工具 |
StatusBar | 一行代碼建立透明狀態欄 |
Channel | 基於協程和JetPack特性的事件分發框架 |
展望
協程對於後端高併發優點很大, 至於Google的Jetpack基本上都有針對協程擴展, 最明顯的是併發網絡請求速度倍增; 同時代碼更加結構清晰, 本文章後續會根據Kotlin的版本中的協程迭代進行更新
這裏咱們使用協程擴展庫, kotlin標準庫的協程太過於簡陋不適用於開發者使用
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9"
複製代碼
開啓主協程的分爲三種方式
生命週期和App一致, 沒法取消(不存在Job), 不存在線程阻塞
fun main() {
GlobalScope.launch { // 在後臺啓動一個新的協程並繼續
delay(1000L)
println("World!")
}
Thread.sleep(2000) // 防止JVM虛擬機退出
}
複製代碼
這裏說的是GlobalScope沒有Job, 可是啓動的launch都是擁有Job的. GlobalScope自己就是一個做用域, launch屬於其子做用域;
不存在線程阻塞, 能夠取消, 能夠經過CoroutineContext控制協程生命週期
fun main() {
CoroutineScope(Dispatchers.IO).launch {
}
Thread.sleep(1000)
}
複製代碼
線程阻塞, 適用於單元測試, 不須要延遲阻塞防止JVM虛擬機退出. runBlocking屬於全局函數能夠在任意地方調用
通常咱們在項目中是不會使用runBlocking, 由於阻塞主線程沒有開啓的任何意義
fun main() = runBlocking {
// 阻塞線程直到協程做用域內部全部協程執行完畢
}
複製代碼
協程內部還可使用函數建立其餘協程做用域, 分爲兩種建立函數:
CoroutineScope
的擴展函數, 只有在做用域內部才能建立其餘的做用域suspend
修飾的函數內部在主協程內還能夠建立子協程做用域, 建立函數分爲兩種
阻塞做用域(串行): 會阻塞當前做用域
掛起做用域(併發): 不會阻塞當前做用域
同步做用域函數
都屬於suspend函數
withContext
能夠切換調度器, 有返回結果coroutineScope
建立一個協程做用域, 該做用域會阻塞當前所在做用域而且等待其子協程執行完纔會恢復, 有返回結果supervisorScope
使用SupervisorJob的coroutineScope, 異常不會取消父協程public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T
// 返回結果; 能夠和當前協程的父協程存在交互關係, 主要做用爲來回切換調度器
public suspend inline operator fun <T> CoroutineDispatcher.invoke( noinline block: suspend CoroutineScope.() -> T ): T = withContext(this, block)
// withContext工具函數而已
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
複製代碼
異步做用域函數
這兩個函數都不屬於suspend, 只須要CoroutineScope就能夠調用
launch
: 異步併發, 沒有返回結果async
: 異步併發, 有返回結果public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job
public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T>
複製代碼
同一個協程做用域中的異步任務遵照順序原則開始執行; 適用於串行網絡請求, 在一個異步任務須要上個異步任務的結果時.
協程掛起須要時間, 因此異步協程永遠比同步代碼執行慢
fun main() = runBlocking<Unit> {
launch {
System.err.println("(Main.kt:34) 後執行")
}
System.err.println("(Main.kt:37) 先執行")
}
複製代碼
當在協程做用域中使用async
函數時能夠建立併發任務
public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T>
複製代碼
示例
fun main() = runBlocking<Unit> {
val name = async { getName() }
val title = async { getTitle() }
System.err.println("(Main.kt:35) result = ${name.await() + title.await()}")
delay(2000)
}
複製代碼
Deferred
; 經過函數await
獲取結果值awaitAll()
等待所有完成await
任務也會等待執行完協程關閉惰性併發
將async函數中的start
設置爲CoroutineStart.LAZY
時則只有調用Deferred對象的await
時纔會開始執行異步任務(或者執行start
函數)
啓動模式
DEFAULT
當即執行LAZY
直到Job執行start或者join纔開始執行ATOMIC
在做用域開始執行以前沒法取消UNDISPATCHED
不執行任何調度器, 直接在當前線程中執行, 可是會根據第一個掛起函數的調度器切換異常
協程中發生異常, 則父協程取消而且父協程其餘的子協程一樣所有取消
繼承自Job
提供一個全局函數用於建立CompletableDeferred對象, 該對象能夠實現自定義Deferred功能
public suspend fun await(): T
// 結果
public val onAwait: SelectClause1<T>
// 在select中使用
public fun getCompleted(): T
// 若是完成[isCompleted]則返回結果, 不然拋出異常
public fun getCompletionExceptionOrNull(): Throwable?
// 若是完成[isCompleted]則返回結果, 不然拋出異常
複製代碼
示例
fun main() = runBlocking<Unit> {
val deferred = CompletableDeferred<Int>()
launch {
delay(1000 )
deferred.complete(23)
}
System.err.println("(Demo.kt:72) 結果 = ${deferred.await()}")
}
複製代碼
建立CompletableDeferred
的頂層函數
public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T>
public fun <T> CompletableDeferred(value: T): CompletableDeferred<T>
複製代碼
CompletableDeferred函數
public fun complete(value: T): Boolean
// 結果
public fun completeExceptionally(exception: Throwable): Boolean
// 拋出異常, 異常發生在`await()`時
public fun <T> CompletableDeferred<T>.completeWith(result: Result<T>): Boolean
// 能夠經過標記來判斷是否成功, 避免異常拋出
複製代碼
建立此對象表示建立一個協程做用域
結構化併發
若是你看協程的教程可能會常常看到這個詞, 這就是做用域內部開啓新的協程; 父協程會限制子協程的生命週期, 子協程承接父協程的上下文, 這種層級關係就是結構化併發
在一個協程做用域裏面開啓多個子協程進行併發行爲
協程上下文, 我認爲協程上下文能夠看作包含協程基本信息的一個Context(上下文), 其能夠決定協程的名稱或者運行
建立一個新的調度器
fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
複製代碼
建立新的調度器比較消耗資源, 建議複用且當不須要的時候使用close
函數釋放
Dispatchers
繼承自CoroutineContext, 該枚舉擁有三個實現; 表示不一樣的線程調度; 當函數不使用調度器時承接當前做用域的調度器
Dispatchers.Unconfined
不指定線程, 若是子協程切換線程那麼接下來的代碼也運行在該線程上Dispatchers.IO
適用於IO讀寫Dispatchers.Main
根據平臺不一樣而有所差, Android上爲主線程Dispatchers.Default
默認調度器, 在線程池中執行協程體, 適用於計算操做當即執行
Dispatchers.Main.immediate
複製代碼
immediate
屬於全部調度器都有的屬性, 該屬性表明着若是當前正處於該調度器中不執行調度器切換直接執行, 能夠理解爲在同一調度器內屬於同步協程做用域
例如launch
函數開啓做用域會比後續代碼執行順序低, 可是使用該屬性協程屬於順序執行
示例
CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
// 執行順序 1
}
// 執行順序 2
CoroutineScope(Job() + Dispatchers.Main).launch {
// 執行順序 4
}
// 執行順序 3
複製代碼
經過建立一個CoroutineName
對象, 在構造函數中指定參數爲協程名稱, CoroutineName
繼承自CoroutineContext.
launch(CoroutineName("吳彥祖")){
}
複製代碼
協程上下文名稱用於方便調試使用
yield函數可讓當前協程暫時掛起執行其餘協程體, 若是沒有其餘正在併發的協程體則繼續執行當前協程體(至關於無效調用)
public suspend fun yield(): Unit
複製代碼
看協程中可能常常說起掛起, 掛起能夠理解爲這段代碼(做用域)暫停, 而後執行後續代碼; 掛起函數通常表示suspend關鍵字修飾的函數, suspend要求只容許在suspend修飾的函數內部調用, 可是自己這個關鍵字是沒作任何事的. 只是爲了限制開發者隨意調用
掛起函數調用會在左側行號列顯示箭頭圖標
在協程中Job一般被稱爲做業, 表示一個協程工做任務, 他一樣繼承自CoroutineContext
val job = launch {
}
複製代碼
Job屬於接口
interface Job : CoroutineContext.Element
複製代碼
函數
public suspend fun join()
// 等待協程執行完畢都阻塞當前線程
public val onJoin: SelectClause0
// 後面說起的選擇器中使用
public fun cancel(cause: CancellationException? = null)
// 取消協程
public suspend fun Job.cancelAndJoin()
// 阻塞而且在協程結束之後取消協程
public fun start(): Boolean
public val children: Sequence<Job>
// 所有子做業
public fun getCancellationException(): CancellationException
public fun invokeOnCompletion( onCancelling: Boolean = false, invokeImmediately: Boolean = true, handler: CompletionHandler): DisposableHandle
// p1: 當爲true表示cancel不會回調handler
// p2: 當爲true則先執行[handler]而後再返回[DisposableHandle], 爲false則先返回[DisposableHandle]
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
// 當其做用域完成之後執行, 主協程指定纔有效, 直接給CoroutineScope指定時無效的
// 手動拋出CancellationException一樣會賦值給cause
複製代碼
狀態
經過字段能夠獲取JOB當前處於狀態
public val isActive: Boolean
public val isCancelled: Boolean
public val isCompleted: Boolean
複製代碼
擴展函數
public fun Job.cancelChildren(cause: CancellationException? = null)
public suspend fun Job.cancelAndJoin()
複製代碼
每一個協程做用域都存在coroutineContext. 而協程上下文中都存在Job對象
coroutineContext[Job]
複製代碼
若是協程做用域內存在計算任務(一直打日誌也算)則沒法被取消, 若是使用delay函數則能夠被取消;
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
while (true){
delay(100) // 這行代碼存在則能夠成功取消協程, 不存在則沒法取消
System.err.println("(Main.kt:30) ")
}
}
delay(500)
job.cancel()
System.err.println("(Main.kt:42) 結束")
}
複製代碼
經過使用協程內部isActive
屬性來判斷是否應該結束
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
while (isActive) { // 一旦協程被取消則爲false
System.err.println("(Main.kt:30) ")
}
}
delay(500)
job.cancel()
System.err.println("(Main.kt:42) 結束")
}
複製代碼
協程存在被手動取消的狀況, 可是有些資源須要在協程取消的時候釋放資源, 這個操做能夠在finally
中執行
不管如何finally都會被執行
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
try {
repeat(1000){
System.err.println("(Main.kt:31) it = $it")
delay(500)
}
} finally {
// 已被取消的協程沒法繼續掛起
}
}
delay(1500)
job.cancel()
System.err.println("(Main.kt:42) ")
}
複製代碼
再次開啓協程
經過withContext
和NonCancellable
能夠在已被取消的協程中繼續掛起協程; 這種用法其實能夠看作建立一個沒法取消的任務
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
複製代碼
協程做用域能夠接收多個CoroutineContext做爲上下文參數; CoroutineContext自己屬於接口, 不少上下文相關的類都實現與他
配置多個CoroutineContext能夠經過+
符號同時指定多個協程上下文, 每一個實現對象可能包含一部分信息能夠存在覆蓋行爲故相加時的順序存在覆蓋行爲
val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) {
delay(1000)
System.err.println("(Main.kt:51) ${Thread.currentThread()}")
}
複製代碼
launch(Dispatchers.IO + CoroutineName("吳彥祖")){ }
複製代碼
協程局部變量
使用ThreadLocal
能夠獲取線程的局部變量, 可是要求使用擴展函數asContextElement
轉爲協程上下文
做爲參數傳入在建立協程的時候
該局部變量做用於持有該協程上下文的協程做用域內
public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> =
ThreadLocalElement(value, this)
複製代碼
public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T
// 超過指定時間timeMillis自動結束協程;
// 當沒有超時時返回值獲取而且繼續執行協程;
// 當超時會拋出異常TimeoutCancellationException, 可是不會致使程序結束
public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T?
// 若是超時不會結束協程而是返回null
複製代碼
沒法手動拋出TimeoutCancellationException, 由於其構造函數私有
全局協程做用域
全局協程做用域屬於單例對象, 整個JVM虛擬機只有一份實例對象; 他的壽命週期也跟隨JVM. 使用全局協程做用域的時候注意避免內存泄漏
public object GlobalScope : CoroutineScope {
/** * Returns [EmptyCoroutineContext]; */
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
複製代碼
全局協程做用域不繼承父協程做用域的上下文, 因此也不會由於父協程被取消而自身被取消
啓動模式
DEFAULT
當即執行協程體ATOMIC
當即執行協程體,但在開始執行協程以前沒法取消協程UNDISPATCHED
當即在當前線程執行協程體,第一個掛起函數執行在函數所在線程, 後面執行在函數指定線程LAZY
手動執行start
或join
纔會執行協程協程體若是已經執行實際上屬於不可取消的, 在協程體中經過isActive
來判斷協程是否處於活躍中
經過取消函數的參數指定異常CancellationException能夠自定義異常對象
不可取消的協程做用域
NonCancellable該單例對象用於withContext
函數建立一個沒法被取消的協程做用域
withContext(NonCancellable) {
delay(2000)
}
複製代碼
示例
fun main() = runBlocking {
launch {
delay(1000)
System.err.println("(Main.kt:19) ")
}
launch {
withContext(NonCancellable) {
delay(2000)
System.err.println("(Main.kt:26) ")
}
}
delay(500) // 防止launch還未開啓withContext就被取消
cancel()
}
複製代碼
delay
不存在, Thread.sleep
能夠模擬未結束的任務)CancellationException
視做結束異常, invokeOnCompletion
也會執行(其中包含異常對象), 可是其餘異常將不會執行invokeOnCompletion取消GlobalScope
GlobalScope屬於全局協程, 由他開啓的協程都不擁有Job, 因此沒法取消協程. 可是能夠經過給GlobalScope開啓的協程做用域指定Job而後就可使用Job取消協程
經過CoroutineExceptionHandler
函數能夠建立一個同名的對象, 該接口繼承自CoroutineContext,
一樣經過制定上下文參數傳遞給全局協程做用域使用, 看成用域拋出異常時會被該對象的回調函數接收到, 而且不會拋出異常
CoroutineExceptionHandler 只有做爲最外層的父協程上下文才有效, 由於異常會層層上拋, 除非配合SupervisorJob監督做業禁止異常上拋, 子做用域的異常處理器才能捕獲到異常
CoroutineExceptionHandler異常處理器並不能阻止協程做用域取消, 只是監聽到協程的異常信息避免JVM拋出異常退出程序而已
只要發生異常就會致使父協程和其全部子協程都被取消, 這種屬於雙向的異常取消機制
, 後面提到的監督做業
(SupervisorJob)屬於單向向下傳遞(即不會向上拋出)
CoroutineExceptionHandler會被做用域一直做爲協程上下文向下傳遞給子做用域(除非子做用域單獨指定)
(以下示例)不要嘗試使用try/catch
捕捉launch做用域的異常, 沒法被捕捉.
try {
launch {
throw NullPointerException()
}
} catch (e: Exception) {
e.printStackTrace()
}
複製代碼
後面專門介紹如何捕獲協程異常避免拋出.
協程取消異常
取消協程的做業(Job)會引起異常, 可是會被默認的異常處理器給忽略, 可是咱們能夠經過捕捉能夠看到異常信息
fun main() = runBlocking<Unit> {
val job = GlobalScope.launch {
try {
delay(1000)
} catch (e: Exception) {
e.printStackTrace()
}
}
job.cancel(CancellationException("自定義一個用於取消協程的異常"))
delay(2000)
}
複製代碼
Job取消函數
public fun cancel(cause: CancellationException? = null)
複製代碼
全局協程做用域的異常處理
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
System.err.println("(Main.kt:41):main coroutineContext = $coroutineContext, throwable = $throwable")
}
GlobalScope.launch(exceptionHandler) {
}
複製代碼
子協程設置異常處理器是無效的, 即便設置了錯誤依然會拋到父協程從而而沒有意義. 除非同時使用異常處理器+監督做業(SupervisorJob), 這樣就是讓子協程的錯誤不向上拋(後面詳解監督做業), 從而被其內部的異常處理器來處理.
異常聚合和解包
全局協程做用域也存在嵌套子父級關係, 故異常可能也會依次拋出多個異常
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
// 第三, 這裏的異常是第一個被拋出的異常對象
println("捕捉的異常: $exception 和被嵌套的異常: ${exception.suppressed.contentToString()}")
}
val job = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE)
} finally { // 當父協程被取消時其全部子協程都被取消, finally被取消以前或者完成任務以後必定會執行
throw ArithmeticException() // 第二, 再次拋出異常, 異常被聚合
}
}
launch {
delay(100)
throw IOException() // 第一, 這裏拋出異常將致使父協程被取消
}
delay(Long.MAX_VALUE)
}
job.join() // 避免GlobalScope做用域沒有執行完畢JVM虛擬機就退出
}
複製代碼
通常狀況子協程發生異常會致使父協程被取消, 同時父協程發生異常會取消全部的子協程; 可是有時候子協程發生異常咱們並不但願父協程也被取消, 而是僅僅全部子協程取消(僅向下傳遞異常), 這個使用就是用SupervisorJob
做業
建立監督做業對象
fun main() = runBlocking<Unit> {
CoroutineScope(coroutineContext).launch {
launch(SupervisorJob(coroutineContext[Job]) + CoroutineExceptionHandler { _, _ -> }) {
throw NullPointerException()
}
delay(500)
println("( Process.kt:13 ) ")
}
println("( Process.kt:16 ) finish")
}
複製代碼
必須添加CoroutineExceptionHandler
處理異常, 不然異常依然會向上傳遞取消父協程
直接建立 SupervisorJob()
對象傳入做用域中會致使該做用域和父協程生命週期不統一的問題, 即父協程取消之後該子協程依然處於活躍狀態, 故須要指定參數爲coroutineContext[Job]
即傳入父協程的做業對象
SupervisorJob
僅能捕捉內部協程做用域的異常, 沒法直接捕捉內部協程
supervisorScope {
// throw NoSuchFieldException() 拋出崩潰
launch {
throw NoSuchFieldException() // 不會拋出
}
}
複製代碼
監督做業在withContext
和async
中添加無效
直接建立一個異常向下傳遞監督做業的做用域
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R
複製代碼
supervisorScope
函數使用的依然是當前做用域的Job, 因此跟隨當前做用域生命週期, 能夠被取消fun main() = runBlocking<Unit> {
CoroutineScope(coroutineContext).launch {
// 在該做用域內只要設置CoroutineExceptionHandler都僅會向下傳遞
supervisorScope {
launch(CoroutineExceptionHandler { _, _ -> }) {
throw NullPointerException()
}
launch {
delay(1000) // 即便上面的launch拋出異常也會繼續執行這裏
println("( Process.kt:18 ) ")
}
}
}
println("( Process.kt:16 ) finish")
}
複製代碼
在做用域中的異常捕獲和通常的異常捕獲有所區別
CoroutineExceptionHandler
能夠捕獲全部子做用域內異常async
可使用監督做業能夠捕獲內部發生的異常, 可是其await
要求trycatchlaunch
要求監督做業配合異常處理器同時使用, 缺一不可withContext/supervisorScope/coroutineScope/select
能夠trycatch捕獲異常函數 | 回調字段 | 描述 |
---|---|---|
suspendCoroutine | Continuation | Result |
suspendCancellableCoroutine | CancellableContinuation | 可取消 |
suspendAtomicCancellableCoroutine | CancellableContinuation | 可取消 |
[Continuation]
public val context: CoroutineContext public fun resumeWith(result: Result<T>) 複製代碼
[CancellableContinuation] -| Continuation
public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean
public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit)
public fun tryResume(value: T, idempotent: Any? = null): Any?
public fun tryResumeWithException(exception: Throwable): Any?
public fun completeResume(token: Any)
public fun cancel(cause: Throwable? = null): Boolean
public fun invokeOnCancellation(handler: CompletionHandler)
public fun CoroutineDispatcher.resumeUndispatched(value: T)
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
複製代碼
解決線程不安全問題
至關於Java中的Lock替代品: Mutex
建立互斥對象
public fun Mutex(locked: Boolean = false): Mutex
// p: 設置初始狀態, 是否當即上鎖
複製代碼
使用擴展函數能夠自動加鎖和解鎖
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
// owner: 鑰匙
複製代碼
函數
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
public fun holdsLock(owner: Any): Boolean
// owner是否被用於鎖
public fun tryLock(owner: Any? = null): Boolean
// 使用owner來上鎖, 若是owner已上鎖則返回false
複製代碼
chan
設計, 可用於控制做用域的阻塞和繼續(經過配合select
)Channel屬於接口沒法直接建立, 咱們須要經過函數Channel()
來建立其實現類
源碼
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel() // 無緩存
UNLIMITED -> LinkedListChannel() // 無限制
CONFLATED -> ConflatedChannel() // 合併
BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) // 64
else -> ArrayChannel(capacity) // 指定緩存大小
}
複製代碼
capacity
緩衝大小, 默認0
當Channel發送一條數據時就會掛起通道(不繼續執行發送後續代碼), 只有在接收這條數據時纔會解除掛起繼續執行; 可是咱們能夠設置緩存大小
複製代碼
通道容許被遍歷獲取當前發送數據
val channel = Channel<Int>()
for (c in channel){
}
複製代碼
public suspend fun yield(): Unit
複製代碼
Channel
Channel接口同時實現發送渠道(SendChannel)和接收渠道(ReceiveChannel)兩個接口, 因此既能發送又能接收數據
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
複製代碼
public val isClosedForSend: Boolean
// 是否關閉
public fun close(cause: Throwable? = null): Boolean
// 關閉發送通道
public fun offer(element: E): Boolean
// 推薦使用send函數
public suspend fun send(element: E)
// 發送消息
public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
// 當通道關閉時執行回調
public val onSend: SelectClause2<E, SendChannel<E>>
// 當即發送數據(若是容許), 在select中使用
複製代碼
ClosedReceiveChannelException
拋出public val isClosedForReceive: Boolean
// SendChannel是否已經關閉通道, 若是關閉通道之後還存在緩存則會接收完緩存以後返回false
public val isEmpty: Boolean // 通道是否爲空
public fun poll(): E? // 推薦使用receive函數
public suspend fun receive(): E
// 接受通道事件
public val onReceive: SelectClause1<E> // 若是通道關閉, 拋出異常
public val onReceiveOrNull: SelectClause1<E?> // 廢棄函數, 若是通道關閉返回null
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
// 在select中使用的監聽器, 推薦使用第三個函數
public suspend fun receiveOrClosed(): ValueOrClosed<E>
// `ValueOrClosed`對象能夠判斷通道是否已關閉
public fun cancel(cause: CancellationException? = null)
// 關閉通道
複製代碼
receive
拋出異常close
函數後不容許再發送或者接收數據, 不然拋出異常send | receive
函數所在做用域被取消cancel
不會致使通道結束(isClosedForReceive返回false)consume
ReceiveChannel不只能夠經過迭代器來接收事件, 還可使用consume
系列函數來接收事件; 本質上consume和迭代沒有任何區別只是consume會在發生異常時自動取消通道(經過cancel函數);
源碼
public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
var cause: Throwable? = null
try {
return block() // 直接返回
} catch (e: Throwable) {
cause = e
throw e
} finally {
cancelConsumed(cause) // 若是發生異常取消通道
}
}
複製代碼
consumeEach
函數僅是迭代接收事件且異常自動取消; 通常建議使用consume函數來接收事件
這個通道和通常的通道區別在於他的每一個數據能夠被每一個做用域所有接收到; 默認的通道一個數據被接收後其餘的協程是沒法再接收到數據的
廣播通道經過全局函數建立對象
public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>
複製代碼
自己廣播通道繼承自SendChannel, 只能發送數據, 經過函數能夠拿到接收通道
public fun openSubscription(): ReceiveChannel<E>
複製代碼
取消通道
public fun cancel(cause: CancellationException? = null)
複製代碼
將Channel轉成BroadcastChannel
fun <E> ReceiveChannel<E>.broadcast( capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY ): BroadcastChannel<E>
複製代碼
經過擴展函數在協程做用域中快速建立一個廣播發送通道
public fun <E> CoroutineScope.broadcast( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY, onCompletion: CompletionHandler? = null, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): BroadcastChannel<E>
複製代碼
迭代通道
接收通道實現操做符重載可使用迭代
public operator fun iterator(): ChannelIterator<E>
複製代碼
示例
for (i in produce){
// 收到每一個髮型的消息
}
複製代碼
當多個協程接收同一個渠道數據會依次輪流接收到數據, 渠道對於多個協程是公平的
上面介紹的屬於建立Channel對象來發送和接收數據, 可是還能夠經過擴展函數快速建立並返回一個具有發送數據的ReceiveChannel
對象
public fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E>
複製代碼
ProducerScope 該接口繼承自SendChannel以及CoroutineScope, 具有發送通道數據以及協程做用域做用;
當produce做用域執行完成會關閉通道, 前面已經說起關閉通道沒法繼續接收數據
等待取消
該函數會在通道被取消時回調其函數參數, 前面說起協程取消時能夠經過finally
來釋放內存等操做, 可是通道取消沒法使用finally只能使用該函數
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {})
// [SendChannel.close] or [ReceiveChannel.cancel] 表明取消通道
複製代碼
能夠經過actor
函數建立一個具有通道做用的協程做用域
public fun <E> CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, // todo: Maybe Channel.DEFAULT here? start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit ): SendChannel<E>
複製代碼
該函數和produce
函數類似,
ReceiveChannel
, 外部進行數據接收; actor返回的SendChannel
, 外部進行數據發送channel:Channel
, 既能夠發送數據又能夠接收數據, produce的屬性channel屬於SendChannelproduce
或者actor
他們的通道都屬於Channel, 既能夠發送又能夠接收數據, 只須要類型強轉便可;不管是RxJava仍是協程都支持輪循器的功能, 在個人網絡請求庫中還賦予了輪循器暫停|繼續|多個觀察者|重置等功能
這裏的協程輪循器就比較簡陋
public fun ticker( delayMillis: Long, initialDelayMillis: Long = delayMillis, context: CoroutineContext = EmptyCoroutineContext, mode: TickerMode = TickerMode.FIXED_PERIOD ): ReceiveChannel<Unit>
複製代碼
該通道返回的數據是Unit
默認狀況下能夠理解爲通道會在指定間隔時間後一直髮送Unit
數據
fun main() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)
// 每秒打印
for (unit in tickerChannel) {
System.err.println("unit = $unit")
}
}
複製代碼
可是若是下游不是在發送數據之後當即接收數據, 而是延遲使用receive函數來接收通道數據
TickerMode
該枚舉擁有兩個字段
FIXED_PERIOD
默認值, 動態調節通道發送數據的時間間隔, 時間間隔能夠看作是上游發送數據的FIXED_DELAY
只有當接收數據後纔會開始計算間隔時間, 時間間隔能夠看作是下游接收數據的這個輪循器不支持多訂閱|暫停|繼續|重置|完成, 可是個人Net庫中Interval
對象已實現全部功能
在select
函數回調中監聽多個Deferred/Channel的結果, 且只會執行最快接收數據的通道或者結果回調.
動做
在前面的函數介紹中能夠看到一系列on{動做}
變量, 他們的值所有是SelectClause{數字}
接口對象;
[SelectBuilder]
public interface SelectBuilder<in R> {
public operator fun SelectClause0.invoke(block: suspend () -> R)
public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R) = invoke(null, block)
@ExperimentalCoroutinesApi
public fun onTimeout(timeMillis: Long, block: suspend () -> R)
}
複製代碼
根據這定義的擴展函數就能夠直接使用動做
對象 | 使用的函數 |
---|---|
SelectClause0 | onJoin |
SelectClause1 | OnReceive |
SelectClause2 | onSend |
示例
@ObsoleteCoroutinesApi
@UseExperimental(InternalCoroutinesApi::class)
suspend fun selectMulti(a: Channel<Int>, b: Channel<Int>): String = select<String> {
b.onReceive {
"b $it" // 優先執行第一個, 不是函數緣由, 而是順序
}
b.onReceiveOrClosed {
"b $it"
}
a.onSend(23) {
"發送 23"
}
}
fun main() = runBlocking<Unit> {
val a = Channel<Int>(1) // 緩衝數量, 避免發送數據時阻塞
val b = Channel<Int>(1)
launch {
b.send(24)
val s = selectMulti(a, b)
println("結果 = $s")
}
}
複製代碼
onReceive
在關閉通道時會致使拋出異常, 若是不想拋出異常應當使用onReceiveOrClosed
來替換onSend
該函數等效於Channel.send
, 就是發送一個值, 假設註冊多個onSend確定是第一個先回調返回結果select
)也不會致使其餘的成員協程做用域結束[ValueOrClosed]
public val isClosed: Boolean // 通道是否已關閉
public val value: T
public val valueOrNull: T?
// 二者都是獲取通道內的值, 可是第2個若是通道關閉不會拋出異常而是返回NULL
複製代碼
Flow類似於RxJava一樣分爲三個部分:
下游接收事件要求在協程做用域內執行(suspend函數)
建立Flow
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
複製代碼
示例
fun shoot() = flow {
for (i in 1.;3) {
delay(1000) // 僞裝咱們在這裏作了一些有用的事情
emit(i) // 發送下一個值
}
}
複製代碼
集合或者Sequence均可以經過asFlow
函數轉成Flow對象
也能夠像建立集合同樣經過fowOf
直接建立Flow對象
Channel通道轉成Flow
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T>
複製代碼
甚至掛起函數也能夠轉成Flow
public fun <T> (suspend () -> T).asFlow(): Flow<T>
複製代碼
collect和flow的回調函數自己屬於suspend
函數能夠開啓協程做用域
建立Flow的函數
函數 | 描述 |
---|---|
flow | 普通Flow |
channelFlow | 建立通道, 其支持緩衝通道, 容許不一樣的CorotineContext發送事件 |
callbackFlow | 與channelFlow函數除了不使用awaitClose 會報錯之外沒有區別 |
emptyFlow | 空的Flow |
flowOf | 直接發送數據 |
flow的發射函數
emit
不是線程安全的不容許其餘線程調用, 若是須要線程安全請使用channelFlow
而不是flow
channelFlow使用
send
函數發送數據
發射數據示例
flow<Int> {
emit(23)
}
channelFlow<Int> {
send(23) // offer(23)
}
複製代碼
Flow在取消做用域時釋放資源可使用callbackFlow
. 這裏演示註冊和取消一個廣播AppWidgetProvider
callbackFlow<Int> {
val appWidgetProvider = AppWidgetProvider()
registerReceiver(appWidgetProvider, IntentFilter()) // 註冊
awaitClose { // 該回調會在協程做用域被取消時回調
unregisterReceiver(appWidgetProvider) // 註銷
}
}.collect {
}
複製代碼
收集數據
Flow是冷數據, 要求調用函數collect
收集數據時纔會進行數據的發射; 該系列函數也成爲末端操做符;
flow {
emit(23)
}.collect {
System.err.println("(Demo.kt:9) it = $it")
}
複製代碼
查看源碼會發現這個emit實際上就是執行collect的參數函數
collect函數表示接收上游發送的數據
public suspend fun Flow<*>.collect()
// 不作任何處理的收集器, 僅僅爲了觸發發射數據
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit
// 收集
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
// 和上個函數的區別是: 若是在下游沒有處理完狀況下上游繼續下個發射會致使上次的下游被取消
public suspend inline fun <T> Flow<T>.collectIndexed( crossinline action: suspend (index: Int, value: T) -> Unit ): Unit
// 具有索引和值的收集器
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
// 將Flow運行在指定的協程做用域內
複製代碼
[FlowCollector] 發射器
public suspend fun emit(value: T)
// 發送一個數據
public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
// 發射另外一個flow對象
複製代碼
調度器
Flow默認使用的是其所在的當前線程或者協程上下文, Flow不容許在內部使用withContext
來切換調度器, 而是應該使用flowOn
函數
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
複製代碼
該函數改變的是Flow函數內部發射時的線程, 而在collect
收集數據時會自動切回建立Flow時的線程
緩存
不須要等待收集執行就當即執行發射數據, 只是數據暫時被緩存而已, 提升性能
默認切換調度器時會自動緩存
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>
複製代碼
合併函數, 這個函數實際上就是buffer
, 當下遊沒法及時處理上游的數據時會丟棄掉該數據
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
複製代碼
將多個事件合併後發送給下游
zip
將兩個Flow在回調函數中進行處理返回一個新的值 R
當兩個flow的長度不等時只發送最短長度的事件
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
複製代碼
示例
val nums = (1.;3).asFlow().onEach { delay(300) } // 發射數字 1.;3,間隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒發射一次字符串
val startTime = System.currentTimeMillis() // 記錄開始的時間
nums.zip(strs) { a, b -> "$a -> $b" } // 使用「zip」組合單個字符串
.collect { value -> // 收集並打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
複製代碼
combine
public fun <T1, T2, R> Flow<T1>.combine( flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R ): Flow<R>
// 組合兩個流,在通過第一次發射之後,任意方有新數據來的時候就能夠發射,另外一方有多是已經發射過的數據
public fun <T1, T2, R> Flow<T1>.combineTransform( flow: Flow<T2>, @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit ): Flow<R>
複製代碼
Flow直接轉成集合函數
public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
複製代碼
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S
public suspend inline fun <T, R> Flow<T>.fold( initial: R, crossinline operation: suspend (acc: R, value: T) -> R ): R
// `acc`爲上次回調函數返回值, 第一次爲初始值, 等同於疊加效果; 該函數和reduce的區別就是支持初始值; reduce累計兩次元素纔會回調函數
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
複製代碼
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R>
public fun <T, R> Flow<T>.flatMapMerge( concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R> ): Flow<R> = map(transform).flattenMerge(concurrency)
// 上游先發送全部的元素, 而後上游每一個元素會致使回調函數中的Flow發送全部元素一次
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
// 等同於RxJava的FlatMap
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>
// 串行收集數據
public fun <T> Flow<Flow<T>>.flattenMerge( concurrency: Int = DEFAULT_CONCURRENCY ): Flow<T>
// 併發收集數據
public inline fun <T, R> Flow<T>.flatMapLatest( @BuilderInference crossinline transform: suspend (value: T) -> Flow<R> ): Flow<R>
// 在每次 emit 新的數據之後,會取消先前的 collect
public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>
// 包含元素索引
複製代碼
public fun <T> Flow<T>.onStart( action: suspend FlowCollector<T>.() -> Unit ): Flow<T>
// 開始
public fun <T> Flow<T>.onCompletion( action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit ): Flow<T>
// 回調函數中的參數`cause`若是爲null表示正常完成沒有拋出異常, 反之則拋出異常非正常結束,
// 和catch函數同樣只能監聽到上游發生的異常, 可是沒法避免異常拋出只能在異常拋出以前執行回調函數
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
// 該函數只能捕獲上游異常, 若是異常處於函數調用之下則依然會被拋出
複製代碼
限制流發送
public fun <T> Flow<T>.take(count: Int): Flow<T>
// 只接受指定數量事件
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>
public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 丟棄指定數量事件
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
// 回調函數判斷是否丟棄或者接收, 只要丟棄或者接收後面就不會繼續發送事件(結束流)
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T>
public suspend fun <T> Flow<T>.single(): T
// 期待只有一個元素, 不然拋出`IllegalStateException`
public suspend fun <T: Any> Flow<T>.singleOrNull(): T?
// 不拋出異常, 但若是不是僅有元素則返回null
public suspend fun <T> Flow<T>.first(): T
// 若是不存在一個元素則會拋出`NoSuchElementException`
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
// 返回回調函數判斷爲true的第一個條件符合的元素
複製代碼
public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, // 重試次數 predicate: suspend (cause: Throwable) -> Boolean = { true } ): Flow<T>
public fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean ): Flow<T>
複製代碼
public inline fun <T, R> Flow<T>.transform( @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R>
// 轉換函數, 能夠在回調函數中發送新的元素
public fun <T, R> Flow<T>.transformLatest( @BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R>
複製代碼
scan和reduce的區別在於
reduce
是所有疊加計算完成後被收集scan
是每次疊加一次後收集一次數據類關係
SharedFlow
|- MutableSharedFlow
|- StateFlow
|- MutableStateFlow
SharedFlow屬於熱流數據, 既沒有收集(collect)狀況下也會發送, 而後在收集時進行重放(replay). 可使用shareIn
將冷流轉成熱流. 也能夠直接使用如下函數建立
public fun <T> MutableSharedFlow( replay: Int = 0, // 重放數量 extraBufferCapacity: Int = 0, // 緩存數量(不包含重放數量) onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow<T>
複製代碼
使用BufferOverflow
StateFlow能夠看作在Flow的基礎上加上了LiveData的特性. 可是沒法不存在生命週期跟隨, 一直均可以收集數據
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
// Backing property to avoid state updates from other classes
private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
// The UI collects from this StateFlow to get its state updates
val uiState: StateFlow<LatestNewsUiState> = _uiState
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Update View with the latest favorite news
// Writes to the value property of MutableStateFlow,
// adding a new element to the flow and updating all
// of its collectors
.collect { favoriteNews ->
_uiState.value = LatestNewsUiState.Success(favoriteNews)
}
}
}
}
複製代碼
示例
將flow從冷流轉換成熱流使用函數shareIn
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
)
複製代碼
SharingStarted:
Google發行的Jetpack庫中不少組件都附有KTX擴展依賴, 這種依賴主要是增長kotlin和協程支持
官方提供生命週期協程做用域的快速建立實現;
onDestory
取消協程引入ktx依賴庫
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc03"
複製代碼
當執行到某個生命週期時運行協程
fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job
fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job
fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job
suspend fun <T> Lifecycle.whenStateAtLeast( minState: Lifecycle.State, block: suspend CoroutineScope.() -> T )
複製代碼
這些函數都屬於Lifecycle
和LifecycleOwner
的擴展函數
依賴
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-rc03"
複製代碼
提供開發者使用的只有這兩個函數, 兩個函數功能同樣, 只是每一個參數接收時間單位不一致
fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit ): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeout: Duration, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit ): LiveData<T> = CoroutineLiveData(context, timeout.toMillis(), block)
複製代碼
timeout
: 若是liveData的沒有處於活躍的觀察者則在指定的時間內(單位毫秒)會取消其做用域[block]block
: 該做用域只在活躍狀態纔會觸發, 默認在Dispatchers.Main.immediate
調度器liveData做用域具有發射數據和LiveData的做用
interface LiveDataScope<T> {
/** * Set's the [LiveData]'s value to the given [value]; If you've called [emitSource] previously, * calling [emit] will remove that source. * * Note that this function suspends until the value is set on the [LiveData]; * * @param value The new value for the [LiveData] * * @see emitSource */
suspend fun emit(value: T)
/** * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]; Calling this * method will remove any source that was yielded before via [emitSource]; * * @param source The [LiveData] instance whose values will be dispatched from the current * [LiveData]; * * @see emit * @see MediatorLiveData.addSource * @see MediatorLiveData.removeSource */
suspend fun emitSource(source: LiveData<T>): DisposableHandle
/** * References the current value of the [LiveData]; * * If the block never `emit`ed a value, [latestValue] will be `null`; You can use this * value to check what was then latest value `emit`ed by your `block` before it got cancelled. * * Note that if the block called [emitSource], then `latestValue` will be last value * dispatched by the `source` [LiveData]; */
val latestValue: T?
}
複製代碼