Kotlin協程嚮導

2019年5月5日發現官方有更新,我也更新一下。html

基於版本:java

<properties>
    <kotlin.version>1.3.30</kotlin.version>
</properties>
<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-core</artifactId>
    <version>1.2.1</version>
</dependency>

2018.02.23才寫完,竟然用了三個月,簡直差勁,並且還有不少機器翻譯的東西。我還會慢慢磨礪這篇文章。git

-------------------------------------------如下原文-------------------------------------------------------github

本文基於官方文檔翻譯所得,才疏學淺,請多指正。文章較長,但願我能堅持寫完,放在這裏也是對本身的一種鞭策。express

-------------------------------------------正片開始-------------------------------------------------------編程

基於例子的Kotlin協程嚮導

本文經過一系列例子簡單介紹了kotlinx.coroutines的核心功能小程序

介紹和設置

Kotlin做爲一種編程語言,只提供最小的底層API的標準庫來支持其餘庫使用協程。和其餘擁有相似功能(指協程)的語言不同,async 和 await 不是語言的關鍵字,甚至還不是標準庫的一部分。後端

kotlinx.coroutines 就是一個這樣功能豐富的庫,它包含了一些高級的關鍵字對協程的支持,包括async 和 await,你須要添加kotlinx-coroutines-core這個jar在你的項目中開啓對協程的支持。api

內容列表

一、協程基礎知識

本節涵蓋了協程的基本概念。安全

a、第一個協程

fun main(args: Array<String>) {
    GlobalScope.launch{ // 開啓一個協程
        delay(1000L) // 延遲一秒,非阻塞,和主線程並行的(默認時間單位是毫秒)
        println("World!") // 延遲以後打印
    }
    println("Hello,") // 主程序繼續執行,由於上面協程是不阻塞的,因此這裏會當即執行
    Thread.sleep(2000L) // 讓主程序休眠2秒,保持虛擬機,給協程執行完畢留出時間
}

執行結果以下:

Hello,
World!

實際上、協程是一種輕量的線程,經過launch關鍵字啓動。在這裏,咱們將在GlobalScope中啓動一個新的協同程序,這意味着新協程的生命週期僅受整個應用程序的生命週期的限制。

你能夠用Thread{...}替換GlobalScope.launch{...},用Thread.sleep{...}替換delay{...}來實現相同的功能。若是你直接替換GlobalScope.launch爲Thread,系統會報錯以下:

Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function

由於delay是一種特殊的暫停方法,它不會阻塞線程,可是會暫停協程,且只能在協程中使用。

b、橋接阻塞和非阻塞環境

第一個例子在主進程中混合了非阻塞的delay(...)和阻塞的Thread.sleep(...),可能讓人混亂。如今讓咱們用runBlocking協程生成器來明確阻塞。

fun main(args: Array<String>) { 
    GlobalScope.launch { //在後臺開啓一個新的協程,並執行
        delay(1000L)
        println("World!")
    }
    println("Hello,") // 主線程會在這裏當即執行
    runBlocking {     // 可是這個表達式會阻塞主線程
        delay(2000L)  // 延遲2秒,保證虛擬機可以等到子協程跑完
    } 
}

運行結果是相同的,不過這裏只使用了非阻塞方法delay

調用runblocking的主線程會被阻塞,直到runblocking內部的協程完成。

這個例子還有一種慣用的寫法,用runBlocking包裝執行主程序。

fun main(args: Array<String>) = runBlocking<Unit> { // 開啓主協程
    GlobalScope.launch { // 啓動一個新的協程
        delay(1000L)
        println("World!")
    }
    println("Hello,") // 主協程當即執行到這裏
    delay(2000L) // 延遲2秒,保證虛擬機可以等到子協程跑完
}

這裏runBlocking<Unit> { ... }做爲一種適配器,開啓一個頂級協程。咱們明確地指定了它的返回類型Unit,由於kotlin中一個格式良好的主函數必須返回Unit。

這是也能夠對暫停程序進行單元測試。

class MyTest {
    @Test
    fun testMySuspendingFunction() = runBlocking<Unit> {
        // 這裏你能夠用你喜歡的斷言方式,來測試暫停程序
    }
}

c、等待一個做業(Job)

經過延遲一小段時間來等待後臺的協程結束,並非一個好的方法。(指delay以保證虛擬機沒有死掉)。讓咱們以一種非阻塞的方式明確的等待一個後臺協程結束。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = GlobalScope.launch { // 開啓一個協程,並賦給它的一個引用
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    job.join() // 等待直到子協程結束
}

結果是同樣的,但主協程的代碼不以任何方式與後臺做業的持續時間相關聯,好多了!

d、結構化併發

實際使用協程仍然須要一些東西,當咱們使用 GlobalScope.launch時, 咱們建立了一個頂級的協程。即便它是輕量的,可是仍然會消耗內存資源。

d、方法重構

讓咱們提取launch{...}裏的代碼塊放進一個單獨的方法裏,這時候你須要一個使用suspend修飾的新方法,這是你的第一個暫停方法。暫停方法能夠和其餘方法同樣在協程中使用,可是不一樣之處在於,暫停方法能夠調用其餘的暫停方法,好比下面例子中的delay,來暫停一個協程。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch { doWorld() }
    println("Hello,")
    job.join()
}

// 這是你的第一個暫停方法
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

e、協程是輕量的

執行下面的代碼:

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = List(100_000) { // 開啓大量協程,並對她們賦值到每個Job中
        launch {
            delay(1000L)
            print(".")
        }
    }
    jobs.forEach { it.join() } //等待全部的Job結束
}

它會啓動10萬個協程,一秒後每個協程都打印一個點。你能夠嘗試用Thread來重寫這段代碼,會發生什麼呢?(極可能會觸發內存溢出異常,固然和你的電腦配置有關,反正個人是崩了=。=)

f、協程很像後臺線程

下面的代碼會啓動一段長時間運行的協程,每隔兩秒打印一句「我在睡覺」,而後在一段延遲以後從主程序退出。

fun main(args: Array<String>) = runBlocking<Unit> {
    launch {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // 延遲以後退出
}

運行以後能夠看到,結果是執行了三行打印就退出了。

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...

因此說,激活的協程,不會保持進程存活,她們更像後臺線程。

二、取消和超時

這一節講協程的取消和超時。

a、取消協程執行

在一個小程序裏,在main方法裏return看上去是一種隱式關閉協程的好方法。在一個大的,長期執行的程序裏,你須要更細粒度的控制。launch方法返回一個Job能夠控制取消正在執行的協程:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // 延遲一下
    println("main: I'm tired of waiting!")
    job.cancel() // 取消Job
    job.join() // 等待Job完成
    println("main: Now I can quit.")
}

輸出爲:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

當主協程執行cancel,咱們再也不看到子協程的打印,由於它被取消了。還有一個方法cancelAndJoin方法,包含了cancel和join這兩個操做。

b、取消是須要配合的

協程的取消是須要配合的,協程代碼必須配合才能被取消!kotlinx.coroutines中的全部暫停函數都是能夠取消的。她們檢查協程的取消操做,並在取消的時候拋出CancellationException異常。可是,若是一個協程正在執行計算工做,而且沒有檢查取消,那麼它不能被取消,以下面的例子所示:

fun main(args: Array<String>) = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val job = launch {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) { // 循環計算,只是浪費CPU
            // 每秒打印2次信息
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

運行它,你會發現即便在取消以後它仍繼續打印「我正在睡覺」,直到五次迭代完成後才自行完成。

c、讓計算代碼可取消

有兩種方法讓計算代碼可取消。第一個是按期調用一個檢查取消的掛起方法。有一個yield函數是一個很好的選擇。另外一個是明確檢查取消狀態。讓咱們嘗試後面這種方法。

用isActive替換前面的例子中的 i < 5 並從新運行。

fun main(args: Array<String>) = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val job = launch {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // cancellable computation loop
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

正如你所看到的,如今這個循環被取消了。 isActive是一個CoroutineScope對象在協程代碼中的屬性。

d、在finally中關閉資源

可取消的暫停函數在取消時拋出CancellationException,能夠利用這一點,在取消協程的時候,處理一些資源問題。例如,try{...}finally {...},Kotlin的use函數,當協程被取消時,會正常執行它們的finally操做。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("I'm running finally")
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

無論是cancel仍是cancelAndJoin都會等待finally裏的代碼執行完畢。因此上面的例子運行結果是:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running finally
main: Now I can quit.

--果果--

e、運行不可取消代碼

任何試圖在前面的例子的finally塊中使用暫停函數的操做都會致使CancellationException,由於運行這個代碼的協程被取消。一般,這不是問題,由於全部關閉操做(關閉文件,取消做業或關閉任何類型的通訊通道)一般都是非阻塞的,不涉及任何掛起功能。可是,在極少數狀況下,當您須要在取消的協程中暫停時,您可使用run函數和NonCancellable上下文來運行相應的代碼(NonCancellable){...},以下例所示:

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            run(NonCancellable) {
                println("I'm running finally")
                delay(1000L)
                println("And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

f、超時

在實踐中取消協同執行的最顯著的緣由是超時。雖然您能夠對相應做業的引用手動跟蹤,並啓動一個單獨的協程在延遲以後取消所跟蹤的協程,可是可使用Timeout功能來執行此操做。看下面的例子:

fun main(args: Array<String>) = runBlocking<Unit> {
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
}

它產生如下輸出:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS

withTimeout引起的TimeoutCancellationException是CancellationException的子類,咱們以前沒有看到它在控制檯上打印的堆棧跟蹤。這是由於在協程的取消中,CancellationException被認爲是協程完成的正常緣由。然而,在這個例子中,咱們已經在主函數內部使用了Timeout。

因爲取消只是一個例外,全部的資源將以一般的方式關閉。你能夠在try {...} catch(e:TimeoutCancellationException){...}中使用timeout來封裝代碼,若是你須要在任何類型的超時

內作一些額外的操做,或者使用與withTimeout相似的TimeoutOrNull函數,但在超時時返回null,而不是拋出異常:

fun main(args: Array<String>) = runBlocking<Unit> {
    val result = withTimeoutOrNull(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
        "Done" // 在產生這個結果以前會被取消
    }
    println("Result is $result")
}

運行此代碼時不會觸發異常:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null

三、編寫掛起程序

這一節介紹各類編寫掛起程序的方法

a、順序執行

假設咱們在別處定義了兩個掛起函數,它們執行某種相似遠程服務調用或計算的有用操做。咱們只是僞裝他們是有用的,但實際上每一個只是爲了這個例子的目的而拖延一秒鐘:

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

若是須要依次調用它們,咱們該怎麼辦?首先執行doSomethingUsefulOne,而後執行doSomethingUsefulTwo,而後計算他們結果的總和?在實踐中,若是咱們使用第一個函數的結果來決定是否須要調用第二個函數或決定如何調用它,那麼咱們會這樣作。咱們只是使用正常的順序調用,由於協程中的代碼與常規代碼同樣,默認狀況下是連續的。如下示例經過測量執行兩個掛起功能所需的總時間來演示它:

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println("The answer is ${one + two}")
    }
    println("Completed in $time ms")
}

結果像這樣:

The answer is 42
Completed in 2017 ms

b、異步併發

若是doSomethingUsefulOne和doSomethingUsefulTwo的調用之間沒有依賴關係,而且咱們但願經過同時執行這兩個方法來更快地獲得答案?使用async

從概念上講,async就像launch。它啓動一個單獨的協程,它是一個與全部其餘協程同時工做的輕量級線程。不一樣之處在於啓動會返回一個Job,而不會帶來任何結果值,而異步返回Deferred  - 一個輕量級的非阻塞對象,表示稍後提供結果。你可使用延遲值的.await()來獲得它的最終結果,可是Deferred也是一個Job,因此你能夠根據須要取消它。

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

運行結果以下:

The answer is 42
Completed in 1017 ms

這是兩倍的速度,由於咱們同時執行兩個協程。請注意,協程的併發老是顯式的。

c、懶啓動異步

有一個惰性選項,使用可選的啓動參數啓動異步,值爲CoroutineStart.LAZY。它只有在須要某個結果的時候才啓動協程,或者啓動了一個start()或者await()函數。對應的例子以下:

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

結果以下:

The answer is 42
Completed in 2017 ms

因此,咱們又回到順序執行,由於咱們先開始await()一個,而後開始await()第二個。這不是懶加載的預期用例。在計算值涉及暫停功能的狀況下,它被設計爲替代標準的懶加載函數。

d、異步風格函數

咱們能夠定義使用異步協程生成器異步調用doSomethingUsefulOne和doSomethingUsefulTwo的異步風格函數。使用「Async」後綴或「async」前綴來命名這些函數是一種很好的風格,以突出顯示這樣的事實,即它們只啓動異步計算,而且須要使用獲得的延遲值來得到結果。

// The result type of asyncSomethingUsefulOne is Deferred<Int>
fun asyncSomethingUsefulOne() = async {
    doSomethingUsefulOne()
}

// The result type of asyncSomethingUsefulTwo is Deferred<Int>
fun asyncSomethingUsefulTwo() = async {
    doSomethingUsefulTwo()
}

請注意,這些asyncXXX函數不是暫停函數。他們能夠從任何地方使用。然而,它們的使用老是意味着它們的行爲與調用代碼的異步(這裏意味着併發)執行。

如下示例顯示了它們在協程以外的用法:

//注意,在這個例子中咱們沒有在`main`的右邊有'runBlocking`
fun main(args: Array<String>) {
    val time = measureTimeMillis {
        //咱們能夠在協程以外啓動異步操做
        val one = asyncSomethingUsefulOne()
        val two = asyncSomethingUsefulTwo()
        //但等待結果必須涉及暫停或掛起。
        //這裏咱們使用`runBlocking {...}`在等待結果的同時阻塞主線程
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}

四、協程的上下文和調度

協程老是在一些上下文中執行,它由Kotlin標準庫中定義的CoroutineContext類型的值表示。

協程的上下文是一組元素。主要元素是咱們之前見過的協程的Job,以及本節中介紹的調度程序

a、調度和線程

協程上下文包括協程調度程序(參見CoroutineDispatcher),該協程肯定相應協程執行的一個線程或多個線程。協程調度程序能夠將協程執行限制在一個特定的線程中,調度它到一個線程池中,或者讓它無限制的運行。

全部協程構建器,如launchasync 接受一個可選的CoroutineContext參數,可用於顯式指定調度程序的新協程和其餘上下文元素

試試下面的例子:

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { //無限制 和主線程一塊兒運行
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { //父級的上下文,runBlocking協程
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(CommonPool) { //將被分派到ForkJoinPool.commonPool(或等同的)
        println("      'CommonPool': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(newSingleThreadContext("MyOwnThread")) { //將得到本身的新線程
        println("          'newSTC': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

它會產生如下輸出(可能順序不一樣):

'Unconfined': I'm working in thread main
      'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
          'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main

咱們在前面部分中使用的默認調度程序由DefaultDispatcher表示,這與當前實現中的CommonPool相同。所以,launch {...}與launch(DefaultDispatcher){...}以及launch(CommonPool){...}相同。

父coroutineContext和Unconfined上下文之間的區別將在稍後顯示。

請注意,newSingleThreadContext建立一個新的線程,這是很耗費資源的。在真正的應用程序中,它必須被釋放,再也不須要時,使用close函數,或者存儲在頂層變量中,並在整個應用程序中重用。

b、無限制 vs 限制調度器

Unconfined協程調度程序在調用者線程中啓動協程,但僅在第一個暫停點以前。暫停後,它將在被調用的暫停功能徹底肯定的線程中恢復。協程不消耗CPU時間,也不更新限於特定線程的任何共享數據(如UI)時,無限制的分派器是合適的。

另外一方面,經過CoroutineScope接口在任何協程的塊內可用的coroutineContext屬性是對此特定協程的上下文的引用。這樣,能夠繼承父上下文。 runBlocking協同程序的默認調度程序特別限於調用程序線程,所以繼承它的做用是經過可預測的FIFO調度將執行限制在該線程中。

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // not confined -- will work with main thread
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("      'Unconfined': After delay in thread ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

執行結果:

'Unconfined': I'm working in thread main
'coroutineContext': I'm working in thread main
      'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
'coroutineContext': After delay in thread main

所以,繼承了runBlocking {...}的coroutineContext的協程在主線程中繼續執行,而非限制的協程在延遲函數使用的默認執行程序線程中恢復。

c、調試協程和線程

協程能夠在一個線程上掛起,並在帶有Unconfined dispatcher的另外一個線程上或使用默認的多線程調度程序恢復。即便使用單線程調度程序,也很難弄清楚協程在什麼地方作什麼,在什麼地方,何時作什麼。使用線程調試應用程序的經常使用方法是在每一個日誌語句的日誌文件中打印線程名稱。日誌框架一般支持此功能。在使用協程時,單獨的線程名稱不會提供不少上下文,所以kotlinx.coroutines包含調試工具以使其更容易

使用-Dkotlinx.coroutines.debug JVM選項運行如下代碼:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking<Unit> {
    val a = async(coroutineContext) {
        log("I'm computing a piece of the answer")
        6
    }
    val b = async(coroutineContext) {
        log("I'm computing another piece of the answer")
        7
    }
    log("The answer is ${a.await() * b.await()}")
}

這裏有三個協程,主協程(#1),runBlocking的這個,還有兩個計算延遲值a(#2)和b(#3)的協程,它們都在runBlocking的上下文中執行,而且被限制在主線程中。執行結果以下:

[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

log函數在方括號中打印線程的名稱,您能夠看到它是主線程,後面跟着當前正在執行的協程的標識符。打開調試模式時,此標識符將連續分配給全部建立的協程。

你能夠在newCoroutineContext函數的文檔中閱讀更多關於調試工具的信息。

d、協程間跳轉

使用-Dkotlinx.coroutines.debug JVM選項運行如下代碼:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) {
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                run(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")
            }
        }
    }
}

它演示了幾種新技術。一個是使用帶有明確指定的上下文的runBlocking,另外一個是使用run函數來改變協程的上下文,而仍然保持在同一個協程中,你能夠在下面的輸出中看到:

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

請注意,該示例也使用Kotlin標準庫中的use函數來釋放在再也不須要的狀況下使用newSingleThreadContext建立的線程。

e、上下文中的做業(Job)

協程的工做(Job)是其上下文的一部分。協程可使用coroutineContext [Job]表達式從它本身的上下文中檢索它:

fun main(args: Array<String>) = runBlocking<Unit> {
    println("My job is ${coroutineContext[Job]}")
}

它在調試模式下運行時會產生如下相似的結果:

My job is "coroutine#1":BlockingCoroutine{Active}@6d311334

所以,在 CoroutineScope中的isActive只是一個coroutineContext [Job] !! isActive的快捷方式。

f、協程的子類

當協程的coroutineContext被用來啓動另外一個協程時,新協程的Job就成爲了父協程的子Job。當父協程被取消時,它的全部子協程也被遞歸地取消

fun main(args: Array<String>) = runBlocking<Unit> {
    // launch a coroutine to process some kind of incoming request
    val request = launch {
        // it spawns two other jobs, one with its separate context
        val job1 = launch {
            println("job1: I have my own context and execute independently!")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        // and the other inherits the parent context
        val job2 = launch(coroutineContext) {
            delay(100)
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
        // request completes when both its sub-jobs complete:
        job1.join()
        job2.join()
    }
    delay(500)
    request.cancel() // cancel processing of the request
    delay(1000) // delay a second to see what happens
    println("main: Who has survived request cancellation?")
}

輸出以下:

job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?

g、組合上下文

協程的上下文可使用+運算符來組合。右側的上下文替換左側上下文的相關條目。例如,父協程的Job能夠被繼承,而調度器被替換:

fun main(args: Array<String>) = runBlocking<Unit> {
    // start a coroutine to process some kind of incoming request
    val request = launch(coroutineContext) { // use the context of `runBlocking`
        // spawns CPU-intensive child job in CommonPool !!! 
        val job = launch(coroutineContext + CommonPool) {
            println("job: I am a child of the request coroutine, but with a different dispatcher")
            delay(1000)
            println("job: I will not execute this line if my parent request is cancelled")
        }
        job.join() // request completes when its sub-job completes
    }
    delay(500)
    request.cancel() // cancel processing of the request
    delay(1000) // delay a second to see what happens
    println("main: Who has survived request cancellation?")
}

這個代碼的預期結果是:

job: I am a child of the request coroutine, but with a different dispatcher
main: Who has survived request cancellation?

h、父級責任

父協程老是等待全部的子協程完成。 父協程沒必要顯式地跟蹤它啓動的全部子節點,而且沒必要使用Job.join等待它們到最後:

fun main(args: Array<String>) = runBlocking<Unit> {
    // launch a coroutine to process some kind of incoming request
    val request = launch {
        repeat(3) { i -> // launch a few children jobs
            launch(coroutineContext)  {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
                println("Coroutine $i is done")
            }
        }
        println("request: I'm done and I don't explicitly join my children that are still active")
    }
    request.join() // wait for completion of the request, including all its children
    println("Now processing of the request is complete")
}

結果以下:

request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete

i、命名協程進行調試

當你只關心來自同一個協程的日誌記錄,自動分配的ID是很好的。可是,當協程與特定請求的處理或執行一些特定的後臺任務相關聯時,爲了調試目的,最好明確地命名它。 CoroutineName上下文元素提供與線程名稱相同的功能。在打開調試模式時,它將顯示在執行此協程的線程名稱中。

下面的例子演示了這個概念:

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
    log("Started main coroutine")
    // run two background value computations
    val v1 = async(CoroutineName("v1coroutine")) {
        delay(500)
        log("Computing v1")
        252
    }
    val v2 = async(CoroutineName("v2coroutine")) {
        delay(1000)
        log("Computing v2")
        6
    }
    log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}

它使用-Dkotlinx.coroutines.debug JVM選項生成的輸出相似於:

[main @main#1] Started main coroutine
[ForkJoinPool.commonPool-worker-1 @v1coroutine#2] Computing v1
[ForkJoinPool.commonPool-worker-2 @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42

j、取消明確的Job

讓咱們把關於上下文,子協程和Job的知識放在一塊兒。假設咱們的應用程序有一個生命週期對象,可是這個對象不是一個協程。例如,咱們正在編寫一個Android應用程序,並在Android活動的上下文中啓動各類協同程序,以執行異步操做以獲取和更新數據,執行動畫等。全部這些協程必須在活動被銷燬時被取消,以免內存泄漏。

咱們能夠經過建立與咱們活動的生命週期相關的Job實例來管理協同程序的生命週期。Job實例是使用Job()工廠函數建立的,如如下示例所示。爲了方便起見,咱們能夠編寫launch(coroutineContext,parent = job)(貌似在最新的1.2版本上,這個方法有更新,這樣用會報錯,譯者注),而不是使用launch(coroutineContext+job)表達式來明確父Job正在被使用。

如今,一個Job.cancel的調用取消了咱們啓動的全部子項。並且,Job.join等待全部這些完成,因此咱們也能夠在這個例子中使用cancelAndJoin 。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = Job() // create a job object to manage our lifecycle
    // now launch ten coroutines for a demo, each working for a different time
    val coroutines = List(10) { i ->
        // they are all children of our job object
        launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
            delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
            println("Coroutine $i is done")
        }
    }
    println("Launched ${coroutines.size} coroutines")
    delay(500L) // delay for half a second
    println("Cancelling the job!")
    job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
}

這個例子的輸出是:

Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Cancelling the job!

正如你所看到的,只有前三個協程已經打印了一條消息,而其餘的則被一個job.cancelAndJoin()調用取消了。因此咱們在咱們假設的Android應用程序中須要作的是在建立活動時建立父做業對象,將其用於子協程,並在活動被銷燬時將其取消。咱們不能在Android生命週期的狀況下加入它們,由於它是同步的,可是當構建後端服務以確保有限的資源使用時,這種加入能力是有用的 。

五、通道

延遲值提供了在協程之間傳遞單個值的簡便方法。通道提供了一種方式來傳遞數據流。

a、通道基礎

通道在概念上與阻塞隊列很是類似。一個關鍵的區別是,用send替換了隊列的put,用receive替換了隊列的take。

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

結果以下:

1
4
9
16
25
Done!

b、關閉和遍歷通道

不像一個隊列,一個通道能夠關閉,表示沒有更多的元素來了。在接收端,使用常規的for循環來接收來自通道的元素是很方便的。

從概念上講, close就像發送一個特殊的令牌給該通道。一旦接收到這個關閉標記,迭代就會中止,這樣就保證了在關閉以前能收到全部先前發送的元素。

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
}

c、建立通道生產者

協程產生一系列元素的模式是經常使用的,這是一般在併發代碼中的生產者 - 消費者模式的一部分。你能夠把這樣一個生產者抽象成一個以通道爲參數的函數,可是必須從函數返回結果,這與常識是相反的。

有一個命名爲produce的便利的協程生成器,能夠很容易地在生產者端作到這一點,而且有一個擴展方法consumeEach,能夠取代消費者端的for循環:

fun produceSquares() = produce<Int> {
    for (x in 1..5) send(x * x)
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

d、管道

管道是協程的一種生成模式,多是無線的數據流:

fun produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

另外一個協程或多個協程則消費這個數據流,作一些處理,併產生一些其餘的結果。在下面的例子中,計算了數字的平方:

fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
    for (x in numbers) send(x * x)
}

主程序啓動並鏈接整個管道:

fun main(args: Array<String>) = runBlocking<Unit> {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    for (i in 1..5) println(squares.receive()) // print first five
    println("Done!") // we are done
    squares.cancel() // need to cancel these coroutines in a larger app
    numbers.cancel()
}

本例中咱們不須要cancel這些協程,由於前文提到的《協程像是一種守護線程》,可是在一個更大的應用程序中,若是咱們再也不須要它,咱們須要中止咱們的管道。或者,咱們能夠將管道協程做爲《主協程的子程序》運行,如如下示例所示:

e、素數與管道(?)

讓咱們經過一個使用一系列協程來生成素數的例子,將管道推向極致。咱們從無限的數字序列開始。此次咱們引入一個明確的上下文參數並傳遞給生成器,以便調用者能夠控制咱們的協程運行的位置

fun numbersFrom(context: CoroutineContext, start: Int) = produce<Int>(context) {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

如下管道階段過濾輸入的數字流,移除可由給定質數整除的全部數字:

fun filter(context: CoroutineContext, numbers: ReceiveChannel<Int>, prime: Int) = produce<Int>(context) { for (x in numbers) if (x % prime != 0) send(x) }

如今咱們經過從2開始編號的流來創建咱們的管道,從當前通道取一個素數,而且爲每一個找到的素數啓動新的管道階段

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

如下示例打印前十個素數,在主線程的上下文中運行整個管道。因爲全部的協程都是在其coroutineContext中做爲主要runBlocking協程的子進程啓動的,因此咱們沒必要保留咱們開始的全部協程的明確列表。咱們使用cancelChildren擴展函數來取消全部的子協同程序:

fun main(args: Array<String>) = runBlocking<Unit> { var cur = numbersFrom(coroutineContext, 2) for (i in 1..10) { val prime = cur.receive() println(prime) cur = filter(coroutineContext, cur, prime) } coroutineContext.cancelChildren() // cancel all children to let main finish }

輸出以下:

2
3
5
7
11
13
17
19
23
29

請注意,您可使用標準庫中的buildIterator協同構建器來構建相同的管道。用 buildIterator 替換 produce ,用yield替換send,用next代替reveive,使用Iterator的ReceiveChannel,擺脫上下文。你也不須要runBlocking。可是,如上所示使用通道的管道的好處是,若是在CommonPool上下文中運行它,它實際上可使用多個CPU內核。

不管如何,這是一個很是不切實際的方式來找到素數。實際上,管道確實涉及到一些其餘暫停調用(如異步調用遠程服務),而且這些管道沒法使用buildSeqeunce / buildIterator進行構建,由於它們不容許任意暫停,與異步的 produce 是不一樣的。

f、 扇出(?)

多個協程能夠從同一個通道接收,在他們之間分配工做。讓咱們從一個按期生成整數的生產者協程開始(每秒十個數字):

fun produceNumbers() = produce<Int> { var x = 1 // start from 1 while (true) { send(x++) // produce next delay(100) // wait 0.1s } }

那麼咱們能夠有幾個處理器協同程序。在這個例子中,他們只是打印他們的ID和收到的號碼:

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { channel.consumeEach { println("Processor #$id received $it") }
}

如今讓咱們啓動五個處理程序,讓他們執行一秒鐘。看看結果:

fun main(args: Array<String>) = runBlocking<Unit> { val producer = produceNumbers() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() // cancel producer coroutine and thus kill them all }

輸出將相似於下面,儘管接收每一個特定整數的處理器ID可能不一樣:

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

請注意,取消生產者協同程序會關閉其通道,從而最終終止對處理器協程正在進行的通道的迭代。

g、扇入

多個協程可能會發送到同一個通道。例如,咱們有一個字符串的通道,以及一個暫停方法,該方法以指定的延遲向該通道重複發送指定的字符串:

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { while (true) { delay(time) channel.send(s) } }

如今,讓咱們看看若是咱們啓動一些發送字符串的協程,會發生什麼(在這個例子中,咱們將它們做爲主協程的子節點在主線程的上下文中啓動):

fun main(args: Array<String>) = runBlocking<Unit> { val channel = Channel<String>() launch(coroutineContext) { sendString(channel, "foo", 200L) } launch(coroutineContext) { sendString(channel, "BAR!", 500L) } repeat(6) { // receive first six println(channel.receive()) } coroutineContext.cancelChildren() // cancel all children to let main finish }

輸出以下:

foo
foo
BAR!
foo
foo
BAR!

h、緩衝頻道

目前的頻道沒有緩衝區。當發送者和接收者彼此相遇(又名會合)時,無緩衝的信道傳送元素。若是發送先被調用,那麼它被掛起直到接收被調用,若是接收被首先調用,它被掛起直到發送被調用。

Channel()工廠函數和產生構建器都使用一個可選的容量參數來指定緩衝區大小。緩衝區容許發送者在掛起以前發送多個元素,相似於具備指定容量的阻塞隊列,當緩衝區已滿時阻塞隊列被阻塞。

看看下面的代碼的行爲:

fun main(args: Array<String>) = runBlocking<Unit> { val channel = Channel<Int>(4) // create buffered channel val sender = launch(coroutineContext) { // launch sender coroutine repeat(10) { println("Sending $it") // print before sending each element channel.send(it) // will suspend when buffer is full } } // don't receive anything... just wait.... delay(1000) sender.cancel() // cancel sender coroutine }

它使用四個容量的緩衝通道打印「發送」五次:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

前四個元素被添加到緩衝區,而且發送者在嘗試發送第五個元素時掛起。

i、平等的頻道

對頻道的發送和接收操做對於從多個協同程序調用的順序是公平的。它們以先入先出的順序被服務,例如,調用接收的第一個協程獲取元素。在如下示例中,兩個協程「ping」和「pong」正從共享的「表格」通道接收「球」對象:

data class Ball(var hits: Int)

fun main(args: Array<String>) = runBlocking<Unit> { val table = Channel<Ball>() // a shared table launch(coroutineContext) { player("ping", table) } launch(coroutineContext) { player("pong", table) } table.send(Ball(0)) // serve the ball delay(1000) // delay 1 second coroutineContext.cancelChildren() // game over, cancel them }

suspend fun player(name: String, table: Channel<Ball>) { for (ball in table) { // receive the ball in a loop ball.hits++ println("$name $ball") delay(300) // wait a bit table.send(ball) // send the ball back } }

「ping」協程首先啓動,因此它是第一個接收球。即便「ping」協同程序在將球發回桌面後當即開始接收球,球被「pong」協程接收,由於它已經在等待它了:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

請注意,因爲正在使用的執行者的性質,有時渠道可能會產生看起來不公平的處決。this issue查看細節:

六、共享可變狀態和併發

協同程序可使用多線程調度程序(如CommonPool)同時執行。它提出了全部常見的併發問題。主要的問題是同步訪問共享可變狀態。在協程的領域中,這個問題的一些解決方案與多線程世界中的解決方案相似,可是其餘解決方案是獨特的:

a、問題

讓咱們啓動一千次協程,所有作一樣的動做千次(共執行一百萬次)。咱們還會測量他們的完成時間,以便進一步比較:

suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) { val n = 1000 // number of coroutines to launch val k = 1000 // times an action is repeated by each coroutine val time = measureTimeMillis { val jobs = List(n) { launch(context) { repeat(k) { action() } } } jobs.forEach { it.join() } } println("Completed ${n * k} actions in $time ms")
}

咱們從一個很是簡單的動做開始,使用多線程的CommonPool上下文來增長一個共享的可變變量:

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(CommonPool) { counter++ } println("Counter = $counter") }

最後打印什麼?打印「counter = 1000000」的可能性很是小,由於千位協同程序從多個線程同時遞增計數器而沒有任何同步。

注意:若是你有一箇舊的系統有2個或更少的cpu,那麼你將一直看到1000000,由於CommonPool在這種狀況下只在一個線程中運行。要重現問題,您須要進行如下更改:

val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(mtContext) { // use it instead of CommonPool in this sample and below counter++ } println("Counter = $counter") }

b、Volatiles是沒用的

有一個常見的誤解認爲,使用volatile變量能夠解決併發問題。讓咱們嘗試一下:

@Volatile // in Kotlin `volatile` is an annotation var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(CommonPool) { counter++ } println("Counter = $counter") }

這段代碼的工做速度較慢,但​​最終仍是沒有獲得「counter = 1000000」,由於volatile變量保證可線性化(這是「原子」的技術術語)讀寫相應的變量,但不提供原子性更大的行動(在咱們的狀況下增長)

c、線程安全的數據結構

對於線程和協程都適用的通用解決方案是使用線程安全(又名同步,線性或原子)數據結構,爲須要在共享狀態上執行的相應操做提供全部必需的同步。在簡單計數器的狀況下,咱們可使用原子增量和原子操做的原子整數類:

var counter = AtomicInteger()

fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(CommonPool) { counter.incrementAndGet() } println("Counter = ${counter.get()}") }

這是解決這個問題的最快解決方案。它適用於普通計數器,集合,隊列和其餘標準數據結構以及它們的基本操做。然而,它不容易擴展到複雜的狀態或複雜的操做,沒有現成的線程安全實現。

d、細粒度的線程約束

線程約束是一種解決共享可變狀態問題的方法,其中對特定共享狀態的全部訪問都侷限於單個線程。它一般用於UI應用程序中,其中全部UI狀態都侷限於單個事件派發/應用程序線程。經過使用一個與協程一塊兒使用單線程的上下文:

val counterContext = newSingleThreadContext("CounterContext") var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(CommonPool) { // run each coroutine in CommonPool withContext(counterContext) { // but confine each increment to the single-threaded context counter++ } } println("Counter = $counter") }

這段代碼的工做速度很是緩慢,由於它會執行細粒度的線程約束。每一個單獨的增量使用withcontext塊從多線程共用上下文切換到單線程上下文。

e、粗粒度的線程約束

實際上,粗粒度的線程約束是以大塊(例如,大部分狀態更新業務邏輯都侷限於單線程。下面的例子就是這樣作的,以單線程上下文中運行每一個協程開始:

val counterContext = newSingleThreadContext("CounterContext") var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(counterContext) { // run each coroutine in the single-threaded context counter++ } println("Counter = $counter") }

這如今工做得更快,併產生正確的結果。

f、相互排斥

互斥問題的解決方案是用一個永遠不會同時執行的關鍵部分來保護共享狀態的全部修改。在一個阻塞的世界中,你一般使用synchronized或reentrantlock。協程的替代方法稱爲互斥體。它具備鎖定和解鎖功能來界定關鍵部分。關鍵的區別是,mutex.lock是一個暫停功能。它不會阻塞線程。

還有一個方便表示mutex.lock()的鎖定擴展函數。try{...}finally{mutex.unlock()}模式

val mutex = Mutex() var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> { massiveRun(CommonPool) { mutex.withLock { counter++
} } println("Counter = $counter") }

這個例子中的鎖定是細粒度的。然而,對於一些絕對必須按期修改某些共享狀態的狀況,這是一個很好的選擇,可是沒有天然線程將此狀態限制爲。

g、Actors(一種併發模型)

一個actors是一個協同程序的組合,這個協程被封裝在這個協程中,而且是一個與其餘協程通訊的通道。一個簡單的actors能夠寫成一個方法,可是具備複雜狀態的actors更適合一個類。

有一個actors協同創做者能夠方便地將actors的消息頻道合併到其做用域中,以便接收來自發送頻道的信息並將其組合到做業對象中,以便對actors的單個引用能夠隨其句柄一塊兒傳送。

使用actors的第一步是定義actors將要處理的一類消息。kotlin的密封類很是適合這一目的。咱們用inccounter消息定義countermsg密封類來增長計數器和getcounter消息以得到它的值。後者須要發送回覆。這裏使用了一個可補充的可用通訊原語,它表示將來將被知道(傳遞)的單個值,這裏用於此目的

// Message types for counterActor sealed class CounterMsg object IncCounter : CounterMsg() // one-way message to increment counter class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

那麼咱們定義一個使用actor協程構建器啓動actor的函數:

// This function launches a new counter actor fun counterActor() = actor<CounterMsg> { var counter = 0 // actor state for (msg in channel) { // iterate over incoming messages when (msg) { is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } }

主要代碼很簡單:

fun main(args: Array<String>) = runBlocking<Unit> { val counter = counterActor() // create the actor massiveRun(CommonPool) { counter.send(IncCounter) } // send a message to get a counter value from an actor val response = CompletableDeferred<Int>() counter.send(GetCounter(response)) println("Counter = ${response.await()}") counter.close() // shutdown the actor }

(對於正確性)執行者自己執行的是什麼上下文可有可無。一個actor是一個協程而且一個協程是按順序執行的,所以將該狀態限制到特定的協程能夠解決共享可變狀態的問題

actor在負載下比鎖定更高效,由於在這種狀況下,它老是有工做要作,並且根本不須要切換到不一樣的上下文。

請注意,actor協同程序生成器是生成協程生成器的雙重對象。一個actor與它接收消息的頻道相關聯,而一個製做者與它發送元素的頻道相關聯。

七、選擇表達式

選擇表達式能夠同時等待多個暫停功能,並選擇第一個可用的暫停功能

a、從頻道中選擇

讓咱們有兩個字符串生產者:fizz和buzz。fizz每300毫秒產生一個「fizz」字符串:

fun fizz(context: CoroutineContext) = produce<String>(context) { while (true) { // sends "Fizz" every 300 ms delay(300) send("Fizz") } }

buzz產生「buzz!」每500毫秒一個字符串:

fun buzz(context: CoroutineContext) = produce<String>(context) { while (true) { // sends "Buzz!" every 500 ms delay(500) send("Buzz!") } }

使用接收暫停功能,咱們能夠從一個頻道或另外一個頻道接收。但選擇表達式容許咱們同時使用它的接受子句接收:

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) { select<Unit> { // <Unit> means that this select expression does not produce any result fizz.onReceive { value -> // this is the first select clause println("fizz -> '$value'") } buzz.onReceive { value -> // this is the second select clause println("buzz -> '$value'") } } }

讓咱們所有運行七次:

fun main(args: Array<String>) = runBlocking<Unit> { val fizz = fizz(coroutineContext) val buzz = buzz(coroutineContext) repeat(7) { selectFizzBuzz(fizz, buzz) } coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}

結果以下:

fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'

b、在關閉時選擇

當通道關閉時,select中的onreceive子句失敗,而且相應的select引起異常。咱們可使用onreceiveornull子句在通道關閉時執行特定的操做。如下示例還顯示select是一個返回其所選子句結果的表達式:

suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> { a.onReceiveOrNull { value -> if (value == null) "Channel 'a' is closed" else "a -> '$value'" } b.onReceiveOrNull { value -> if (value == null) "Channel 'b' is closed" else
"b -> '$value'" } }

讓咱們用四次產生「hello」字符串的通道a和四次產生「world」的通道b使用它:

fun main(args: Array<String>) = runBlocking<Unit> { // we are using the context of the main thread in this example for predictability ... val a = produce<String>(coroutineContext) { repeat(4) { send("Hello $it") } } val b = produce<String>(coroutineContext) { repeat(4) { send("World $it") } } repeat(8) { // print first eight results println(selectAorB(a, b)) } coroutineContext.cancelChildren()
}

這段代碼的結果很是有趣,因此咱們會詳細分析它的模式:

a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed

有幾個觀察結果能夠從中得出:

首先,選擇偏向於第一個條款。當同時選擇幾個子句時,其中的第一個被選中。在這裏,兩個頻道都在不斷地產生字符串,因此一個頻道成爲選擇的第一個條目,贏了。可是,由於咱們使用的是無緩衝的頻道,因此a會不時暫停發送調用,而且也給b發送一個機會

第二個觀察結果是,當通道已經關閉時,onreceiveornull當即被選中

c、選擇發送

選擇表達式有onsend子句,能夠用於一個很棒的好處與偏見的選擇性質相結合。

讓咱們編寫一個整數生成器的例子,當其主通道的用戶不能跟上它時,它將其值發送到輔助通道:

fun produceNumbers(context: CoroutineContext, side: SendChannel<Int>) = produce<Int>(context) { for (num in 1..10) { // produce 10 numbers from 1 to 10 delay(100) // every 100 ms select<Unit> { onSend(num) {} // Send to the primary channel side.onSend(num) {} // or to the side channel
} } }

消費者將會很是緩慢,須要250毫秒來處理每一個號碼

fun main(args: Array<String>) = runBlocking<Unit> { val side = Channel<Int>() // allocate side channel launch(coroutineContext) { // this is a very fast consumer for the side channel side.consumeEach { println("Side channel has $it") } } produceNumbers(coroutineContext, side).consumeEach { println("Consuming $it") delay(250) // let us digest the consumed number properly, do not hurry } println("Done consuming") coroutineContext.cancelChildren()
}

讓咱們看看發生了什麼:

Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming

d、選擇延期值

可使用onawait子句選擇延遲值。讓咱們從一個異步函數開始,它在隨機延遲以後返回一個延遲字符串值:

fun asyncString(time: Int) = async { delay(time.toLong()) "Waited for $time ms" }

讓咱們以隨機延遲啓動其中的十幾個

fun asyncStringsList(): List<Deferred<String>> { val random = Random(3) return List(12) { asyncString(random.nextInt(1000)) } }

如今主函數正在等待第一個完成並計算仍然活動的延遲值的數量。請注意,咱們在這裏使用了select expression是kotlin dsl的事實,因此咱們可使用任意代碼爲它提供子句。在這種狀況下,咱們遍歷延遲值列表,爲每一個延遲值提供onawait子句

fun main(args: Array<String>) = runBlocking<Unit> { val list = asyncStringsList() val result = select<String> { list.withIndex().forEach { (index, deferred) -> deferred.onAwait { answer -> "Deferred $index produced answer '$answer'" } } } println(result) val countActive = list.count { it.isActive } println("$countActive coroutines are still active") }

輸出以下:

Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active

e、切換延遲值的頻道

讓咱們編寫一個使用延遲字符串值通道的通道生成器函數,等待每一個接收到的延遲值,但直到下一個延遲值結束或通道關閉。這個例子將onreceiveornull和onawait子句放在同一個select中

fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> { var current = input.receive() // start with first received deferred value while (isActive) { // loop while not cancelled/closed val next = select<Deferred<String>?> { // return next deferred value from this select or null input.onReceiveOrNull { update -> update // replaces next value to wait } current.onAwait { value ->
send(value) // send value that current deferred has produced input.receiveOrNull() // and use the next deferred from the input channel } } if (next == null) { println("Channel was closed") break // out of loop } else { current = next } } }

爲了測試它,咱們將使用一個簡單的異步函數,在指定的時間後解析爲指定的字符串

fun asyncString(str: String, time: Long) = async { delay(time) str }

主函數只是啓動一個協程來打印switchmapdeferreds的結果併發送一些測試數據給它

fun main(args: Array<String>) = runBlocking<Unit> { val chan = Channel<Deferred<String>>() // the channel for test launch(coroutineContext) { // launch printing coroutine for (s in switchMapDeferreds(chan)) println(s) // print each received string } chan.send(asyncString("BEGIN", 100)) delay(200) // enough time for "BEGIN" to be produced chan.send(asyncString("Slow", 500)) delay(100) // not enough time to produce slow chan.send(asyncString("Replace", 100)) delay(500) // give it time before the last one chan.send(asyncString("END", 500)) delay(1000) // give it time to process chan.close() // close the channel ... delay(500) // and wait some time to let it finish }

結果以下:

BEGIN
Replace
END
Channel was closed
相關文章
相關標籤/搜索