二、Kotlin-協程;

簡介

   Coroutines提供了異步和非阻塞行爲,但又不缺少可讀性。 使用協程執行網絡請求,而不會阻塞線程,也不用使用回調。對於網絡請求庫,Retrofit已經支持協程。git

阻塞請求

  以下是使用RetrofitGithub執行HTTP請求。 它容許請求給定組織下的repo列表,以及每一個repo的貢獻者列表:github

interface GitHubService {
    @GET("orgs/{org}/repos?per_page=100")
    fun getOrgReposCall( @Path("org") org: String ): Call<List<Repo>>

    @GET("repos/{owner}/{repo}/contributors?per_page=100")
    fun getRepoContributorsCall( @Path("owner") owner: String, @Path("repo") repo: String ): Call<List<User>>
}
複製代碼

以下定義的loadContributorsBlocking函數使用此API來獲取給定組織的貢獻者列表:bash

fun loadContributorsBlocking(req: RequestData) : List<User> {
    val service = createGitHubService(req.username, req.password)
    val repos = service
        .getOrgReposCall(req.org)          
        .execute()                         
        .also { logRepos(req, it) }        
        .body() ?: listOf()                

    return repos.flatMap { repo ->
        service
            .getRepoContributorsCall(req.org, repo.name)      
            .execute()                                        
            .also { logUsers(repo, it) }                      
            .bodyList()                                       
    }.aggregate()
}
複製代碼

  首先,得到給定組織下的Repo List,並將其存儲在repos中。 而後,對於每一個repo,而後繼續請求貢獻者list,並將全部這些列表合併爲一個最終的貢獻者List網絡

  getOrgReposCallgetRepoContributorsCall都將Call類的實例返回。 調用Call.execute方法來執行請求。 execute方法將阻塞線程同步調用。併發

  以下是擴展函數bodyList,若是出現錯誤則返回一個空的List<T>:異步

fun <T> Response<List<T>>.bodyList(): List<T> {
    return body() ?: listOf()
}
複製代碼
fun List<User>.aggregate(): List<User> =
    groupBy { it.login }
        .map { (login, group) -> User(login, group.sumBy { it.contributions }) }
        .sortedByDescending { it.contributions }
複製代碼

使用回調

在一個單獨的線程中調用loadContributors:

thread {
    loadContributorsBlocking(req)
}
複製代碼

  以下是使用回調來執行方法:async

fun loadContributorsBackground(req: RequestData, updateResults: (List<User>) -> Unit)
複製代碼

使用Retrofit的回調API

  Retrofit回調API也能夠實現。可使用Call.enqueue函數,該函數啓動HTTP請求並以回調做爲參數。函數

fun loadContributorsCallbacks(req: RequestData, updateResults: (List<User>) -> Unit) {
    val service = createGitHubService(req.username, req.password)
    service.getOrgReposCall(req.org).onResponse { responseRepos ->  
        logRepos(req, responseRepos)
        val repos = responseRepos.bodyList()
        
      val allUsers = Collections.synchronizedList(mutableListOf<User>())
      val numberOfProcessed = AtomicInteger()
      for (repo in repos) {
     service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
        logUsers(repo, responseUsers)
        val users = responseUsers.bodyList()
        allUsers += users
        if (numberOfProcessed.incrementAndGet() == repos.size) {
            updateResults(allUsers.aggregate())
        }
    }
}
    }
}
複製代碼

使用掛起函數

  Retrofit最近添加了對協程的支持,能夠再也不將Call<List<Repo>>返回,而是將API調用定義爲掛起函數:測試

interface GitHubService {
    @GET("orgs/{org}/repos?per_page=100")
    suspend fun getOrgRepos( @Path("org") org: String ): List<Repo>
}
複製代碼

  當使用掛起函數執行請求時,線程不會被阻塞,請注意,如今getOrgRepos會直接返回結果,而不是返回Call若是結果不成功,則引起異常。ui

  Retrofit還容許將返回包裝在Response的結果中。 在這種狀況下,將提供結果主體,而且能夠手動檢查錯誤。

interface GitHubService {
    // getOrgReposCall & getRepoContributorsCall declarations
    
    @GET("orgs/{org}/repos?per_page=100")
    suspend fun getOrgRepos( @Path("org") org: String ): Response<List<Repo>>

    @GET("repos/{owner}/{repo}/contributors?per_page=100")
    suspend fun getRepoContributors( @Path("owner") owner: String, @Path("repo") repo: String ): Response<List<User>>
}
複製代碼
suspend fun loadContributorsSuspend(req: RequestData): List<User> {
    val service = createGitHubService(req.username, req.password)
    val repos = service
        .getOrgRepos(req.org)
        .also { logRepos(req, it) }
        .bodyList()

    return repos.flatMap { repo ->
        service.getRepoContributors(req.org, repo.name)
            .also { logUsers(repo, it) }
            .bodyList()
    }.aggregate()
}
複製代碼

  再也不須要調用以前返回了Responseexecute,由於如今API函數直接返回了Response。 但這是特定於Retrofit庫的實現細節。 對於其餘庫,API會有所不一樣,可是概念是相同的: 帶有暫停功能的代碼看起來驚人地相似於阻塞」版本。 它具備可讀性,並準確表達了正在努力實現的目標。

  調用掛起方法:

launch {
    val users = loadContributorsSuspend(req)
    updateResults(users, startTime)
}
複製代碼

  launch將啓動新的計算。 此計算負責加載數據並顯示結果。 這種計算是可掛起的:在執行網絡請求時,它被掛起並釋放線程。 當網絡請求返回結果時,恢復計算。 這種可掛起的計算稱爲協程, 協程是在線程頂部運行的計算,能夠暫停所謂暫停,是指相應的計算能夠暫停,從線程中刪除並存儲在內存中。 同時,線程能夠自由地被其餘活動佔用:

  當準備好繼續計算時,它會返回到線程(但不必定要返回到同一線程)。

併發

  與線程相比,Kotlin協程很是輕量級。 每當要異步啓動新計算時,均可以建立一個新協程。

  要啓動新的協程,可使用主要的協程構建器launchasyncrunBlocking。   async啓動一個新的協程,並返回一個Deferred對象。 Deferred表明其餘名稱(例如FuturePromise)所熟知的概念:它存儲計算,但在得到最終結果時將延遲。 它有望在未來的某個時候產生結果。

  asynclaunch之間的主要區別在於,launch用於啓動預計不會返回特定結果的計算。 launch返回表明協程的Job。經過調用Job.join(),能夠等到完成。

  Deferred是擴展Job的通用類型。異步調用能夠返回Deferred <Int>Deferred <CustomType>,具體取決於lambda返回的內容(lambda中的最後一個表達式是結果)。

  爲了得到協程的結果,能夠在Deferred實例上調用await()。 在等待結果時,調用的協程將暫停:

import kotlinx.coroutines.*

fun main() = runBlocking {
    val deferred: Deferred<Int> = async {
        loadData()
    }
    println("waiting...")
    println(deferred.await())
}

suspend fun loadData(): Int {
    println("loading...")
    delay(1000L)
    println("loaded!")
    return 42
}
複製代碼

  runBlocking用做常規函數和掛起函數之間阻塞世界和非阻塞世界之間的橋樑。 它用做啓動頂級主協程的適配器。

  若是存在一個延遲對象列表,則能夠調用awaitAll來等待全部對象的結果:

import kotlinx.coroutines.*

fun main() = runBlocking {
    val deferreds: List<Deferred<Int>> = (1..3).map {
        async {
            delay(1000L * it)
            println("Loading $it")
            it
        }
    }
    val sum = deferreds.awaitAll().sum()
    println("$sum")
}
複製代碼

  獲取貢獻者列表能夠改成以下代碼 :

suspend fun loadContributorsConcurrent(req: RequestData): List<User> = coroutineScope {
    val service = createGitHubService(req.username, req.password)
    val repos = service
        .getOrgRepos(req.org)
        .also { logRepos(req, it) }
        .bodyList()

    val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
        async {
            service.getRepoContributors(req.org, repo.name)
                .also { logUsers(repo, it) }
                .bodyList()
        }
    }
    deferreds.awaitAll().flatten().aggregate()
複製代碼

  全部協程仍在主UI線程上運行。要想在公共線程池中的不一樣線程上運行協程很是容易。 指定Dispatchers.Default做爲異步函數的上下文參數:

async(Dispatchers.Default) { ... }
複製代碼

  CoroutineDispatcher肯定了相應的協程應在哪一個或哪些線程上運行。 若是不指定,那麼async將使用外部做用域的調度程序。

  Dispatchers.Default表示JVM上的共享線程池。 該池提供了一種並行執行的方法。 它包含與CPU可用內核數量同樣多的線程,可是若是隻有一個內核,它仍然具備兩個線程。

  要僅在主UI線程上運行協程,應指定Dispatchers.Main做爲參數:

launch(Dispatchers.Main) {
    updateResults()
}
複製代碼

  若是在主線程上啓動新協程時主線程正忙,則協程將被掛起並計劃在此線程上執行。 協程僅在線程空閒時恢復。應該從外部範圍使用調度程序,這是一種好習慣:

launch(Dispatchers.Default) {
    val users = loadContributorsConcurrent(service, req)
    withContext(Dispatchers.Main) {
        updateResults(users, startTime)
    }
}
複製代碼

  應該在主UI線程上調用updateResults,所以在Dispatchers.Main的上下文中進行調用。 withContext使用指定的協程上下文調用給定代碼,掛起直到完成,而後返回結果。 另外一種更冗長的表達方式是啓動一個新的協程並顯式等待(經過掛起)直到完成:

launch(context) { ... }.join()
複製代碼

結構化併發

  協程範圍(Coroutine scope)負責不一樣協程之間的結構和父子關係,老是在範圍內開始新的協程。 協程上下文(Coroutine context)存儲用於運行給定協程的其餘信息,例如調度程序指定應在其上調度協程的一個或多個線程。

  使用launchasyncrunBlocking啓動新協程時,它們會自動建立相應的做用域。 全部這些函數都將lambda用做接收器的參數,隱式接收器類型爲CoroutineScope:

launch { /* this: CoroutineScope */
}
複製代碼

  新協程只能在範圍內啓動。 launchasync被聲明爲CoroutineScope的擴展,所以在調用它們時必須始終傳遞隱式或顯式接收器。 由runBlocking啓動的協程是惟一的例外:runBlocking被定義爲頂級函數。 可是由於它阻塞了當前線程,因此它主要用於主要功能並做爲橋接功能進行測試。

  在runBlockinglaunchasync中啓動新的協程時,它將在範圍內自動啓動:

import kotlinx.coroutines.*

fun main() = runBlocking { /* this: CoroutineScope */
    launch { /* ... */ }
    // the same as: 
    this.launch { /* ... */ }
}
複製代碼

  當在runBlocking內部調用launch時,將其稱爲對CoroutineScope類型的隱式接收器的擴展。 或者,能夠顯式編寫this.launch

  能夠說嵌套的協程(launch)是外部協程(runBlocking)的子級。 這種父子關係經過做用域起做用:子協程從與父協程相對應的做用域開始。

  可使用GlobalScope.asyncGlobalScope.launch從全局範圍啓動新的協程。 這將建立一個頂級的獨立協程。

  提供協程結構的機制稱爲結構化併發,結構化併發在全局範圍內的好處以下:

  • scope一般負責子協程,而且子協程的生命週期依附於scope的生命週期。
  • 若是出現問題或用戶只是改變主意並決定撤消操做,scope能夠自動取消子協程操做。
  • scope會自動等待全部子協程的完成。所以,若是scope對應於一個協程,則父協程將不會完成,直到在其範圍內啓動的全部協程都執行完成爲止。

  使用GlobalScope.async時,從全局範圍定義的協程都是獨立的。它們的壽命僅受整個應用程序壽命的限制。 能夠存儲從全局範圍開始的協程的引用,等待其完成或顯式取消它,可是它不會像結構化的那樣自動發生。

使用外部scopeContext

  由coroutineScope或由coroutine構建器建立的新做用域始終從外部做用域繼承上下文。 在這種狀況下,調用suspended loadContributorsConcurrentscope是外部的scope

launch(Dispatchers.Default) {  //外部scope
    val users = loadContributorsConcurrent(service, req)
    // ...
}
複製代碼

  全部嵌套的協程都是從繼承的上下文自動開始的。 dispatcher就是這種狀況的一部分。 這就是爲何async啓動的全部協程都使用默認dispatcher的上下文啓動的緣由:

suspend fun loadContributorsConcurrent(req: RequestData): List<User> = coroutineScope {
    // 該scope從外部scope繼承上下文
    // ... 
    async {   // 嵌套協程從繼承的上下文開始
        // ...
    }
    // ...
}
複製代碼

  使用結構化併發,當建立頂級協程時,能夠一次指定主要上下文元素(例如dispatcher)。 全部嵌套的協程都繼承上下文並僅在須要時對其進行修改。

當爲UI應用程序(例如Android)使用協程編寫代碼時,一般的作法是默認使用CoroutineDispatchers.Main做爲頂層協程,而後在須要在不一樣線程上運行代碼時顯式放置其餘dispatcher

Channels

  衆所周知,編寫具備共享可變狀態的代碼很是困難且容易出錯。 經過通訊共享信息而不是使用公共可變狀態共享信息能夠簡化此過程。 協程能夠經過channels相互通訊。

  channels容許在不一樣協程之間傳遞數據。 一個協程能夠向channel發送一些信息,而另外一個協程能夠從該channel接收此信息:

  發送(產生)信息的協程一般被稱爲生產者,而接收(消費)信息的協程被稱爲消費者。 在須要時,許多協程能夠將信息發送到同一channel,許多協程也能夠從該channel接收信息:

當許多協程從同一channel接收信息時,每一個元素僅由一個使用者處理一次;處理意味着從會從channel中刪除此元素。

  能夠認爲channel相似於元素集合(直接的模擬將是一個隊列:將元素添加到一端並從另外一端接收)。 可是,有一個重要的區別:與集合不一樣,即便在同步版本中,通道也能夠暫停sendreceive操做。 當channel爲空或已滿(channel的大小可能受到限制,可能已滿)時,會發生這種狀況。

  Channel經過三個不一樣的接口表示:SendChannelReceiveChannel和繼承前兩個接口的Channel。 一般,建立一個channel並將其做爲SendChannel實例提供給生產者,以便只有他們能夠發送,並做爲ReceiveChannel實例提供給消費者,以便只有他們能夠從中接收。

注意,send和接收receive都聲明爲suspend方法

interface SendChannel<in E> {
    suspend fun send(element: E)
    fun close(): Boolean
}

interface ReceiveChannel<out E> {
    suspend fun receive(): E
}    

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
複製代碼

  生產者能夠關閉通道以指示再也不有元素。

  Library中定義了幾種類型的Channel。 它們在內部能夠存儲多種元素,只是在send調用是否能夠掛起方面有所不一樣。 對於全部通道類型,receive調用的行爲方式相同:若是通道不爲空,則接收元素,不然將掛起。

  • Unlimited channel

  無限制通道(Unlimited channel)是最接近隊列的模擬:生產者能夠將元素髮送到此通道,而且它將無限增加。 send方法將永遠不會被掛起。 若是沒有更多的內存,則會拋出OutOfMemoryException。 和隊列不一樣的是當使用者嘗試從空通道接收消息並被掛起直到有一些新元素髮送到該通道時繼續使用。

  • Buffered channel

  緩衝通道(Buffered channel)的大小受指定數字的限制。 生產者能夠將元素髮送到此通道,直到達到最大限制。 全部元素都在內部存儲。 通道已滿時,下一個send呼叫將被掛起,直到出現更多可用空間。

  • Rendezvous channel

  "約定"通道(Rendezvous channel)是沒有緩衝區的通道。 這與建立大小爲零的緩衝通道(Buffered channel)相同。 其中一個功能(sendreceive)始終被掛起,直到調用另外一個功能爲止。 若是調用了send函數,但消費者沒有準備好處理該元素則receive會掛起,而且send也會被掛起。 一樣,若是調用了receive函數且通道爲空,換句話說,沒有準備好發送該元素的的send被掛起-receive也會被掛起。

  • Conflated channel

  發送到合併通道( Conflated channel)的新元素將覆蓋先前發送的元素,所以接收方將始終僅能獲取最新元素。 send調用將永遠不會被掛起。

建立通道時,指定其類型或緩衝區大小(若是須要緩衝的通道):

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)
複製代碼

  默認狀況下,會建立一個"約定"通道(Rendezvous channel)。

  在如下示例中,將建立一個"約定"通道,兩個生產者協程和一個消費者協程:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        channel.send("A1")
        channel.send("A2")
        log("A done")
    }
    launch {
        channel.send("B1")
        log("B done")
    }
    launch {
        repeat(3) {
            val x = channel.receive()
            log(x)
        }
    }
}

fun log(message: Any?) {
    println("[${Thread.currentThread().name}] $message")
}
複製代碼

以上將會打印以下結果:

[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2
複製代碼
相關文章
相關標籤/搜索