Kotlin Flow啊,你將流向何方?

語言: CN / TW / HK

前言

協程系列文章:

前邊一系列的協程文章鋪墊了很久,終於要分析Flow了。如果說協程是Kotlin的精華,那麼Flow就是協程的精髓。
通過本篇文章,你將瞭解到:

  1. 什麼是流?
  2. 為什麼引進Flow?
  3. Fow常見的操作
  4. 為什麼說Flow是冷流?

1. 什麼是流

![image.png](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7229da2cd41c45259ef6a64106c0c615~tplv-k3u1fbpfcp-zoom-1.image)

自然界的流水,從高到低,從上游到下游流動。

而對於計算機世界的流:

資料的傳遞過程構成了資料流,簡稱流

比如想要查詢1~1000內的偶數,可以這麼寫:
kotlin var i = 0 var list = mutableListOf<Int>() while (i < 1000) { if (i % 2 == 0) list.add(i) i++ } 此處對資料的處理即為找出其中的偶數。
若想要在偶數中找到>500的數,則繼續篩選:
kotlin var i = 0 var list = mutableListOf<Int>() while (i < 1000) { if (i > 500 && i % 2 == 0) list.add(i) i++ } 可以看出,原始資料是1~1000,我們對它進行了一些操作:過濾偶數、過濾>500的數。當然還可以進行其它操作,如對映、變換等。
提取上述過程三要素:

  1. 原始資料
  2. 對資料的一系列操作
  3. 最終的資料

把這一系列的過程當做流:

![image.png](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/b7f46949488e4a2da6d3853cc5e97812~tplv-k3u1fbpfcp-zoom-1.image)

從流的方向來觀察,我們稱原始資料為上流,對資料進行一系列處理後,最終的資料為下流。
從流的屬性來觀察,我們認為生產者在上流生產資料,消費者在下流消費資料。

2. 為什麼引進Flow?

由前面的文章我們知道,Java8提供了StreamAPI,專用來操作流,而Kotlin也提供了Sequence來處理流。
那為什麼還要引進Flow呢?
在Kotlin的世界裡當然不會想再依賴Java的StreamAPI了,主要來對比Kotlin裡的各種方案選擇。
先看應用場景的演變。

a、集合獲取多個值
想要獲取多個值,很顯而易見的想到了集合。
kotlin fun testList() { //構造集合 fun list(): List<Int> = listOf(1, 2, 3) list().forEach { //獲取多個值 println("value = $it") } } 以上函式功能涉及兩個物件:生產者和消費者。
生產者:負責將1、2、3構造為集合。
消費者:負責從集合裡將1、2、3取出。
若此時想要控制生產者的速度,比如先將1放到集合裡,過1秒後再講2放進集合,在此種場景下該函式顯得不那麼靈活了。

b、Sequence控制生成速度
Sequence可以生產資料,先看看它是怎麼控制生產速度的。
kotlin fun testSequence() { fun sequence():Sequence<Int> = sequence { for (i in 1..3) { Thread.sleep(1000) yield(i) } } sequence().forEach { println("value = $it") } } 通過阻塞執行緒控制了生產者的速度。
你可能會說:在協程體裡為啥要用Thread.sleep()阻塞執行緒呢,用delay()不香嗎?
看起來很香,我們來看看實際效果:

![image.png](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/16481ae8c7de4eb5a1fa3a56c54ebb08~tplv-k3u1fbpfcp-zoom-1.image)

直接報編譯錯誤了,提示是:受限制的掛起函式只能呼叫自己協程作用域內的成員和其它掛起函式。
而sequence的作用域是SequenceScope,檢視其定義發現:

![image.png](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/fda8e182f5a74e99a467b2c996da306d~tplv-k3u1fbpfcp-zoom-1.image)

究其原因,SequenceScope 被RestrictsSuspension 修飾限制了。

c、集合配合協程使用
sequence 因為協程作用域的限制,不能非同步生產資料,而使用集合卻沒此限制。 kotlin suspend fun testListDelay() { suspend fun list():List<Int> { delay(1000) return listOf(1, 2, 3) } list().forEach { println("value = $it") } } 但也暴露了一個缺陷,只能一次性的返回集合元素。

綜上所述:

不管是集合還是Sequence,都不能完全覆蓋流的需求,此時Flow閃亮登場了

3. Fow常見的操作

最簡單的Flow使用

```kotlin suspend fun testFlow1() { //生產者 var flow = flow { //發射資料 emit(5) }

    //消費者
    flow.collect {
        println("value=$it")
    }
}

``` 通過flow函式構造一個flow物件,然後通過呼叫flow.collect收集資料。
flow函式的閉包為生產者的生產邏輯,collect函式的閉包為消費者的消費邏輯。

當然,還有更簡單的寫法:
kotlin suspend fun testFlow2() { //生產者 flow { //發射資料 emit(5) }.collect { //消費者 println("value=$it") } } 執行流程:

![image.png](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/b9f6fa1fa1f843ed87a330986417c488~tplv-k3u1fbpfcp-zoom-1.image)

Flow操作符

上面只提到了flow資料的傳送以及接收,並沒有提及對flow資料的操作。
flow提供了許多操作符方便我們對資料進行處理(對流進行加工)。
我們以尋找1~1000內大於500的偶數為例:
```kotlin suspend fun testFlow3() { //生產者 var flow = flow { for (i in 1..1000) { emit(i) } }.filter { it > 500 && it % 2 == 0 }

    //消費者
    flow.collect {
        println("value=$it")
    }
}

filter函式的作用根據一定的規則過濾資料,一般稱這種函式為flow的操作符。 當然還可以對flow進行對映、變換、異常處理等。kotlin suspend fun testFlow3() { //生產者 var flow = flow { for (i in 1..1000) { emit(i) } }.filter { it > 500 && it % 2 == 0 } .map { it - 500 } .catch { //異常處理 }

    //消費者
    flow.collect {
        println("value=$it")
    }
}

```

中間操作符
前面說過流的三要素:原始資料、對資料的操作、最終資料,對應到Flow上也是一樣的。
flow的閉包裡我們看做是原始資料,而filter、map、catch等看做是對資料的操作,collect閉包裡看做是最終的資料。
filter、map等操作符屬於中間操作符,它們負責對資料進行處理。

中間操作符僅僅只是預先定義一些對流的操作方式,並不會主動觸發動作執行

末端操作符
末端操作符也叫做終端操作符,呼叫末端操作符後,Flow將從上流發出資料,經過一些列中間操作符處理後,最後流到下流形成最終資料。
如上面的collect操作符就是其中一種末端操作符。

怎麼區分中間操作符和末端操作符呢?
和Sequence操作符類似,可以通過返回值判斷。
先看看中間操作符filter:
```kotlin public inline fun Flow.filter(crossinline predicate: suspend (T) -> Boolean): Flow = transform { value -> if (predicate(value)) [email protected] emit(value) }

internal inline fun Flow.unsafeTransform( @BuilderInference crossinline transform: suspend FlowCollector.(value: T) -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use collect { value -> // kludge, without it Unit will be returned and TCE won't kick in, KT-28938 [email protected] transform(value) } } ``` 可以看出,filter操作符僅僅只是構造了Flow物件,並重寫了collect函式。

再看末端操作符collect:
kotlin public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) }) 返回值為Unit,並且通過呼叫collect最終呼叫了emit,觸發了流。

Flow相比Sequence、Collection的優勢

Sequence對於協程的支援不夠好,不能呼叫其作用域外的suspend函式,而Collection生產資料不夠靈活,來看看Flow是如何解決這些問題的。
```kotlin suspend fun testFlow4() { //生產者 var flow = flow { for (i in 1..1000) { delay(1000) emit(i) } }.flowOn(Dispatchers.IO)//切換到io執行緒執行

    //消費者
    flow.collect {
        delay(1000)
        println("value=$it")
    }
}

``` 如上,flow的生產者、消費者閉包裡都支援呼叫協程的suspend函式,同時也支援切換執行緒執行。
再者,flow可以將集合裡的值一個個發出,可調整其流速。
當然,flow還提供了許多操作符幫助我們實現各種各樣的功能,此處限於篇幅就不再深入。
萬變不離其宗,知道了原理,一切迎刃而解。

4. 為什麼說Flow是冷流?

flow 的流動

在sequence的分析裡有提到過sequence是冷流,那麼什麼是冷流呢?

沒有消費者,生產者不會生產資料
沒有觀察者,被觀察者不會發送資料

kotlin suspend fun testFlow5() { //生產者 var flow = flow { println("111") for (i in 1..1000) { emit(i) } }.filter { println("222") it > 500 && it % 2 == 0 }.map { println("333") it - 500 }.catch { println("444") //異常處理 } 如上程式碼,只要生產者沒有消費者,該函式執行後不會有任何列印語句輸出。
這個時候將消費者加上,就會觸發流的流動。

還是以最簡單的flow demo為例,看看其呼叫流程:

![image.png](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c7c1c99c6aa0480f9fe0f8b540b947e8~tplv-k3u1fbpfcp-zoom-1.image)

圖上1~6步驟即為最簡單的flow呼叫流程。
可以看出,只有呼叫了末端操作符(如collect)之後才會觸發flow的流動,因此flow是冷流。

flow 的原理

```kotlin suspend fun testFlow1() { //生產者 var flow = flow { //發射資料 emit(5) }

    //消費者
    flow.collect {
        println("value=$it")
    }
}

``` 以上程式碼涉及到三個關鍵函式(flow、emit、collect),兩個閉包(flow閉包、collect閉包。
從上面的呼叫圖可知,以上五者的呼叫關係:

flow-->collect-->flow閉包-->emit-->collect閉包

接下來逐一分析在程式碼裡的關係。

先看生產者動作(flow函式)
flow函式實現:
```kotlin

public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block)

傳入的引數型別為:FlowCollector的擴充套件函式,而FlowCollector是介面,它有唯一的函式:emit(xx)。因此在flow函式的閉包裡可以呼叫emit(xx)函式,flow閉包作為SafeFlow的成員變數block。 flow 函式返回SafeFlow,SafeFlow繼承自AbstractFlow,並實現了collect函式:kotlin

Flow.kt

public final override suspend fun collect(SafeCollector: FlowCollector<T>) {
    //構造SafeCollector
    //collector 作為SafeCollector的成員變數
    val safeCollector = SafeCollector(collector, coroutineContext)
    try {
        //抽象函式,子類實現
        collectSafely(safeCollector)
    } finally {
        safeCollector.releaseIntercepted()
    }
}

``` collect的閉包作為SafeCollector的成員變數collector,後面會用到。
由此可見:flow函式僅僅只是構造了flow物件並返回。

再看消費者動作(collect)
當消費者呼叫flow.collect函式時:
kotlin public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> { override suspend fun emit(value: T) = action(value) }) 此時呼叫的collect即為flow裡定義的collect函式,並構造了匿名物件FlowCollector,實現了emit函式,而emit函式的真正實現為action,也就是外層傳入的collect的閉包。

上面分析到的collect原始碼裡呼叫了collectSafely:
kotlin private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() { override suspend fun collectSafely(collector: FlowCollector<T>) { collector.block() } } 此處的block即為在構造flow物件時傳入的閉包。
此時,消費者通過collect函式已經呼叫到生產者的閉包裡

還剩下最後一個問題:生產者的閉包是如何流轉到消費者的閉包裡呢?

最後看發射動作(emit)
在生產者的閉包裡呼叫了emit函式:
```kotlin override suspend fun emit(value: T) { //掛起函式 return suspendCoroutineUninterceptedOrReturn [email protected]{ uCont -> try { //uCont為當前協程續體 emit(uCont, value) } catch (e: Throwable) { // Save the fact that exception from emit (or even check context) has been thrown lastEmissionContext = DownstreamExceptionElement(e) throw e } } }

private fun emit(uCont: Continuation<Unit>, value: T): Any? {
    val currentContext = uCont.context
    currentContext.ensureActive()
    // This check is triggered once per flow on happy path.
    val previousContext = lastEmissionContext
    if (previousContext !== currentContext) {
        checkContext(currentContext, previousContext, value)
    }
    completion = uCont
    //collector.emit 最終呼叫collect的閉包
    return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}

``` 如此一來,生產者的閉包裡呼叫emit函式後,將會呼叫到collect的閉包裡,此時資料從flow的上游流轉到下游。
總結以上步驟,其實本質還是物件呼叫。

中間操作符的原理
以filter為例:
```kotlin public inline fun Flow.filter(crossinline predicate: suspend (T) -> Boolean): Flow = transform { value -> //判斷過濾條件是否滿足,若是則傳送資料 if (predicate(value)) [email protected] emit(value) }

internal inline fun <T, R> Flow<T>.unsafeTransform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
    //呼叫當前物件collect
    collect { value ->
        // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
        [email protected] transform(value)
    }
}

internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
    //構造flow,重寫collect
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}

``` filter操作符構造了新的flow物件,該物件重寫了collect函式。
當呼叫flow.collect時,先呼叫到filter物件的collect,進而呼叫到原始flow的collect,接著呼叫到原始flow物件的閉包,在閉包裡呼叫的emit即為filter的閉包,若filter閉包裡條件滿足則調動emit函式,最後呼叫到collect的閉包。

![image.png](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c7230ee5a80a4dffac55b746e445ef2d~tplv-k3u1fbpfcp-zoom-1.image)

理解中間操作符的要點:

  1. 中間操作符返回新的flow物件,重寫了collect函式
  2. collect函式會呼叫當前flow(呼叫filter的flow物件)的collect
  3. collect函式做其它的處理

與sequence類似,使用了裝飾者模式。
以上以filter為例闡述了原理,其它中間操作符的原理類似,此處就不再細說。

下篇將分析Flow的背壓與執行緒切換,相信分析的邏輯會讓大家耳目一新,敬請期待~

本文基於Kotlin 1.5.3,文中完整Demo請點選

您若喜歡,請點贊、關注、收藏,您的鼓勵是我前進的動力

持續更新中,和我一起步步為營系統、深入學習Android/Kotlin

1、Android各種Context的前世今生
2、Android DecorView 必知必會
3、Window/WindowManager 不可不知之事
4、View Measure/Layout/Draw 真明白了
5、Android事件分發全套服務
6、Android invalidate/postInvalidate/requestLayout 徹底釐清
7、Android Window 如何確定大小/onMeasure()多次執行原因
8、Android事件驅動Handler-Message-Looper解析
9、Android 鍵盤一招搞定
10、Android 各種座標徹底明瞭
11、Android Activity/Window/View 的background
12、Android Activity建立到View的顯示過
13、Android IPC 系列
14、Android 儲存系列
15、Java 併發系列不再疑惑
16、Java 執行緒池系列
17、Android Jetpack 前置基礎系列
18、Android Jetpack 易學易懂系列
19、Kotlin 輕鬆入門系列
20、Kotlin 協程系列全面解讀