【譯】kotlin 協程官方文檔(8)-共享可變狀態和併發性(Shared mutable state and concurrency)

最近一直在瞭解關於kotlin協程的知識,那最好的學習資料天然是官方提供的學習文檔了,看了看後我就萌生了翻譯官方文檔的想法。先後花了要接近一個月時間,一共九篇文章,在這裏也分享出來,但願對讀者有所幫助。我的知識所限,有些翻譯得不是太順暢,也但願讀者能提出意見java

協程官方文檔:coroutines-guidegit

協程官方文檔中文翻譯:coroutines-cn-guidegithub

協程官方文檔中文譯者:leavesC安全

[TOC]bash

可使用多線程調度器(如 Dispatchers.Default)併發執行協程,它呈現了全部常見的併發問題。主要問題是對共享可變狀態的同步訪問。在協程做用域中解決這個問題的一些方法相似於多線程世界中的方法,但有一些其它方法是獨有的數據結構

1、問題(The problem)

讓咱們啓動一百個協程,都作一樣的操做一千次。咱們還將計算它們的完成時間,以便進一步比較:多線程

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}
複製代碼

咱們從一個很是簡單的操做開始,該操做使用多線程調度器 Dispatchers.Default,並增長一個共享的可變變量併發

import kotlinx.coroutines.*
import kotlin.system.*    

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}
//sampleEnd 
複製代碼

最後會打印出什麼呢?不太可能打印出 「Counter=100000」,由於100個協程從多個線程併發地遞增 counter 而不進行任何同步。ide

2、Volatiles 是沒有做用的(Volatiles are of no help)

有一種常見的誤解是:將變量標記爲 volatile 能夠解決併發問題。讓咱們試試:函數

import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
@Volatile // in Kotlin `volatile` is an annotation 
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}
//sampleEnd 
複製代碼

這段代碼運行得比較慢,可是咱們在最後仍然沒有獲得「Counter=100000」,由於 volatile 變量保證了可線性化(這是「atomic」的一個技術術語)對相應變量的讀寫,但不提供更大行爲的原子性(在咱們的例子中指遞增操做)

3、線程安全的數據結構(Thread-safe data structures)

對線程和協程都有效的一個解決方案是使用線程安全的(也稱爲同步、可線性化或原子)數據結構,該結構爲須要在共享狀態上執行的相應操做提供全部必要的同步保障。對於一個簡單的計數器,咱們可使用 AtomicInteger 類,該類具備保證原子性的 incrementAndGet 方法

import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
var counter = AtomicInteger()

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}
//sampleEnd 
複製代碼

這是解決這個特殊問題的最快方法。它適用於普通計數器、集合、隊列和其餘標準數據結構及其基本操做。可是,它不容易擴展到複雜的狀態或沒有實現好了的線程安全的複雜操做

4、以細粒度限制線程(Thread confinement fine-grained)

線程限制是解決共享可變狀態問題的一種方法,其中對特定共享狀態的全部訪問都限制在一個線程內。它一般用於 UI 應用程序,其中全部的 UI 狀態都限制在「單個事件分派」或「應用程序線程」中。經過使用單線程上下文,能夠很容易地使用協程來實現上述的計數器

import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // confine each increment to a single-threaded context
            withContext(counterContext) {
                counter++
            }
        }
    }
    println("Counter = $counter")
}
//sampleEnd      
複製代碼

這段代碼運行得很是緩慢,由於它執行細粒度的線程限制。每一個單獨的增值操做都使用 withContext(counterContext) 從多線程 Dispatchers.Default 上下文切換到單線程上下文

5、以粗粒度限制線程(Thread confinement coarse-grained)

在實踐中,線程限制是在比較大的範圍內執行的,例如,更新狀態的邏輯的範圍被限制在單個線程中。下面的示例就是這樣作的,首先在單線程上下文中運行每一個協程

import kotlinx.coroutines.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    // confine everything to a single-threaded context
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}
//sampleEnd 
複製代碼

如今這段代碼的運行速度會快得多,併產生了正確的結果

6、互斥(Mutual exclusion)

互斥問題的解決方案是保護共享狀態的全部修改操做,其中的關鍵代碼永遠不會同時執行。在一個阻塞的世界中,一般會使用 synchronizedReentrantLock。協程的替換方案稱爲互斥(Mutex)。它具備 lockunlock 函數以劃定一個關鍵位置。關鍵的區別在於 Mutex.lock() 是一個掛起函數。它不會阻塞線程

還有一個擴展函數 withLock 能夠方便地來實現 mutex.lock(); try {...} finally { mutex.unlock() }

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

//sampleStart
val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // protect each increment with lock
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}
//sampleEnd
複製代碼

本例中的鎖是細粒度的,所以它也付出了某些代價(消耗)。可是,在某些狀況下這是一個很好的選擇,好比你必須按期修改某些共享狀態,但不具有修改共享狀態所需的原生線程

7、Actors

actor 是一個實體,由一個協程、被限制並封裝到這個協程中的狀態以及一個與其它協程通訊的通道組成。簡單的 actor 能夠寫成函數,但具備複雜狀態的 actor 更適合類

有一個 actor 協程構造器,它能夠方便地將 actor 的 mailbox channel 合併到其接收的消息的做用域中,並將 send channel 合併到生成的 job 對象中,以即可以將對 actor 的單個引用做爲其句柄引有

使用 actor 的第一步是定義一類 actor 將要處理的消息。kotlin 的密封類很是適合這個目的。在 CounterMsg 密封類中,咱們用 IncCounter 消息來定義遞增計數器,用 GetCounter 消息來獲取其值,後者須要返回值。爲此,這裏使用 CompletableDeferred communication primitive,它表示未來已知(通訊)的單個值

// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
複製代碼

而後,咱們定義一個函數,該函數使用 actor 協程構造器來啓動 actor:

// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}
複製代碼

代碼很簡單:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

//sampleStart
fun main() = runBlocking<Unit> {
    val counter = counterActor() // create the actor
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IncCounter)
        }
    }
    // send a message to get a counter value from an actor
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // shutdown the actor
}
//sampleEnd 
複製代碼

在什麼上下文中執行 actor 自己並不重要(爲了正確)。actor 是一個協程,而且協程是按順序執行的,所以將狀態限制到特定的協程能夠解決共享可變狀態的問題。實際上,actors 能夠修改本身的私有狀態,但只能經過消息相互影響(避免須要任何鎖)

actor 比使用鎖更爲有效,由於在這種狀況下,它老是有工做要作,根本不須要切換到不一樣的上下文

注意,actor 協程構造器是一個雙重的 product 協程構造器 。actor 與它接收消息的通道相關聯,而 producer 與向其發送元素的通道相關聯

相關文章
相關標籤/搜索