你們元旦快樂,去年(幾天前)寫了篇Kotlin協程快速入門,簡單介紹了下協程的一些基本概念,今天來介紹下一些其餘重要的知識點。java
在協程裏面開啓另外一個協程是很方便的,但若是想在它們之間傳遞消息,或者說協程間通訊該怎麼作呢?Channel(通道)就能夠用做在協程之間簡單的發送接收數據:數據庫
fun main() = runBlocking {
val channel = Channel<String>()
launch {
channel.send("apple")
}
println("I like ${channel.receive()}")
}
複製代碼
這種作法是很像消費者與生產者模式。生產者一方生成併發送必定量的數據放到緩衝區中,與此同時,消費者也在緩衝區消耗這些數據。這一點經過它所繼承的接口定義也能很好地體現:編程
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
public companion object Factory {
public const val UNLIMITED = Int.MAX_VALUE
public const val RENDEZVOUS = 0
public const val CONFLATED = -1
}
}
複製代碼
通道緩衝區 通道是一個接口,根據緩衝區容量不一樣,有四種不一樣的具體實現。bash
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
else -> ArrayChannel(capacity)
}
複製代碼
Channel的緩衝區默認是0個,當有信息send
進去後,協程就會被掛起,只有被調receive
後纔會繼續執行。若是容量大於0,當達到容量最大值時也一樣會被掛起:網絡
fun main() = runBlocking {
val channel = Channel<Int>(2)
launch {
for (x in 1..5) {
channel.send(x * x)
println("send $x")
}
}
delay(200L)
repeat(2) { println("receive ${channel.receive()}") }
複製代碼
結果以下,在發送兩個數據後,只有收到一個數據後纔會繼續發送:併發
2019-01-01 19:01:11.176 30809-30809/com.renny.kotlin I/System.out: send 1
2019-01-01 19:01:11.176 30809-30809/com.renny.kotlin I/System.out: send 2
2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/System.out: receive 1
2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/System.out: receive 4
2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/System.out: send 3
2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/System.out: send 4
複製代碼
以上就是Channel的基本用法了,看到這,熟悉Java併發編程的同窗很容易聯想到阻塞隊列,它們的做用是很類似的。Channel實現的阻塞隊列並非真正的阻塞,而是協程被掛起,而且它是能夠被關閉的。app
上面說道Channel
繼承了SendChannel
和ReceiveChannel
,它自己沒有實現邏輯,因此咱們來看下這兩個接口的一些重要方法:異步
public fun offer(element: E): Boolean
複製代碼
這也是發送消息的方法,不過和send
不一樣,它有返回值,在Channel緩衝區容量滿了的時候不會掛起而是直接返回false。async
public fun close(cause: Throwable? = null): Boolean
複製代碼
關閉通道,關閉通道後再調用send
或者offer
會拋出異常。在發送方能夠用isClosedForSend
來判斷通道是否關閉。對應的, 還有isClosedForReceive
,但它會在全部以前發送的元素收到以後才返回 "true"。post
public fun poll(): E?
複製代碼
和offer
對應,從緩衝區取不到消息會返回空,而不是像receive
同樣掛起協程。
public fun cancel(): Unit
複製代碼
會取消接受消息並移除緩衝區的全部元素,所以isClosedForReceive
也會當即返回"true"。
public operator fun iterator(): ChannelIterator<E>
複製代碼
經過返回一個迭代器來接受緩衝區的消息,其實直接用for循環也是能夠的(Channel
並非一個集合,多是對協程的特殊支持吧):
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close()
}
for (y in channel) println(y)
println("Done!")
}
複製代碼
再回到最初,緩衝區容量定義,大於等於0的值都很好理解,但Channel.CONFLATED = -1
是什麼鬼? 咱們來改造下上面的demo:
fun main() = runBlocking {
val channel = Channel<Int>(Channel.CONFLATED)
launch {
for (x in 1..5) {
channel.send(x * x)
println("send $x")
}
}
delay(200L)
repeat(2) { println("receive ${channel.receive()}") }
}
複製代碼
輸出以下:
2019-01-01 20:10:29.922 1314-1314/com.renny.kotlin I/System.out: send 1
2019-01-01 20:10:29.922 1314-1314/com.renny.kotlin I/System.out: send 2
2019-01-01 20:10:29.927 1314-1314/com.renny.kotlin I/System.out: send 3
2019-01-01 20:10:29.927 1314-1314/com.renny.kotlin I/System.out: send 4
2019-01-01 20:10:29.928 1314-1314/com.renny.kotlin I/System.out: send 5
2019-01-01 20:10:30.117 1314-1314/com.renny.kotlin I/System.out: receive 25
複製代碼
send
方法並無被掛起,但咱們只收到了一個消息。事實上,定義爲Channel.CONFLATED
時,緩衝區的的容量也是1,但當容量已經有消息,但又有新消息來的的時候,它會用新消息來替代當前的消息。因此根據這個特性,接收方老是能接收到最新的消息。具體有啥用嘛?好比點擊一次按鈕觸發一次動畫,在動畫播放期間的點擊事件都將被合併成一次,當動畫結束後,又會開始最新點擊的動畫,之間的點擊都被略掉了。
上面發送和接受代碼寫的多少有些繁瑣,官方還提供了擴展方法produce
和consumeEach
,咱們來該寫下例子,不須要手動再開啓發送消息一方的協程了:
fun main() = runBlocking {
val squares = produce {
for (x in 1..5) send(x * x)
}
squares.consumeEach { println(it) }
println("Done!")
}
複製代碼
async 異步, await 等待 ,這兩個方法是協程爲了更好解決異步任務而推出的,熟悉JS、C#等語言的人對這兩個方法確定很熟悉,用法也是差很少的。
fun main() = runBlocking{
var time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Sync completed in $time ms")
time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Async completed in $time ms")
}
suspend fun doSomethingUsefulOne(): Int {
delay(1000L)
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L)
return 29
}
複製代碼
結果是同樣的,但耗時卻差了一半。咱們就像調用同步任務同樣啓用異步,不得不說比java原生實現優雅多了
2019-01-01 20:52:15.482 3520-3520/com.renny.kotlin I/System.out: The answer is 42
2019-01-01 20:52:15.483 3520-3520/com.renny.kotlin I/System.out: Sync completed in 2006 ms
2019-01-01 20:52:16.489 3520-3520/com.renny.kotlin I/System.out: The answer is 42
2019-01-01 20:52:16.489 3520-3520/com.renny.kotlin I/System.out: Async completed in 1006 ms
複製代碼
async
是一個擴展方法,在裏面啓動了一個子協程,看下定義:
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
複製代碼
返回的也再也不是Job
對象,而是Deferred
,這二者大概像就是runnable和callable的關係吧,無返回值和有返回值,其餘都差很少。而await
會掛起當前的協程,直到子協程代碼結束並拿到返回結果,和join
也相似。
今天就介紹這麼多啦,正如標題所說,這幾篇文章的目的就是讓你們一塊兒更簡單地和快速地對協程有個初步的瞭解。同RxJava同樣,協程也是Kotlin爲了更好地處理異步任務而推出的功能庫,如何將其用在網絡請求、數據庫、文件IO等方面,讓代碼變得更簡潔更優雅纔是最終目的。