Coroutines
提供了異步和非阻塞行爲,但又不缺少可讀性。 使用協程執行網絡請求,而不會阻塞線程,也不用使用回調。對於網絡請求庫,Retrofit已經支持協程。git
以下是使用Retrofit
對Github
執行HTTP
請求。 它容許請求給定組織下的repo
列表,以及每一個repo
的貢獻者列表:github
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
fun getOrgReposCall( @Path("org") org: String ): Call<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
fun getRepoContributorsCall( @Path("owner") owner: String, @Path("repo") repo: String ): Call<List<User>>
}
複製代碼
以下定義的loadContributorsBlocking
函數使用此API
來獲取給定組織的貢獻者列表:bash
fun loadContributorsBlocking(req: RequestData) : List<User> {
val service = createGitHubService(req.username, req.password)
val repos = service
.getOrgReposCall(req.org)
.execute()
.also { logRepos(req, it) }
.body() ?: listOf()
return repos.flatMap { repo ->
service
.getRepoContributorsCall(req.org, repo.name)
.execute()
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
}
複製代碼
首先,得到給定組織下的Repo List
,並將其存儲在repos
中。 而後,對於每一個repo
,而後繼續請求貢獻者list
,並將全部這些列表合併爲一個最終的貢獻者List
。網絡
getOrgReposCall
和getRepoContributorsCall
都將Call
類的實例返回。 調用Call.execute
方法來執行請求。 execute
方法將阻塞線程同步調用。併發
以下是擴展函數bodyList
,若是出現錯誤則返回一個空的List<T>
:異步
fun <T> Response<List<T>>.bodyList(): List<T> {
return body() ?: listOf()
}
複製代碼
fun List<User>.aggregate(): List<User> =
groupBy { it.login }
.map { (login, group) -> User(login, group.sumBy { it.contributions }) }
.sortedByDescending { it.contributions }
複製代碼
loadContributors
:thread {
loadContributorsBlocking(req)
}
複製代碼
以下是使用回調來執行方法:async
fun loadContributorsBackground(req: RequestData, updateResults: (List<User>) -> Unit)
複製代碼
Retrofit
回調API
也能夠實現。可使用Call.enqueue
函數,該函數啓動HTTP
請求並以回調做爲參數。函數
fun loadContributorsCallbacks(req: RequestData, updateResults: (List<User>) -> Unit) {
val service = createGitHubService(req.username, req.password)
service.getOrgReposCall(req.org).onResponse { responseRepos ->
logRepos(req, responseRepos)
val repos = responseRepos.bodyList()
val allUsers = Collections.synchronizedList(mutableListOf<User>())
val numberOfProcessed = AtomicInteger()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
if (numberOfProcessed.incrementAndGet() == repos.size) {
updateResults(allUsers.aggregate())
}
}
}
}
}
複製代碼
Retrofit
最近添加了對協程的支持,能夠再也不將Call<List<Repo>>
返回,而是將API
調用定義爲掛起函數:測試
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos( @Path("org") org: String ): List<Repo>
}
複製代碼
當使用掛起函數執行請求時,線程不會被阻塞,請注意,如今getOrgRepos
會直接返回結果,而不是返回Call
。 若是結果不成功,則引起異常。ui
Retrofit
還容許將返回包裝在Response
的結果中。 在這種狀況下,將提供結果主體,而且能夠手動檢查錯誤。
interface GitHubService {
// getOrgReposCall & getRepoContributorsCall declarations
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos( @Path("org") org: String ): Response<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
suspend fun getRepoContributors( @Path("owner") owner: String, @Path("repo") repo: String ): Response<List<User>>
}
複製代碼
suspend fun loadContributorsSuspend(req: RequestData): List<User> {
val service = createGitHubService(req.username, req.password)
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
return repos.flatMap { repo ->
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
}
複製代碼
再也不須要調用以前返回了Response
的execute
,由於如今API
函數直接返回了Response
。 但這是特定於Retrofit
庫的實現細節。 對於其餘庫,API
會有所不一樣,可是概念是相同的: 帶有暫停功能的代碼看起來驚人地相似於阻塞
」版本。 它具備可讀性,並準確表達了正在努力實現的目標。
調用掛起方法:
launch {
val users = loadContributorsSuspend(req)
updateResults(users, startTime)
}
複製代碼
launch
將啓動新的計算。 此計算負責加載數據並顯示結果。 這種計算是可掛起的:在執行網絡請求時,它被掛起並釋放線程。 當網絡請求返回結果時,恢復計算。 這種可掛起的計算稱爲協程, 協程是在線程頂部運行的計算,能夠暫停。 所謂暫停,是指相應的計算能夠暫停,從線程中刪除並存儲在內存中。 同時,線程能夠自由地被其餘活動佔用:
當準備好繼續計算時,它會返回到線程(但不必定要返回到同一線程)。
與線程相比,Kotlin
協程很是輕量級。 每當要異步啓動新計算時,均可以建立一個新協程。
要啓動新的協程,可使用主要的協程構建器:launch
,async
或runBlocking
。 async
啓動一個新的協程,並返回一個Deferred
對象。 Deferred
表明其餘名稱(例如Future
或Promise
)所熟知的概念:它存儲計算,但在得到最終結果時將延遲。 它有望在未來的某個時候產生結果。
async
與launch
之間的主要區別在於,launch
用於啓動預計不會返回特定結果的計算。 launch
返回表明協程的Job
。經過調用Job.join()
,能夠等到完成。
Deferred
是擴展Job
的通用類型。異步調用能夠返回Deferred <Int>
或Deferred <CustomType>
,具體取決於lambda
返回的內容(lambda
中的最後一個表達式是結果)。
爲了得到協程的結果,能夠在Deferred
實例上調用await()
。 在等待結果時,調用的協程將暫停:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferred: Deferred<Int> = async {
loadData()
}
println("waiting...")
println(deferred.await())
}
suspend fun loadData(): Int {
println("loading...")
delay(1000L)
println("loaded!")
return 42
}
複製代碼
runBlocking
用做常規函數和掛起函數之間阻塞世界和非阻塞世界之間的橋樑。 它用做啓動頂級主協程的適配器。
若是存在一個延遲對象列表,則能夠調用awaitAll
來等待全部對象的結果:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferreds: List<Deferred<Int>> = (1..3).map {
async {
delay(1000L * it)
println("Loading $it")
it
}
}
val sum = deferreds.awaitAll().sum()
println("$sum")
}
複製代碼
獲取貢獻者列表能夠改成以下代碼 :
suspend fun loadContributorsConcurrent(req: RequestData): List<User> = coroutineScope {
val service = createGitHubService(req.username, req.password)
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
async {
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}
}
deferreds.awaitAll().flatten().aggregate()
複製代碼
全部協程仍在主UI
線程上運行。要想在公共線程池中的不一樣線程上運行協程很是容易。 指定Dispatchers.Default
做爲異步函數的上下文參數:
async(Dispatchers.Default) { ... }
複製代碼
CoroutineDispatcher
肯定了相應的協程應在哪一個或哪些線程上運行。 若是不指定,那麼async
將使用外部做用域的調度程序。
Dispatchers.Default
表示JVM
上的共享線程池。 該池提供了一種並行執行的方法。 它包含與CPU
可用內核數量同樣多的線程,可是若是隻有一個內核,它仍然具備兩個線程。
要僅在主UI
線程上運行協程,應指定Dispatchers.Main
做爲參數:
launch(Dispatchers.Main) {
updateResults()
}
複製代碼
若是在主線程上啓動新協程時主線程正忙,則協程將被掛起並計劃在此線程上執行。 協程僅在線程空閒時恢復。應該從外部範圍使用調度程序,這是一種好習慣:
launch(Dispatchers.Default) {
val users = loadContributorsConcurrent(service, req)
withContext(Dispatchers.Main) {
updateResults(users, startTime)
}
}
複製代碼
應該在主UI
線程上調用updateResults
,所以在Dispatchers.Main
的上下文中進行調用。 withContext
使用指定的協程上下文調用給定代碼,掛起直到完成,而後返回結果。 另外一種更冗長的表達方式是啓動一個新的協程並顯式等待(經過掛起)直到完成:
launch(context) { ... }.join()
複製代碼
協程範圍(Coroutine scope
)負責不一樣協程之間的結構和父子關係,老是在範圍內開始新的協程。 協程上下文(Coroutine context
)存儲用於運行給定協程的其餘信息,例如調度程序指定應在其上調度協程的一個或多個線程。
使用launch
、async
或runBlocking
啓動新協程時,它們會自動建立相應的做用域。 全部這些函數都將lambda
用做接收器的參數,隱式接收器類型爲CoroutineScope
:
launch { /* this: CoroutineScope */
}
複製代碼
新協程只能在範圍內啓動。 launch
和async
被聲明爲CoroutineScope
的擴展,所以在調用它們時必須始終傳遞隱式或顯式接收器。 由runBlocking
啓動的協程是惟一的例外:runBlocking
被定義爲頂級函數。 可是由於它阻塞了當前線程,因此它主要用於主要功能並做爲橋接功能進行測試。
在runBlocking
、launch
或async
中啓動新的協程時,它將在範圍內自動啓動:
import kotlinx.coroutines.*
fun main() = runBlocking { /* this: CoroutineScope */
launch { /* ... */ }
// the same as:
this.launch { /* ... */ }
}
複製代碼
當在runBlocking
內部調用launch
時,將其稱爲對CoroutineScope
類型的隱式接收器的擴展。 或者,能夠顯式編寫this.launch
。
能夠說嵌套的協程(launch
)是外部協程(runBlocking
)的子級。 這種父子關係經過做用域起做用:子協程從與父協程相對應的做用域開始。
可使用GlobalScope.async
或GlobalScope.launch
從全局範圍啓動新的協程。 這將建立一個頂級的獨立協程。
提供協程結構的機制稱爲結構化併發,結構化併發在全局範圍內的好處以下:
scope
一般負責子協程,而且子協程的生命週期依附於scope
的生命週期。scope
能夠自動取消子協程操做。scope
會自動等待全部子協程的完成。所以,若是scope
對應於一個協程,則父協程將不會完成,直到在其範圍內啓動的全部協程都執行完成爲止。 使用GlobalScope.async
時,從全局範圍定義的協程都是獨立的。它們的壽命僅受整個應用程序壽命的限制。 能夠存儲從全局範圍開始的協程的引用,等待其完成或顯式取消它,可是它不會像結構化的那樣自動發生。
scope
的Context
由coroutineScope
或由coroutine
構建器建立的新做用域始終從外部做用域繼承上下文。 在這種狀況下,調用suspended loadContributorsConcurrent
的scope
是外部的scope
:
launch(Dispatchers.Default) { //外部scope
val users = loadContributorsConcurrent(service, req)
// ...
}
複製代碼
全部嵌套的協程都是從繼承的上下文自動開始的。 dispatcher
就是這種狀況的一部分。 這就是爲何async
啓動的全部協程都使用默認dispatcher
的上下文啓動的緣由:
suspend fun loadContributorsConcurrent(req: RequestData): List<User> = coroutineScope {
// 該scope從外部scope繼承上下文
// ...
async { // 嵌套協程從繼承的上下文開始
// ...
}
// ...
}
複製代碼
使用結構化併發,當建立頂級協程時,能夠一次指定主要上下文元素(例如dispatcher
)。 全部嵌套的協程都繼承上下文並僅在須要時對其進行修改。
當爲
UI
應用程序(例如Android
)使用協程編寫代碼時,一般的作法是默認使用CoroutineDispatchers.Main
做爲頂層協程,而後在須要在不一樣線程上運行代碼時顯式放置其餘dispatcher
。
衆所周知,編寫具備共享可變狀態的代碼很是困難且容易出錯。 經過通訊共享信息而不是使用公共可變狀態共享信息能夠簡化此過程。 協程能夠經過channels
相互通訊。
channels
容許在不一樣協程之間傳遞數據。 一個協程能夠向channel
發送一些信息,而另外一個協程能夠從該channel
接收此信息:
發送(產生)信息的協程一般被稱爲生產者,而接收(消費)信息的協程被稱爲消費者。 在須要時,許多協程能夠將信息發送到同一channel
,許多協程也能夠從該channel
接收信息:
當許多協程從同一
channel
接收信息時,每一個元素僅由一個使用者處理一次;處理意味着從會從channel
中刪除此元素。
能夠認爲channel
相似於元素集合(直接的模擬將是一個隊列:將元素添加到一端並從另外一端接收)。 可是,有一個重要的區別:與集合不一樣,即便在同步版本中,通道也能夠暫停send
和receive
操做。 當channel
爲空或已滿(channel
的大小可能受到限制,可能已滿)時,會發生這種狀況。
Channel
經過三個不一樣的接口表示:SendChannel
、ReceiveChannel
和繼承前兩個接口的Channel
。 一般,建立一個channel
並將其做爲SendChannel
實例提供給生產者,以便只有他們能夠發送,並做爲ReceiveChannel
實例提供給消費者,以便只有他們能夠從中接收。
注意,
send
和接收receive
都聲明爲suspend
方法
interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
複製代碼
生產者能夠關閉通道以指示再也不有元素。
Library
中定義了幾種類型的Channel
。 它們在內部能夠存儲多種元素,只是在send
調用是否能夠掛起方面有所不一樣。 對於全部通道類型,receive
調用的行爲方式相同:若是通道不爲空,則接收元素,不然將掛起。
無限制通道(Unlimited channel
)是最接近隊列的模擬:生產者能夠將元素髮送到此通道,而且它將無限增加。 send
方法將永遠不會被掛起。 若是沒有更多的內存,則會拋出OutOfMemoryException
。 和隊列不一樣的是當使用者嘗試從空通道接收消息並被掛起直到有一些新元素髮送到該通道時繼續使用。
緩衝通道(Buffered channel
)的大小受指定數字的限制。 生產者能夠將元素髮送到此通道,直到達到最大限制。 全部元素都在內部存儲。 通道已滿時,下一個send
呼叫將被掛起,直到出現更多可用空間。
"約定"通道(Rendezvous channel
)是沒有緩衝區的通道。 這與建立大小爲零的緩衝通道(Buffered channel
)相同。 其中一個功能(send
或receive
)始終被掛起,直到調用另外一個功能爲止。 若是調用了send
函數,但消費者沒有準備好處理該元素則receive
會掛起,而且send
也會被掛起。 一樣,若是調用了receive
函數且通道爲空,換句話說,沒有準備好發送該元素的的send
被掛起-receive
也會被掛起。
Conflated channel
)的新元素將覆蓋先前發送的元素,所以接收方將始終僅能獲取最新元素。
send
調用將永遠不會被掛起。
建立通道時,指定其類型或緩衝區大小(若是須要緩衝的通道):
val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)
複製代碼
默認狀況下,會建立一個"約定"通道(
Rendezvous channel
)。
在如下示例中,將建立一個"約定"通道,兩個生產者協程和一個消費者協程:
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
launch {
repeat(3) {
val x = channel.receive()
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
複製代碼
以上將會打印以下結果:
[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2
複製代碼