Channel
實際上就是協程在生產消費這模型上的應用,把過去你用BloockingQuqe
實現的功能替換成Channel
, 也許會有新的發現~數據庫
Channel 實際上就是一個隊列,並且是併發安全的,他能夠用來鏈接協程,實現不一樣協程的通訊。廢話很少說,直接看例子:數組
suspend fun main(){
val channel = Channel<Int>()
val producer = GlobalScope.launch{
var i = 0
while (true){
channel.send(i++)
delay(1000)
}
}
val consumer = GlobalScope.launch{
while(true){
val element = channel.receive()
Logger.debug(element)
}
}
producer.join()
consumer.join()
}
複製代碼
咱們構造了倆個協程,分別叫他們 producer 和 consumer, 咱們沒有明確的指定調度器,因此他們的調度器都是默認的,在 Java 虛擬機上就是那個你們都熟悉的線程池:他們能夠運行在不一樣的線程上,固然也能夠運行在同一個線程上。緩存
例子的運行機制是,producer 當中每隔 1s 向 Channel
中發送一個數字,而 consumer 那邊則是一直在讀取 Channel 來獲取這個數字並打印,咱們可以發現這裏發端比收端慢的,在沒有值能夠讀到的時候,receive 是掛起的,直到有新元素 send 過來--因此你知道了 receive 是一個掛起函數,那麼 send 呢?安全
若是你本身去 IDE 寫了這段代碼,你會發現 send 也是掛起函數。額,發端爲何會掛起?想一想咱們之前熟知的 BlockingQueue
,咱們往裏面添加元素的時候,元素在隊列裏其實是佔用了空間的,若是這個隊列空間不足,那麼在往裏面添加的時候就是倆種狀況:1. 阻塞,等待隊列騰出空間;2. 拋異常,拒絕添加元素。send 也會面臨一樣的問題,咱們說 Channel 實際上就是一個隊列嘛,隊列不該該有緩衝區嗎,那麼這個緩衝區一旦滿了,而且也一直沒有人調用 receive 取走元素的話,send 不久掛起了嘛。那麼接下來我麼看下 Channel
的緩存區定義:bash
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
else -> ArrayChannel(capacity)
}
複製代碼
咱們構造 Channel
的時候調用了u 一個叫 Channel
的哈數,hmm,這玩意兒確實不是它的構造器,在 Kotlin 當中咱們能夠隨便定義一個頂級函數跟某些類名同樣來假裝成構造器,這本質上就是個工廠方法。微信
相似的還有 String, 不信你去試試 它有一個參數叫 capacity,指定緩衝區的容量,默認值
RENDEZVOUS
就是 0,這個詞本意就是描述「不見不散」的場景,因此你不來 receive,我這 send 就一直擱這兒掛起等着。換句話,咱們開頭的例子裏面,若是 consumer 不 receive,produver 裏面的第一個 send 就給掛起了:網絡
val producer = GlobalScope.launch {
var i = 0
while (true) {
i++ // 爲了方便輸出日誌,咱們將自增放到前面
Logger.debug("before send $i")
channel.send(i)
Logger.debug("before after $i")
delay(1000)
}
}
val consumer = GlobalScope.launch {
while (true) {
delay(2000) // receive 以前延遲 2s
val element = channel.receive()
Logger.debug(element)
}
}
複製代碼
咱們故意讓收端的節奏放慢,你就會發現,send 老是會掛起,直到 receive 以後纔會繼續往下執行:併發
07:11:23:119 [DefaultDispatcher-worker-2 @coroutine#1] before send 1
07:11:24:845 [DefaultDispatcher-worker-2 @coroutine#2] 1
07:11:24:846 [DefaultDispatcher-worker-2 @coroutine#1] before after 1
07:11:25:849 [DefaultDispatcher-worker-4 @coroutine#1] before send 2
07:11:26:850 [DefaultDispatcher-worker-2 @coroutine#2] 2
07:11:26:850 [DefaultDispatcher-worker-3 @coroutine#1] before after 2
複製代碼
UNLIMITED
比較好理解,來者不拒,從他給出的實現 LinkedListChannel
來看,這一點也與咱們的 LinkedBlockingQueue
有殊途同歸之妙。框架
CONFLATED
, 這個詞是合併的意思,更 inflate 是同一個詞根,con- 前綴表示反着來,那是否是說我發了個一、二、三、四、5 那邊收的時候就會收到一個 [1,2,3,4,5] 的集合呢?畢竟字面意思是合併嘛。但實際上這個的小鍋是隻保留最後一個元素,不是合併,應該是置換,換句話說,這個類型的 Channel 有一個元素大小的緩衝區,但每次有新元素過來,都會用新的替換舊的,也就是說我發了個一、二、三、四、5 以後收端才接收的話,就只能收到 5 了。ide
剩下的就是 ArrayChannel
了,他接收一個值做爲緩衝區容量的大小,這也比較相似與 ArrayBlockingQueue
。
前面咱們在發送和讀取 Channel
的時候用了 while(true)
,由於咱們想要去不斷的進行讀寫操做,Channel
自己實際上也有點兒像序列,能夠一個一個讀,因此咱們在讀取的時候也能夠直接獲取一個 Channel
的 iterator:
val consumer = GlobalScope.launch {
val iterator = channel.iterator()
while(iterator.hansNext()) { // 掛起點
val element = iterator.next()
Logger.debug(element)
delay(2000)
}
}
複製代碼
那麼這個時候,iterator.hasNext() 是掛起函數,在判斷是否有下一個元素的時候實際上就須要去 Channel
當中讀取元素了。 這個寫法天然能夠簡化成 for each:
val consumer = GlobalScope.launch {
for (element in channel){
Logger.debug(element)
delay(2000)
}
}
複製代碼
前面咱們在協程外部定義 Channel
,並在協程當中訪問它,實現了一個簡單的生產-消費者的示例,那麼有沒有便捷的辦法構造生產者和消費者呢?
val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce {
while(true){
delay(2000)
send(2)
}
}
複製代碼
咱們能夠經過 produce
這個方法啓動一個生產者協程,並返回一個 ReceiveChannel
,其餘協程就能夠拿着這個 Channel
來接收數據了。反過來,咱們能夠用 actor
啓動一個消費者協程:
val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
while(true){
val element = receive()
}
}
複製代碼
ReceiveChannel 和 SendChannel 都是 Channel 的父接口,前者定義了 receive,後者定義了 send, Channel 也所以既能夠 receive 又能夠 send。
produce
和actor
與launch
同樣都被稱做「協程啓動器」。經過這個協程的啓動器啓動的協程也天然的與返回的Channel
綁定到了一塊兒,所以Channel
的關閉也會在協程結束時自動完成,以produce
爲例,它構造出了一個ProducerCoroutine
的對象:
internal open class ProducerCoroutine<E>(
parentContext: CoroutineContext,channel:Channel<E>
) : ChannelCoroutine<E>(parentContext,channel,active = true),ProducerScope<E>{
...
override fun onCompleted(value: Unit) {
_channel.close() // 協程完成時
}
override fun onCancelled(cause: Throwable,handled:Boolean) {
val processed = _channel.close(cause) // 協程取消時
if (!processed && !handled) handleCoroutineException(context,cause)
}
}
複製代碼
注意到在協程完成和取消的方法調用中,對應的 _channel
都會被關閉。
這樣看上去仍是挺有用的。不過截止目前這倆 API produce
和 actor
都沒有穩定,前者仍被標記爲 ExperimentalCoroutinesApi
,後者則標記爲 ObsoleteCoroutinesApi
,這就比較尷尬了,明擺着不讓用嘛。actor
的文檔中提到的 issue 的討論也說明相比基於 Actor 模型的併發框架,Kotlin 協程提供的這個 actor
API 也不過就是提供了一個 SendChannel
的返回值而已。固然,協程的負責人也有實現一套更復雜的 Actor 的想法,只是這一段時間的高優明顯是 Flow
——這貨從協程框架的 v1.2 開始公測,到協程 v1.3 就穩定,真是神速,咱們後面的文章會介紹它。
雖然 produce
沒有被標記爲 ObsoleteCoroutinesApi
,顯然它做爲 actor
的另外一半,不可能單獨轉正的,這倆 API 個人建議是看看就行了。
前咱們提到了 produce
和 actor
返回的 Channel
都會伴隨着對應的協程執行完畢而關閉。哦,原來 Channel
還有一個關閉的概念。
Channel
和咱們後面的文章即將要探討的 Flow
不一樣,它是在線的,是一個熱數據源,換句話說就是有想要收數據,就要有人在對面給他發,就像發微信同樣。既然這樣,就不免曲終人散,對於一個 Channel
,若是咱們調用了它的 close
,它會當即中止接受新元素,也就是說這時候它的 isClosedForSend
會當即返回 true
,而因爲 Channel
緩衝區的存在,這時候可能還有一些元素沒有被處理完,因此要等全部的元素都被讀取以後 isClosedForReceive
纔會返回 true
。
val channel = Channel<Int>(3)
val producer = GlobalScope.launch {
List(5){
channel.send(it)
Logger.debug("send $it")
}
channel.close()
Logger.debug("close channel. ClosedForSend = ${channel.isClosedForSend} ClosedForReceive = ${channel.isClosedForReceive}")
}
val consumer = GlobalScope.launch {
for (element in channel){
Logger.debug("receiveb: $element")
delay(1000)
}
Logger.debug(""After Consuming. ClosedForSend = ${channel.isClosedForSend} ClosedForReceive = ${channel.isClosedForReceive})
}
複製代碼
咱們把例子稍做修改,開了一個緩衝區大小爲 3 的 Channel
,在 producer 協程裏面快速的發送元素出去,發送5個以後關閉 Channel
,而在 consumer 協程當中每秒讀取一個, 結果以下:
11:05:20:678 [DefaultDispatcher-worker-1] send 0
11:05:20:678 [DefaultDispatcher-worker-3] receive:0
11:05:20:678 [DefaultDispatcher-worker-1] send 1
11:05:20:678 [DefaultDispatcher-worker-1] send 2
11:05:20:678 [DefaultDispatcher-worker-1] send 3
11:05:21:688 [DefaultDispatcher-worker-3] receive:1
11:05:21:688 [DefaultDispatcher-worker-3] send 4
11:05:21:689 [DefaultDispatcher-worker-3] close channel.ClosedForSend = true ClosedForReceive = false
11:05:22:693 [DefaultDispatcher-worker-3] receive:2
11:05:23:694 [DefaultDispatcher-worker-4] receive:3
11:05:24:698 [DefaultDispatcher-worker-4] receive:4
11:05:25:700 [DefaultDispatcher-worker-4] After Consuming.ClosedForSend = true ClosedForReceive = true
複製代碼
下面咱們來探討下 Channel
關閉的意義。
一提及關閉,咱們就容易想到 IO,若是不關閉可能形成資源泄露,那麼 Channel
的關閉是個什麼概念呢?咱們前面提到過, Channel
其實內部的資源就是個緩衝區,這個東西本質上就是個線性表,就是一起內存,因此若是咱們開了一個 Channel
而不去關閉它,其實也不會形成什麼資源泄露,發端若是本身已經發完,它就能夠不理會這個 Channel
了。嗯,看上去好像沒什麼問題是吧?
But,這時候在接收端就比較尷尬了,它不知道會不會有數據發過來,若是 Channel
是微信,那麼接收端打開微信的窗口可能一直看到的是『對方正在輸入』,而後它就一直這樣了,孤獨終老。因此這裏的關閉更多像是一種約定:
女: 咱倆沒戲,你別傻等了。
男:哦。(您的消息未發送成功)
那麼 Channel
的關閉究竟應該有誰來處理呢?正常的通訊,若是是單向的,就比如領導講話,講完都會說『我講完了』,你不能在領導還沒講完的時候就說『我聽完了』,因此單向通訊的狀況比較推薦由發端處理關閉;而對於雙向通訊的狀況,就要考慮協商了,雙向通訊從技術上兩端是對等的,但業務場景下一般來講不是,建議由主導的一方處理關閉。
還有一些複雜的狀況,前面咱們看到的例子都是一對一的收發,其實還有一對多,多對多的狀況,這種也仍然存在主導一方, Channel
的生命週期最好由主導方來維護。官方文檔給出的扇入(fan-in)和扇出(fan-out),其實就是這種狀況。
扇入和扇出的概念可能你們不是很熟悉,網上的說法不是很通俗,你們就想象它是一把摺扇,摺扇的邊射向圓心就是扇入,這種狀況圓心若是是通訊的一端,那它就是接收方,若是是一個函數,那它就是被調用方。扇入越大,說明模塊的複用程度越高,以函數爲例,若是一個函數被調用的次數越多,那說明覆用的程度越高。扇出就是反過來的狀況,描述的是複雜度高的情形,例如一個 Model,負責調用網絡模塊、數據庫、文件等不少模塊。
前面提到了一對多的情形,從數據處理的自己來說,雖然有多個接收端,同一個元素只會被一個接收端讀到。廣播則否則,多個接收端不存在互斥行爲。
直接建立 broadcastChannel
的方法跟普通的 Channel
彷佛也沒什麼太多的不同:
val broadcastChannel = broadcastChannel<Int>(5)
複製代碼
若是要訂閱,那麼只須要調用:
val receiveChannel = broadcastChannel.openSubscription()
複製代碼
這樣咱們就獲得了一個 ReceiveChannel
,獲取訂閱的消息,只須要調用它的 receive
。
咱們看一個完整一點兒的例子,例子中咱們在發端發送 1 - 5,並啓動 3 個協程同時接收廣播:
val producer = GlobalScope.launch {
List(5) {
broadcastChannel.send(it)
Logger.debug("send it")
}
channel.close()
}
List(3) { index ->
GlobalScope.launch {
val receiveChannel = broadcast.openSubscription()
for (element in receiveChannel) {
Logger.debug("[$index] receive: $element")
delay(1000)
}
}
}.forEach { it.join() }
producer.join()
複製代碼
輸出結果以下:
12:34:59:656 [DefaultDispatcher-worker-6] [2] receive: 0
12:34:59:656 [DefaultDispatcher-worker-3] [1] receive: 0
12:34:59:656 [DefaultDispatcher-worker-5] [0] receive: 0
12:34:59:656 [DefaultDispatcher-worker-7] send 0
12:34:59:657 [DefaultDispatcher-worker-7] send 1
12:34:59:658 [DefaultDispatcher-worker-7] send 2
12:35:00:664 [DefaultDispatcher-worker-3] [0] receive: 1
12:35:00:664 [DefaultDispatcher-worker-5] [1] receive: 1
12:35:00:664 [DefaultDispatcher-worker-6] [2] receive: 1
12:35:00:664 [DefaultDispatcher-worker-8] send 3
12:35:01:669 [DefaultDispatcher-worker-8] [0] receive: 2
12:35:01:669 [DefaultDispatcher-worker-3] [1] receive: 2
12:35:01:669 [DefaultDispatcher-worker-6] [2] receive: 2
12:35:01:669 [DefaultDispatcher-worker-8] send 4
12:35:02:674 [DefaultDispatcher-worker-8] [0] receive: 3
12:35:02:674 [DefaultDispatcher-worker-7] [1] receive: 3
12:35:02:675 [DefaultDispatcher-worker-3] [2] receive: 3
12:35:03:678 [DefaultDispatcher-worker-8] [1] receive: 4
12:35:03:678 [DefaultDispatcher-worker-3] [0] receive: 4
12:35:03:678 [DefaultDispatcher-worker-1] [2] receive: 4
複製代碼
這裏請你們重點關注每個收端協程均可以讀取到每個元素。
日誌順序不能很是直觀的反映數據的讀寫順序,若是你們本身再次運行,順序上可能也有出入。
除了直接建立之外,咱們也能夠直接用前面定義的普通的 Channel
來作個轉換:
val channel = Channel<Int>()
val broadcast = channel.broadcast(3)
複製代碼
其中,參數表示緩衝區的大小。
實際上這裏獲得的這個 broadcastChannel
能夠認爲與原 Channel
是級聯關係,這個擴展方法的源碼其實很清晰的爲咱們展現了這一點:
fun <E> ReceiveChannel<E>.broadcast( capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY ): broadcastChannel<E> =
GlobalScope.broadcast(Dispatchers.Unconfined, capacity = capacity, start
= start, onCompletion = consumes()) {
for (e in this@broadcast) { // 這實際上就是在讀取原 Channel
send(e)
}
}
複製代碼
哦~原來對於 BroadcastChannel
,官方也提供相似於 produce
和 actor
的方式,咱們能夠經過 CoroutineScope.broadcast
來直接啓動一個協程,並返回一個 BroadcastChannel
。
須要注意的是,從原始的 Channel
轉換到 BroadcastChannel
其實就是對原 Channel
的一個讀取操做,若是還有其餘協程也在讀這個原始的 Channel
,那麼會與 BroadcastChannel
產生互斥關係。
另外, BroadcastChannel
相關的 API 大部分被標記爲 ExperimentalCoroutinesApi
,後續也許還會有調整。
前面的文章咱們講到過 Sequence
,它的生成器是基於標準庫的協程的 API 實現的,實際上 Channel
自己也能夠用來生成序列,例如:
val channel = GlobalScope.produce(Dispatchers.Unconfined) {
Logger.debug("A")
send(1)
Logger.debug("B")
send(2)
Logger.debug("Done")
}
for (item in channel) { Logger.debug("Got $item")
}
複製代碼
有了前面的基礎這個就很容易看懂了, produce
建立的協程返回了一個緩衝區大小爲 0 的 Channel
,爲了問題描述起來比較容易,咱們傳入了一個 Dispatchers.Unconfined
調度器,意味着協程會當即在當前協程執行到第一個掛起點,因此會當即輸出 A 並在 send(1)
處掛起,直到後面的 for 循環讀到第一個值時,實際上就是 channel
的 iterator
的 hasNext
方法的調用,這個 hasNext
方法會檢查是否有下一個元素,是一個掛起函數,在檢查的過程當中就會讓前面啓動的協程從send(1)
掛起的位置繼續執行,所以會看到日誌 B
輸出,而後再掛起到 send(2)
這裏,這時候 hasNext
結束掛起,for 循環終於輸出第一個元素,依次類推。輸出結果以下:
22:33:56:073 [main @coroutine#1] A
22:33:56:172 [main @coroutine#1] B
22:33:56:173 [main] Got 1
22:33:56:173 [main @coroutine#1] Done
22:33:56:176 [main] Got 2
複製代碼
咱們看到 B
竟然比 Got1
先輸出,一樣, Done
也比 Got2
先輸出,這個看上去比較不符合直覺,不過掛起恢復的執行順序確實如此,關鍵點就是咱們前面提到的 hasNext
方法會掛起並觸發了協程內部從掛起點繼續執行的操做。若是你選擇了其餘調度器,固然也會有其餘合理的結果輸出。無論怎麼樣,咱們體驗了一把用 Channel
模擬 sequence
。若是相似的代碼換做 sequence
,是這樣的:
val sequence = sequence {
Logger.debug("A")
yield(1)
Logger.debug("B")
yield(2)
Logger.debug("Done")
}
Logger.debug("before sequence")
for (item in sequence) {
Logger.debug("Got $item")
}
複製代碼
sequence
的執行順序要直觀的多,它沒有調度器的概念,並且 sequence
的 iterator
的 hasNext
和 next
都不是掛起函數,在 hasNext
的時候一樣會觸發元素的查找,這時候就會觸發 sequence
內部邏輯的執行,所以此次其實是先觸發了 hasNext
纔會輸出 A, yield
把 1 傳出來做爲 sequence
的第一個元素,這樣就會有 Got 1 這樣的輸出,完整輸出以下:
22:33:55:600 [main] A
22:33:55:603 [main] Got 1
22:33:55:604 [main] B
22:33:55:604 [main] Got 2
22:33:55:604 [main] Done
複製代碼
sequence
本質上就是基於標準庫的協程 API 實現的,沒有上層協程框架的做用域以及 Job 這樣的概念。
因此咱們能夠在 Channel
的例子裏面切換不一樣的調度器來生成元素,例如:
val channel = GlobalScope.produce(Dispatchers.Unconfined) { Logger.debug(1)
send(1)
withContext(Dispatchers.IO){
Logger.debug(2)
send(2)
}
Logger.debug("Done")
}
複製代碼
sequence 就不行了。
固然,單純的用 Channel
當作序列生成器來使用有點兒小題大作,這裏更多的是告訴你們存在這樣的可能性,你們在未來遇到合適的場景時,就能夠靈活運用了。
前面咱們提到 sequence
沒法享受更上層的協程框架概念下的各類能力,還有一點 sequence
顯然不是線程安全的,而 Channel
能夠在併發場景下使用。
Channel
內部結構咱們主要說下緩衝區分別是鏈表和數組的版本。鏈表版本的定義主要是在 AbstractSendChannel
當中:
internal abstract class AbstractSendChannel<E> : SendChannel<E> {
protected val queue = LockFreeLinkedListHead()
...
}
複製代碼
LockFreeLinkedListHead
自己其實就是一個雙向鏈表的節點,實際上 Channel
把它首尾相連成爲了循環鏈表,而這個 queque
就是哨兵(sentinel)節點。有新的元素添加時,就在 queue 的前面插入,實際上就至關於在整個隊列的最後插入元素了。
它所謂的 LockFree
在 Java 虛擬機上實際上是經過原子讀寫來實現的, 對於鏈表來講,須要修改的無非就是先後節點的引用:
public actual open class LockFreeLinkedListNode {
private val _next = atomic<Any>(this) // Node | Removed | OpDescriptor private val _prev = atomic<Any>(this) // Node | Removed
...
}
複製代碼
它的實現基於一篇論文中提到的無鎖鏈表的實現,因爲 CAS 原子操做一般只能修改一個引用,對於須要原子同時修改先後節點引用的情形是不適用的,例如單鏈表插入節點時須要修改兩個引用,分別是操做節點的前一個節點的 next 和本身的 next,即 Head -> A -> B -> C 在 A 、B 之間插件 X 時會須要先修改 X -> B 再修改 A -> X,若是這個過程當中 A 被刪除,那麼可能的結果是 X 一併被刪除,獲得的鏈表是 Head -> B -> C。
這個無鎖鏈表的實現經過引入 prev 來輔助解決這個問題,即在 A 被刪除的問題發生的同時,其實咱們是能夠作到 X.next = B,X.prev = A 的,這時候判斷若是 A 已經被移除了,那麼 B.prev 原本是 A,結果就變成了 Head,這時候就能夠將 X.prev 再次賦值爲 B.prev 來修復,固然這個過程稍稍有些複雜,有興趣的同窗也能夠參考 LockFreeLinkedListNode 在 Jvm 上的實現。
而對於數組版本, ArrayChannel
就相對粗暴了,內部就是一個數組:
// 若是緩衝區大小大於 8,會先分配大小爲 8 的數組,在後續進行擴容
private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8))
複製代碼
對這個數組讀寫時則直接用了一個 ReentrantLock
進行加鎖。
這裏是否是有優化的空間呢?其實對於數組的元素,咱們一樣能夠進行 CAS 讀寫,若是你們有興趣,能夠參考下 ConcurrentHashMap
的實現,JDK 7 的實現中對於段數組的讀寫採用了 UnSafe
的 CAS 讀寫,JDK 1.8 直接幹掉了分段,對於桶的讀寫也採用了 UnSafe
的 CAS。
協程在 Js 和 Native 上的實現就要簡單得多,由於它們的協程都只是在單線程上運行,基本不須要處理併發問題。
Channel
的出現,應該說爲協程注入了靈魂。每個獨立的協程再也不是孤獨的個體, Channel
可讓他們更加方便的協做起來。實際上 Channel
的概念並非 Kotlin 原創的,且不說 Golang 裏面的 channel
,就說 Java NIO 當中也存在 Channel
這樣的概念,其實這時候你們很容易就應該想到多路複用,多路複用的時候咱們還能像前面那樣簡單的掛起嗎?或者不掛起咱們該怎麼辦呢?且看下回分解。
歡迎關注 Kotlin 中文社區!
中文官網:www.kotlincn.net/
中文官方博客:www.kotliner.cn/
公衆號:Kotlin
知乎專欄:Kotlin
CSDN:Kotlin中文社區
掘金:Kotlin中文社區
簡書:Kotlin中文社區