相信你們或多或少的都瞭解過,協程是什麼,官網上這麼說:android
Essentially, coroutines are light-weight threads.
協程是輕量級的線程,爲何是輕量的?能夠先告訴你們結論,由於它基於線程池API,因此在處理併發任務這件事上它真的遊刃有餘。
有可能有的同窗問了,既然它基於線程池,那我直接使用線程池或者使用 Android 中其餘的異步任務解決方式,好比 Handler
、RxJava
等,不更好嗎?
協程可使用阻塞的方式寫出非阻塞式的代碼,解決併發中常見的回調地獄,這是其最大的優勢,後面介紹。程序員
<pre data-tool="mdnice編輯器" class="custom" >\`GlobalScope.launch(Dispatchers.Main) { val res = getResult(2) mNumTv.text = res.toString() }
啓動協程的代碼就是如此的簡單。上面的代碼中能夠分爲三部分,分別是 GlobalScope
、Dispatcher
和 launch
,他們分別對應着協程的做用域、調度器和協程構建器,咱們挨個兒介紹。數據庫
協程的做用域有三種,他們分別是:編程
runBlocking
:頂層函數,它和 coroutineScope
不同,它會阻塞當前線程來等待,因此這個方法在業務中並不適用 。GlobalScope
:全局協程做用域,能夠在整個應用的聲明週期中操做,且不能取消,因此仍不適用於業務開發。顯然,咱們不能在 Activity
中調用 GlobalScope
,這樣可能會形成內存泄漏,看一下如何自定義做用域,具體的步驟我在註釋中已給出:數組
<pre data-tool="mdnice編輯器" class="custom" >\`class MainActivity : AppCompatActivity() { // 1\\. 建立一個 MainScope val scope = MainScope() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity\_main) // 2\\. 啓動協程 scope.launch(Dispatchers.Unconfined) { val one = getResult(20) val two = getResult(40) mNumTv.text = (one + two).toString() } } // 3\\. 銷燬的時候釋放 override fun onDestroy() { super.onDestroy() scope.cancel() } private suspend fun getResult(num: Int): Int { delay(5000) return num \* num } } </pre>
調度器的做用是將協程限制在特定的線程執行。主要的調度器類型有:網絡
Dispatchers.Main
:指定執行的線程是主線程,如上面的代碼。Dispatchers.IO
:指定執行的線程是 IO 線程。Dispatchers.Default
:默認的調度器,適合執行 CPU 密集性的任務。Dispatchers.Unconfined
:非限制的調度器,指定的線程可能會隨着掛起的函數的發生變化。什麼是掛起?咱們就以九心吃飯爲例,若是到公司對面的廣場吃飯,九心得通過:併發
若是九心點廣場的外賣呢?異步
從九心吃飯的例子能夠看出,若是點了外賣,九心花費的時間較少了,能夠空閒出更多的時間作本身的事。再仔細分析一下,其實從公司到廣場和等待取餐這個過程並無省去,只是九心把這個過程交給了外賣員。
協程的原理跟九心點外賣的原理是一致的,耗時阻塞的操做並無減小,只是交給了其餘線程async
launch
的做用從它的名稱就能夠看的出來,啓動一個新的協程,它返回的是一個 Job
對象,咱們能夠調用 Job#cancel()
取消這個協程。
除了 launch
,還有一個方法跟它很像,就是 async
,它的做用是建立一個協程,以後返回一個 Deferred<T>
對象,咱們能夠調用 Deferred#await()
去獲取返回的值,有點相似於 Java 中的 Future
,稍微改一下上面的代碼:編輯器
<pre data-tool="mdnice編輯器" class="custom" >\`class MainActivity : AppCompatActivity() { // 1\\. 建立一個 MainScope val scope = MainScope() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity\_main) // 2\\. 啓動協程 scope.launch(Dispatchers.Unconfined) { val one = async { getResult(20) } val two = async { getResult(40) } mNumTv.text = (one.await() + two.await()).toString() } } // 3\\. 銷燬的時候釋放 override fun onDestroy() { super.onDestroy() scope.cancel() } private suspend fun getResult(num: Int): Int { delay(5000) return num \* num } }
與修改前的代碼相比,async
可以併發執行任務,執行任務的時間也所以縮短了一半。
除了上述的併發執行任務,async
還能夠對它的 start
入參設置成懶加載
<pre data-tool="mdnice編輯器" class="custom" >\`val one = async(start = CoroutineStart.LAZY) { getResult(20) }
這樣系統就能夠在調用它的時候再爲它分配資源了。
suspend
是修飾函數的關鍵字,意思是當前的函數是能夠掛起的,可是它僅僅起着提醒的做用,好比,當咱們的函數中沒有須要掛起的操做的時候,編譯器回給咱們提醒 Redudant suspend modifier,意思是當前的 suspend
是沒有必要的,能夠把它刪除。
那咱們何時須要使用掛起函數呢?常見的場景有:
withContext
切換到指定的 IO 線程去進行網絡或者數據庫請求。delay方法
去等待某個事件。withContext
的代碼:
`<pre data-tool="mdnice編輯器" class="custom" >\private suspend fun getResult(num: Int): Int { return withContext(Dispatchers.IO) { num \* num } } delay 的代碼: <pre data-tool="mdnice編輯器" class="custom" >\private suspend fun getResult(num: Int): Int { delay(5000) return num \* num } `
在介紹自定義協程做用域的時候,咱們須要主動在 Activity
或者 Fragment
中的 onDestroy
方法中調用 job.cancel()
,忘記處理多是程序員常常會犯的錯誤,如何避免呢?
Google 老是可以解決程序員的痛點,在 Android Jetpack 中的 lifecycle
、LiveData
和 ViewModel
已經集成了快速使用協程的方法,若是咱們已經引入了 Android Jetpack,能夠引入依賴:
<pre data-tool="mdnice編輯器" class="custom" > \`dependencies { def lifecycle\_version = "2.2.0" // ViewModel implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle\_version" // LiveData implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycle\_version" // Lifecycles only (without ViewModel or LiveData) implementation "androidx.lifecycle:lifecycle-runtime-ktx:$lifecycle\_version" }
使用能夠結合具體的場景,好比結合 Lifecycle
,須要使用 lifecycleScope
協程做用域:
<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch { // 表明當前生命週期處於 Resumed 的時候纔會執行(選擇性使用) whenResumed { // ... 具體的協程代碼 } }
即便你不使用 Android Jetpack 組件,因爲 Lifecycles
在很早以前就內置在 Android 系統的代碼中,因此你仍然能夠僅僅引入 Lifecycle
的協程擴展庫,由於它會幫助你很好的處理 Activity
或者 Fragment
的生命週期。
引入 Android Jetpack 協程擴展庫官方文檔:點我打開
長期以來,在 Android 中響應式編程的首選方案是 RxJava,咱們今天就來了解一下 Kotlin中的響應式編程 Flow。若是你能熟練使用 RxJava,那你確定能快速上手 Flow。
LiveData 更加簡單和純粹,它創建單一的生產消費模型,Flow 纔是相似於 RxJava 的存在。
先上一段代碼:
<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch { // 建立一個協程 Flow<T> createFlow() .collect {num-> // 具體的消費處理 // ... } } }
我在 createFlow
這個方法中,返回了 Flow<Int>
的對象,因此咱們能夠這樣對比。
咱們暫不考慮 RxJava
中的背壓和非背壓,直接先將 Flow
對標 RxJava 中的 Observable
。
和 RxJava 同樣,在建立 Flow
對象的時候咱們也須要調用 emit
方法發射數據:
<pre data-tool="mdnice編輯器" class="custom" >\`fun createFlow(): Flow<Int> = flow { for (i in 1..10) emit(i) }
一直調用 emit
可能不便捷,由於 RxJava 提供了 Observable.just()
這類的操做符,顯然,Flow 也爲咱們提供了快速建立操做:
flowof(vararg elements: T)
:幫助可變數組生成 Flow
實例.asFlow()
:面向數組、列表等集合好比可使用 (1..10).asFlow()
代替上述的 Flow
對象的建立。
collect
方法和 RxJava 中的 subscribe
方法同樣,都是用來消費數據的。
除了簡單的用法外,這裏有兩個問題得注意一下:
collect
函數是一個 suspend
方法,因此它必須發生在協程或者帶有 suspend
的方法裏面,這也是我爲何在一開始的時候啓動了 lifecycleScope.launch
。lifecycleScope
是我使用的 Lifecycle
的協程擴展庫當中的,你能夠替換成自定義的協程做用域。咱們學習 RxJava 的時候,大佬們都會說,RxJava 牛逼,牛逼在哪兒呢?
切換線程,一樣的,Flow 的協程切換也很牛逼。Flow 是這麼切換協程的:
pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch { // 建立一個協程 Flow<T> createFlow() // 將數據發射的操做放到 IO 線程中的協程 .flowOn(Dispatchers.IO) .collect { num -> // 具體的消費處理 // ... } } }
和 RxJava 對比:
flowOn
使用的參數是協程對應的調度器,它實質改變的是協程對應的線程。
我在上面的表格中並無寫到在 Flow 中如何改變消費線程,並不意味着 Flow 不能夠指定消費線程?
Flow 的消費線程在咱們啓動協程指定調度器的時候就確認好了,對應着啓動協程的調度器。好比在上面的代碼中 lifecycleScope
啓動的調度器是 Dispatchers.Main
,那麼 collect
方法就消費在主線程。
Flow 中的 catch
對應着 RxJava 中的 onError
,catch
操做:
<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch { flow { //... }.catch {e-> }.collect( ) }
除此之外,你可使用聲明式捕獲 try { } catch (e: Throwable) { }
去捕獲異常,不過 catch
本質上是一個擴展方法,它是對聲明式捕獲的封裝。
Flow 中的 onCompletion
對應這 RxJava 中的 onComplete
回調,onCompletion
操做:
<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch { createFlow() .onCompletion { // 處理完成操做 } .collect { } }
除此之外,咱們還能夠經過捕獲式 try {} finally {}
去獲取完成狀況。
咱們在對 Flow 已經有了一些基礎的認知了,再來聊一聊 Flow 的特色,Flow 具備如下特色:
若是你對 Kotlin 中的 Sequence
有一些認識,那麼你應該能夠輕鬆的 Get 到前兩個點。
有點相似於懶加載,當咱們觸發 collect
方法的時候,數據纔開始發射。
<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch { val flow = (1..10).asFlow().flowOn(Dispatchers.Main) flow.collect { num -> // 具體的消費處理 // ... } } }
也就是說,在第2行的時候,雖然流建立好了,可是數據一直到第四行發生 collect
纔開始發射。
看代碼比較容易理解:
<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch { flow { for(i in 1..3) { Log.e("Flow","$i emit") emit(i) } }.filter { Log.e("Flow","$it filter") it % 2 != 0 }.map { Log.e("Flow","$it map") "${it \* it} money" }.collect { Log.e("Flow","i get $it") } }
獲得的日誌:
<pre data-tool="mdnice編輯器" class="custom" >\`E/Flow: 1 emit E/Flow: 1 filter E/Flow: 1 map E/Flow: i get 1 money E/Flow: 2 emit E/Flow: 2 filter E/Flow: 3 emit E/Flow: 3 filter E/Flow: 3 map E/Flow: i get 9 money
從日誌中,咱們很容易得出這樣的結論,每一個數據都是通過 emit
、filter
、map
和 collect
這一套完整的處理流程後,下個數據纔會開始處理,而不是全部的數據都先統一 emit
,完了再統一 filter
,接着 map
,最後再 collect
。
Flow 採用和協程同樣的協做取消,也就是說,Flow 的 collect
只能在可取消的掛起函數中掛起的時候取消,不然不能取消。
若是咱們想取消 Flow 得藉助 withTimeoutOrNull
之類的頂層函數,不妨猜一下,下面的代碼最終會打印出什麼?
<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch { val f = flow { for (i in 1..3) { delay(500) Log.e(TAG, "emit $i") emit(i) } } withTimeoutOrNull(1600) { f.collect { delay(500) Log.e(TAG, "consume $it") } } Log.e(TAG, "cancel") }
限於篇幅,我僅介紹一下 Flow 中操做符的做用,就不一一介紹每一個操做符具體怎麼使用了。
總會有一些特殊的狀況,好比我只須要取前幾個,我只要最新的數據等,不過在這些狀況下,數據的發射就是併發執行的。
展平流有點相似於 RxJava 中的 flatmap
,將你發射出去的數據源轉變爲另外一種數據源。
顧名思義,就是幫你作 collect
處理,collect
是最基礎的末端操做符。
其餘還有一些操做符,我這裏就不一一介紹了,感興趣能夠查看 API。
Channel
是一個面向多協程之間數據傳輸的 BlockQueue
。它的使用方式超級簡單:
<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch { // 1\\. 生成一個 Channel val channel = Channel<Int>() // 2\\. Channel 發送數據 launch { for(i in 1..5){ delay(200) channel.send(i \* i) } channel.close() } // 3\\. Channel 接收數據 launch { for( y in channel) Log.e(TAG, "get $y") } }
實現協程之間的數據傳輸須要三步:
建立的 Channel
的方式能夠分爲兩種:
produce
若是使用了擴展函數,代碼就變成了:
<pre data-tool="mdnice編輯器" class="custom" >\`lifecycleScope.launch { // 1\\. 生成一個 Channel val channel = produce<Int> { for(i in 1..5){ delay(200) send(i \* i) } close() } // 2\\. 接收數據 // ... 省略 跟以前代碼一致 }
直接將第一步和第二步合併了。
發送數據使用的 Channel#send()
方法,當咱們數據發送完畢的時候,可使用 Channel#close()
來代表通道已經結束數據的發送。
正常狀況下,咱們僅須要調用 Channel#receive()
獲取數據,可是該方法只能獲取一次傳遞的數據,若是咱們僅需獲取指定次數的數據,能夠這麼操做:
<pre data-tool="mdnice編輯器" class="custom" >\`repeat(4){ Log.e(TAG, "get ${channel.receive()}") }
但若是發送的數據不能夠預估呢?這個時候咱們就須要迭代 Channel
了
<pre data-tool="mdnice編輯器" class="custom" >\`for( y in channel) Log.e(TAG, "get $y")
多協程處理併發數據的時候,原子性一樣也得不到保證,協程中出了一種叫 Mutex
的鎖,區別是它的 lock
操做是掛起的,非阻塞的,感興趣的同窗能夠自行查看。
https://shimo.im/docs/TG8PDh9...