協程這個概念在1958年就開始出現, 目前某些語言開始原生支持(目前主流語言我感受只有Java徹底不支持協程). Java沒有原生協程可是能夠大型公司都本身或者使用第三方庫來支持協程編程, 可是Kotlin原生支持協程.java
Android領域的網絡請求庫通常由Rxjava實現, 包括我本身寫的網絡請求庫一樣也是採用的RxJava. 可是這些RxJava實現的網絡請求庫一樣很難方便的實現併發android
我認爲協程的核心就是一個詞: 做用域, 理解什麼是做用域就理解協程了git
什麼是協程:github
線程和協程的關係屬於一對多關係, 一個線程上容許存在多個協程, 即主線程你也能異步執行代碼. 可是讓某個線程執行太多協程效率過低下, 因此針對不一樣的場景建議使用調度器切換線程, 使用協程開始就不須要考慮線程的問題, 只須要在不一樣場景使用不一樣的調度器(調度器會對特定任務進行優化)就好, 協程英文名是Coroutine.數據庫
使用場景編程
假設首頁存在七個接口網絡請求(後端人員處理差)的狀況一個個使用串行網絡請求的時間比並髮網絡請求慢了接近七倍.後端
目前計算機都是經過多核CPU提高計算能力, 因此熟練掌握併發編程是將來的趨勢緩存
協程優點安全
實驗特性bash
協程在Kotlin1.3時候放出正式版本, 可是目前仍然存在不穩定的函數變更, 不過這個我認爲不影響項目中實際使用
@FlowPreview 表明可能之後存在Api函數變更
@ExperimentalCoroutinesApi 表明目前可能存在不穩定的因素的函數
@ObsoleteCoroutinesApi 可能存在被廢棄的可能
複製代碼
Kotlin的協程主要構成分爲三部分
爲方便網絡請求和簡化異步做用域開啓可使用我實現的一個庫: Net
1.0+版本爲RxJava實現, 2.0+版本爲Coroutine實現
本文章後續會根據Kotlin的版本中的協程迭代進行更新
經常使用事件分發框架爲EventBus或者RxBus, 我以前使用RxJava的時候也寫了RxBus來使用, 使用協程後我又用協程實現一個: Channel
展望
協程對於後端高併發優點很大, 相信Spring的Kt版本後續會跟進
至於Google的Jetpack基本上都有針對協程擴展
咱們公司項目屬於 MVVM+Kotlin+Coroutine+JetPack, 最明顯的是併發網絡請求速度翻倍. 同時代碼更加結構清晰
開啓主協程的三種方式
生命週期和App一致, 沒法取消(不存在Job), 不存在線程阻塞
fun main() {
GlobalScope.launch { // 在後臺啓動一個新的協程並繼續
delay(1000L)
println("World!")
}
Thread.sleep(2000) // 防止JVM虛擬機退出
}
複製代碼
不存在線程阻塞, 能夠取消, 能夠經過CoroutineContext控制協程生命週期
fun main() {
CoroutineScope(Dispatchers.IO).launch {
}
Thread.sleep(1000)
}
複製代碼
線程阻塞, 適用於單元測試, 不須要延遲阻塞防止JVM虛擬機退出. runBlocking屬於全局函數能夠在任意地方調用
通常咱們在項目中是不會使用runBlocking, 由於阻塞主線程沒有開啓的任何意義
fun main() = runBlocking {
// 阻塞線程直到協程做用域內部全部協程執行完畢
}
複製代碼
協程內部還可使用函數建立其餘協程做用域, 分爲兩種建立函數:
CoroutineScope
的擴展函數, 只有在做用域內部才能建立其餘的做用域suspend
修飾的函數內部在主協程內還能夠建立子協程做用域, 建立函數分爲兩種
阻塞做用域(串行): 會阻塞當前做用域
掛起做用域(併發): 不會阻塞當前做用域
同步做用域函數
都屬於suspend函數
withContext
: 能夠切換調度器, 有返回結果coroutineScope
: 建立一個協程做用域, 該做用域會阻塞當前所在做用域而且等待其子協程執行完纔會恢復, 有返回結果supervisorScope
: 使用SupervisorJob的coroutineScope, 異常不會取消父協程public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T
): T
// 返回結果. 能夠和當前協程的父協程存在交互關係, 主要做用爲來回切換調度器
public suspend inline operator fun <T> CoroutineDispatcher.invoke( noinline block: suspend CoroutineScope.() -> T
): T = withContext(this, block)
// withContext工具函數而已
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
複製代碼
異步做用域函數
這兩個函數都不屬於suspend, 只須要CoroutineScope就能夠調用
launch
: 異步併發, 沒有返回結果async
: 異步併發, 有返回結果public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit
): Job
public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T>
複製代碼
同一個協程做用域中的異步任務遵照順序原則開始執行. 適用於串行網絡請求, 在一個異步任務須要上個異步任務的結果時.
協程掛起須要時間, 因此異步協程永遠比同步代碼執行慢
fun main() = runBlocking<Unit> {
launch {
System.err.println("(Main.kt:34) 後執行")
}
System.err.println("(Main.kt:37) 先執行")
}
複製代碼
當在協程做用域中使用async
函數時能夠建立併發任務
public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T>
複製代碼
示例
fun main() = runBlocking<Unit> {
val name = async { getName() }
val title = async { getTitle() }
System.err.println("(Main.kt:35) result = ${name.await() + title.await()}")
delay(2000)
}
複製代碼
Deferred
. 經過函數await
獲取結果值.awaitAll()
等待所有完成.await
任務也會等待執行完協程關閉await
則async內部拋出的異常不會被logCat和tryCatch捕獲, 可是依然會致使做用域取消和異常崩潰. 但當執行await
時異常信息會從新拋出.惰性併發
將async函數中的start
設置爲CoroutineStart.LAZY
時則只有調用Deferred對象的await
時纔會開始執行異步任務(或者執行start
函數).
啓動模式
異常
協程中發生異常, 則父協程取消而且父協程其餘的子協程一樣所有取消
繼承自Job
提供一個全局函數用於建立CompletableDeferred對象, 該對象能夠實現自定義Deferred功能
示例
fun main() = runBlocking<Unit> {
val deferred = CompletableDeferred<Int>()
launch {
delay(1000 )
deferred.complete(23)
}
System.err.println("(Demo.kt:72) 結果 = ${deferred.await()}")
}
複製代碼
建立CompletableDeferred的全局函數
public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T>
public fun <T> CompletableDeferred(value: T): CompletableDeferred<T>
複製代碼
CompletableDeferred函數
public fun complete(value: T): Boolean
// 結果
public fun completeExceptionally(exception: Throwable): Boolean
// 拋出異常, 異常發生在`await()`時
複製代碼
建立此對象表示建立一個協程做用域
若是你看協程的教程可能會常常看到這個詞, 這就是做用域內部開啓新的協程. 父協程會限制子協程的生命週期, 子協程承接父協程的上下文, 這種層級關係就是結構化併發.
在一個協程做用域裏面開啓多個子協程進行併發行爲
協程上下文, 我認爲協程上下文能夠看作包含協程基本信息的一個Context(上下文). 其能夠決定協程的名稱或者運行
建立一個新的調度器
fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
複製代碼
建立新的調度器比較消耗資源, 建議複用且當不須要的時候使用close
函數釋放
Dispatchers
繼承自CoroutineContext, 該枚舉擁有三個實現. 表示不一樣的線程調度. 當函數不使用調度器時承接當前做用域的調度器
Dispatchers.Unconfined 不指定線程,
若是子協程切換線程那麼接下來的代碼也運行在該線程上
複製代碼
Dispatchers.IO
適用於IO讀寫
複製代碼
Dispatchers.Main
根據平臺不一樣而有所差, Android上爲主線程
複製代碼
Dispatchers.Default 默認調度器
在線程池中執行協程體, 適用於計算操做
複製代碼
當即執行
Dispatchers.Main.immediate
複製代碼
immediate
屬於全部調度器都有的屬性, 該屬性表明着若是當前正處於該調度器中不執行調度器切換直接執行, 能夠理解爲在同一調度器內屬於同步協程做用域
例如launch
函數開啓做用域會比後續代碼執行順序低, 可是使用該屬性協程屬於順序執行
示例
CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
// 執行順序 1
}
// 執行順序 2
CoroutineScope(Job() + Dispatchers.Main).launch {
// 執行順序 4
}
// 執行順序 3
複製代碼
經過建立一個CoroutineName
對象, 在構造函數中指定參數爲協程名稱, CoroutineName
繼承自CoroutineContext.
launch(CoroutineName("吳彥祖")){
}
複製代碼
協程上下文名稱用於方便調試使用
yield函數可讓當前協程暫時掛起執行其餘協程體, 若是沒有其餘正在併發的協程體則繼續執行當前協程體(至關於無效調用).
public suspend fun yield(): Unit
複製代碼
在協程中Job一般被稱爲做業, 表示一個協程工做任務, 他一樣繼承自CoroutineContext
val job = launch {
}
複製代碼
Job屬於接口
interface Job : CoroutineContext.Element
複製代碼
函數
public suspend fun join()
// 等待協程執行完畢都阻塞當前線程
public fun cancel(cause: CancellationException? = null)
// 取消協程
public suspend fun Job.cancelAndJoin()
// 阻塞而且在協程結束之後取消協程
public fun start(): Boolean
public val children: Sequence<Job>
// 所有子做業
public fun invokeOnCompletion( onCancelling: Boolean = false, // true則取消做用域不會執行 invokeImmediately: Boolean = true, // handler: CompletionHandler): DisposableHandle
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
// 當其做用域完成之後執行, 主協程指定纔有效, 直接給CoroutineScope指定時無效的
// 手動拋出CancellationException一樣會賦值給cause
public fun getCancellationException(): CancellationException
public val onJoin: SelectClause0
public val children: Sequence<Job>
// 後面說起的選擇器中使用
複製代碼
狀態
經過字段能夠獲取JOB當前處於狀態
public val isActive: Boolean
public val isCancelled: Boolean
public val isCompleted: Boolean
複製代碼
擴展函數
public fun Job.cancelChildren(cause: CancellationException? = null)
public suspend fun Job.cancelAndJoin()
複製代碼
每一個協程做用域都存在coroutineContext. 而協程上下文中都存在Job對象
coroutineContext[Job]
複製代碼
若是協程做用域內存在計算任務(一直打日誌也算)則沒法被取消, 若是使用delay函數則能夠被取消.
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
while (true){
delay(100) // 這行代碼存在則能夠成功取消協程, 不存在則沒法取消
System.err.println("(Main.kt:30) ")
}
}
delay(500)
job.cancel()
System.err.println("(Main.kt:42) 結束")
}
複製代碼
經過使用協程內部isActive
屬性來判斷是否應該結束
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
while (isActive) { // 一旦協程被取消則爲false
System.err.println("(Main.kt:30) ")
}
}
delay(500)
job.cancel()
System.err.println("(Main.kt:42) 結束")
}
複製代碼
協程存在被手動取消的狀況, 可是有些資源須要在協程取消的時候釋放資源, 這個操做能夠在finally
中執行.
不管如何finally都會被執行
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
try {
repeat(1000){
System.err.println("(Main.kt:31) it = $it")
delay(500)
}
} finally {
// 已被取消的協程沒法繼續掛起
}
}
delay(1500)
job.cancel()
System.err.println("(Main.kt:42) ")
}
複製代碼
再次開啓協程
經過withContext
和NonCancellable
能夠在已被取消的協程中繼續掛起協程. 這種用法其實能夠看作建立一個沒法取消的任務
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
複製代碼
協程做用域能夠接收多個CoroutineContext做爲上下文參數. CoroutineContext自己屬於接口, 不少上下文相關的類都實現與他.
配置多個CoroutineContext能夠經過+
符號同時指定多個協程上下文, 每一個實現對象可能包含一部分信息能夠存在覆蓋行爲故相加時的順序存在覆蓋行爲.
val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) {
delay(1000)
System.err.println("(Main.kt:51) ${Thread.currentThread()}")
}
複製代碼
launch(Dispatchers.IO + CoroutineName("吳彥祖")){}
複製代碼
協程局部變量
使用ThreadLocal
能夠獲取線程的局部變量, 可是要求使用擴展函數asContextElement
轉爲協程上下文
做爲參數傳入在建立協程的時候.
該局部變量做用於持有該協程上下文的協程做用域內
public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> =
ThreadLocalElement(value, this)
複製代碼
public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T
// 超過指定時間timeMillis自動結束協程.
// 當沒有超時時返回值獲取而且繼續執行協程.
// 當超時會拋出異常TimeoutCancellationException, 可是不會致使程序結束
public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T?
// 若是超時不會結束協程而是返回null
複製代碼
沒法手動拋出TimeoutCancellationException, 由於其構造函數私有
全局協程做用域
全局協程做用域屬於單例對象, 整個JVM虛擬機只有一份實例對象. 他的壽命週期也跟隨JVM. 使用全局協程做用域的時候注意避免內存泄漏
public object GlobalScope : CoroutineScope {
/** * Returns [EmptyCoroutineContext]. */
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
複製代碼
全局協程做用域不繼承父協程做用域的上下文, 因此也不會由於父協程被取消而自身被取消.
啓動模式
DEFAULT
當即執行協程體ATOMIC
當即執行協程體,但在開始執行協程以前沒法取消協程UNDISPATCHED
當即在當前線程執行協程體,第一個掛起函數執行在函數所在線程, 後面執行在函數指定線程LAZY
手動執行start
或join
纔會執行協程協程體若是已經執行實際上屬於不可取消的, 在協程體中經過isActive
來判斷協程是否處於活躍中
經過取消函數的參數指定異常CancellationException能夠自定義異常對象
不可取消的協程做用域
NonCancellable該單例對象用於withContext
函數建立一個沒法被取消的協程做用域
withContext(NonCancellable) {
delay(2000)
}
複製代碼
示例
fun main() = runBlocking {
launch {
delay(1000)
System.err.println("(Main.kt:19) ")
}
launch {
withContext(NonCancellable) {
delay(2000)
System.err.println("(Main.kt:26) ")
}
}
delay(500) // 防止launch還未開啓withContext就被取消
cancel()
}
複製代碼
GlobalScope取消
GlobalScope屬於全局協程, 由他開啓的協程都不擁有Job, 因此沒法取消協程. 可是能夠經過給GlobalScope開啓的協程做用域指定Job而後就可使用Job取消協程.
經過CoroutineExceptionHandler
函數能夠建立一個同名的對象, 該接口繼承自CoroutineContext
. 一樣經過制定上下文參數傳遞給全局協程做用域使用, 看成用域拋出異常時會被該對象的回調函數接收到, 而且不會拋出異常.
只要發生異常就會致使父協程和其全部子協程都被取消, 這種屬於雙向的異常取消機制, 後面提到的監督做業屬於發生異常只會取消全部子協程, 屬於單向.
CoroutineExceptionHandler異常處理器並不能阻止協程取消, 只是監聽到協程的異常信息避免JVM拋出異常退出程序而已
不要嘗試直接使用try/catch
捕捉協程做用域的異常
try {
launch {
throw NullPointerException()
}
} catch (e: Exception) {
e.printStackTrace()
}
複製代碼
錯誤示例, 沒法捕捉到異常
協程取消異常
取消協程的做業(Job)會引起異常, 可是會被默認的異常處理器給忽略, 可是咱們能夠經過捕捉能夠看到異常信息
fun main() = runBlocking<Unit> {
val job = GlobalScope.launch {
try {
delay(1000)
} catch (e: Exception) {
e.printStackTrace()
}
}
job.cancel(CancellationException("自定義一個用於取消協程的異常"))
delay(2000)
}
複製代碼
Job取消函數
public fun cancel(cause: CancellationException? = null)
複製代碼
全局協程做用域的異常處理
CoroutineExceptionHandler沒法捕捉async|produce
拋出的異常
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
System.err.println("(Main.kt:41):main coroutineContext = $coroutineContext, throwable = $throwable")
}
GlobalScope.launch(exceptionHandler) {
}
複製代碼
子協程沒法設置異常處理器, 即便設置了也會被父協程覆蓋而沒有意義. 除非使用異常處理器+Job對象, 可是第一個子協程launch容許設置
異常聚合和解包
全局協程做用域也存在嵌套子父級關係, 故異常可能也會依次拋出多個異常
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
// 這裏的異常是第一個被拋出的異常對象
println("捕捉的異常: $exception 和被嵌套的異常: ${exception.suppressed.contentToString()}")
}
val job = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE)
} finally { // 當父協程被取消時其全部子協程都被取消, finally被取消以前或者完成任務以後必定會執行
throw ArithmeticException() // 再次拋出異常, 異常被聚合
}
}
launch {
delay(100)
throw IOException() // 這裏拋出異常將致使父協程被取消
}
delay(Long.MAX_VALUE)
}
job.join() // 避免GlobalScope做用域沒有執行完畢JVM虛擬機就退出
}
複製代碼
通常狀況子協程發生異常會致使父協程被取消, 同時父協程發生異常會取消全部的子協程. 可是有時候子協程發生異常咱們並不但願父協程也被取消, 而是僅僅全部子協程取消, 這個使用就是用SupervisorJob
做業.
建立監督做業對象
val job = SupervisorJob()
launch(job) { }
複製代碼
監督做業在withContext
和async
中添加無效
直接建立 SupervisorJob()
對象傳入做用域中會致使該做用域和父協程生命週期不統一的問題, 即父協程取消之後該子協程依然處於活躍狀態.
該函數能夠解決我上面提到的生命週期不統一問題, 直接建立向下傳播異常的協程做用域
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R
複製代碼
supervisorScope
函數使用的依然是當前做用域的Job, 因此跟隨當前做用域生命週期, 能夠被取消.下面我寫個即便拋出異常也不會致使協程取消的
supervisorScope {
try {
throw NullPointerException()
} catch (e: Exception) {
e.printStackTrace()
}
}
複製代碼
解決線程不安全問題
至關於Java中的Lock替代品: Mutex
建立互斥對象
public fun Mutex(locked: Boolean = false): Mutex
// `locked`是否當即上鎖
複製代碼
使用擴展函數能夠自動加鎖和解鎖
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
// owner 鑰匙
複製代碼
經過在不一樣的協程內部使用Channel實例能夠數據通信. 通道能夠連續順序傳輸N個元素
多個協程容許發送和接收同一個通道數據
Channel屬於接口沒法直接建立, 咱們須要經過函數Channel()
來建立其實現類
源碼
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel() // 無緩存
UNLIMITED -> LinkedListChannel() // 無限制
CONFLATED -> ConflatedChannel() // 合併
BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) // 64
else -> ArrayChannel(capacity) // 指定緩存大小
}
複製代碼
capacity
緩衝大小, 默認0
當Channel發送一條數據時就會掛起通道(不繼續執行發送後續代碼), 只有在接收這條數據時纔會解除掛起繼續執行. 可是咱們能夠設置緩存大小
複製代碼
通道容許被遍歷獲取當前發送數據
val channel = Channel<Int>()
for (c in channel){
}
複製代碼
public suspend fun yield(): Unit
複製代碼
Channel
Channel接口同時實現發送渠道(SendChannel)和接收渠道(ReceiveChannel)兩個接口
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
複製代碼
public val isClosedForSend: Boolean
public suspend fun send(element: E)
// 發送消息
public fun offer(element: E): Boolean
public fun close(cause: Throwable? = null): Boolean
// 關閉發送通道
public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
public val onSend: SelectClause2<E, SendChannel<E>>
複製代碼
ClosedReceiveChannelException
拋出public val isClosedForReceive: Boolean
// SendChannel是否已經關閉通道, 若是關閉通道之後還存在緩存則會接收完緩存以後返回false
public val isEmpty: Boolean
public suspend fun receive(): E
// 接受當前被髮送的消息
public val onReceive: SelectClause1<E>
// 監聽事件發送
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
// 會拋出異常, 不推薦使用
public val onReceiveOrNull: SelectClause1<E?>
// Null表示通道已關閉
public suspend fun receiveOrClosed(): ValueOrClosed<E>
// `ValueOrClosed`對象能夠判斷通道是否已關閉
public fun poll(): E?
public fun cancel(cause: CancellationException? = null)
// 取消
複製代碼
close
函數後不容許再發送或者接收數據, 不然拋出異常send|receive
函數所在做用域被取消cancel
不會致使通道結束(isClosedForReceive返回false)這個通道和通常的通道區別在於他的每一個數據能夠被每一個做用域所有接收到. 默認的通道一個數據被接收後其餘的協程是沒法再接收到數據的
廣播通道經過全局函數建立對象
public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>
複製代碼
自己廣播通道繼承自SendChannel, 只能發送數據, 經過函數能夠拿到接收通道
public fun openSubscription(): ReceiveChannel<E>
複製代碼
取消通道
public fun cancel(cause: CancellationException? = null)
複製代碼
將Channel轉成BroadcastChannel
fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E>
複製代碼
經過擴展函數在協程做用域中快速建立一個廣播發送通道
public fun <E> CoroutineScope.broadcast(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E>
複製代碼
迭代通道
接收通道實現操做符重載可使用迭代
public operator fun iterator(): ChannelIterator<E>
複製代碼
示例
for (i in produce){
// 收到每一個髮型的消息
}
複製代碼
當多個協程接收同一個渠道數據會依次輪流接收到數據, 渠道對於多個協程是公平的
上面介紹的屬於建立Channel對象來發送和接收數據, 可是還能夠經過擴展函數快速建立並返回一個具有發送數據的ReceiveChannel
對象
public fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
複製代碼
ProducerScope 該接口繼承自SendChannel以及CoroutineScope, 具有發送通道數據以及協程做用域做用.
當produce做用域執行完成會關閉通道, 前面已經說起關閉通道沒法繼續接收數據
等待取消
該函數會在通道被取消時回調其函數參數. 前面說起協程取消時能夠經過finally
來釋放內存等操做, 可是通道取消沒法使用finally只能使用該函數.
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {})
// [SendChannel.close] or [ReceiveChannel.cancel] 表明取消通道
複製代碼
能夠經過actor
函數建立一個具有渠道做用的協程做用域
public fun <E> CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, // todo: Maybe Channel.DEFAULT here? start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>
複製代碼
該函數和produce
函數類似,
channel:Channel
, 既能夠發送數據又能夠接收數據, produce的屬性channel屬於SendChannelproduce
或者actor
他們的通道都屬於Channel, 既能夠發送又能夠接收數據, 只須要類型強轉便可.不管是RxJava仍是協程都支持輪循器的功能, 在個人網絡請求庫中還賦予了輪循器暫停|繼續|多個觀察者|重置等功能
這裏的協程輪循器就比較簡陋
public fun ticker( delayMillis: Long, initialDelayMillis: Long = delayMillis, context: CoroutineContext = EmptyCoroutineContext, mode: TickerMode = TickerMode.FIXED_PERIOD ): ReceiveChannel<Unit>
複製代碼
該通道返回的數據是Unit
默認狀況下能夠理解爲通道會在指定間隔時間後一直髮送Unit
數據
fun main() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)
// 每秒打印
for (unit in tickerChannel) {
System.err.println("unit = $unit")
}
}
複製代碼
可是若是下游不是在發送數據之後當即接收數據, 而是延遲使用receive函數來接收通道數據
TickerMode
該枚舉擁有兩個字段
這個輪循器不支持多訂閱|暫停|繼續|重置|完成, 可是個人Net庫中Interval
對象已實現該功能.
在select
閉包中能夠建立多個協程做用域或者通道, 且只會執行最快接收數據的通道或者結果.
通道
@InternalCoroutinesApi
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> {
a.onReceiveOrNull {
if (it == null) "Channel 'a' is closed"
else "a -> '$it.valueOrNull'"
}
b.onReceiveOrNull {
if (it == null) "Channel 'b' is closed"
else "b -> '$it.valueOrNull'"
}
}
@InternalCoroutinesApi
fun main() = runBlocking<Unit> {
val a = produce<String> {
repeat(4) { send("Hello $it") }
}
val b = produce<String> {
repeat(4) { send("World $it") }
}
repeat(8) {
// 打印最先的八個結果
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
複製代碼
Flow不屬於掛起函數能夠在任意位置建立. 默認執行在當前協程上下文中.
Flow和RxJava很類似, 分爲上游和下游及中間操做符, 可是Flow內部屬於協程做用域, 其調度器依靠掛起函數切換異步.
JetBrains公司也認可Flow屬於參考Reactive Stream等框架的產物, 這樣我相信不少人就能理解Flow的存在乎義了. 這種上下游觀察者模式在ROOM官方數據庫中一樣支持.
Flow的操做符不如RxJava豐富, 可是Flow的開發時間還很短還未正式完成. 後面能夠跟進
建立Flow
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
複製代碼
示例
fun shoot() = flow {
for (i in 1..3) {
delay(1000) // 僞裝咱們在這裏作了一些有用的事情
emit(i) // 發送下一個值
}
}
複製代碼
集合或者Sequence均可以經過asFlow
函數轉成Flow對象
也能夠像建立集合同樣經過fowOf
直接建立Flow對象
Channel通道轉成Flow
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T>
複製代碼
甚至掛起函數也能夠轉成Flow
public fun <T> (suspend () -> T).asFlow(): Flow<T>
複製代碼
Flow的完成和異常不須要經過發射器Emitter去發送, RxJava須要.
和RxJava同樣理解基本用法和思想以後就僅須要熟悉不一樣的操做符了. 基本的操做符每一個ReactiveStream框架都是雷同
下面介紹Flow的函數(操做符).
收集數據
Flow是冷數據, 要求調用函數collect
收集數據時纔會進行數據的發射. 該系列函數也成爲末端操做符.
shoot().collect {
System.err.println("(Demo.kt:9) it = $it")
}
複製代碼
函數
public suspend fun Flow<*>.collect()
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
public suspend inline fun <T> Flow<T>.collectIndexed( crossinline action: suspend (index: Int, value: T) -> Unit
): Unit
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
// 將Flow運行在指定的協程做用域內
public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
複製代碼
調度器
Flow默認使用的是其所在的當前線程或者協程上下文, Flow不容許在內部使用withContext
來切換調度器, 而是應該使用flowOn
函數
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
複製代碼
該函數改變的是Flow函數內部發射時的線程, 而在collect
收集數據時會自動切回建立Flow時的線程.
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T>
public inline fun <T> callbackFlow(@BuilderInference noinline block: suspend ProducerScope<T>.() -> Unit): Flow<T>
複製代碼
緩存
不須要等待收集執行就當即執行發射數據. 只是數據暫時被緩存而已. 提升性能.
默認切換調度器時會自動緩存
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>
複製代碼
合併函數, 這個函數實際上就是buffer
當下遊沒法及時處理上游的數據時會丟棄掉該數據
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
複製代碼
將多個事件合併後發送給下游
zip
將兩個Flow在回調函數中進行處理返回一個新的值 R
當兩個flow的長度不等時只發送最短長度的事件
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
複製代碼
示例
val nums = (1..3).asFlow().onEach { delay(300) } // 發射數字 1..3,間隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒發射一次字符串
val startTime = System.currentTimeMillis() // 記錄開始的時間
nums.zip(strs) { a, b -> "$a -> $b" } // 使用「zip」組合單個字符串
.collect { value -> // 收集並打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
複製代碼
combine
public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R>
public fun <T1, T2, R> Flow<T1>.combineTransform( flow: Flow<T2>, @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R>
複製代碼
Flow直接轉成集合函數
public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
複製代碼
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S
public suspend inline fun <T, R> Flow<T>.fold( initial: R, crossinline operation: suspend (acc: R, value: T) -> R
): R
// `acc`爲上次回調函數返回值, 第一次爲初始值, 等同於疊加效果. 該函數和reduce的區別就是支持初始值. reduce累計兩次元素纔會回調函數
public suspend fun <T> Flow<T>.single(): T
// 期待只有一個元素, 不然拋出`IllegalStateException`
public suspend fun <T: Any> Flow<T>.singleOrNull(): T?
// 不拋出異常, 但若是不是僅有元素則返回null
public suspend fun <T> Flow<T>.first(): T
// 若是不存在一個元素則會拋出`NoSuchElementException`
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
// 返回回調函數判斷爲true的第一個條件符合的元素
複製代碼
public fun <T, R> Flow<T>.flatMapMerge( concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)
// 上游先發送全部的元素, 而後上游每一個元素會致使回調函數中的Flow發送全部元素一次
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
// 等同於RxJava的FlatMap
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>
public fun <T> Flow<Flow<T>>.flattenMerge( concurrency: Int = DEFAULT_CONCURRENCY ): Flow<T>
public fun <T, R> Flow<T>.transformLatest( @BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>
public inline fun <T, R> Flow<T>.flatMapLatest( @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R>
public inline fun <T, R> Flow<T>.flatMapLatest( @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R>
複製代碼
public inline fun <T, R> Flow<T>.transform( @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>
// 轉換函數, 能夠在回調函數中發送新的元素
public fun <T> Flow<T>.onStart( action: suspend FlowCollector<T>.() -> Unit
): Flow<T>
// 開始
public fun <T> Flow<T>.onCompletion( action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>
// 回調函數中的參數`cause`若是爲null表示正常完成沒有拋出異常, 反之則拋出異常非正常結束
// 和catch函數同樣只能監聽到上游發生的異常, 可是沒法避免異常拋出只能在異常拋出以前執行回調函數
複製代碼
限制流發送
public fun <T> Flow<T>.take(count: Int): Flow<T>
// 只接受指定數量事件
public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 丟棄指定數量事件
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
// 回調函數判斷是否丟棄或者接收, 只要丟棄或者接收後面就不會繼續發送事件(結束流)
複製代碼
捕捉異常
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
複製代碼
該函數只能捕獲上游異常, 若是異常處於函數調用之下則依然會被拋出
重試
public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, // 重試次數 predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T>
public fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T>
複製代碼
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T>
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R>
public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
複製代碼
Google發行的Jetpack庫中不少組件都附有KTX擴展依賴, 這種依賴主要是增長kotlin和協程支持
官方提供生命週期協程做用域的快速建立實現.
onDestory
取消協程引入ktx依賴庫
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc03"
複製代碼
當執行到某個生命週期時運行協程
fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job
fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job
fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job
suspend fun <T> Lifecycle.whenStateAtLeast(
minState: Lifecycle.State,
block: suspend CoroutineScope.() -> T
)
複製代碼
這些函數都屬於Lifecycle
和LifecycleOwner
的擴展函數
依賴
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-rc03"
複製代碼
提供開發者使用的只有這兩個函數, 兩個函數功能同樣, 只是每一個參數接收時間單位不一致
fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeout: Duration, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeout.toMillis(), block)
複製代碼
Dispatchers.Main.immediate
調度器liveData做用域具有發射數據和LiveData的做用
interface LiveDataScope<T> {
/** * Set's the [LiveData]'s value to the given [value]. If you've called [emitSource] previously, * calling [emit] will remove that source. * * Note that this function suspends until the value is set on the [LiveData]. * * @param value The new value for the [LiveData] * * @see emitSource */
suspend fun emit(value: T)
/** * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]. Calling this * method will remove any source that was yielded before via [emitSource]. * * @param source The [LiveData] instance whose values will be dispatched from the current * [LiveData]. * * @see emit * @see MediatorLiveData.addSource * @see MediatorLiveData.removeSource */
suspend fun emitSource(source: LiveData<T>): DisposableHandle
/** * References the current value of the [LiveData]. * * If the block never `emit`ed a value, [latestValue] will be `null`. You can use this * value to check what was then latest value `emit`ed by your `block` before it got cancelled. * * Note that if the block called [emitSource], then `latestValue` will be last value * dispatched by the `source` [LiveData]. */
val latestValue: T?
}
複製代碼