上週在內部分享會上大佬同事分享了關於 Kotlin 協程的知識,以前有看過 Kotlin 協程的一些知識,覺得本身還挺了解協程的,結果...html
在這一次分享中,發現 Flow
和 Channel
這一起知識是本身不怎麼了解的,本文也將着重和你們聊一聊這一起的內容,協程部分將分爲三篇,本文是第一篇:android
「《即學即用Kotlin - 協程》
《抽絲剝繭Kotlin - 協程基礎篇》
《抽絲剝繭Kotlin - 協程Flow篇》程序員
相信你們或多或少的都瞭解過,協程是什麼,官網上這麼說:web
「Essentially, coroutines are light-weight threads.數據庫
協程是輕量級的線程,爲何是輕量的?能夠先告訴你們結論,由於它基於線程池API,因此在處理併發任務這件事上它真的遊刃有餘。編程
有可能有的同窗問了,既然它基於線程池,那我直接使用線程池或者使用 Android 中其餘的異步任務解決方式,好比 Handler
、RxJava
等,不更好嗎?數組
協程能夠使用阻塞的方式寫出非阻塞式的代碼,解決併發中常見的回調地獄,這是其最大的優勢,後面介紹。微信
GlobalScope.launch(Dispatchers.Main) {
val res = getResult(2) mNumTv.text = res.toString() } 複製代碼
啓動協程的代碼就是如此的簡單。上面的代碼中能夠分爲三部分,分別是 GlobalScope
、Dispatcher
和 launch
,他們分別對應着協程的做用域、調度器和協程構建器,咱們挨個兒介紹。網絡
協程的做用域有三種,他們分別是:併發
runBlocking
:頂層函數,它和
coroutineScope
不同,它會阻塞當前線程來等待,因此這個方法在業務中並不適用 。
GlobalScope
:全局協程做用域,能夠在整個應用的聲明週期中操做,且不能取消,因此仍不適用於業務開發。
顯然,咱們不能在 Activity
中調用 GlobalScope
,這樣可能會形成內存泄漏,看一下如何自定義做用域,具體的步驟我在註釋中已給出:
class MainActivity : AppCompatActivity() {
// 1. 建立一個 MainScope val scope = MainScope() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) // 2. 啓動協程 scope.launch(Dispatchers.Unconfined) { val one = getResult(20) val two = getResult(40) mNumTv.text = (one + two).toString() } } // 3. 銷燬的時候釋放 override fun onDestroy() { super.onDestroy() scope.cancel() } private suspend fun getResult(num: Int): Int { delay(5000) return num * num } } 複製代碼
調度器的做用是將協程限制在特定的線程執行。主要的調度器類型有:
Dispatchers.Main
:指定執行的線程是主線程,如上面的代碼。
Dispatchers.IO
:指定執行的線程是 IO 線程。
Dispatchers.Default
:默認的調度器,適合執行 CPU 密集性的任務。
Dispatchers.Unconfined
:非限制的調度器,指定的線程可能會隨着
掛起的函數的發生變化。
什麼是掛起?咱們就以九心吃飯爲例,若是到公司對面的廣場吃飯,九心得通過:
若是九心點廣場的外賣呢?
從九心吃飯的例子能夠看出,若是點了外賣,九心花費的時間較少了,能夠空閒出更多的時間作本身的事。再仔細分析一下,其實從公司到廣場和等待取餐這個過程並無省去,只是九心把這個過程交給了外賣員。
協程的原理跟九心點外賣的原理是一致的,耗時阻塞的操做並無減小,只是交給了其餘線程:
launch
的做用從它的名稱就能夠看的出來,啓動一個新的協程,它返回的是一個 Job
對象,咱們能夠調用 Job#cancel()
取消這個協程。
除了 launch
,還有一個方法跟它很像,就是 async
,它的做用是建立一個協程,以後返回一個 Deferred<T>
對象,咱們能夠調用 Deferred#await()
去獲取返回的值,有點相似於 Java 中的 Future
,稍微改一下上面的代碼:
class MainActivity : AppCompatActivity() {
// 1. 建立一個 MainScope val scope = MainScope() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) // 2. 啓動協程 scope.launch(Dispatchers.Unconfined) { val one = async { getResult(20) } val two = async { getResult(40) } mNumTv.text = (one.await() + two.await()).toString() } } // 3. 銷燬的時候釋放 override fun onDestroy() { super.onDestroy() scope.cancel() } private suspend fun getResult(num: Int): Int { delay(5000) return num * num } } 複製代碼
與修改前的代碼相比,async
可以併發執行任務,執行任務的時間也所以縮短了一半。
除了上述的併發執行任務,async
還能夠對它的 start
入參設置成懶加載
val one = async(start = CoroutineStart.LAZY) { getResult(20) }
複製代碼
這樣系統就能夠在調用它的時候再爲它分配資源了。
suspend
是修飾函數的關鍵字,意思是當前的函數是能夠掛起的,可是它僅僅起着提醒的做用,好比,當咱們的函數中沒有須要掛起的操做的時候,編譯器回給咱們提醒 Redudant suspend modifier,意思是當前的 suspend
是沒有必要的,能夠把它刪除。
那咱們何時須要使用掛起函數呢?常見的場景有:
withContext
切換到指定的 IO 線程去進行網絡或者數據庫請求。
delay方法
去等待某個事件。
withContext
的代碼:
private suspend fun getResult(num: Int): Int {
return withContext(Dispatchers.IO) { num * num } } 複製代碼
delay
的代碼:
private suspend fun getResult(num: Int): Int {
delay(5000) return num * num } 複製代碼
在介紹自定義協程做用域的時候,咱們須要主動在 Activity
或者 Fragment
中的 onDestroy
方法中調用 job.cancel()
,忘記處理多是程序員常常會犯的錯誤,如何避免呢?
Google 老是可以解決程序員的痛點,在 Android Jetpack 中的 lifecycle
、LiveData
和 ViewModel
已經集成了快速使用協程的方法,若是咱們已經引入了 Android Jetpack,能夠引入依賴:
dependencies {
def lifecycle_version = "2.2.0" // ViewModel implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version" // LiveData implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycle_version" // Lifecycles only (without ViewModel or LiveData) implementation "androidx.lifecycle:lifecycle-runtime-ktx:$lifecycle_version" } 複製代碼
使用能夠結合具體的場景,好比結合 Lifecycle
,須要使用 lifecycleScope
協程做用域:
lifecycleScope.launch { // 表明當前生命週期處於 Resumed 的時候纔會執行(選擇性使用) whenResumed { // ... 具體的協程代碼 } } 複製代碼
即便你不使用 Android Jetpack 組件,因爲 Lifecycles
在很早以前就內置在 Android 系統的代碼中,因此你仍然能夠僅僅引入 Lifecycle
的協程擴展庫,由於它會幫助你很好的處理 Activity
或者 Fragment
的生命週期。
引入 Android Jetpack 協程擴展庫官方文檔:點我打開
長期以來,在 Android 中響應式編程的首選方案是 RxJava,咱們今天就來了解一下 Kotlin中的響應式編程 Flow。若是你能熟練使用 RxJava,那你確定能快速上手 Flow。
曾經我在《即學即用Android Jetpack - ViewModel & LiveData》一文中說過,LiveData 的使用相似於 RxJava,如今我收回這句話,事實上,LiveData 更加簡單和純粹,它創建單一的生產消費模型,Flow 纔是相似於 RxJava 的存在。
先上一段代碼:
lifecycleScope.launch {
// 建立一個協程 Flow<T> createFlow() .collect {num-> // 具體的消費處理 // ... } } } 複製代碼
我在 createFlow
這個方法中,返回了 Flow<Int>
的對象,因此咱們能夠這樣對比。
對比 | Flow | RxJava |
---|---|---|
數據源 | Flow<T> |
Observable<T> |
訂閱 | collect |
subscribe |
咱們暫不考慮 RxJava
中的背壓和非背壓,直接先將 Flow
對標 RxJava 中的 Observable
。
和 RxJava 同樣,在建立 Flow
對象的時候咱們也須要調用 emit
方法發射數據:
fun createFlow(): Flow<Int> = flow {
for (i in 1..10) emit(i) } 複製代碼
一直調用 emit
可能不便捷,由於 RxJava 提供了 Observable.just()
這類的操做符,顯然,Flow 也爲咱們提供了快速建立操做:
flowof(vararg elements: T)
:幫助可變數組生成
Flow
實例
.asFlow()
:面向數組、列表等集合
好比可使用 (1..10).asFlow()
代替上述的 Flow
對象的建立。
collect
方法和 RxJava 中的 subscribe
方法同樣,都是用來消費數據的。
除了簡單的用法外,這裏有兩個問題得注意一下:
collect
函數是一個
suspend
方法,因此它必須發生在協程或者帶有
suspend
的方法裏面,這也是我爲何在一開始的時候啓動了
lifecycleScope.launch
。
lifecycleScope
是我使用的
Lifecycle
的協程擴展庫當中的,你能夠替換成自定義的協程做用域。
咱們學習 RxJava 的時候,大佬們都會說,RxJava 牛逼,牛逼在哪兒呢?
切換線程,一樣的,Flow 的協程切換也很牛逼。Flow 是這麼切換協程的:
lifecycleScope.launch { // 建立一個協程 Flow<T> createFlow() // 將數據發射的操做放到 IO 線程中的協程 .flowOn(Dispatchers.IO) .collect { num -> // 具體的消費處理 // ... } } } 複製代碼
和 RxJava 對比:
操做 | Flow | RxJava |
---|---|---|
改變數據發射的線程 | flowOn |
subscribeOn |
改變消費數據的線程 | 無 | observeOn |
flowOn
使用的參數是協程對應的調度器,它實質改變的是協程對應的線程。
我在上面的表格中並無寫到在 Flow 中如何改變消費線程,並不意味着 Flow 不能夠指定消費線程?
Flow 的消費線程在咱們啓動協程指定調度器的時候就確認好了,對應着啓動協程的調度器。好比在上面的代碼中 lifecycleScope
啓動的調度器是 Dispatchers.Main
,那麼 collect
方法就消費在主線程。
對比 | Flow | RxJava |
---|---|---|
異常 | catch |
onError |
Flow 中的 catch
對應着 RxJava 中的 onError
,catch
操做:
lifecycleScope.launch { flow { //... }.catch {e-> }.collect( ) } 複製代碼
除此之外,你可使用聲明式捕獲 try { } catch (e: Throwable) { }
去捕獲異常,不過 catch
本質上是一個擴展方法,它是對聲明式捕獲的封裝。
對比 | Flow | RxJava |
---|---|---|
完成 | onCompletion |
onComplete |
Flow 中的 onCompletion
對應這 RxJava 中的 onComplete
回調,onCompletion
操做:
lifecycleScope.launch { createFlow() .onCompletion { // 處理完成操做 } .collect { } } 複製代碼
除此之外,咱們還能夠經過捕獲式 try {} finally {}
去獲取完成狀況。
咱們在對 Flow 已經有了一些基礎的認知了,再來聊一聊 Flow 的特色,Flow 具備如下特色:
若是你對 Kotlin 中的 Sequence
有一些認識,那麼你應該能夠輕鬆的 Get 到前兩個點。
有點相似於懶加載,當咱們觸發 collect
方法的時候,數據纔開始發射。
lifecycleScope.launch { val flow = (1..10).asFlow().flowOn(Dispatchers.Main) flow.collect { num -> // 具體的消費處理 // ... } } } 複製代碼
也就是說,在第2行的時候,雖然流建立好了,可是數據一直到第四行發生 collect
纔開始發射。
看代碼比較容易理解:
lifecycleScope.launch {
flow { for(i in 1..3) { Log.e("Flow","$i emit") emit(i) } }.filter { Log.e("Flow","$it filter") it % 2 != 0 }.map { Log.e("Flow","$it map") "${it * it} money" }.collect { Log.e("Flow","i get $it") } } 複製代碼
獲得的日誌:
E/Flow: 1 emit E/Flow: 1 filter E/Flow: 1 map E/Flow: i get 1 money E/Flow: 2 emit E/Flow: 2 filter E/Flow: 3 emit E/Flow: 3 filter E/Flow: 3 map E/Flow: i get 9 money 複製代碼
從日誌中,咱們很容易得出這樣的結論,每一個數據都是通過 emit
、filter
、map
和 collect
這一套完整的處理流程後,下個數據纔會開始處理,而不是全部的數據都先統一 emit
,完了再統一 filter
,接着 map
,最後再 collect
。
Flow 採用和協程同樣的協做取消,也就是說,Flow 的 collect
只能在可取消的掛起函數中掛起的時候取消,不然不能取消。
若是咱們想取消 Flow 得藉助 withTimeoutOrNull
之類的頂層函數,不妨猜一下,下面的代碼最終會打印出什麼?
lifecycleScope.launch {
val f = flow { for (i in 1..3) { delay(500) Log.e(TAG, "emit $i") emit(i) } } withTimeoutOrNull(1600) { f.collect { delay(500) Log.e(TAG, "consume $it") } } Log.e(TAG, "cancel") } 複製代碼
限於篇幅,我僅介紹一下 Flow 中操做符的做用,就不一一介紹每一個操做符具體怎麼使用了。
Flow 操做符 | 做用 |
---|---|
map |
轉換操做符,將 A 變成 B |
take |
後面跟 Int 類型的參數,表示接收多少個 emit 出的值 |
filter |
過濾操做符 |
總會有一些特殊的狀況,好比我只須要取前幾個,我只要最新的數據等,不過在這些狀況下,數據的發射就是併發執行的。
Flow 操做符 | 做用 |
---|---|
buffer |
數據發射併發,collect 不併發 |
conflate |
發射數據太快,只處理最新發射的 |
collectLatest |
接收處理太慢,只處理最新接收的 |
Flow 操做符 | 做用 |
---|---|
zip |
組合兩個流,雙方都有新數據纔會發射處理 |
combine |
組合兩個流,在通過第一次發射之後,任意方有新數據來的時候就能夠發射,另外一方有多是已經發射過的數據 |
展平流有點相似於 RxJava 中的 flatmap
,將你發射出去的數據源轉變爲另外一種數據源。
Flow 操做符 | 做用 |
---|---|
flatMapConcat |
串行處理數據 |
flatMapMerge |
併發 collect 數據 |
flatMapLatest |
在每次 emit 新的數據之後,會取消先前的 collect |
顧名思義,就是幫你作 collect
處理,collect
是最基礎的末端操做符。
末端流操做符 | 做用 |
---|---|
collect |
最基礎的消費數據 |
toList |
轉化爲 List 集合 |
toSet |
轉化爲 Set 集合 |
first |
僅僅取第一個值 |
single |
確保流發射單個值 |
reduce |
規約,若是發射的是 Int ,最終會獲得一個 Int ,可作累加操做 |
fold |
規約,能夠說是 reduce 的升級版,能夠自定義返回類型 |
其餘還有一些操做符,我這裏就不一一介紹了,感興趣能夠查看 API。
Channel
是一個面向多協程之間數據傳輸的 BlockQueue
。它的使用方式超級簡單:
lifecycleScope.launch {
// 1. 生成一個 Channel val channel = Channel<Int>() // 2. Channel 發送數據 launch { for(i in 1..5){ delay(200) channel.send(i * i) } channel.close() } // 3. Channel 接收數據 launch { for( y in channel) Log.e(TAG, "get $y") } } 複製代碼
實現協程之間的數據傳輸須要三步:
建立的 Channel
的方式能夠分爲兩種:
produce
若是使用了擴展函數,代碼就變成了:
lifecycleScope.launch {
// 1. 生成一個 Channel val channel = produce<Int> { for(i in 1..5){ delay(200) send(i * i) } close() } // 2. 接收數據 // ... 省略 跟以前代碼一致 } 複製代碼
直接將第一步和第二步合併了。
發送數據使用的 Channel#send()
方法,當咱們數據發送完畢的時候,可使用 Channel#close()
來代表通道已經結束數據的發送。
正常狀況下,咱們僅須要調用 Channel#receive()
獲取數據,可是該方法只能獲取一次傳遞的數據,若是咱們僅需獲取指定次數的數據,能夠這麼操做:
repeat(4){
Log.e(TAG, "get ${channel.receive()}") } 複製代碼
但若是發送的數據不能夠預估呢?這個時候咱們就須要迭代 Channel
了
for( y in channel)
Log.e(TAG, "get $y") 複製代碼
多協程處理併發數據的時候,原子性一樣也得不到保證,協程中出了一種叫 Mutex
的鎖,區別是它的 lock
操做是掛起的,非阻塞的,感興趣的同窗能夠自行查看。
我的感受協層的主要做用是簡化代碼的邏輯,減小了代碼的回調地獄,結合 Kotlin,既能夠寫出優雅的代碼,還能下降咱們犯錯的機率。至於提高多協程開發的性能?
若是以爲本文不錯,「三連」是對我最大的鼓勵。我將會在下一篇文章中和你們討論協程的原理,歡迎你們關注。
學習協程和 kotlin 仍是頗有必要的,咱們團隊在開發新的功能的時候,也所有選擇了 Kotlin。
我是九心,新晉互聯網碼農,若是想要進階和了解更多的乾貨,歡迎關注個人公衆號接收到的個人最新文章。
參考文章:
「《最全面的Kotlin協程: Coroutine/Channel/Flow 以及實際應用》
《Kotlin中文站》
《Kotlin 的協程用力瞥一眼》