Kotlin協程: Coroutine/Channel/Flow 以及實際應用

協程這個概念在1958年就開始出現, 目前某些語言開始原生支持(目前主流語言我感受只有Java徹底不支持協程). Java沒有原生協程可是能夠大型公司都本身或者使用第三方庫來支持協程編程, 可是Kotlin原生支持協程.java

Android領域的網絡請求庫通常由Rxjava實現, 包括我本身寫的網絡請求庫一樣也是採用的RxJava. 可是這些RxJava實現的網絡請求庫一樣很難方便的實現併發android

我認爲協程的核心就是一個詞: 做用域, 理解什麼是做用域就理解協程了git

什麼是協程:github

線程和協程的關係屬於一對多關係, 一個線程上容許存在多個協程, 即主線程你也能異步執行代碼. 可是讓某個線程執行太多協程效率過低下, 因此針對不一樣的場景建議使用調度器切換線程, 使用協程開始就不須要考慮線程的問題, 只須要在不一樣場景使用不一樣的調度器(調度器會對特定任務進行優化)就好, 協程英文名是Coroutine.數據庫

特性

使用場景編程

假設首頁存在七個接口網絡請求(後端人員處理差)的狀況一個個使用串行網絡請求的時間比並髮網絡請求慢了接近七倍.後端

目前計算機都是經過多核CPU提高計算能力, 因此熟練掌握併發編程是將來的趨勢緩存

協程優點安全

  1. 併發實現方便
  2. 沒有回調嵌套發生, 代碼結構清晰
  3. 建立協程性能開銷優於建立線程, 一個線程能夠運行多個協程, 單線程便可異步

實驗特性bash

協程在Kotlin1.3時候放出正式版本, 可是目前仍然存在不穩定的函數變更, 不過這個我認爲不影響項目中實際使用

@FlowPreview 表明可能之後存在Api函數變更

@ExperimentalCoroutinesApi  表明目前可能存在不穩定的因素的函數

@ObsoleteCoroutinesApi 可能存在被廢棄的可能
複製代碼

Kotlin的協程主要構成分爲三部分

  1. CoroutineScope 協程做用域: 每一個協程體都存在一個做用域, 異步仍是同步由該做用域決定
  2. Channel 通道: 數據如同一個通道進行發送和接收, 能夠在協程之間互相傳遞數據
  3. Flow 響應流: 相似RxJava等結構寫法

爲方便網絡請求和簡化異步做用域開啓可使用我實現的一個庫: Net

1.0+版本爲RxJava實現, 2.0+版本爲Coroutine實現

本文章後續會根據Kotlin的版本中的協程迭代進行更新

經常使用事件分發框架爲EventBus或者RxBus, 我以前使用RxJava的時候也寫了RxBus來使用, 使用協程後我又用協程實現一個: Channel

展望

協程對於後端高併發優點很大, 相信Spring的Kt版本後續會跟進

至於Google的Jetpack基本上都有針對協程擴展

咱們公司項目屬於 MVVM+Kotlin+Coroutine+JetPack, 最明顯的是併發網絡請求速度翻倍. 同時代碼更加結構清晰

建立協程

開啓主協程的三種方式

生命週期和App一致, 沒法取消(不存在Job), 不存在線程阻塞

fun main() {
    GlobalScope.launch { // 在後臺啓動一個新的協程並繼續
        delay(1000L)
        println("World!")
    }
    Thread.sleep(2000) // 防止JVM虛擬機退出
}
複製代碼

不存在線程阻塞, 能夠取消, 能夠經過CoroutineContext控制協程生命週期

fun main() {
    CoroutineScope(Dispatchers.IO).launch {
    }
    Thread.sleep(1000)
}
複製代碼

線程阻塞, 適用於單元測試, 不須要延遲阻塞防止JVM虛擬機退出. runBlocking屬於全局函數能夠在任意地方調用

通常咱們在項目中是不會使用runBlocking, 由於阻塞主線程沒有開啓的任何意義

fun main() = runBlocking { 
    // 阻塞線程直到協程做用域內部全部協程執行完畢
}
複製代碼

建立做用域

協程內部還可使用函數建立其餘協程做用域, 分爲兩種建立函數:

  1. CoroutineScope的擴展函數, 只有在做用域內部才能建立其餘的做用域
  2. suspend修飾的函數內部
  3. 協程永遠會等待其內部做用域內全部協程都執行完畢後纔會關閉協程

在主協程內還能夠建立子協程做用域, 建立函數分爲兩種

  1. 阻塞做用域(串行): 會阻塞當前做用域

  2. 掛起做用域(併發): 不會阻塞當前做用域

同步做用域函數

都屬於suspend函數

  • withContext: 能夠切換調度器, 有返回結果
  • coroutineScope: 建立一個協程做用域, 該做用域會阻塞當前所在做用域而且等待其子協程執行完纔會恢復, 有返回結果
  • supervisorScope: 使用SupervisorJob的coroutineScope, 異常不會取消父協程
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T
): T
// 返回結果. 能夠和當前協程的父協程存在交互關係, 主要做用爲來回切換調度器

public suspend inline operator fun <T> CoroutineDispatcher.invoke( noinline block: suspend CoroutineScope.() -> T
): T = withContext(this, block)
// withContext工具函數而已

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R

public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
複製代碼

異步做用域函數

這兩個函數都不屬於suspend, 只須要CoroutineScope就能夠調用

  • launch: 異步併發, 沒有返回結果
  • async: 異步併發, 有返回結果
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit
): Job


public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T>

複製代碼

併發

同一個協程做用域中的異步任務遵照順序原則開始執行. 適用於串行網絡請求, 在一個異步任務須要上個異步任務的結果時.

協程掛起須要時間, 因此異步協程永遠比同步代碼執行慢

fun main() = runBlocking<Unit> {
    launch {
        System.err.println("(Main.kt:34) 後執行")
    }

    System.err.println("(Main.kt:37) 先執行")
}
複製代碼

當在協程做用域中使用async函數時能夠建立併發任務

public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T>
複製代碼

示例

fun main() = runBlocking<Unit> {
    val name = async { getName() }
    val title = async { getTitle() }

    System.err.println("(Main.kt:35) result = ${name.await() + title.await()}")
    delay(2000)
}
複製代碼
  1. 返回對象Deferred. 經過函數await獲取結果值.
  2. Deferred集合還可使用awaitAll()等待所有完成.
  3. 不執行await任務也會等待執行完協程關閉
  4. 若是Deferred不執行await則async內部拋出的異常不會被logCat和tryCatch捕獲, 可是依然會致使做用域取消和異常崩潰. 但當執行await時異常信息會從新拋出.

惰性併發

將async函數中的start設置爲CoroutineStart.LAZY時則只有調用Deferred對象的await時纔會開始執行異步任務(或者執行start函數).

啓動模式

  1. DEFAULT 當即執行
  2. LAZY 直到Job執行start或者join纔開始執行
  3. ATOMIC 在做用域開始執行以前沒法取消
  4. UNDISPATCHED 不執行任何調度器, 直接在當前線程中執行, 可是會根據第一個掛起函數的調度器切換

異常

協程中發生異常, 則父協程取消而且父協程其餘的子協程一樣所有取消

Deferred

繼承自Job

提供一個全局函數用於建立CompletableDeferred對象, 該對象能夠實現自定義Deferred功能

示例

fun main() = runBlocking<Unit> {
    val deferred = CompletableDeferred<Int>()
    
    launch {
        delay(1000 )
        deferred.complete(23)
    }

    System.err.println("(Demo.kt:72) 結果 = ${deferred.await()}")
}
複製代碼

建立CompletableDeferred的全局函數

public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T>

public fun <T> CompletableDeferred(value: T): CompletableDeferred<T>
複製代碼

CompletableDeferred函數

public fun complete(value: T): Boolean
// 結果

public fun completeExceptionally(exception: Throwable): Boolean
// 拋出異常, 異常發生在`await()`時
複製代碼

CoroutineScope

建立此對象表示建立一個協程做用域

結構化併發

若是你看協程的教程可能會常常看到這個詞, 這就是做用域內部開啓新的協程. 父協程會限制子協程的生命週期, 子協程承接父協程的上下文, 這種層級關係就是結構化併發.

在一個協程做用域裏面開啓多個子協程進行併發行爲

CoroutineContext

協程上下文, 我認爲協程上下文能夠看作包含協程基本信息的一個Context(上下文). 其能夠決定協程的名稱或者運行

建立一個新的調度器

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher

fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
複製代碼

建立新的調度器比較消耗資源, 建議複用且當不須要的時候使用close函數釋放

調度器

Dispatchers繼承自CoroutineContext, 該枚舉擁有三個實現. 表示不一樣的線程調度. 當函數不使用調度器時承接當前做用域的調度器

  1. Dispatchers.Unconfined 不指定線程,

    若是子協程切換線程那麼接下來的代碼也運行在該線程上
    複製代碼
  2. Dispatchers.IO

    適用於IO讀寫
    複製代碼
  3. Dispatchers.Main

    根據平臺不一樣而有所差, Android上爲主線程
    複製代碼
  4. Dispatchers.Default 默認調度器

    在線程池中執行協程體, 適用於計算操做
    複製代碼

當即執行

Dispatchers.Main.immediate
複製代碼

immediate屬於全部調度器都有的屬性, 該屬性表明着若是當前正處於該調度器中不執行調度器切換直接執行, 能夠理解爲在同一調度器內屬於同步協程做用域

例如launch函數開啓做用域會比後續代碼執行順序低, 可是使用該屬性協程屬於順序執行

示例

CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
	// 執行順序 1
}

// 執行順序 2

CoroutineScope(Job() + Dispatchers.Main).launch {
		// 執行順序 4
}

// 執行順序 3
複製代碼

協程命名

經過建立一個CoroutineName對象, 在構造函數中指定參數爲協程名稱, CoroutineName繼承自CoroutineContext.

launch(CoroutineName("吳彥祖")){

}
複製代碼

協程上下文名稱用於方便調試使用

協程掛起

yield函數可讓當前協程暫時掛起執行其餘協程體, 若是沒有其餘正在併發的協程體則繼續執行當前協程體(至關於無效調用).

public suspend fun yield(): Unit
複製代碼

JOB

在協程中Job一般被稱爲做業, 表示一個協程工做任務, 他一樣繼承自CoroutineContext

val job = launch {

}
複製代碼

Job屬於接口

interface Job : CoroutineContext.Element
複製代碼

函數

public suspend fun join()
// 等待協程執行完畢都阻塞當前線程

public fun cancel(cause: CancellationException? = null)
// 取消協程

public suspend fun Job.cancelAndJoin()
// 阻塞而且在協程結束之後取消協程

public fun start(): Boolean

public val children: Sequence<Job>
// 所有子做業

public fun invokeOnCompletion( onCancelling: Boolean = false, // true則取消做用域不會執行 invokeImmediately: Boolean = true, //  handler: CompletionHandler): DisposableHandle

public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
// 當其做用域完成之後執行, 主協程指定纔有效, 直接給CoroutineScope指定時無效的
// 手動拋出CancellationException一樣會賦值給cause

public fun getCancellationException(): CancellationException

public val onJoin: SelectClause0
public val children: Sequence<Job>
// 後面說起的選擇器中使用
複製代碼

狀態

經過字段能夠獲取JOB當前處於狀態

public val isActive: Boolean

public val isCancelled: Boolean

public val isCompleted: Boolean
複製代碼

擴展函數

public fun Job.cancelChildren(cause: CancellationException? = null)

public suspend fun Job.cancelAndJoin()
複製代碼

每一個協程做用域都存在coroutineContext. 而協程上下文中都存在Job對象

coroutineContext[Job]
複製代碼

結束協程

若是協程做用域內存在計算任務(一直打日誌也算)則沒法被取消, 若是使用delay函數則能夠被取消.

fun main() = runBlocking<Unit> {

  val job = launch(Dispatchers.Default) {
    while (true){
      delay(100) // 這行代碼存在則能夠成功取消協程, 不存在則沒法取消
      System.err.println("(Main.kt:30) ")
    }
  }
  
  delay(500)
  job.cancel() 
  System.err.println("(Main.kt:42) 結束")
}
複製代碼

經過使用協程內部isActive屬性來判斷是否應該結束

fun main() = runBlocking<Unit> {

    val job = launch(Dispatchers.Default) {
        while (isActive) { // 一旦協程被取消則爲false
            System.err.println("(Main.kt:30) ")
        }
    }

    delay(500)
    job.cancel()
    System.err.println("(Main.kt:42) 結束")
}
複製代碼

釋放資源

協程存在被手動取消的狀況, 可是有些資源須要在協程取消的時候釋放資源, 這個操做能夠在finally中執行.

不管如何finally都會被執行

fun main() = runBlocking<Unit> {

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        try {
            repeat(1000){
                System.err.println("(Main.kt:31) it = $it")
                delay(500)
            }
        } finally {
           // 已被取消的協程沒法繼續掛起
        }
    }
    delay(1500)
    job.cancel()
    System.err.println("(Main.kt:42) ")
}
複製代碼

再次開啓協程

經過withContextNonCancellable能夠在已被取消的協程中繼續掛起協程. 這種用法其實能夠看作建立一個沒法取消的任務

withContext(NonCancellable) {
                println("job: I'm running finally")
                delay(1000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
複製代碼

上下文組合

協程做用域能夠接收多個CoroutineContext做爲上下文參數. CoroutineContext自己屬於接口, 不少上下文相關的類都實現與他.

配置多個CoroutineContext能夠經過+符號同時指定多個協程上下文, 每一個實現對象可能包含一部分信息能夠存在覆蓋行爲故相加時的順序存在覆蓋行爲.

val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) {
  delay(1000)
  System.err.println("(Main.kt:51) ${Thread.currentThread()}")
}
複製代碼
launch(Dispatchers.IO + CoroutineName("吳彥祖")){}
複製代碼

協程局部變量

使用ThreadLocal能夠獲取線程的局部變量, 可是要求使用擴展函數asContextElement轉爲協程上下文做爲參數傳入在建立協程的時候.

該局部變量做用於持有該協程上下文的協程做用域內

public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> =
    ThreadLocalElement(value, this)
複製代碼

超時

public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T
// 超過指定時間timeMillis自動結束協程. 
// 當沒有超時時返回值獲取而且繼續執行協程. 
// 當超時會拋出異常TimeoutCancellationException, 可是不會致使程序結束

public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T?
// 若是超時不會結束協程而是返回null
複製代碼

沒法手動拋出TimeoutCancellationException, 由於其構造函數私有

全局協程做用域

全局協程做用域屬於單例對象, 整個JVM虛擬機只有一份實例對象. 他的壽命週期也跟隨JVM. 使用全局協程做用域的時候注意避免內存泄漏

public object GlobalScope : CoroutineScope {
    /** * Returns [EmptyCoroutineContext]. */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
複製代碼

全局協程做用域不繼承父協程做用域的上下文, 因此也不會由於父協程被取消而自身被取消.

啓動模式

  • DEFAULT 當即執行協程體
  • ATOMIC 當即執行協程體,但在開始執行協程以前沒法取消協程
  • UNDISPATCHED 當即在當前線程執行協程體,第一個掛起函數執行在函數所在線程, 後面執行在函數指定線程
  • LAZY 手動執行startjoin纔會執行協程

協程取消

協程體若是已經執行實際上屬於不可取消的, 在協程體中經過isActive來判斷協程是否處於活躍中

經過取消函數的參數指定異常CancellationException能夠自定義異常對象

不可取消的協程做用域

NonCancellable該單例對象用於withContext函數建立一個沒法被取消的協程做用域

withContext(NonCancellable) {
  delay(2000)
}
複製代碼

示例

fun main() = runBlocking {
    launch {
        delay(1000)
        System.err.println("(Main.kt:19) ")
    }

    launch {
        withContext(NonCancellable) {
            delay(2000)
            System.err.println("(Main.kt:26) ")
        }
    }

    delay(500) // 防止launch還未開啓withContext就被取消
    cancel()
}
複製代碼

GlobalScope取消

GlobalScope屬於全局協程, 由他開啓的協程都不擁有Job, 因此沒法取消協程. 可是能夠經過給GlobalScope開啓的協程做用域指定Job而後就可使用Job取消協程.

協程異常

經過CoroutineExceptionHandler函數能夠建立一個同名的對象, 該接口繼承自CoroutineContext. 一樣經過制定上下文參數傳遞給全局協程做用域使用, 看成用域拋出異常時會被該對象的回調函數接收到, 而且不會拋出異常.

只要發生異常就會致使父協程和其全部子協程都被取消, 這種屬於雙向的異常取消機制, 後面提到的監督做業屬於發生異常只會取消全部子協程, 屬於單向.

CoroutineExceptionHandler異常處理器並不能阻止協程取消, 只是監聽到協程的異常信息避免JVM拋出異常退出程序而已

不要嘗試直接使用try/catch捕捉協程做用域的異常

try {
  launch {
    throw NullPointerException()
  }
} catch (e: Exception) {
  e.printStackTrace()
}
複製代碼

錯誤示例, 沒法捕捉到異常

協程取消異常

取消協程的做業(Job)會引起異常, 可是會被默認的異常處理器給忽略, 可是咱們能夠經過捕捉能夠看到異常信息

fun main() = runBlocking<Unit> {
    val job = GlobalScope.launch {

        try {
            delay(1000)
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

    job.cancel(CancellationException("自定義一個用於取消協程的異常"))

    delay(2000)
}
複製代碼

Job取消函數

public fun cancel(cause: CancellationException? = null)
複製代碼
  • cause: 參數不傳默認爲JobCancellationException

全局協程做用域的異常處理

CoroutineExceptionHandler沒法捕捉async|produce拋出的異常

val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
                                                  System.err.println("(Main.kt:41):main coroutineContext = $coroutineContext, throwable = $throwable")
                                                 }

GlobalScope.launch(exceptionHandler) { 

}
複製代碼

子協程沒法設置異常處理器, 即便設置了也會被父協程覆蓋而沒有意義. 除非使用異常處理器+Job對象, 可是第一個子協程launch容許設置

異常聚合和解包

全局協程做用域也存在嵌套子父級關係, 故異常可能也會依次拋出多個異常

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
				// 這裏的異常是第一個被拋出的異常對象
        println("捕捉的異常: $exception 和被嵌套的異常: ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE)
            } finally { // 當父協程被取消時其全部子協程都被取消, finally被取消以前或者完成任務以後必定會執行
                throw ArithmeticException() // 再次拋出異常, 異常被聚合
            }
        }
        launch {
            delay(100)
            throw IOException() // 這裏拋出異常將致使父協程被取消
        }
        delay(Long.MAX_VALUE)
    }
    job.join() // 避免GlobalScope做用域沒有執行完畢JVM虛擬機就退出
}
複製代碼

監督做業

通常狀況子協程發生異常會致使父協程被取消, 同時父協程發生異常會取消全部的子協程. 可是有時候子協程發生異常咱們並不但願父協程也被取消, 而是僅僅全部子協程取消, 這個使用就是用SupervisorJob做業.

建立監督做業對象

val job = SupervisorJob()

launch(job) {  }
複製代碼

監督做業在withContextasync中添加無效

直接建立 SupervisorJob() 對象傳入做用域中會致使該做用域和父協程生命週期不統一的問題, 即父協程取消之後該子協程依然處於活躍狀態.

該函數能夠解決我上面提到的生命週期不統一問題, 直接建立向下傳播異常的協程做用域

public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R
複製代碼
  • 該函數屬於阻塞
  • supervisorScope函數使用的依然是當前做用域的Job, 因此跟隨當前做用域生命週期, 能夠被取消.
  • 返回值
  • 該做用域內若是不trycatch異常否仍是會致使做用域被取消(在設置CoroutineExceptionHandler狀況下)

下面我寫個即便拋出異常也不會致使協程取消的

supervisorScope {
  try {
    throw NullPointerException()
  } catch (e: Exception) {
    e.printStackTrace()
  }
}
複製代碼

線程不安全

解決線程不安全問題

  1. 單協程上下文操做
  2. 互斥鎖
  3. 切換線程實現單線程
  4. Channel

互斥

至關於Java中的Lock替代品: Mutex

建立互斥對象

public fun Mutex(locked: Boolean = false): Mutex
// `locked`是否當即上鎖
複製代碼

使用擴展函數能夠自動加鎖和解鎖

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
// owner 鑰匙
複製代碼

Channel

經過在不一樣的協程內部使用Channel實例能夠數據通信. 通道能夠連續順序傳輸N個元素

多個協程容許發送和接收同一個通道數據

Channel屬於接口沒法直接建立, 咱們須要經過函數Channel()來建立其實現類

源碼

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel() // 無緩存
        UNLIMITED -> LinkedListChannel() // 無限制
        CONFLATED -> ConflatedChannel()  // 合併
        BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) // 64
        else -> ArrayChannel(capacity) // 指定緩存大小
    }
複製代碼
  • capacity

    緩衝大小, 默認0
    當Channel發送一條數據時就會掛起通道(不繼續執行發送後續代碼), 只有在接收這條數據時纔會解除掛起繼續執行. 可是咱們能夠設置緩存大小
    複製代碼

通道容許被遍歷獲取當前發送數據

val channel = Channel<Int>()

for (c in channel){

}
複製代碼
public suspend fun yield(): Unit
複製代碼

Channel

Channel接口同時實現發送渠道(SendChannel)和接收渠道(ReceiveChannel)兩個接口

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 
複製代碼

SendChannel

public val isClosedForSend: Boolean

public suspend fun send(element: E)
// 發送消息

public fun offer(element: E): Boolean

public fun close(cause: Throwable? = null): Boolean
// 關閉發送通道

public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)

public val onSend: SelectClause2<E, SendChannel<E>>
複製代碼
  • 發送通道關閉後不能繼續使用ReceiveChannel接收數據, 會致使ClosedReceiveChannelException拋出

ReceiveChannel

public val isClosedForReceive: Boolean
// SendChannel是否已經關閉通道, 若是關閉通道之後還存在緩存則會接收完緩存以後返回false

public val isEmpty: Boolean

public suspend fun receive(): E
// 接受當前被髮送的消息

public val onReceive: SelectClause1<E>
// 監聽事件發送
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
// 會拋出異常, 不推薦使用

public val onReceiveOrNull: SelectClause1<E?>
// Null表示通道已關閉

public suspend fun receiveOrClosed(): ValueOrClosed<E>
// `ValueOrClosed`對象能夠判斷通道是否已關閉

public fun poll(): E?

public fun cancel(cause: CancellationException? = null)
// 取消
複製代碼
  1. 通道的發送和接收都會致使做用域被阻塞, 可是發送消息能夠經過設置緩存讓他不阻塞, 或者取消通道可讓阻塞繼續
  2. 通道只容許在掛起函數中發送和接收, 可是建立通道不限制
  3. 若是不關閉通道, 其所在做用域不會被結束
  4. 通道執行close函數後不容許再發送或者接收數據, 不然拋出異常
  5. 通道的send|receive函數所在做用域被取消cancel不會致使通道結束(isClosedForReceive返回false)

BroadcastChannel

這個通道和通常的通道區別在於他的每一個數據能夠被每一個做用域所有接收到. 默認的通道一個數據被接收後其餘的協程是沒法再接收到數據的

廣播通道經過全局函數建立對象

public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>
複製代碼

自己廣播通道繼承自SendChannel, 只能發送數據, 經過函數能夠拿到接收通道

public fun openSubscription(): ReceiveChannel<E>
複製代碼

取消通道

public fun cancel(cause: CancellationException? = null)
複製代碼

將Channel轉成BroadcastChannel

fun <E> ReceiveChannel<E>.broadcast(
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E>
複製代碼

經過擴展函數在協程做用域中快速建立一個廣播發送通道

public fun <E> CoroutineScope.broadcast(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY,
    onCompletion: CompletionHandler? = null,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E> 
複製代碼

迭代通道

接收通道實現操做符重載可使用迭代

public operator fun iterator(): ChannelIterator<E>
複製代碼

示例

for (i in produce){
	// 收到每一個髮型的消息
}
複製代碼

當多個協程接收同一個渠道數據會依次輪流接收到數據, 渠道對於多個協程是公平的

Produce

上面介紹的屬於建立Channel對象來發送和接收數據, 可是還能夠經過擴展函數快速建立並返回一個具有發送數據的ReceiveChannel對象

public fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
複製代碼
  • context: 能夠經過協程上下文決定調度器等信息
  • capacity: 初始化通道空間

ProducerScope 該接口繼承自SendChannel以及CoroutineScope, 具有發送通道數據以及協程做用域做用.

當produce做用域執行完成會關閉通道, 前面已經說起關閉通道沒法繼續接收數據

等待取消

該函數會在通道被取消時回調其函數參數. 前面說起協程取消時能夠經過finally來釋放內存等操做, 可是通道取消沒法使用finally只能使用該函數.

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) 

// [SendChannel.close] or [ReceiveChannel.cancel] 表明取消通道
複製代碼

Actor

能夠經過actor函數建立一個具有渠道做用的協程做用域

public fun <E> CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, // todo: Maybe Channel.DEFAULT here? start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>
複製代碼
  • context: 協程上下文
  • capacity: 通道緩存空間
  • start: 協程啓動模式
  • onCompletion: 完成回調
  • block: 回調函數中能夠進行發送數據

該函數和produce函數類似,

  1. produce返回ReceiveChannel, 外部協程只能進行數據接收
  2. actor返回的SendChannel, 外部協程只能進行數據發送
  3. actor的回調函數擁有屬性channel:Channel, 既能夠發送數據又能夠接收數據, produce的屬性channel屬於SendChannel
  4. 不管是produce或者actor他們的通道都屬於Channel, 既能夠發送又能夠接收數據, 只須要類型強轉便可.
  5. 自己Channel能夠進行雙向數據通訊, 可是設計produce和actor屬於設計思想中的生產者和消費者模式
  6. 他們都屬於協程做用域和數據通道的結合

輪循器

不管是RxJava仍是協程都支持輪循器的功能, 在個人網絡請求庫中還賦予了輪循器暫停|繼續|多個觀察者|重置等功能

這裏的協程輪循器就比較簡陋

public fun ticker( delayMillis: Long, initialDelayMillis: Long = delayMillis, context: CoroutineContext = EmptyCoroutineContext, mode: TickerMode = TickerMode.FIXED_PERIOD ): ReceiveChannel<Unit>
複製代碼

該通道返回的數據是Unit

默認狀況下能夠理解爲通道會在指定間隔時間後一直髮送Unit數據

fun main() = runBlocking<Unit> {
  
    val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)

  // 每秒打印
    for (unit in tickerChannel) {
        System.err.println("unit = $unit")
    }
}
複製代碼

可是若是下游不是在發送數據之後當即接收數據, 而是延遲使用receive函數來接收通道數據

TickerMode該枚舉擁有兩個字段

  • FIXED_PERIOD 默認值, 動態調節通道發送數據的時間間隔, 時間間隔能夠看作是上游發送數據的
  • FIXED_DELAY 只有當接收數據後纔會開始計算間隔時間, 時間間隔能夠看作是下游接收數據的

這個輪循器不支持多訂閱|暫停|繼續|重置|完成, 可是個人Net庫中Interval對象已實現該功能.

Select

select閉包中能夠建立多個協程做用域或者通道, 且只會執行最快接收數據的通道或者結果.

通道

@InternalCoroutinesApi
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> {
    a.onReceiveOrNull {
        if (it == null) "Channel 'a' is closed"
        else "a -> '$it.valueOrNull'"
    }

    b.onReceiveOrNull {
        if (it == null) "Channel 'b' is closed"
        else "b -> '$it.valueOrNull'"
    }
}

@InternalCoroutinesApi
fun main() = runBlocking<Unit> {

    val a = produce<String> {
        repeat(4) { send("Hello $it") }
    }

    val b = produce<String> {
        repeat(4) { send("World $it") }
    }

    repeat(8) {
        // 打印最先的八個結果
        println(selectAorB(a, b))
    }

    coroutineContext.cancelChildren()
}
複製代碼

Flow

Flow不屬於掛起函數能夠在任意位置建立. 默認執行在當前協程上下文中.

Flow和RxJava很類似, 分爲上游和下游及中間操做符, 可是Flow內部屬於協程做用域, 其調度器依靠掛起函數切換異步.

JetBrains公司也認可Flow屬於參考Reactive Stream等框架的產物, 這樣我相信不少人就能理解Flow的存在乎義了. 這種上下游觀察者模式在ROOM官方數據庫中一樣支持.

Flow的操做符不如RxJava豐富, 可是Flow的開發時間還很短還未正式完成. 後面能夠跟進

建立Flow

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
複製代碼

示例

fun shoot() = flow {
    for (i in 1..3) {
        delay(1000) // 僞裝咱們在這裏作了一些有用的事情
        emit(i) // 發送下一個值
    }
}
複製代碼
  • 集合或者Sequence均可以經過asFlow函數轉成Flow對象

  • 也能夠像建立集合同樣經過fowOf直接建立Flow對象

  • Channel通道轉成Flow

    public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow
    複製代碼
  • 甚至掛起函數也能夠轉成Flow

    public fun <T> (suspend () -> T).asFlow(): Flow<T>
    複製代碼

Flow的完成和異常不須要經過發射器Emitter去發送, RxJava須要.

和RxJava同樣理解基本用法和思想以後就僅須要熟悉不一樣的操做符了. 基本的操做符每一個ReactiveStream框架都是雷同

下面介紹Flow的函數(操做符).

Collect

收集數據

Flow是冷數據, 要求調用函數collect收集數據時纔會進行數據的發射. 該系列函數也成爲末端操做符.

shoot().collect {
	System.err.println("(Demo.kt:9) it = $it")
}
複製代碼

函數

public suspend fun Flow<*>.collect() 

public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)

public suspend inline fun <T> Flow<T>.collectIndexed( crossinline action: suspend (index: Int, value: T) -> Unit
): Unit

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
// 將Flow運行在指定的協程做用域內

public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
複製代碼

Context

調度器

Flow默認使用的是其所在的當前線程或者協程上下文, Flow不容許在內部使用withContext來切換調度器, 而是應該使用flowOn函數

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
複製代碼

該函數改變的是Flow函數內部發射時的線程, 而在collect收集數據時會自動切回建立Flow時的線程.

public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T>

public inline fun <T> callbackFlow(@BuilderInference noinline block: suspend ProducerScope<T>.() -> Unit): Flow<T>
複製代碼

緩存

不須要等待收集執行就當即執行發射數據. 只是數據暫時被緩存而已. 提升性能.

默認切換調度器時會自動緩存

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>
複製代碼

合併函數, 這個函數實際上就是buffer

當下遊沒法及時處理上游的數據時會丟棄掉該數據

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
複製代碼

Zip

將多個事件合併後發送給下游

zip

將兩個Flow在回調函數中進行處理返回一個新的值 R

當兩個flow的長度不等時只發送最短長度的事件

public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
複製代碼

示例

val nums = (1..3).asFlow().onEach { delay(300) } // 發射數字 1..3,間隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒發射一次字符串
val startTime = System.currentTimeMillis() // 記錄開始的時間
nums.zip(strs) { a, b -> "$a -> $b" } // 使用「zip」組合單個字符串
    .collect { value -> // 收集並打印
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
複製代碼

combine

public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R>

public fun <T1, T2, R> Flow<T1>.combineTransform( flow: Flow<T2>, @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R>
複製代碼

Collection

Flow直接轉成集合函數

public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>

public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>

public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
複製代碼

Reduce

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

public suspend inline fun <T, R> Flow<T>.fold( initial: R, crossinline operation: suspend (acc: R, value: T) -> R
): R
// `acc`爲上次回調函數返回值, 第一次爲初始值, 等同於疊加效果. 該函數和reduce的區別就是支持初始值. reduce累計兩次元素纔會回調函數

public suspend fun <T> Flow<T>.single(): T
// 期待只有一個元素, 不然拋出`IllegalStateException`

public suspend fun <T: Any> Flow<T>.singleOrNull(): T?
// 不拋出異常, 但若是不是僅有元素則返回null

public suspend fun <T> Flow<T>.first(): T
// 若是不存在一個元素則會拋出`NoSuchElementException`

public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
// 返回回調函數判斷爲true的第一個條件符合的元素
複製代碼

Merge

public fun <T, R> Flow<T>.flatMapMerge( concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)
// 上游先發送全部的元素, 而後上游每一個元素會致使回調函數中的Flow發送全部元素一次

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
// 等同於RxJava的FlatMap

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>

public fun <T> Flow<Flow<T>>.flattenMerge( concurrency: Int = DEFAULT_CONCURRENCY ): Flow<T>

public fun <T, R> Flow<T>.transformLatest( @BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>

public inline fun <T, R> Flow<T>.flatMapLatest( @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R> 

public inline fun <T, R> Flow<T>.flatMapLatest( @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R>
複製代碼

Emitter

public inline fun <T, R> Flow<T>.transform( @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>
// 轉換函數, 能夠在回調函數中發送新的元素

public fun <T> Flow<T>.onStart( action: suspend FlowCollector<T>.() -> Unit
): Flow<T>
// 開始

public fun <T> Flow<T>.onCompletion( action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>
// 回調函數中的參數`cause`若是爲null表示正常完成沒有拋出異常, 反之則拋出異常非正常結束
// 和catch函數同樣只能監聽到上游發生的異常, 可是沒法避免異常拋出只能在異常拋出以前執行回調函數
複製代碼

Limit

限制流發送

public fun <T> Flow<T>.take(count: Int): Flow<T>
// 只接受指定數量事件

public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 丟棄指定數量事件

public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
// 回調函數判斷是否丟棄或者接收, 只要丟棄或者接收後面就不會繼續發送事件(結束流)
複製代碼

Error

捕捉異常

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
複製代碼

該函數只能捕獲上游異常, 若是異常處於函數調用之下則依然會被拋出

重試

public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, // 重試次數 predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T>

public fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T>
複製代碼

Transform

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>

public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T>

public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>

public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T>

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>

public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R>


public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>

public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
複製代碼

Android

Google發行的Jetpack庫中不少組件都附有KTX擴展依賴, 這種依賴主要是增長kotlin和協程支持

Lifecycle

官方提供生命週期協程做用域的快速建立實現.

  • 指定生命週期運行協程
  • 自動在onDestory取消協程

引入ktx依賴庫

implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc03"
複製代碼

當執行到某個生命週期時運行協程

fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job

suspend fun <T> Lifecycle.whenStateAtLeast(
    minState: Lifecycle.State,
    block: suspend CoroutineScope.() -> T
)
複製代碼

這些函數都屬於LifecycleLifecycleOwner的擴展函數

LiveData

依賴

implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-rc03"
複製代碼

提供開發者使用的只有這兩個函數, 兩個函數功能同樣, 只是每一個參數接收時間單位不一致

fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeout: Duration, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeout.toMillis(), block)
複製代碼
  • timeout: 若是liveData的沒有處於活躍的觀察者則在指定的時間內(單位毫秒)會取消其做用域[block]
  • block: 該做用域只在活躍狀態纔會觸發, 默認在Dispatchers.Main.immediate調度器

liveData做用域具有發射數據和LiveData的做用

interface LiveDataScope<T> {
    /** * Set's the [LiveData]'s value to the given [value]. If you've called [emitSource] previously, * calling [emit] will remove that source. * * Note that this function suspends until the value is set on the [LiveData]. * * @param value The new value for the [LiveData] * * @see emitSource */
    suspend fun emit(value: T)

    /** * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]. Calling this * method will remove any source that was yielded before via [emitSource]. * * @param source The [LiveData] instance whose values will be dispatched from the current * [LiveData]. * * @see emit * @see MediatorLiveData.addSource * @see MediatorLiveData.removeSource */
    suspend fun emitSource(source: LiveData<T>): DisposableHandle

    /** * References the current value of the [LiveData]. * * If the block never `emit`ed a value, [latestValue] will be `null`. You can use this * value to check what was then latest value `emit`ed by your `block` before it got cancelled. * * Note that if the block called [emitSource], then `latestValue` will be last value * dispatched by the `source` [LiveData]. */
    val latestValue: T?
}
複製代碼
  1. 若是emitSource在emit以前執行則無效
  2. 該做用域會在每次處於活躍狀態時都執行一遍, 若是將應用從後臺切換到前臺則會返回執行該做用域, 可是觀察者只會在活躍時才收到數據
相關文章
相關標籤/搜索