【譯】kotlin 協程官方文檔(6)-通道(Channels)

最近一直在瞭解關於kotlin協程的知識,那最好的學習資料天然是官方提供的學習文檔了,看了看後我就萌生了翻譯官方文檔的想法。先後花了要接近一個月時間,一共九篇文章,在這裏也分享出來,但願對讀者有所幫助。我的知識所限,有些翻譯得不是太順暢,也但願讀者能提出意見git

協程官方文檔:coroutines-guidegithub

協程官方文檔中文翻譯:coroutines-cn-guide編程

協程官方文檔中文譯者:leavesC安全

[TOC]併發

Deferred 值提供了在協程之間傳遞單個值的方便方法,而通道(Channels)提供了一種傳輸值流的方法異步

1、通道基礎(Channel basics)

通道在概念上很是相似於 BlockingQueue,它們之間的一個關鍵區別是:通道有一個掛起的 send 函數和一個掛起的 receive 函數,而不是一個阻塞的 put 操做和一個阻塞的 take 操做async

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
//sampleEnd
}
複製代碼

輸出結果是:ide

1
4
9
16
25
Done!
複製代碼

2、關閉和迭代通道(Closing and iteration over channels)

與隊列不一樣,通道能夠關閉,以此來代表元素已發送完成。在接收方,使用常規的 for 循環從通道接收元素是比較方便的函數

從概念上講,close 相似於向通道發送一個特殊的 cloase 標記。一旦接收到這個 close 標記,迭代就會中止,所以能夠保證接收到 close 以前發送的全部元素:oop

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
//sampleEnd
}
複製代碼

3、構建通道生產者(Building channel producers)

協程生成元素序列(sequence )的模式很是常見。這是能夠常常在併發編程中發現的生產者-消費者模式的一部分。你能夠將這樣一個生產者抽象爲一個以 channel 爲參數的函數,但這與必須從函數返回結果的常識相反

有一個方便的名爲 product 的協程構造器,它使得在 producer 端執行該操做變得很容易;還有一個擴展函數 consumerEach,它替換了consumer 端的 for 循環:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
//sampleStart
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
//sampleEnd
}
複製代碼

4、管道(Pipelines)

管道是一種模式,是一個協程正在生成的多是無窮多個元素的值流

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}
複製代碼

存在一個或多個協程對值流進行取值,進行一些處理併產生一些其它結果。在下面的示例中,每一個返回值也是入參值(數字)的平方值

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}
複製代碼

啓動並鏈接整個管道:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    repeat(5) {
        println(squares.receive()) // print first five
    }
    println("Done!") // we are done
    coroutineContext.cancelChildren() // cancel children coroutines
//sampleEnd
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}
複製代碼

建立協程的全部函數都被定義爲 CoroutineScope 的擴展,所以咱們能夠依賴結構化併發來確保應用程序中沒有延遲的全局協程

5、使用管道的素數(Prime numbers with pipeline)

讓咱們以一個使用協程管道生成素數的例子,將管道發揮到極致。咱們從一個無限的數字序列開始

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}
複製代碼

如下管道過濾傳入的數字流,刪除全部可被給定素數整除的數字:

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}
複製代碼

如今,咱們經過從2開始一個數字流,從當前通道獲取一個質數,併爲找到的每一個質數啓動新的管道:

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ... 
複製代碼

下面的示例代碼打印了前十個質數,在主線程的上下文中運行整個管道。由於全部的協程都是在主 runBlocking 協程的範圍內啓動的,因此咱們沒必要保留全部已啓動的協程的顯式引用。咱們使用擴展函數 cancelChildren 來取消打印前十個質數後的全部子協程

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
    var cur = numbersFrom(2)
    repeat(10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
//sampleEnd 
}

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}
複製代碼

運行結果:

2
3
5
7
11
13
17
19
23
29
複製代碼

注意,你可使用標準庫中的 iterator 協程構造器來構建相同的管道。將 product 替換爲 iterator,send 替換爲 yield,receive 替換爲 next,ReceiveChannel 替換爲 iterator,並去掉協程做用域。你也不須要再使用 runBlocking 。可是,使用如上所示的通道的管道的好處是,若是在 Dispatchers.Default 上下文中運行它,它實際上能夠利用多個 CPU 來執行代碼

但不管如何,如上所述的替代方案也是一個很是不切實際的來尋找素數的方法。實際上,管道確實涉及一些其餘掛起調用(如對遠程服務的異步調用),而且這些管道不能使用 sequence/iterator 來構建,由於它們不容許任意掛起,而 product 是徹底異步的

6、扇出(Fan-out)

多個協程能夠從同一個通道接收數據,在它們之間分配任務。讓咱們從一個週期性地生成整數(每秒10個數)的 producer 協程開始:

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}
複製代碼

而後咱們能夠有多個處理器(processor)協程。在本例中,他們只需打印他們的 id 和接收的數字:

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}
複製代碼

如今讓咱們啓動5個處理器,讓它們工做幾乎一秒鐘。看看會發生什麼:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
//sampleStart
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
//sampleEnd
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}
複製代碼

儘管接收每一個特定整數的處理器 id 可能不一樣,但運行結果將相似於如下輸出:

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
複製代碼

請注意,取消 producer 協程會關閉其通道,從而最終終止 processor 協程正在執行的通道上的迭代

另外,請注意咱們如何使用 for 循環在通道上顯式迭代以在 launchProcessor 代碼中執行 fan-out。與 consumeEach 不一樣,這個 for 循環模式在多個協程中使用是徹底安全的。若是其中一個 processor 協程失敗,則其餘處理器仍將處理通道,而經過 consumeEach 寫入的處理器老是在正常或異常完成時消費(取消)底層通道

7、扇入(Fan-in)

多個協程能夠發送到同一個通道。例如,有一個字符串通道和一個掛起函數,函數以指定的延遲將指定的字符串重複發送到此通道:

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}
複製代碼

如今,讓咱們看看若是啓動兩個協程來發送字符串會發生什麼狀況(在本例中,咱們將它們做爲主協程的子協程,在主線程的上下文中啓動):

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
//sampleStart
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
//sampleEnd
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}
複製代碼

運行結果:

foo
foo
BAR!
foo
foo
BAR!
複製代碼

8、帶緩衝的通道(Buffered channels)

到目前爲止顯示的通道都沒有緩衝區。無緩衝通道在發送方和接收方同時調用發送和接收操做時傳輸元素。若是先調用 send,則在調用 receive 以前會將其掛起;若是先調用 receive ,則在調用 send 以前會將其掛起

Channel() 工廠函數和 produce 構建器都採用可選的參數 capacity 來指定緩衝區大小。 緩衝用於容許發送者在掛起以前發送多個元素,相似於具備指定容量的 BlockingQueue,它在緩衝區已滿時才阻塞

查看如下代碼的效果:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
//sampleStart
    val channel = Channel<Int>(4) // create buffered channel
    val sender = launch { // launch sender coroutine
        repeat(10) {
            println("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(1000)
    sender.cancel() // cancel sender coroutine
//sampleEnd 
}
複製代碼

使用了容量爲4的緩衝通道,因此將打印五次:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
複製代碼

前四個元素被添加到緩衝區內,sender 在嘗試發送第五個元素時掛起

9、通道是公平的(Channels are fair)

對通道的發送和接收操做,對於從多個協程調用它們的順序是公平的。它們按先入先出的順序提供,例如,先調用 receive 的協程先獲取到元素。在下面的示例中,兩個協程 「ping」 和 「pong」 從共享的 「table」 通道接收 「ball」 對象

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

//sampleStart
data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}
//sampleEnd
複製代碼

「ping」 協程首先開始運行,因此它是第一個接收到 ball 的。即便 「ping」 協程在將 ball 從新送回給 table 後又當即開始進行 receive,但 ball 仍是會被 「pong」 接收到,由於它已經先在等待接收了:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
複製代碼

請注意,有時因爲所使用的執行者的性質,通道可能會產生看起來不公平的執行效果。有關詳細信息,請參閱此 issue

10、計時器通道(Ticker channels)

計時器通道是一種特殊的會合(rendezvous)通道,自該通道的最後一次消耗以來,每次給定的延遲時間結束後都將返回 Unit 值。儘管它看起來是無用處的,但它是一個有用的構建塊,能夠建立複雜的基於時間的 produce 管道和進行窗口化操做以及其它時間相關的處理。計時器通道可用於 select 執行 「on tick」 操做

要建立這樣的通道,請使用工廠方法 ticker。若是不須要通道發送更多元素了,請對其使用 ReceiveChannel.cancel 取消發送

如今讓咱們看看它在實踐中是如何工做的:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}
複製代碼

運行結果:

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
複製代碼

請注意,ticker 能感知到消費端可能處於暫停狀態,而且在默認的狀況下,若是發生暫停,將會延遲下一個元素的生成,嘗試保持生成元素的固定速率

可選的,ticker 函數的 mode 參數能夠指定爲 TickerMode.FIXED_DELAY,以保證元素之間的固定延遲

相關文章
相關標籤/搜索