即學即用Kotlin - 協程

前言

上週在內部分享會上大佬同事分享了關於 Kotlin 協程的知識,以前有看過 Kotlin 協程的一些知識,覺得本身還挺了解協程的,結果...html

打臉
打臉

在這一次分享中,發現 FlowChannel 這一起知識是本身不怎麼了解的,本文也將着重和你們聊一聊這一起的內容,協程部分將分爲三篇,本文是第一篇:android

《即學即用Kotlin - 協程》
《抽絲剝繭Kotlin - 協程基礎篇》
《抽絲剝繭Kotlin - 協程Flow篇》程序員

目錄

目錄
目錄

1、基礎

1. 概念

相信你們或多或少的都瞭解過,協程是什麼,官網上這麼說:web

Essentially, coroutines are light-weight threads.數據庫

協程是輕量級的線程,爲何是輕量的?能夠先告訴你們結論,由於它基於線程池API,因此在處理併發任務這件事上它真的遊刃有餘。編程

有可能有的同窗問了,既然它基於線程池,那我直接使用線程池或者使用 Android 中其餘的異步任務解決方式,好比 HandlerRxJava等,不更好嗎?數組

協程能夠使用阻塞的方式寫出非阻塞式的代碼,解決併發中常見的回調地獄,這是其最大的優勢,後面介紹。微信

2. 使用

GlobalScope.launch(Dispatchers.Main) {
 val res = getResult(2)  mNumTv.text = res.toString() } 複製代碼

啓動協程的代碼就是如此的簡單。上面的代碼中能夠分爲三部分,分別是 GlobalScopeDispatcherlaunch,他們分別對應着協程的做用域、調度器和協程構建器,咱們挨個兒介紹。網絡

協程做用域

協程的做用域有三種,他們分別是:併發

  • runBlocking:頂層函數,它和 coroutineScope 不同,它會阻塞當前線程來等待,因此這個方法在業務中並不適用 。
  • GlobalScope:全局協程做用域,能夠在整個應用的聲明週期中操做,且不能取消,因此仍不適用於業務開發。
  • 自定義做用域:自定義協程的做用域,不會形成內存泄漏。

顯然,咱們不能在 Activity 中調用 GlobalScope,這樣可能會形成內存泄漏,看一下如何自定義做用域,具體的步驟我在註釋中已給出:

class MainActivity : AppCompatActivity() {
 // 1. 建立一個 MainScope  val scope = MainScope()   override fun onCreate(savedInstanceState: Bundle?) {  super.onCreate(savedInstanceState)  setContentView(R.layout.activity_main)   // 2. 啓動協程  scope.launch(Dispatchers.Unconfined) {  val one = getResult(20)  val two = getResult(40)  mNumTv.text = (one + two).toString()  }  }   // 3. 銷燬的時候釋放  override fun onDestroy() {  super.onDestroy()   scope.cancel()  }   private suspend fun getResult(num: Int): Int {  delay(5000)  return num * num  } } 複製代碼

調度器

調度器的做用是將協程限制在特定的線程執行。主要的調度器類型有:

  • Dispatchers.Main:指定執行的線程是主線程,如上面的代碼。
  • Dispatchers.IO:指定執行的線程是 IO 線程。
  • Dispatchers.Default:默認的調度器,適合執行 CPU 密集性的任務。
  • Dispatchers.Unconfined:非限制的調度器,指定的線程可能會隨着 掛起的函數的發生變化。

什麼是掛起?咱們就以九心吃飯爲例,若是到公司對面的廣場吃飯,九心得通過:

  • 走到廣場 10min > 點餐 5min > 等待上餐 10min > 就餐 30min > 回來 10 min

若是九心點廣場的外賣呢?

  • 九心:下單 5min > 等待(等待的時候能夠工做) 30min > 就餐 30min
  • 外賣騎手:到店 > 取餐 > 送外賣

從九心吃飯的例子能夠看出,若是點了外賣,九心花費的時間較少了,能夠空閒出更多的時間作本身的事。再仔細分析一下,其實從公司到廣場和等待取餐這個過程並無省去,只是九心把這個過程交給了外賣員。

協程的原理跟九心點外賣的原理是一致的,耗時阻塞的操做並無減小,只是交給了其餘線程: 協程請求數據過程

launch

launch 的做用從它的名稱就能夠看的出來,啓動一個新的協程,它返回的是一個 Job對象,咱們能夠調用 Job#cancel() 取消這個協程。

除了 launch,還有一個方法跟它很像,就是 async,它的做用是建立一個協程,以後返回一個 Deferred<T>對象,咱們能夠調用 Deferred#await()去獲取返回的值,有點相似於 Java 中的 Future,稍微改一下上面的代碼:

class MainActivity : AppCompatActivity() {
 // 1. 建立一個 MainScope  val scope = MainScope()   override fun onCreate(savedInstanceState: Bundle?) {  super.onCreate(savedInstanceState)  setContentView(R.layout.activity_main)   // 2. 啓動協程  scope.launch(Dispatchers.Unconfined) {  val one = async { getResult(20) }  val two = async { getResult(40) }  mNumTv.text = (one.await() + two.await()).toString()  }  }   // 3. 銷燬的時候釋放  override fun onDestroy() {  super.onDestroy()   scope.cancel()  }   private suspend fun getResult(num: Int): Int {  delay(5000)  return num * num  } } 複製代碼

與修改前的代碼相比,async 可以併發執行任務,執行任務的時間也所以縮短了一半。

除了上述的併發執行任務,async 還能夠對它的 start 入參設置成懶加載

val one = async(start = CoroutineStart.LAZY) { getResult(20) }
複製代碼

這樣系統就能夠在調用它的時候再爲它分配資源了。

suspend

suspend 是修飾函數的關鍵字,意思是當前的函數是能夠掛起的,可是它僅僅起着提醒的做用,好比,當咱們的函數中沒有須要掛起的操做的時候,編譯器回給咱們提醒 Redudant suspend modifier,意思是當前的 suspend 是沒有必要的,能夠把它刪除。

那咱們何時須要使用掛起函數呢?常見的場景有:

  • 耗時操做:使用 withContext 切換到指定的 IO 線程去進行網絡或者數據庫請求。
  • 等待操做:使用 delay方法去等待某個事件。

withContext 的代碼:

private suspend fun getResult(num: Int): Int {
 return withContext(Dispatchers.IO) {  num * num  } } 複製代碼

delay 的代碼:

private suspend fun getResult(num: Int): Int {
 delay(5000)  return num * num } 複製代碼

結合 Android Jetpack

在介紹自定義協程做用域的時候,咱們須要主動在 Activity 或者 Fragment 中的 onDestroy 方法中調用 job.cancel(),忘記處理多是程序員常常會犯的錯誤,如何避免呢?

Google 老是可以解決程序員的痛點,在 Android Jetpack 中的 lifecycleLiveDataViewModel 已經集成了快速使用協程的方法,若是咱們已經引入了 Android Jetpack,能夠引入依賴:

dependencies {
 def lifecycle_version = "2.2.0"   // ViewModel  implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version"  // LiveData  implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycle_version"  // Lifecycles only (without ViewModel or LiveData)  implementation "androidx.lifecycle:lifecycle-runtime-ktx:$lifecycle_version"  } 複製代碼

使用能夠結合具體的場景,好比結合 Lifecycle,須要使用 lifecycleScope 協程做用域:

lifecycleScope.launch {
 // 表明當前生命週期處於 Resumed 的時候纔會執行(選擇性使用)  whenResumed {  // ... 具體的協程代碼  } } 複製代碼

即便你不使用 Android Jetpack 組件,因爲 Lifecycles 在很早以前就內置在 Android 系統的代碼中,因此你仍然能夠僅僅引入 Lifecycle 的協程擴展庫,由於它會幫助你很好的處理 Activity 或者 Fragment 的生命週期。

引入 Android Jetpack 協程擴展庫官方文檔:點我打開

2、流

長期以來,在 Android 中響應式編程的首選方案是 RxJava,咱們今天就來了解一下 Kotlin中的響應式編程 Flow。若是你能熟練使用 RxJava,那你確定能快速上手 Flow。

曾經我在《即學即用Android Jetpack - ViewModel & LiveData》一文中說過,LiveData 的使用相似於 RxJava,如今我收回這句話,事實上,LiveData 更加簡單和純粹,它創建單一的生產消費模型,Flow 纔是相似於 RxJava 的存在。

1. 基礎

先上一段代碼:

lifecycleScope.launch {
 // 建立一個協程 Flow<T>  createFlow()  .collect {num->  // 具體的消費處理  // ...  }  } } 複製代碼

我在 createFlow 這個方法中,返回了 Flow<Int> 的對象,因此咱們能夠這樣對比。

對比 Flow RxJava
數據源 Flow<T> Observable<T>
訂閱 collect subscribe

建立 Flow 對象

咱們暫不考慮 RxJava中的背壓和非背壓,直接先將 Flow 對標 RxJava 中的 Observable

和 RxJava 同樣,在建立 Flow 對象的時候咱們也須要調用 emit 方法發射數據:

fun createFlow(): Flow<Int> = flow {
 for (i in 1..10)  emit(i) } 複製代碼

一直調用 emit 可能不便捷,由於 RxJava 提供了 Observable.just() 這類的操做符,顯然,Flow 也爲咱們提供了快速建立操做:

  • flowof(vararg elements: T):幫助可變數組生成 Flow 實例
  • 擴展函數 .asFlow():面向數組、列表等集合

好比可使用 (1..10).asFlow() 代替上述的 Flow 對象的建立。

消費數據

collect 方法和 RxJava 中的 subscribe 方法同樣,都是用來消費數據的。

除了簡單的用法外,這裏有兩個問題得注意一下:

  • collect 函數是一個 suspend 方法,因此它必須發生在協程或者帶有 suspend 的方法裏面,這也是我爲何在一開始的時候啓動了 lifecycleScope.launch
  • lifecycleScope 是我使用的 Lifecycle 的協程擴展庫當中的,你能夠替換成自定義的協程做用域。

2. 線程切換

咱們學習 RxJava 的時候,大佬們都會說,RxJava 牛逼,牛逼在哪兒呢?

切換線程,一樣的,Flow 的協程切換也很牛逼。Flow 是這麼切換協程的:

lifecycleScope.launch {
 // 建立一個協程 Flow<T>  createFlow()  // 將數據發射的操做放到 IO 線程中的協程  .flowOn(Dispatchers.IO)  .collect { num ->  // 具體的消費處理  // ...  }  } } 複製代碼

和 RxJava 對比:

操做 Flow RxJava
改變數據發射的線程 flowOn subscribeOn
改變消費數據的線程 observeOn

改變數據發射的線程

flowOn 使用的參數是協程對應的調度器,它實質改變的是協程對應的線程。

改變消費數據的線程

我在上面的表格中並無寫到在 Flow 中如何改變消費線程,並不意味着 Flow 不能夠指定消費線程?

Flow 的消費線程在咱們啓動協程指定調度器的時候就確認好了,對應着啓動協程的調度器。好比在上面的代碼中 lifecycleScope 啓動的調度器是 Dispatchers.Main,那麼 collect 方法就消費在主線程。

3. 異常和完成

異常捕獲

對比 Flow RxJava
異常 catch onError

Flow 中的 catch 對應着 RxJava 中的 onErrorcatch 操做:

lifecycleScope.launch {
 flow {  //...  }.catch {e->   }.collect(   ) } 複製代碼

除此之外,你可使用聲明式捕獲 try { } catch (e: Throwable) { } 去捕獲異常,不過 catch 本質上是一個擴展方法,它是對聲明式捕獲的封裝。

完成

對比 Flow RxJava
完成 onCompletion onComplete

Flow 中的 onCompletion 對應這 RxJava 中的 onComplete 回調,onCompletion操做:

lifecycleScope.launch {
 createFlow()  .onCompletion {  // 處理完成操做  }  .collect {   } } 複製代碼

除此之外,咱們還能夠經過捕獲式 try {} finally {} 去獲取完成狀況。

4. Flow的特色

咱們在對 Flow 已經有了一些基礎的認知了,再來聊一聊 Flow 的特色,Flow 具備如下特色:

  • 冷流
  • 有序
  • 協做取消

若是你對 Kotlin 中的 Sequence 有一些認識,那麼你應該能夠輕鬆的 Get 到前兩個點。

冷流

有點相似於懶加載,當咱們觸發 collect 方法的時候,數據纔開始發射。

lifecycleScope.launch {
 val flow = (1..10).asFlow().flowOn(Dispatchers.Main)   flow.collect { num ->  // 具體的消費處理  // ...  }  } } 複製代碼

也就是說,在第2行的時候,雖然流建立好了,可是數據一直到第四行發生 collect 纔開始發射。

有序

看代碼比較容易理解:

lifecycleScope.launch {
 flow {  for(i in 1..3) {  Log.e("Flow","$i emit")  emit(i)  }  }.filter {  Log.e("Flow","$it filter")  it % 2 != 0  }.map {  Log.e("Flow","$it map")  "${it * it} money"  }.collect {  Log.e("Flow","i get $it")  } } 複製代碼

獲得的日誌:

E/Flow: 1 emit
E/Flow: 1 filter E/Flow: 1 map E/Flow: i get 1 money E/Flow: 2 emit E/Flow: 2 filter E/Flow: 3 emit E/Flow: 3 filter E/Flow: 3 map E/Flow: i get 9 money 複製代碼

從日誌中,咱們很容易得出這樣的結論,每一個數據都是通過 emitfiltermapcollect 這一套完整的處理流程後,下個數據纔會開始處理,而不是全部的數據都先統一 emit,完了再統一 filter,接着 map,最後再 collect

協做取消

Flow 採用和協程同樣的協做取消,也就是說,Flow 的 collect 只能在可取消的掛起函數中掛起的時候取消,不然不能取消。

若是咱們想取消 Flow 得藉助 withTimeoutOrNull 之類的頂層函數,不妨猜一下,下面的代碼最終會打印出什麼?

lifecycleScope.launch {
 val f = flow {  for (i in 1..3) {  delay(500)  Log.e(TAG, "emit $i")  emit(i)  }  }  withTimeoutOrNull(1600) {  f.collect {  delay(500)  Log.e(TAG, "consume $it")  }  }  Log.e(TAG, "cancel") } 複製代碼

5. 操做符對比

限於篇幅,我僅介紹一下 Flow 中操做符的做用,就不一一介紹每一個操做符具體怎麼使用了。

普通操做符:

Flow 操做符 做用
map 轉換操做符,將 A 變成 B
take 後面跟 Int 類型的參數,表示接收多少個 emit 出的值
filter 過濾操做符

特殊的操做符

總會有一些特殊的狀況,好比我只須要取前幾個,我只要最新的數據等,不過在這些狀況下,數據的發射就是併發執行的。

Flow 操做符 做用
buffer 數據發射併發,collect 不併發
conflate 發射數據太快,只處理最新發射的
collectLatest 接收處理太慢,只處理最新接收的

組合操做符

Flow 操做符 做用
zip 組合兩個流,雙方都有新數據纔會發射處理
combine 組合兩個流,在通過第一次發射之後,任意方有新數據來的時候就能夠發射,另外一方有多是已經發射過的數據

展平流操做符

展平流有點相似於 RxJava 中的 flatmap,將你發射出去的數據源轉變爲另外一種數據源。

Flow 操做符 做用
flatMapConcat 串行處理數據
flatMapMerge 併發 collect 數據
flatMapLatest 在每次 emit 新的數據之後,會取消先前的 collect

末端操做符

顧名思義,就是幫你作 collect 處理,collect 是最基礎的末端操做符。

末端流操做符 做用
collect 最基礎的消費數據
toList 轉化爲 List 集合
toSet 轉化爲 Set 集合
first 僅僅取第一個值
single 確保流發射單個值
reduce 規約,若是發射的是 Int,最終會獲得一個 Int,可作累加操做
fold 規約,能夠說是 reduce 的升級版,能夠自定義返回類型

其餘還有一些操做符,我這裏就不一一介紹了,感興趣能夠查看 API。

3、通道

Channel是一個面向多協程之間數據傳輸的 BlockQueue。它的使用方式超級簡單:

lifecycleScope.launch {
 // 1. 生成一個 Channel  val channel = Channel<Int>()   // 2. Channel 發送數據  launch {  for(i in 1..5){  delay(200)  channel.send(i * i)  }  channel.close()  }   // 3. Channel 接收數據  launch {  for( y in channel)  Log.e(TAG, "get $y")  } } 複製代碼

實現協程之間的數據傳輸須要三步:

1.建立 Channel

建立的 Channel的方式能夠分爲兩種:

  • 直接建立對象:方式跟上述代碼一致。
  • 擴展函數 produce

若是使用了擴展函數,代碼就變成了:

lifecycleScope.launch {
 // 1. 生成一個 Channel  val channel = produce<Int> {  for(i in 1..5){  delay(200)  send(i * i)  }  close()  }   // 2. 接收數據  // ... 省略 跟以前代碼一致 } 複製代碼

直接將第一步和第二步合併了。

2. 發送數據

發送數據使用的 Channel#send() 方法,當咱們數據發送完畢的時候,可使用 Channel#close() 來代表通道已經結束數據的發送。

3. 接收數據

正常狀況下,咱們僅須要調用 Channel#receive() 獲取數據,可是該方法只能獲取一次傳遞的數據,若是咱們僅需獲取指定次數的數據,能夠這麼操做:

repeat(4){
 Log.e(TAG, "get ${channel.receive()}") } 複製代碼

但若是發送的數據不能夠預估呢?這個時候咱們就須要迭代 Channel

for( y in channel)
 Log.e(TAG, "get $y") 複製代碼

4、多協程數據處理

多協程處理併發數據的時候,原子性一樣也得不到保證,協程中出了一種叫 Mutex 的鎖,區別是它的 lock 操做是掛起的,非阻塞的,感興趣的同窗能夠自行查看。

總結

我的感受協層的主要做用是簡化代碼的邏輯,減小了代碼的回調地獄,結合 Kotlin,既能夠寫出優雅的代碼,還能下降咱們犯錯的機率。至於提高多協程開發的性能?

不存在的
不存在的

若是以爲本文不錯,「三連」是對我最大的鼓勵。我將會在下一篇文章中和你們討論協程的原理,歡迎你們關注。

學習協程和 kotlin 仍是頗有必要的,咱們團隊在開發新的功能的時候,也所有選擇了 Kotlin。

關於我

我是九心,新晉互聯網碼農,若是想要進階和了解更多的乾貨,歡迎關注個人公衆號接收到的個人最新文章。

微信二維碼
微信二維碼

參考文章:

《最全面的Kotlin協程: Coroutine/Channel/Flow 以及實際應用》
《Kotlin中文站》
《Kotlin 的協程用力瞥一眼》

相關文章
相關標籤/搜索