即學即用Kotlin - 協程

目錄

1、基礎

1. 概念

相信你們或多或少的都瞭解過,協程是什麼,官網上這麼說:android

Essentially, coroutines are light-weight threads.

協程是輕量級的線程,爲何是輕量的?能夠先告訴你們結論,由於它基於線程池API,因此在處理併發任務這件事上它真的遊刃有餘。
有可能有的同窗問了,既然它基於線程池,那我直接使用線程池或者使用 Android 中其餘的異步任務解決方式,好比 HandlerRxJava等,不更好嗎?
協程可使用阻塞的方式寫出非阻塞式的代碼,解決併發中常見的回調地獄,這是其最大的優勢,後面介紹。程序員

2. 使用

<pre data-tool="mdnice編輯器" class="custom" >\`GlobalScope.launch(Dispatchers.Main) {
 val res = getResult(2)
 mNumTv.text = res.toString()
}

啓動協程的代碼就是如此的簡單。上面的代碼中能夠分爲三部分,分別是 GlobalScopeDispatcherlaunch,他們分別對應着協程的做用域、調度器和協程構建器,咱們挨個兒介紹。數據庫

協程做用域

協程的做用域有三種,他們分別是:編程

  • runBlocking:頂層函數,它和 coroutineScope 不同,它會阻塞當前線程來等待,因此這個方法在業務中並不適用 。
  • GlobalScope:全局協程做用域,能夠在整個應用的聲明週期中操做,且不能取消,因此仍不適用於業務開發。
  • 自定義做用域:自定義協程的做用域,不會形成內存泄漏。

顯然,咱們不能在 Activity 中調用 GlobalScope,這樣可能會形成內存泄漏,看一下如何自定義做用域,具體的步驟我在註釋中已給出:數組

<pre data-tool="mdnice編輯器" class="custom" >\`class MainActivity : AppCompatActivity() {
 // 1\\. 建立一個 MainScope
 val scope = MainScope()
 override fun onCreate(savedInstanceState: Bundle?) {
 super.onCreate(savedInstanceState)
 setContentView(R.layout.activity\_main)
 // 2\\. 啓動協程
 scope.launch(Dispatchers.Unconfined) {
 val one = getResult(20)
 val two = getResult(40)
 mNumTv.text = (one + two).toString()
 }
 }
 // 3\\. 銷燬的時候釋放
 override fun onDestroy() {
 super.onDestroy()
 scope.cancel()
 }
 private suspend fun getResult(num: Int): Int {
 delay(5000)
 return num \* num
 }
} </pre>

調度器

調度器的做用是將協程限制在特定的線程執行。主要的調度器類型有:網絡

  • Dispatchers.Main:指定執行的線程是主線程,如上面的代碼。
  • Dispatchers.IO:指定執行的線程是 IO 線程。
  • Dispatchers.Default:默認的調度器,適合執行 CPU 密集性的任務。
  • Dispatchers.Unconfined:非限制的調度器,指定的線程可能會隨着掛起的函數的發生變化。

什麼是掛起?咱們就以九心吃飯爲例,若是到公司對面的廣場吃飯,九心得通過:併發

  • 走到廣場 10min > 點餐 5min > 等待上餐 10min > 就餐 30min > 回來 10 min

若是九心點廣場的外賣呢?異步

  • 九心:下單 5min > 等待(等待的時候能夠工做) 30min > 就餐 30min
  • 外賣騎手:到店 > 取餐 > 送外賣

從九心吃飯的例子能夠看出,若是點了外賣,九心花費的時間較少了,能夠空閒出更多的時間作本身的事。再仔細分析一下,其實從公司到廣場和等待取餐這個過程並無省去,只是九心把這個過程交給了外賣員。
協程的原理跟九心點外賣的原理是一致的,耗時阻塞的操做並無減小,只是交給了其餘線程async

launch

launch 的做用從它的名稱就能夠看的出來,啓動一個新的協程,它返回的是一個 Job對象,咱們能夠調用 Job#cancel() 取消這個協程。
除了 launch,還有一個方法跟它很像,就是 async,它的做用是建立一個協程,以後返回一個 Deferred<T>對象,咱們能夠調用 Deferred#await()去獲取返回的值,有點相似於 Java 中的 Future,稍微改一下上面的代碼:編輯器

<pre data-tool="mdnice編輯器" class="custom" >\`class MainActivity : AppCompatActivity() {
 // 1\\. 建立一個 MainScope
 val scope = MainScope()
 override fun onCreate(savedInstanceState: Bundle?) {
 super.onCreate(savedInstanceState)
 setContentView(R.layout.activity\_main)
 // 2\\. 啓動協程
 scope.launch(Dispatchers.Unconfined) {
 val one = async { getResult(20) }
 val two = async { getResult(40) }
 mNumTv.text = (one.await() + two.await()).toString()
 }
 }
 // 3\\. 銷燬的時候釋放
 override fun onDestroy() {
 super.onDestroy()
 scope.cancel()
 }
 private suspend fun getResult(num: Int): Int {
 delay(5000)
 return num \* num
 }
}

與修改前的代碼相比,async 可以併發執行任務,執行任務的時間也所以縮短了一半。
除了上述的併發執行任務,async 還能夠對它的 start 入參設置成懶加載

<pre data-tool="mdnice編輯器" class="custom" >\`val one = async(start = CoroutineStart.LAZY) { getResult(20) }

這樣系統就能夠在調用它的時候再爲它分配資源了。

suspend

suspend 是修飾函數的關鍵字,意思是當前的函數是能夠掛起的,可是它僅僅起着提醒的做用,好比,當咱們的函數中沒有須要掛起的操做的時候,編譯器回給咱們提醒 Redudant suspend modifier,意思是當前的 suspend 是沒有必要的,能夠把它刪除。
那咱們何時須要使用掛起函數呢?常見的場景有:

  • 耗時操做:使用 withContext 切換到指定的 IO 線程去進行網絡或者數據庫請求。
  • 等待操做:使用delay方法去等待某個事件。

withContext 的代碼:

`<pre data-tool="mdnice編輯器" class="custom" >\private suspend fun getResult(num: Int): Int {
 return withContext(Dispatchers.IO) {
 num \* num
 }
} 

delay 的代碼:

<pre data-tool="mdnice編輯器" class="custom" >\private suspend fun getResult(num: Int): Int {
 delay(5000)
 return num \* num
} `

結合 Android Jetpack

在介紹自定義協程做用域的時候,咱們須要主動在 Activity 或者 Fragment 中的 onDestroy 方法中調用 job.cancel(),忘記處理多是程序員常常會犯的錯誤,如何避免呢?
Google 老是可以解決程序員的痛點,在 Android Jetpack 中的 lifecycleLiveDataViewModel 已經集成了快速使用協程的方法,若是咱們已經引入了 Android Jetpack,能夠引入依賴:

<pre data-tool="mdnice編輯器" class="custom" > \`dependencies {
 def lifecycle\_version = "2.2.0"
 // ViewModel
 implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle\_version"
 // LiveData
 implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycle\_version"
 // Lifecycles only (without ViewModel or LiveData)
 implementation "androidx.lifecycle:lifecycle-runtime-ktx:$lifecycle\_version"
 }

使用能夠結合具體的場景,好比結合 Lifecycle,須要使用 lifecycleScope 協程做用域:

<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch {
 // 表明當前生命週期處於 Resumed 的時候纔會執行(選擇性使用)
 whenResumed { 
 // ... 具體的協程代碼
 }
}

即便你不使用 Android Jetpack 組件,因爲 Lifecycles 在很早以前就內置在 Android 系統的代碼中,因此你仍然能夠僅僅引入 Lifecycle 的協程擴展庫,由於它會幫助你很好的處理 Activity 或者 Fragment 的生命週期。
引入 Android Jetpack 協程擴展庫官方文檔:點我打開

2、流

長期以來,在 Android 中響應式編程的首選方案是 RxJava,咱們今天就來了解一下 Kotlin中的響應式編程 Flow。若是你能熟練使用 RxJava,那你確定能快速上手 Flow。
LiveData 更加簡單和純粹,它創建單一的生產消費模型,Flow 纔是相似於 RxJava 的存在。

1. 基礎

先上一段代碼:

<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch {
 // 建立一個協程 Flow<T>
 createFlow()
 .collect {num->
 // 具體的消費處理
 // ...
 }
 }
}

我在 createFlow 這個方法中,返回了 Flow<Int> 的對象,因此咱們能夠這樣對比。
image.png

建立 Flow 對象

咱們暫不考慮 RxJava中的背壓和非背壓,直接先將 Flow 對標 RxJava 中的 Observable
和 RxJava 同樣,在建立 Flow 對象的時候咱們也須要調用 emit 方法發射數據:

<pre data-tool="mdnice編輯器" class="custom" >\`fun createFlow(): Flow<Int> = flow {
 for (i in 1..10)
 emit(i)
}

一直調用 emit 可能不便捷,由於 RxJava 提供了 Observable.just() 這類的操做符,顯然,Flow 也爲咱們提供了快速建立操做:

  • flowof(vararg elements: T):幫助可變數組生成 Flow 實例
  • 擴展函數 .asFlow():面向數組、列表等集合

好比可使用 (1..10).asFlow() 代替上述的 Flow 對象的建立。

消費數據

collect 方法和 RxJava 中的 subscribe 方法同樣,都是用來消費數據的。
除了簡單的用法外,這裏有兩個問題得注意一下:

  • collect 函數是一個 suspend 方法,因此它必須發生在協程或者帶有 suspend 的方法裏面,這也是我爲何在一開始的時候啓動了 lifecycleScope.launch
  • lifecycleScope 是我使用的 Lifecycle 的協程擴展庫當中的,你能夠替換成自定義的協程做用域。

2. 線程切換

咱們學習 RxJava 的時候,大佬們都會說,RxJava 牛逼,牛逼在哪兒呢?
切換線程,一樣的,Flow 的協程切換也很牛逼。Flow 是這麼切換協程的:

pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch {
 // 建立一個協程 Flow<T>
 createFlow()
 // 將數據發射的操做放到 IO 線程中的協程
 .flowOn(Dispatchers.IO)
 .collect { num ->
 // 具體的消費處理
 // ...
 }
 }
}

和 RxJava 對比:
image.png

改變數據發射的線程

flowOn 使用的參數是協程對應的調度器,它實質改變的是協程對應的線程。

改變消費數據的線程

我在上面的表格中並無寫到在 Flow 中如何改變消費線程,並不意味着 Flow 不能夠指定消費線程?
Flow 的消費線程在咱們啓動協程指定調度器的時候就確認好了,對應着啓動協程的調度器。好比在上面的代碼中 lifecycleScope 啓動的調度器是 Dispatchers.Main,那麼 collect 方法就消費在主線程。

3. 異常和完成

異常捕獲

image.png
Flow 中的 catch 對應着 RxJava 中的 onErrorcatch 操做:

<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch {
 flow {
 //...
 }.catch {e->
 }.collect(
 )
}

除此之外,你可使用聲明式捕獲 try { } catch (e: Throwable) { } 去捕獲異常,不過 catch 本質上是一個擴展方法,它是對聲明式捕獲的封裝。

完成

image.png
Flow 中的 onCompletion 對應這 RxJava 中的 onComplete 回調,onCompletion操做:

<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch {
 createFlow()
 .onCompletion {
 // 處理完成操做
 }
 .collect {
 }
}

除此之外,咱們還能夠經過捕獲式 try {} finally {} 去獲取完成狀況。

4. Flow的特色

咱們在對 Flow 已經有了一些基礎的認知了,再來聊一聊 Flow 的特色,Flow 具備如下特色:

  • 冷流
  • 有序
  • 協做取消

若是你對 Kotlin 中的 Sequence 有一些認識,那麼你應該能夠輕鬆的 Get 到前兩個點。

冷流

有點相似於懶加載,當咱們觸發 collect 方法的時候,數據纔開始發射。

<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch {
 val flow = (1..10).asFlow().flowOn(Dispatchers.Main)
 flow.collect { num ->
 // 具體的消費處理
 // ...
 }
 }
}

也就是說,在第2行的時候,雖然流建立好了,可是數據一直到第四行發生 collect 纔開始發射。

有序

看代碼比較容易理解:

<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch {
 flow {
 for(i in 1..3) {
 Log.e("Flow","$i emit")
 emit(i)
 }
 }.filter {
 Log.e("Flow","$it filter")
 it % 2 != 0
 }.map {
 Log.e("Flow","$it map")
 "${it \* it} money"
 }.collect {
 Log.e("Flow","i get $it")
 }
}

獲得的日誌:

<pre data-tool="mdnice編輯器" class="custom" >\`E/Flow: 1 emit
E/Flow: 1 filter
E/Flow: 1 map
E/Flow: i get 1 money
E/Flow: 2 emit
E/Flow: 2 filter
E/Flow: 3 emit
E/Flow: 3 filter
E/Flow: 3 map
E/Flow: i get 9 money

從日誌中,咱們很容易得出這樣的結論,每一個數據都是通過 emitfiltermapcollect 這一套完整的處理流程後,下個數據纔會開始處理,而不是全部的數據都先統一 emit,完了再統一 filter,接着 map,最後再 collect

協做取消

Flow 採用和協程同樣的協做取消,也就是說,Flow 的 collect 只能在可取消的掛起函數中掛起的時候取消,不然不能取消。
若是咱們想取消 Flow 得藉助 withTimeoutOrNull 之類的頂層函數,不妨猜一下,下面的代碼最終會打印出什麼?

<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch {
 val f = flow {
 for (i in 1..3) {
 delay(500)
 Log.e(TAG, "emit $i")
 emit(i)
 }
 }
 withTimeoutOrNull(1600) {
 f.collect {
 delay(500)
 Log.e(TAG, "consume $it")
 }
 }
 Log.e(TAG, "cancel")
}

5. 操做符對比

限於篇幅,我僅介紹一下 Flow 中操做符的做用,就不一一介紹每一個操做符具體怎麼使用了。

普通操做符:

image.png

特殊的操做符

總會有一些特殊的狀況,好比我只須要取前幾個,我只要最新的數據等,不過在這些狀況下,數據的發射就是併發執行的。
image.png

組合操做符

image.png

展平流操做符

展平流有點相似於 RxJava 中的 flatmap,將你發射出去的數據源轉變爲另外一種數據源。
image.png

末端操做符

顧名思義,就是幫你作 collect 處理,collect 是最基礎的末端操做符。
image.png
其餘還有一些操做符,我這裏就不一一介紹了,感興趣能夠查看 API。

3、通道

Channel是一個面向多協程之間數據傳輸的 BlockQueue。它的使用方式超級簡單:

<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch {
 // 1\\. 生成一個 Channel
 val channel = Channel<Int>()
 // 2\\. Channel 發送數據
 launch {
 for(i in 1..5){
 delay(200)
 channel.send(i \* i)
 }
 channel.close()
 }
 // 3\\. Channel 接收數據
 launch {
 for( y in channel)
 Log.e(TAG, "get $y")
 }
}

實現協程之間的數據傳輸須要三步:

1.建立 Channel

建立的 Channel的方式能夠分爲兩種:

  • 直接建立對象:方式跟上述代碼一致。
  • 擴展函數 produce

若是使用了擴展函數,代碼就變成了:

<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch {
 // 1\\. 生成一個 Channel
 val channel = produce<Int> {
 for(i in 1..5){
 delay(200)
 send(i \* i)
 }
 close()
 }
 // 2\\. 接收數據
 // ... 省略 跟以前代碼一致
}

直接將第一步和第二步合併了。

2. 發送數據

發送數據使用的 Channel#send() 方法,當咱們數據發送完畢的時候,可使用 Channel#close() 來代表通道已經結束數據的發送。

3. 接收數據

正常狀況下,咱們僅須要調用 Channel#receive() 獲取數據,可是該方法只能獲取一次傳遞的數據,若是咱們僅需獲取指定次數的數據,能夠這麼操做:

<pre data-tool="mdnice編輯器" class="custom" >\`repeat(4){
 Log.e(TAG, "get ${channel.receive()}")
}

但若是發送的數據不能夠預估呢?這個時候咱們就須要迭代 Channel

<pre data-tool="mdnice編輯器" class="custom" >\`for( y in channel)
 Log.e(TAG, "get $y")

4、多協程數據處理

多協程處理併發數據的時候,原子性一樣也得不到保證,協程中出了一種叫 Mutex 的鎖,區別是它的 lock 操做是掛起的,非阻塞的,感興趣的同窗能夠自行查看。
https://shimo.im/docs/TG8PDh9...

相關文章
相關標籤/搜索