Kotlin 學習筆記(六)—— Flow 資料流學習實踐指北(二)StateFlow 與 SharedFlow

語言: CN / TW / HK

要說最近圈內大事件,那就非 chatGPT 莫屬了!人工智慧領域最新的大突破了吧?很可能引發下一場的技術革命,因為大家都懂的原因現在還不能在中國大陸使用,不過國內的度廠正在積極跟進了,預計3月份能面世,且期待一下吧~

上節主要講述了 Flow 的組成、Flow 常用操作符以及冷流的具體使用。這節自然就要介紹熱流了。先來溫習下:

冷流(Cold Flow):在資料被消費者訂閱後,即呼叫 collect 方法之後,生產者才開始執行傳送資料流的程式碼,通常是呼叫 emit 方法。即不消費,不生產,多次消費才會多次生產。消費者和生產者是一對一的關係。

上次說的例子不太直觀,所以這次換了個更直觀的對比例子,先來看第一個: kotlin //code 1 val coldFlow = flow { println("coldFlow begin emitting") emit(40) println("coldFlow 40 is emitted") emit(50) println("coldFlow 50 is emitted") } binding.btn2.setOnClickListener { lifecycleScope.launch { coldFlow.collect { println("coldFlow = $it") } } } 只有當點選按鈕時,才會如圖打印出資訊,即冷流只有呼叫了 collect 方法收集流後,emit 才會開始執行。 圖1 冷流特點日誌圖

熱流(Hot Flow)就不一樣了,無論有無消費者,生產者都會生產資料。它不像冷流,Flow 必須在呼叫末端操作符之後才會去執行;而是可以自己控制是否傳送或者生產資料流。並且熱流可以有多個訂閱者;而冷流只有一個。再來看看熱流的例子: ```kotlin //code 2 val hotFlow = MutableStateFlow(0) lifecycleScope.launch { println("hotFlow begin emitting") hotFlow.emit(40) println("hotFlow 40 is emitted")

hotFlow.emit(50)
println("hotFlow 50 is emitted")

} binding.btn2.setOnClickListener { lifecycleScope.launch { hotFlow.collect { println("hotFlow collects $it") } } } ``` MutableStateFlow 就是熱流中的一種,當沒有點選按鈕時,便會輸出下圖中的前三行資訊。 圖2 熱流特點日誌圖 當點選兩下按鈕後,就會依次輸出如圖第 4,5 行的資訊,至於為什麼只會接收到 50,這跟 MutableStateFlow 的特性有關,後面再說。

通過這兩個例子就可清楚地知道冷熱流之間的區別。熱流有兩種物件,分別是 StateFlow 和 SharedFlow。

1. SharedFlow

先來看看 SharedFlow,它是一個 subscriber 訂閱者的角色,當一個 SharedFlow 呼叫了 collect 方法後,它就不會正常地結束完成;但可以 cancel 掉 collect 所在的協程,這樣就可以取消掉訂閱了。SharedFlow 在每次 emit 時都會去 check 一下所在協程是否已經取消。絕大多數的終端操作符,例如 Flow.toList() 都不會使得 SharedFlow 結束完成,但 Flow.take() 之類的截斷操作符是例外,它們是可以強制完成一個 SharedFlow 的。

SharedFlow 的簡單使用樣例: ```kotlin //code 3 class EventBus { private val _events = MutableSharedFlow() // private mutable shared flow val events = _events.asSharedFlow() // publicly exposed as read-only shared flow

suspend fun produceEvent(event: Event) {
    _events.emit(event) // suspends until all subscribers receive it
}

} ``` 與 LiveData 相似的使用方式。但 SharedFlow 的功能更為強大,它有 replay cache 和 buffer 機制。

1.1 Replay cache

可以理解為是一個粘性事件的快取。每個新的訂閱者會首先收到 replay cache 中之前發出並接收到的事件,再才會收到新的發射出的值。可以在 MutableSharedFlow 的建構函式中設定 cache 的大小,不能為負數,預設為 0. kotlin //code 4 public fun <T> MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) replay 重播之前最新的 n 個事件,見字知義。下面是例子: ```kotlin //code 5 private fun testSharedFlow() { val sharedFlow = MutableSharedFlow(replay = 2) lifecycleScope.launch { launch { sharedFlow.collect { println("++++ sharedFlow1 collected $it") } }

    launch {
        (1..3).forEach{
            sharedFlow.emit(it)
        }
    }

    delay(200)
    launch {
        sharedFlow.collect {
            println("++++ sharedFlow2 collected $it")
        }
    }
}

} 結果為:kotlin com.example.myapplication I/System.out: ++++ sharedFlow1 collected 1 com.example.myapplication I/System.out: ++++ sharedFlow1 collected 2 com.example.myapplication I/System.out: ++++ sharedFlow1 collected 3 com.example.myapplication I/System.out: ++++ sharedFlow2 collected 2 com.example.myapplication I/System.out: ++++ sharedFlow2 collected 3 `` 在emit` 發射資料前後分別設定了一個訂閱者,後面還延時了 200ms 才進行訂閱。第一個訂閱者 1、2、3都收到了;而第二個訂閱者卻只收到了 2 和 3. 這是因為在第二個訂閱者開始訂閱時,資料已經都發射完了,而 SharedFlow 的重播 replay 為 2,就可將最近發射的兩個資料再依次傳送一遍,這就可以收到 2 和 3 了。

1.2 extraBufferCapacity

SharedFlow 建構函式的第二個引數 extraBufferCapacity 的作用是,在 replay cache 之外還能額外設定的快取。常用於當生產者生產資料的速度 > 消費者消費資料的速度時的情況,可以有效提升吞吐量。

所以,若 replay = m,extraBufferCapacity = n,那麼這個 SharedFlow 總共的 BufferSize = m + n. replay 會儲存最近發射的資料,如果滿了就會往 extraBuffer 中存。接下來看一個例子: ```kotlin //code 6 private fun coroutineStudy() { val sharedFlow = MutableSharedFlow(replay = 1, extraBufferCapacity = 1) lifecycleScope.launch { launch { sharedFlow.collect { println("++++ sharedFlow1 collected $it") delay(6000) } }

    launch {
        (1..4).forEach{
            sharedFlow.emit(it)
            println("+++emit $it")
            delay(1000)
        }
    }

    delay(4000)
    launch {
        sharedFlow.collect {
            println("++++ sharedFlow2 collected $it")
            delay(20000)
        }
    }
}

} 執行結果為:kotlin 17:32:09.283 28184-28184 System.out com.wen.testdemo I +++emit 1 17:32:09.284 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 1 17:32:10.285 28184-28184 System.out com.wen.testdemo I +++emit 2 17:32:11.289 28184-28184 System.out com.wen.testdemo I +++emit 3 17:32:13.286 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow2 collected 3 17:32:15.292 28184-28184 System.out com.wen.testdemo I +++emit 4 17:32:15.293 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 2 17:32:21.301 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 3 17:32:27.311 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 4 17:32:33.292 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow2 collected 4 `` 列印結果可能會有點懵,對照著時序圖更容易理解(此圖來自於參考文獻3,感謝 fundroid 大佬的輸出~): ![圖 3 SharedFlow快取時序圖](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/470efed5448e453b80194a8e2dfdec79~tplv-k3u1fbpfcp-zoom-1.image) 1)Emitter 傳送 1,因為 Subscriber1 在 Emitter 傳送資料前就已開始訂閱,所以 Subscriber1 可馬上接收;此時replay儲存 1; 2)Emitter 傳送 2,Subscriber1 還在處理中處於掛起態,此時replay儲存 2; 3)Emitter 傳送 3,此時還沒有任何消費者能消費,則replay儲存 3,將 2 放入extra中; 4)Emitter 想要傳送 4,但發現 SharedFlow 的 Buffer 已滿,則按照預設的策略進行掛起等待(預設策略就是 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND); 5)Subscriber2 開始訂閱,接收到replay中的 3,此時 Subscriber1 還是掛起態,Buffer 中資料沒變化,即replay儲存 3,extra儲存 2; 6)Subscriber1 處理完 1 後,依次處理 Buffer 中 的下一個資料,即消費extra中的 2,這時 Buffer 終於有空間了,Emitter 結束掛起,傳送 4,replay儲存 4,將 3 放入extra中; 7)Subscriber1 消費完 2 後接著再消費extra` 中的 3,此時 Buffer 中就只有 4 了。後面的就不用多說了

比較繞,需要多看幾次思考一下。需要注意的是,程式碼執行結果中下面兩行輸出到底誰先誰後的問題: kotlin 17:32:15.292 28184-28184 System.out com.wen.testdemo I +++emit 4 17:32:15.293 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 2 打印出的時間戳幾乎是一樣的,若嚴格按照 log 列印的時間戳順序,應該是 Emitter 先發送的 4,Subscriber1 再才接收到的 2,但根據反覆實踐的結果來看,實際上是 Subscriber1 先接收緩衝區中的 2,等緩衝區有剩餘空間後,Emitter 才結束掛起繼續傳送 4. 把上面的例子簡化一下,再改改資料: ```kotlin //code 7 private fun coroutineStudy() { val sharedFlow = MutableSharedFlow(replay = 1, extraBufferCapacity = 1) lifecycleScope.launch { launch { sharedFlow.collect { println("++++ sharedFlow1 collected $it") delay(10000) } }

    launch {
        (1..4).forEach{
            sharedFlow.emit(it)
            println("+++emit $it")
            delay(1000)
        }
    }
}

} 列印結果如下所示,因為把 sharedFlow delay 的時長設定為 10s,所以很明顯地看到 Emitter 在傳送 1、2、3 時時間間隔均是 1s,傳送 4 時足足過了 8s,這段時間就是 Emitter 被掛起了,一直等到 sharedFlow1 接收到 2 之後,4 才被 Emitter 傳送,而 sharedFlow1 的每次接收都是間隔 10s,所以是先接收的 2,再結束掛起傳送的 4.kotlin 00:25:52.481 29483-29483/com.example.myapplication I/System.out: +++emit 1 00:25:52.482 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 1 00:25:53.483 29483-29483/com.example.myapplication I/System.out: +++emit 2 00:25:54.486 29483-29483/com.example.myapplication I/System.out: +++emit 3 00:26:02.487 29483-29483/com.example.myapplication I/System.out: +++emit 4 00:26:02.488 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 2 00:26:12.497 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 3 00:26:22.516 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 4 通過原始碼也可看出這個結論,從 `collect` 方法進入,最終可以找到實際上是呼叫了 SharedFlowImpl 中的 `collect` 方法:kotlin //code 8 override suspend fun collect(collector: FlowCollector) { val slot = allocateSlot() try { if (collector is SubscribedFlowCollector) collector.onSubscription() val collectorJob = currentCoroutineContext()[Job] while (true) { var newValue: Any? while (true) { newValue = tryTakeValue(slot) //首先嚐試直接獲取值 if (newValue !== NO_VALUE) break awaitValue(slot) //沒獲取到則只能掛起等待新值到來 } collectorJob?.ensureActive() collector.emit(newValue as T) } } finally { freeSlot(slot) } } `` 在內層while迴圈中,首先是通過tryTakeValue方法直接取值,如果沒取到則通過awaitValue方法掛起等待新值,awaitValue是個掛起函式。取到新值之後,才會跳出內層while迴圈,並執行collector.emit(newValue as T),而這一段程式碼,實際上就是呼叫的 code 7 中的sharedFlow.emit(it)` 程式碼。

此處原始碼還可以看出,SharedFlow 每次在 emit 之前,確實都會檢視所在協程是否還在執行;且它確實是不會停止的,哪怕沒有接收到新值,也會一直處於掛起等待的狀態,想要結束則得使用截斷型別的操作符。

1.3 onBufferOverflow

SharedFlow 建構函式的第三個引數就是設定超過 Buffer 之後的策略,預設是將生產者掛起暫時不再發送資料,即 BufferOverflow.SUSPEND。

還有另外兩個資料丟棄策略:
1)BufferOverflow.DROP_LATEST 丟棄最新資料; 圖 4 BufferOverflow.DROP_LATEST 策略 Emitter 在傳送 4 時,因為 Buffer 已滿,所以只能按照策略將最新的資料 4 丟棄。而在傳送 3 時,由於 1 已經被消費過,所以可以從 Buffer 中移除,從而騰出儲存空間快取 3。

2)BufferOverflow.DROP_OLDEST 丟棄最老資料: 圖 5 BufferOverflow.DROP_OLDEST 策略 這個策略就比較簡單,Buffer 中只會儲存最新的資料。不管較老的資料是否被消費,當 Buffer 已滿而又有新的資料到達時,老資料都會從 Buffer 中移除,騰出空間讓給新資料。

注意點:當 replay、extra 都為 0,即沒有 Buffer 的時候,那麼 onBufferOverflow 只能是 BufferOverflow.SUSPEND。丟棄策略啟動的前提是 SharedFlow 至少有 Buffer 且 Buffer 已滿。

1.4 emit 與 tryEmit

由前一節可知,當 SharedFlow 的 Buffer 已滿且 onBufferOverflow 為 BufferOverflow.SUSPEND 的時候,emit 會被掛起(emit 是個掛起函式),但這會影響到 Emitter 的速度。如果不想在傳送資料的時候被掛起,除了設定 onBufferOverflow 丟棄策略外,還可以使用 tryEmit 方法。 ```kotlin //code 9 override fun tryEmit(value: T): Boolean { var resumes: Array?> = EMPTY_RESUMES val emitted = synchronized(this) { if (tryEmitLocked(value)) { resumes = findSlotsToResumeLocked(resumes) true } else { false } } for (cont in resumes) cont?.resume(Unit) return emitted }

@Suppress("UNCHECKED_CAST")
private fun tryEmitLocked(value: T): Boolean {
    // Fast path without collectors -> no buffering
    // 1.沒有訂閱者時,直接返回 true,因為沒有人接收,發了也沒用,也不用快取
    if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
    // With collectors we'll have to buffer
    // 2.有訂閱者,就得考慮快取傳送的值了
    // cannot emit now if buffer is full & blocked by slow collectors
    // 3.如果快取空間已滿,且訂閱者還在掛起處理上次的資料,則不能 emit
    if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
        when (onBufferOverflow) {
            BufferOverflow.SUSPEND -> return false // will suspend
            BufferOverflow.DROP_LATEST -> return true // just drop incoming
            BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
        }
    }
    // 4.程式碼能走到這裡,說明快取還有空間或丟棄策略為DROP_OLDEST
    enqueueLocked(value)
    bufferSize++ // value was added to buffer
    // drop oldest from the buffer if it became more than bufferCapacity
    if (bufferSize > bufferCapacity) dropOldestLocked()
    // keep replaySize not larger that needed
    if (replaySize > replay) { // increment replayIndex by one
        updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
    }
    return true
}

`` 由程式碼可見tryEmit不是一個掛起函式,它有返回值,如果返回 true 則說明發送資料成功了;如果返回 false,則說明這時傳送資料需要被掛起等待。其中最主要的就是tryEmitLocked` 方法。

tryEmitLocked 方法主要邏輯已在註釋中說明,需要額外說明的是,bufferCapacity 就是 replay + extraBufferCapacity 的大小;replayIndex 指的是最近開始訂閱的訂閱者在 replay cache 快取陣列中需要重播的最小 index。所以當使用預設構造的 SharedFlow 時,replayextraBufferCapacity 都為 0,如果這時再使用 tryEmit 方法進行傳送,則會使得 if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) 判斷為 true,預設的丟棄策略又是 BufferOverflow.SUSPEND,就會導致這裡會直接返回 false,永遠都不會發送出值。所以,在使用預設構造的 SharedFlow 時,不能使用 tryEmit 傳送值,否則無法傳送。 一般使用 emit 即可。

在 SharedFlow 具體實現中,emit 方法就是先嚐試使用 tryEmit 來發送值,如果不能馬上傳送再使用掛起函式 emitSuspend 方法: kotlin //code 10 class SharedFlowImpl override suspend fun emit(value: T) { if (tryEmit(value)) return // fast-path emitSuspend(value) }

2. StateFlow

看完 SharedFlow 再來看 StateFlow 的話就比較簡單了。因為 StateFlow 就是 SharedFlow 的一種特殊子類,特點有三:
1)它的 replay cache 容量為 1;即可快取最近的一次粘性事件;
2)初始化時必須給它設定一個初始值;
3)每次傳送資料都會與上次快取的資料作比較,如果不一樣才會傳送,自動過濾掉沒有發生變化的資料。
它還可直接訪問它自己的 value 引數獲取當前結果值,總體來說,在使用上與 LiveData 相似,下面是它倆的異同點對比。

2.1 與 LiveData 比較的相同點

  1. 均提供了 可讀可寫 和 僅可讀 兩個版本:MutableStateFlow、StateFlow 與 MutableLiveData、LiveData;
  2. 允許被多個觀察者觀察,即生產者對消費者可以為一對多的關係;
  3. 都只會把最新的值給到觀察者,即使沒有觀察者,也會更新自己的值;
  4. 都會產生粘性事件問題;
  5. 都可能產生丟失值的問題;

粘性事件問題:因為 StateFlow 初始化時必須給定初始值,且 replay 為 1,所以每個觀察者進行觀察時,都會收到最近一次的回播資料。如果想避免粘性事件問題,換用 SharedFlow 即可,replay 使用預設值 0 。

值丟失問題:出現在消費者處理資料比生產者生產資料慢的情況,消費者來不及處理資料,就會把之前生產者傳送的舊資料丟棄掉,看個例子: ```kotlin //code 11 private fun stateFlowDemo1() { val stateFlow = MutableStateFlow(0) CoroutineScope(Dispatchers.Default).launch { var count = 1 while (true) { val tmp = count++ delay(1000) println("+++++ tmp = $tmp") stateFlow.value = tmp } }

    CoroutineScope(Dispatchers.Default).launch {
        stateFlow.collect{
            println("++++ count = $it")
            delay(5000)  //模擬耗時操作
        }
    }
}

``` 圖 6 StateFlow丟失值log 可以從列印結果看出,StateFlow 會丟棄掉生產者之前傳送的值,其實 MutableStateFlow 的丟棄策略就是設定的 BufferOverflow.DROP_OLDEST。

2.2 與 LiveData 比較的不同點

  1. StateFlow 必須在構建的時候傳入初始值,LiveData 不需要;
  2. StateFlow 預設是防抖的,LiveData 預設不防抖;
  3. 對於 Android 來說 StateFlow 預設沒有和生命週期繫結,直接使用會有問題;

StateFlow 預設防抖:即如果傳送的值與上次相同,則生產者並不會真正傳送。在原始碼中也有說明,具體在 StateFlow.kt -> class StateFlowImpl -> private fun updateState -> if (oldState == newState) return true 感興趣的可以自行查閱,我看的版本是 1.5.0.

與 LiveData 相比,沒有和 Activity 的生命週期繫結恐怕是使用 StateFlow 最不方便的地方了。當 View 進入 STOPPED 狀態時,LiveData.observe() 會自動取消註冊使用方,這樣就不會再接收到資料了,也符合常理。因為使用者此時已經離開頁面,再接收資料已沒有意義,如果繼續處理後續邏輯可能還會出 bug。

而如果使用的是 StateFlow 或其他資料流,在 View 進入 STOPPED 狀態時,收集資料的操作並不會自動停止。如需實現相同的行為,則需要從 Lifecycle.repeatOnLifecycle 塊收集資料流。如下是來自官方文件的例子: kotlin //code 12 class LatestNewsActivity : AppCompatActivity() { private val latestNewsViewModel = // getViewModel() override fun onCreate(savedInstanceState: Bundle?) { ... // Start a coroutine in the lifecycle scope lifecycleScope.launch { // repeatOnLifecycle launches the block in a new coroutine every time the // lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED. repeatOnLifecycle(Lifecycle.State.STARTED) { // Trigger the flow and start listening for values. // Note that this happens when lifecycle is STARTED and stops // collecting when the lifecycle is STOPPED latestNewsViewModel.uiState.collect { uiState -> // New value received when (uiState) { is LatestNewsUiState.Success -> showFavoriteNews(uiState.news) is LatestNewsUiState.Error -> showError(uiState.exception) } } } } } } //注意:repeatOnLifecycle API 僅在 androidx.lifecycle:lifecycle-runtime-ktx:2.4.0 庫及更高版本中提供。 英文部分註釋說的比較明確了,repeatOnLifecycle(Lifecycle.State.STARTED) 的作用就是每次進入 STARTED 可見狀態時都會重新觀察並收集資料;而在 STOPPED 狀態時就會 cancel 掉 StateFlow 收集流所在的協程從而停止收集。

總結

最後總結一下 Flow 第二小節的內容吧:
1)熱流有無消費者都可傳送資料,生產者和消費者的關係可以是一對多;
2)SharedFlow 可構建熱流,可設定 replay 重播資料量及 extraBufferCapacity 緩衝區大小,以及 onBufferOverflow 緩衝區滿的策略;
3)emittryEmit 傳送方法的異同,前者是掛起函式,注意在使用預設構造的 SharedFlow 時不要使用 tryEmit
4)StateFlow 是 SharedFlow 的一個子類,replay = 1,必須給定初始值,自帶防抖;
5)使用 StateFlow 或 SharedFlow 收集值時,記得在 repeatOnLifecycle(Lifecycle.State.STARTED) 方法中,防止出現崩潰等問題。

更多內容,歡迎關注公眾號:修之竹

贊人玫瑰,手留餘香!歡迎點贊、轉發~ 轉發請註明出處~

參考文獻

  1. Reactive Streams on Kotlin: SharedFlow and StateFlow; Ricardo Costeira; http://www.raywenderlich.com/22030171-reactive-streams-on-kotlin-sharedflow-and-stateflow
  2. Kotlin中 Flow、SharedFlow與StateFlow區別;五問;http://juejin.cn/post/7142038525997744141
  3. 一看就懂!圖解 Kotlin SharedFlow 快取系統;fundroid;http://juejin.cn/post/7156408785886511111
  4. Kotlin:深入理解StateFlow與SharedFlow,StateFlow和LiveData使用差異區分,SharedFlow實現原始碼解析; pumpkin的玄學; http://blog.csdn.net/weixin_44235109/article/details/121594988?spm=1001.2014.3001.5502
  5. StateFlow 和 SharedFlow 官方文件 http://developer.android.google.cn/kotlin/flow/stateflow-and-sharedflow?hl=zh-cn