Koltin Flow 基礎詳解

Koltin Flow 基礎詳解

kotlinx-coroutines-core 1.4.2版本出來了,沒有了以前的實驗方法警告,終於能夠能夠愉快的玩耍了,如今準備把有關flow的相關資料整理彙總一下java

1.概覽

Flow基礎詳解

2.Kotlin Flow 介紹

A cold asynchronous data stream that sequentially emits values and completes normally or with an exception。
複製代碼

意思是:按順序發出值並正常完成或異常完成的Cold異步數據流。markdown

與rxjava做用相似,可能會在之後的開發中逐步代替rxjava,使整個開發生態更加趨向一體化異步

4.Flow的建立

  • Empty Flow
emptyFlow<String>()
複製代碼
  • 經過flowOf函數
flowOf(1, 2, 3)
// 1, 2, 3
flowOf(listOf(1,2,3))
// [1, 2, 3]
複製代碼
  • Iterable調用asFlow函數
listOf(1, 2, 3).asFlow()
// 1, 2, 3
複製代碼
  • 無參可是有返回值的函數**(() -> T)**調用asFlow函數
fun flowBuilderFunction(): Int {
    return 10
}

::flowBuilderFunction.asFlow()

// 10
複製代碼
  • 無參可是有返回值的掛起函數**(() -> T)**調用asFlow函數
suspend fun flowBuilderFunction(): Int {
    return 10
}

::flowBuilderFunction.asFlow()

// 10
複製代碼
  • Array調用asFlow函數
LongRange(1, 5).asFlow().collect { value -> println(value) }
複製代碼

5.Flow操做符

Delay相關的操做符

  • debounceasync

    特性:函數

    1. 若是兩個相鄰的值生產出來的時間間隔超過了[timeout]毫秒,就忽過濾掉前一個值

    最後一個值不受影響,老是會被釋放emit。 [timeout]能夠傳毫秒,也能夠傳Durationui

    flow {
            emit(1)
            delay(3000)
            emit(2)
            delay(1000)
            emit(3)
            delay(1000)
            emit(4)
        }.debounce(2000)
    
        // 結果:1 4 
        // 解釋:
        // 2和1的間隔大於2000,1被釋放
        // 3和2的間隔小於2000, 2被忽略
        // 4和3的間隔小於2000, 3被忽略
        // 4是最後一個值不受timeout值的影響, 4被釋放
    
    
    
    flow {
        emit(1)
        delay(3000)
        emit(2)
        delay(1000)
        emit(3)
        delay(1000)
        emit(4)
    }.debounce(2000.milliseconds)
    
    // 結果:1 4 
    
    應用:可用於搜索框的反覆輸入內容篩選
    
    
    
    
    
    複製代碼

Distinct相關的操做符

  • distinctUntilChangedspa

    1.若是生產的值和上個發送的值相同,值就會被過濾掉code

    flow {
          emit(1)
          emit(1)
          emit(2)
          emit(2)
          emit(3)
          emit(4)
      }.distinctUntilChanged()
    
      // 結果:1 2 3 4
      // 解釋:
      // 第一個1被釋放
      // 第二個1因爲和第一個1相同,被過濾掉
      // 第一個2被釋放
      // 第二個2因爲和第一個2相同,被過濾掉
      // 第一個3被釋放
      // 第一個4被釋放
    複製代碼
    1. 能夠傳參(old: T, new: T) -> Boolean,進行自定義的比較
    private class Person(val age: Int, val name: String)
    
    flow {
        emit(Person(20, "張三"))
        emit(Person(21, "李四"))
        emit(Person(21, "王五"))
        emit(Person(22, "趙六"))
    }.distinctUntilChanged{old, new -> old.age == new.age }
    .collect{ value -> println(value.name) }
        
    // 結果:張三 李四 趙六
    // 解釋:本例子定義若是年齡相同就認爲是相同的值,因此王五被過濾掉了
    複製代碼
    1. 能夠用distinctUntilChangedBy轉換成年齡進行對比
    flow {
        emit(Person(20, "張三"))
        emit(Person(21, "李四"))
        emit(Person(21, "王五"))
        emit(Person(22, "趙六"))
    }.distinctUntilChangedBy { person -> person.age }
    
    // 結果:張三 李四 趙六
    複製代碼

Emitters相關的操做符

  • transformorm

    對每一個值進行轉換協程

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.transform {
        if (it % 2 == 0) {
            emit(it * it)
        }
    }
    
    // 結果:4 16
    // 解釋:
    // 1 不是偶數,被忽略
    // 2 是偶數,2的平方4
    // 3 不是偶數,被忽略
    // 4 是偶數,4的平方16
    複製代碼
  • onStart

    第一個值被釋放以前被執行

    flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
        }.onStart { emit(1000) }
    
        // 結果:1000 1 2 3 4
        // 解釋:
        // 第一個值1被釋放的時候調用了emit(10 00), 因此1000在1以前被釋放
    複製代碼
  • onCompletion

    最後一個值釋放完成以後被執行

    flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.onCompletion { emit(1000) }
    
      // 結果:1 2 3 4 1000
      // 解釋:
      // 第一個值4被釋放的時候調用了emit(100 0), 因此1000在4以後被釋放
    複製代碼

Limit相關的操做符

  • drop

    忽略最開始的[count]個值

    flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.drop(2)
    
      // 結果:3 4
      // 解釋:
      // 最開始釋放的兩個值(1,2)被忽略了
    複製代碼
  • dropWhile

    判斷第一個值若是知足(T) -> Boolean這個條件就忽略

    flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.dropWhile {
          it % 2 == 0
      }
    
      // 結果:1 2 3 4
      // 解釋:
      // 第一個值不是偶數,因此1被釋放
    
      flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.dropWhile {
          it % 2 != 0
      }
    
      // 結果:2 3 4
      // 解釋:
      // 第一個值是偶數,因此1被忽略
    複製代碼
  • take

    只釋放前面[count]個值

    flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.take(2)
    
      // 結果:1 2
      // 解釋:
      // 前面兩個值被釋放
    複製代碼
  • takeWhile

    判斷第一個值若是知足(T) -> Boolean這個條件就釋放

    flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.takeWhile { it%2 != 0 }
    
      // 結果:1
      // 解釋:
      // 第一個值知足是奇數條件
    
      flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.takeWhile { it%2 == 0 }
    
      // 結果:無
      // 解釋:
      // 第一個值不知足是奇數條件
    
    
    複製代碼

CoroutineContext相關的操做符

  • flowOn

    能夠切換CoroutineContext 說明:flowOn隻影響該運算符以前的CoroutineContext,對它以後的CoroutineContext沒有任何影響

  • buffer

    將flow的多個任務分配到不一樣的協程中去執行,加快執行的速度。

  • conflate

    若是值的生產速度大於值的消耗速度,就忽略掉中間將來得及處理的值,只處理最新的值。

val flow1 = flow {
        delay(2000)
        emit(1)
        delay(2000)
        emit(2)
        delay(2000)
        emit(3)
        delay(2000)
        emit(4)
    }.conflate()

    flow1.collect { value ->
        println(value)
        delay(5000)
    }

    // 結果: 1 3 4
    // 解釋:
    // 2000毫秒後生產了1這個值,交由collect 執行,花費了5000毫秒,當1這個值執行co llect完成後已經通過了7000毫秒。
    // 這7000毫秒中,生產了2,可是collect還 沒執行完成又生產了3,因此7000毫秒之後 會直接執行3的collect方法,忽略了2這 個值
    // collect執行完3後,還有一個4,繼續執 行。
複製代碼

Flatten相關的操做符

  • flatMapConcat
將原始的Flow<T>經過[transform]轉換成Flow<Flow<T>>,而後將Flow<Flow<T>>釋放的Flow<T>其中釋放的值一個個釋放。
  
  flow {
      delay(1000)
      emit(1)
      delay(1000)
      emit(2)
      delay(1000)
      emit(3)
      delay(1000)
      emit(4)
  }.flatMapConcat {
      flow {
          emit("$it 產生第一個flow值")
          delay(2500)
          emit("$it 產生第二個flow值")
      }
  }.collect { value ->
      println(value)
  }
  
  // 結果
  // I/System.out: 1 產生第一個flow值
  // I/System.out: 1 產生第二個flow值
  // I/System.out: 2 產生第一個flow值
  // I/System.out: 2 產生第二個flow值
  // I/System.out: 3 產生第一個flow值
  // I/System.out: 3 產生第二個flow值
  // I/System.out: 4 產生第一個flow值
  // I/System.out: 4 產生第二個flow值
  
  // 解釋:
  // 原始Flow<Int>經過flatMapConcat被轉換成Flow<Flow<Int>>
  // 原始Flow<Int>首先釋放1,接着Flow<Flow<Int>> 就會釋放 1產生第一個flow值 和 1產生第二個flow值 兩個值
  // Flow<Int>釋放2,...
  // Flow<Int>釋放3,...
  // Flow<Int>釋放4,...
  
複製代碼
  • flattenConcat

    和flatMapConcat相似,只是少了一步Map操做。

    flow {
        delay(1000)
        emit(flow {
            emit("1 產生第一個flow值")
            delay(2000)
            emit("1 產生第二個flow值") })
        delay(1000)
        emit(flow {
            emit("2 產生第一個flow值")
            delay(2000)
            emit("3 產生第二個flow值") })
        delay(1000)
        emit(flow {
            emit("3 產生第一個flow值")
            delay(2000)
            emit("3 產生第二個flow值") })
        delay(1000)
        emit(flow {
            emit("4 產生第一個flow值")
            delay(2500)
            emit("4 產生第二個flow值") })
        }.flattenConcat()
        
    // 結果
    // I/System.out: 1 產生第一個flow值
    // I/System.out: 1 產生第二個flow值
    // I/System.out: 2 產生第一個flow值
    // I/System.out: 2 產生第二個flow值
    // I/System.out: 3 產生第一個flow值
    // I/System.out: 3 產生第二個flow值
    // I/System.out: 4 產生第一個flow值
    // I/System.out: 4 產生第二個flow值
    複製代碼
  • flatMapMerge

    將原始的Flow經過[transform]轉換成Flow<Flow>,而後將Flow<Flow>釋放的Flow其中釋放的值一個個釋放。 它與flatMapConcat的區別是:Flow<Flow>釋放的Flow其中釋放的值沒有順序性,誰先產生誰先釋放。

flow {
      delay(1000)
      emit(1)
      delay(1000)
      emit(2)
      delay(1000)
      emit(3)
      delay(1000)
      emit(4)
  }.flatMapMerge {
      flow {
          emit("$it 產生第一個flow值")
          delay(2500)
          emit("$it 產生第二個flow值")
      }
  }.collect { value ->
      println(value)
  }
    
複製代碼
  • merge

    將Iterable<Flow>合併成一個Flow

val flow1 = listOf(
     flow {
         emit(1)
         delay(500)
         emit(2)
     },
     flow {
         emit(3)
         delay(500)
         emit(4)
     },
     flow {
         emit(5)
         delay(500)
         emit(6)
     }
 )
 flow1.merge().collect { value -> println("$value") }
 
 // 結果: 1 3 5 2 4 6
 // 解釋:
 // 按Iterable的順序和耗時順序依次釋放值
複製代碼
  • transformLatest

    原始flow會觸發transformLatest轉換後的flow, 當原始flow有新的值釋放後,transformLatest轉換後的flow會被取消,接着觸發新的轉換後的flow

  • flatMapLatest

    和transformLatest相似, 原始flow會觸發transformLatest轉換後的flow, 當原始flow有新的值釋放後,transformLatest轉換後的flow會被取消,接着觸發新的轉換後的flow

    區別:flatMapLatest的transform轉換成的是Flow, transformLatest的transform轉換成的是Unit

  • mapLatest

    和transformLatest相似, 原始flow會觸發transformLatest轉換後的flow, 當原始flow有新的值釋放後,transformLatest轉換後的flow會被取消,接着觸發新的轉換後的flow

    區別:mapLatest的transform轉換成的是T,flatMapLatest的transform轉換成的是Flow,transformLatest的transform轉換成的是Unit

Transform相關的操做符

  • filter

    經過predicate進行過濾,知足條件則被釋放

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.filter { it % 2 == 0 }
    
    // 結果: 2 4
    // 解釋:
    // 2和4知足it % 2 == 0,被釋放
    複製代碼
  • filterNot

    經過predicate進行過濾,不知足條件則被釋放

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.filterNot { it % 2 == 0 }
    
    // 結果: 1 3
    // 解釋:
    // 1和3不知足it % 2 == 0,被釋放
    複製代碼
  • filterIsInstance

    若是是某個數據類型則被釋放

    flow {
        emit(1)
        emit("2")
        emit("3")
        emit(4)
    }.filterIsInstance<String>()
    
    // 結果: "2" "3"
    // 解釋:
    // "2" "3"是String類型,被釋放
    複製代碼
  • filterNotNull

    若是數據是非空,則被釋放

    flow {
        emit(1)
        emit("2")
        emit("3")
        emit(null)
    }.filterNotNull()
    
    // 結果: 1 "2" "3"
    複製代碼
  • map

    將一個值轉換成另一個值

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.map { it * it }
    
    // 結果: 1 4 9 16
    // 解釋:
    // 將1,2,3,4轉換成對應的平方數
    複製代碼
  • mapNotNull

    將一個非空值轉換成另一個值

  • withIndex

    將值封裝成IndexedValue對象

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.withIndex()
    
    // 結果:
    // I/System.out: IndexedValue(index=0, value=1)
    // I/System.out: IndexedValue(index=1, value=2)
    // I/System.out: IndexedValue(index=2, value=3)
    // I/System.out: IndexedValue(index=3, value=4)
    複製代碼
  • onEach

    每一個值釋放的時候能夠執行的一段代碼

  • scan

    有一個初始值,而後每一個值都和初始值進行運算,而後這個值做爲後一個值的初始值

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.scan(100) { acc, value ->
        acc * value
    }
    
    // 結果: 100 100 200 600 2400
    // 解釋:
    // 初始值 100
    // 1 100 * 1 = 100
    // 2 100 * 2 = 200
    // 3 200 * 3 = 600
    // 4 600 * 4 = 2400
    複製代碼
  • runningReduce

    和scan相似,可是沒有初始值,最開始是它自己

    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
    }.runningReduce { acc, value ->
        acc * value
    }
    
    // 結果: 1 2 6 24
    // 解釋:
    // 1 1
    // 2 1 * 2 = 2
    // 3 2 * 3 = 6
    // 4 6 * 4 = 24
    複製代碼

合併的操做符

  • zip

    將兩個Flow在回調函數中進行處理返回一個新的值 R 當兩個flow的長度不等時只發送最短長度的事件

    val nums = (1..4).asFlow() 
    val strs = flowOf("one", "two", "three") 
    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
    
    // 結果:
    1 -> one
    2 -> two
    3 -> three
    複製代碼
  • combine

    任意一個flow釋放值且都有釋放值後會調用combine後的代碼塊,且值爲每一個flow的最新值。 和zip的區別: 組合兩個流,在通過第一次發射之後,任意方有新數據來的時候就能夠發射,另外一方有多是已經發射過的數據

    val flow1 = flowOf(1, 2, 3, 4).onEach { delay(10) }
    val flow2 = flowOf("a", "b", "c", "d").onEach { delay(20) }
    
    flow1.combine(flow2) { first, second ->
        "$first$second"
    }.collect { println("$it") }
    
    // 結果:1a 2a 2b 3b 4b 4c 4d
    
    // 解釋:
    // 開始 --- flow1 釋放 1,flow2 釋放 a, 釋放1a
    // 10毫秒 --- flow1 釋放 2,釋放2a
    // 20毫秒 --- flow2 釋放 b,此時釋放2b
    // 30毫秒 --- flow1 釋放 3,此時釋放3b
    // 40毫秒 --- flow1 釋放 4,此時釋放4b
    // 40毫秒 --- flow2 釋放 c,此時釋放4c
    // 60毫秒 --- flow2 釋放 d,此時釋放4d
    複製代碼

retry相關操做符

  • retry
public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, // 重試次數 predicate: suspend (cause: Throwable) -> Boolean = { true } ): Flow<T> 複製代碼
  • retryWhen
public fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean ): Flow<T>
複製代碼

末端操做符

  • Collect相關的末端操做符

    • collect

      接收值

    • launchIn

      scope.launch { flow.collect() }的縮寫, 表明在某個協程上下文環境中去接收釋放的值

      val flow1 = flow {
          delay(1000)
          emit(1)
          delay(1000)
          emit(2)
          delay(1000)
          emit(3)
          delay(1000)
          emit(4)
      }
      
      flow1.onEach { println("$it") }
          .launchIn(GlobalScope)
      
      // 結果:1 2 3 4
      複製代碼
    • collectIndexed

      和withIndex對應的,接收封裝的IndexedValue

      val flow1 = flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }.withIndex()
      
      flow1.collectIndexed { index, value ->
          println("index = $index, value = $value")
      }
      
      // 結果:
      // I/System.out: index = 0, value = IndexedValue(index=0, value=1)
      // I/System.out: index = 1, value = IndexedValue(index=1, value=2)
      // I/System.out: index = 2, value = IndexedValue(index=2, value=3)
      // I/System.out: index = 3, value = IndexedValue(index=3, value=4)
      複製代碼
    • collectLatest

      collectLatest與collect的區別是,若是有新的值釋放,上一個值的操做若是沒執行完則將會被取消

      val flow1 = flow {
          emit(1)
          delay(1000)
          emit(2)
          delay(1000)
          emit(3)
          delay(2000)
          emit(4)
      }
      
      flow1.collectLatest {
          println("正在計算收到的值 $it")
          delay(1500)
          println("收到的值 $it")
      }
      
      // 結果:
      // I/System.out: 正在計算收到的值 1
      // I/System.out: 正在計算收到的值 2
      // I/System.out: 正在計算收到的值 3
      // I/System.out: 收到的值 3
      // I/System.out: 正在計算收到的值 4
      // I/System.out: 收到的值 4
      
      // 解釋:
      // 1間隔1000毫秒後釋放2,2間隔1000毫秒後釋放3,這間隔小於須要接收的時間1500毫秒,因此當2和3 到來後,以前的操做被取消了。
      // 3和4 之間的間隔夠長可以等待執行完畢,4是最後一個值也能執行
      複製代碼
  • Collection相關的末端操做符

    • toList

      將釋放的值轉換成List

      flow {
          emit(1)
          delay(1000)
          emit(2)
          delay(1000)
          emit(3)
          delay(2000)
          emit(4)
      }
      
      println(flow1.toList())
      
      // 結果:[1, 2, 3, 4]
      複製代碼
    • toSet

      將釋放的值轉換成Set

      flow {
          emit(1)
          delay(1000)
          emit(2)
          delay(1000)
          emit(3)
          delay(2000)
          emit(4)
      }
      
      println(flow1.toSet())
      
      // 結果:[1, 2, 3, 4]
      
      複製代碼
  • Count相關的末端操做符

    • count

      1.計算釋放值的個數

    val flow1 = flow {
            emit(1)
            delay(1000)
            emit(2)
            delay(1000)
            emit(3)
            delay(2000)
            emit(4)
        }
        
        println(flow1.count())
                
        // 結果:4
        
        2.計算知足某一條件的釋放值的個數
        val flow1 = flow {
            emit(1)
            delay(1000)
            emit(2)
            delay(1000)
            emit(3)
            delay(2000)
            emit(4)
        }
        
        println(flow1.count { it % 2 == 0 })
                
        // 結果:2
        // 解釋:
        // 偶數有2個值 2 4
        ```
        
        
        
        
    
    複製代碼
  • Reduce相關的末端操做符

    • reduce

      和runningReduce相似,可是隻計算最後的結果。

      val flow1 = flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }
      println(flow1.reduce { acc, value -> acc * value })
      
      // 結果:24
      // 解釋:計算最後的結果,1 * 2 * 3 * 4 = 24
      複製代碼
    • fold

      和scan相似,有一個初始值,可是隻計算最後的結果。

      val flow1 = flow {
          emit(1)
          emit(2)
          emit(3)
          emit(4)
      }
      println(flow1.fold(100) { acc, value -> acc * value })
      
      // 結果:2400
      // 解釋:計算最後的結果,100 * 1 * 2 * 3 * 4 = 2400
      複製代碼
    • single

      只接收一個值的Flow 注意:多於1個或者沒有值都會報錯

      val flow1 = flow {
          emit(1)
      }
      println(flow1.single())
      
      // 結果:1
      複製代碼
    • singleOrNull

      接收一個值的Flow或者一個空值的Flow

    • first/firstOrNull

      1. 接收釋放的第一個值/接收第一個值或者空值
    val flow1 = flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
        }
        println(flow1.first())
        
        // 結果:1
        ```
    
        2. 接收第一個知足某個條件的值
        
        val flow1 = flow {
            emit(1)
            emit(2)
            emit(3)
            emit(4)
        }
        println(flow1.first { it % 2 == 0})
        
        // 結果:2
        ```
        
        
    
    複製代碼

Flow的錯誤異常處理

  • 能夠經過 try catch 捕獲錯誤異常

    try {
        flow {
           for (i in 1..3) {
            emit(i)
         }
        }.collect {
            println("接收值 $it")
            check(it <= 1) { "$it 大於1"  }
        }
    } catch (e: Throwable) {
        println("收到了異常: $e")
    }
    
    // 結果:
    // I/System.out: 接收值 1
    // I/System.out: 接收值 2
    // I/System.out: 收到了異常: java.lang.IllegalStateException: 2 大於1
    
    // 解釋:
    // 收到2的時候就拋出了異常,讓後flow被取消,異常被捕獲
    複製代碼
  • 經過catch函數

    catch函數可以捕獲以前產生的異常,以後的異常沒法捕獲。

    flow {
         for (i in 1..3) {
            emit(i)
         }
        }.map {
            check(it <= 1) { "$it 大於1" }
            it
        }
        .catch { e -> println("Caught $e") }
        .collect()
    
    // 結果:
    // Caught java.lang.IllegalStateException: 2 大於1
    複製代碼

Flow的取消

  • CoroutineScope.cancel

    GlobalScope.launch {
        val flow1 = flow {
            for(i in 1..4){
              emit(i)
            }
        }
        flow1.collect { value ->
            println("$value")
            if (value >= 3) {
                cancel()
            }
        }
    }
            
     // 結果:1 2 3 
    複製代碼
  • 流取消檢測

    在協程處於繁忙循環的狀況下,必須明確檢測是否取消。 能夠添加 .onEach { currentCoroutineContext().ensureActive() }, 可是這裏提供了一個現成的 cancellable 操做符來執行此操做:

    (1..5).asFlow().cancellable().collect { value -> 
            if (value == 3) cancel()  
            println(value)
        } 
    複製代碼
相關文章
相關標籤/搜索