Kotlin 學習筆記(六)—— Flow 數據流學習實踐指北(二)StateFlow 與 SharedFlow
要説最近圈內大事件,那就非 chatGPT 莫屬了!人工智能領域最新的大突破了吧?很可能引發下一場的技術革命,因為大家都懂的原因現在還不能在中國大陸使用,不過國內的度廠正在積極跟進了,預計3月份能面世,且期待一下吧~
上節主要講述了 Flow 的組成、Flow 常用操作符以及冷流的具體使用。這節自然就要介紹熱流了。先來温習下:
冷流(Cold Flow):在數據被消費者訂閲後,即調用
collect
方法之後,生產者才開始執行發送數據流的代碼,通常是調用emit
方法。即不消費,不生產,多次消費才會多次生產。消費者和生產者是一對一的關係。
上次説的例子不太直觀,所以這次換了個更直觀的對比例子,先來看第一個:
kotlin
//code 1
val coldFlow = flow {
println("coldFlow begin emitting")
emit(40)
println("coldFlow 40 is emitted")
emit(50)
println("coldFlow 50 is emitted")
}
binding.btn2.setOnClickListener {
lifecycleScope.launch {
coldFlow.collect {
println("coldFlow = $it")
}
}
}
只有當點擊按鈕時,才會如圖打印出信息,即冷流只有調用了 collect
方法收集流後,emit
才會開始執行。
熱流(Hot Flow)就不一樣了,無論有無消費者,生產者都會生產數據。它不像冷流,Flow 必須在調用末端操作符之後才會去執行;而是可以自己控制是否發送或者生產數據流。並且熱流可以有多個訂閲者;而冷流只有一個。再來看看熱流的例子: ```kotlin //code 2 val hotFlow = MutableStateFlow(0) lifecycleScope.launch { println("hotFlow begin emitting") hotFlow.emit(40) println("hotFlow 40 is emitted")
hotFlow.emit(50)
println("hotFlow 50 is emitted")
} binding.btn2.setOnClickListener { lifecycleScope.launch { hotFlow.collect { println("hotFlow collects $it") } } } ``` MutableStateFlow 就是熱流中的一種,當沒有點擊按鈕時,便會輸出下圖中的前三行信息。 當點擊兩下按鈕後,就會依次輸出如圖第 4,5 行的信息,至於為什麼只會接收到 50,這跟 MutableStateFlow 的特性有關,後面再説。
通過這兩個例子就可清楚地知道冷熱流之間的區別。熱流有兩種對象,分別是 StateFlow 和 SharedFlow。
1. SharedFlow
先來看看 SharedFlow,它是一個 subscriber 訂閲者的角色,當一個 SharedFlow 調用了 collect
方法後,它就不會正常地結束完成;但可以 cancel 掉 collect
所在的協程,這樣就可以取消掉訂閲了。SharedFlow 在每次 emit
時都會去 check 一下所在協程是否已經取消。絕大多數的終端操作符,例如 Flow.toList()
都不會使得 SharedFlow 結束完成,但 Flow.take()
之類的截斷操作符是例外,它們是可以強制完成一個 SharedFlow 的。
SharedFlow 的簡單使用樣例:
```kotlin
//code 3
class EventBus {
private val _events = MutableSharedFlow
suspend fun produceEvent(event: Event) {
_events.emit(event) // suspends until all subscribers receive it
}
} ``` 與 LiveData 相似的使用方式。但 SharedFlow 的功能更為強大,它有 replay cache 和 buffer 機制。
1.1 Replay cache
可以理解為是一個粘性事件的緩存。每個新的訂閲者會首先收到 replay cache 中之前發出並接收到的事件,再才會收到新的發射出的值。可以在 MutableSharedFlow 的構造函數中設置 cache 的大小,不能為負數,默認為 0.
kotlin
//code 4
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)
replay 重播之前最新的 n 個事件,見字知義。下面是例子:
```kotlin
//code 5
private fun testSharedFlow() {
val sharedFlow = MutableSharedFlow
launch {
(1..3).forEach{
sharedFlow.emit(it)
}
}
delay(200)
launch {
sharedFlow.collect {
println("++++ sharedFlow2 collected $it")
}
}
}
}
結果為:
kotlin
com.example.myapplication I/System.out: ++++ sharedFlow1 collected 1
com.example.myapplication I/System.out: ++++ sharedFlow1 collected 2
com.example.myapplication I/System.out: ++++ sharedFlow1 collected 3
com.example.myapplication I/System.out: ++++ sharedFlow2 collected 2
com.example.myapplication I/System.out: ++++ sharedFlow2 collected 3
``
在
emit` 發射數據前後分別設置了一個訂閲者,後面還延時了 200ms 才進行訂閲。第一個訂閲者 1、2、3都收到了;而第二個訂閲者卻只收到了 2 和 3. 這是因為在第二個訂閲者開始訂閲時,數據已經都發射完了,而 SharedFlow 的重播 replay 為 2,就可將最近發射的兩個數據再依次發送一遍,這就可以收到 2 和 3 了。
1.2 extraBufferCapacity
SharedFlow 構造函數的第二個參數 extraBufferCapacity
的作用是,在 replay cache 之外還能額外設置的緩存。常用於當生產者生產數據的速度 > 消費者消費數據的速度時的情況,可以有效提升吞吐量。
所以,若 replay = m,extraBufferCapacity = n
,那麼這個 SharedFlow 總共的 BufferSize = m + n. replay 會存儲最近發射的數據,如果滿了就會往 extraBuffer
中存。接下來看一個例子:
```kotlin
//code 6
private fun coroutineStudy() {
val sharedFlow = MutableSharedFlow
launch {
(1..4).forEach{
sharedFlow.emit(it)
println("+++emit $it")
delay(1000)
}
}
delay(4000)
launch {
sharedFlow.collect {
println("++++ sharedFlow2 collected $it")
delay(20000)
}
}
}
}
運行結果為:
kotlin
17:32:09.283 28184-28184 System.out com.wen.testdemo I +++emit 1
17:32:09.284 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 1
17:32:10.285 28184-28184 System.out com.wen.testdemo I +++emit 2
17:32:11.289 28184-28184 System.out com.wen.testdemo I +++emit 3
17:32:13.286 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow2 collected 3
17:32:15.292 28184-28184 System.out com.wen.testdemo I +++emit 4
17:32:15.293 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 2
17:32:21.301 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 3
17:32:27.311 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 4
17:32:33.292 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow2 collected 4
``
打印結果可能會有點懵,對照着時序圖更容易理解(此圖來自於參考文獻3,感謝 fundroid 大佬的輸出~):
![圖 3 SharedFlow緩存時序圖](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/470efed5448e453b80194a8e2dfdec79~tplv-k3u1fbpfcp-zoom-1.image)
1)Emitter 發送 1,因為 Subscriber1 在 Emitter 發送數據前就已開始訂閲,所以 Subscriber1 可馬上接收;此時
replay存儲 1;
2)Emitter 發送 2,Subscriber1 還在處理中處於掛起態,此時
replay存儲 2;
3)Emitter 發送 3,此時還沒有任何消費者能消費,則
replay存儲 3,將 2 放入
extra中;
4)Emitter 想要發送 4,但發現 SharedFlow 的 Buffer 已滿,則按照默認的策略進行掛起等待(默認策略就是 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND);
5)Subscriber2 開始訂閲,接收到
replay中的 3,此時 Subscriber1 還是掛起態,Buffer 中數據沒變化,即
replay存儲 3,
extra存儲 2;
6)Subscriber1 處理完 1 後,依次處理 Buffer 中 的下一個數據,即消費
extra中的 2,這時 Buffer 終於有空間了,Emitter 結束掛起,發送 4,
replay存儲 4,將 3 放入
extra中;
7)Subscriber1 消費完 2 後接着再消費
extra` 中的 3,此時 Buffer 中就只有 4 了。後面的就不用多説了
比較繞,需要多看幾次思考一下。需要注意的是,代碼運行結果中下面兩行輸出到底誰先誰後的問題:
kotlin
17:32:15.292 28184-28184 System.out com.wen.testdemo I +++emit 4
17:32:15.293 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 2
打印出的時間戳幾乎是一樣的,若嚴格按照 log 打印的時間戳順序,應該是 Emitter 先發送的 4,Subscriber1 再才接收到的 2,但根據反覆實踐的結果來看,實際上是 Subscriber1 先接收緩衝區中的 2,等緩衝區有剩餘空間後,Emitter 才結束掛起繼續發送 4. 把上面的例子簡化一下,再改改數據:
```kotlin
//code 7
private fun coroutineStudy() {
val sharedFlow = MutableSharedFlow
launch {
(1..4).forEach{
sharedFlow.emit(it)
println("+++emit $it")
delay(1000)
}
}
}
}
打印結果如下所示,因為把 sharedFlow delay 的時長設置為 10s,所以很明顯地看到 Emitter 在發送 1、2、3 時時間間隔均是 1s,發送 4 時足足過了 8s,這段時間就是 Emitter 被掛起了,一直等到 sharedFlow1 接收到 2 之後,4 才被 Emitter 發送,而 sharedFlow1 的每次接收都是間隔 10s,所以是先接收的 2,再結束掛起發送的 4.
kotlin
00:25:52.481 29483-29483/com.example.myapplication I/System.out: +++emit 1
00:25:52.482 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 1
00:25:53.483 29483-29483/com.example.myapplication I/System.out: +++emit 2
00:25:54.486 29483-29483/com.example.myapplication I/System.out: +++emit 3
00:26:02.487 29483-29483/com.example.myapplication I/System.out: +++emit 4
00:26:02.488 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 2
00:26:12.497 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 3
00:26:22.516 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 4
通過源碼也可看出這個結論,從 `collect` 方法進入,最終可以找到實際上是調用了 SharedFlowImpl 中的 `collect` 方法:
kotlin
//code 8
override suspend fun collect(collector: FlowCollector``
在內層
while循環中,首先是通過
tryTakeValue方法直接取值,如果沒取到則通過
awaitValue方法掛起等待新值,
awaitValue是個掛起函數。取到新值之後,才會跳出內層
while循環,並執行
collector.emit(newValue as T),而這一段代碼,實際上就是調用的 code 7 中的
sharedFlow.emit(it)` 代碼。
此處源代碼還可以看出,SharedFlow 每次在 emit
之前,確實都會查看所在協程是否還在運行;且它確實是不會停止的,哪怕沒有接收到新值,也會一直處於掛起等待的狀態,想要結束則得使用截斷類型的操作符。
1.3 onBufferOverflow
SharedFlow 構造函數的第三個參數就是設置超過 Buffer 之後的策略,默認是將生產者掛起暫時不再發送數據,即 BufferOverflow.SUSPEND。
還有另外兩個數據丟棄策略:
1)BufferOverflow.DROP_LATEST 丟棄最新數據;
Emitter 在發送 4 時,因為 Buffer 已滿,所以只能按照策略將最新的數據 4 丟棄。而在發送 3 時,由於 1 已經被消費過,所以可以從 Buffer 中移除,從而騰出存儲空間緩存 3。
2)BufferOverflow.DROP_OLDEST 丟棄最老數據: 這個策略就比較簡單,Buffer 中只會存儲最新的數據。不管較老的數據是否被消費,當 Buffer 已滿而又有新的數據到達時,老數據都會從 Buffer 中移除,騰出空間讓給新數據。
注意點:當 replay、extra 都為 0,即沒有 Buffer 的時候,那麼 onBufferOverflow 只能是 BufferOverflow.SUSPEND。丟棄策略啟動的前提是 SharedFlow 至少有 Buffer 且 Buffer 已滿。
1.4 emit 與 tryEmit
由前一節可知,當 SharedFlow 的 Buffer 已滿且 onBufferOverflow 為 BufferOverflow.SUSPEND 的時候,emit
會被掛起(emit
是個掛起函數),但這會影響到 Emitter 的速度。如果不想在發送數據的時候被掛起,除了設置 onBufferOverflow 丟棄策略外,還可以使用 tryEmit
方法。
```kotlin
//code 9
override fun tryEmit(value: T): Boolean {
var resumes: Array
@Suppress("UNCHECKED_CAST")
private fun tryEmitLocked(value: T): Boolean {
// Fast path without collectors -> no buffering
// 1.沒有訂閲者時,直接返回 true,因為沒有人接收,發了也沒用,也不用緩存
if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
// With collectors we'll have to buffer
// 2.有訂閲者,就得考慮緩存發送的值了
// cannot emit now if buffer is full & blocked by slow collectors
// 3.如果緩存空間已滿,且訂閲者還在掛起處理上次的數據,則不能 emit
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
when (onBufferOverflow) {
BufferOverflow.SUSPEND -> return false // will suspend
BufferOverflow.DROP_LATEST -> return true // just drop incoming
BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
}
}
// 4.代碼能走到這裏,説明緩存還有空間或丟棄策略為DROP_OLDEST
enqueueLocked(value)
bufferSize++ // value was added to buffer
// drop oldest from the buffer if it became more than bufferCapacity
if (bufferSize > bufferCapacity) dropOldestLocked()
// keep replaySize not larger that needed
if (replaySize > replay) { // increment replayIndex by one
updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
}
return true
}
``
由代碼可見
tryEmit不是一個掛起函數,它有返回值,如果返回 true 則説明發送數據成功了;如果返回 false,則説明這時發送數據需要被掛起等待。其中最主要的就是
tryEmitLocked` 方法。
tryEmitLocked
方法主要邏輯已在註釋中説明,需要額外説明的是,bufferCapacity
就是 replay + extraBufferCapacity 的大小;replayIndex
指的是最近開始訂閲的訂閲者在 replay cache 緩存數組中需要重播的最小 index。所以當使用默認構造的 SharedFlow 時,replay
和 extraBufferCapacity
都為 0,如果這時再使用 tryEmit
方法進行發送,則會使得 if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex)
判斷為 true,默認的丟棄策略又是 BufferOverflow.SUSPEND,就會導致這裏會直接返回 false,永遠都不會發送出值。所以,在使用默認構造的 SharedFlow 時,不能使用 tryEmit 發送值,否則無法發送。 一般使用 emit
即可。
在 SharedFlow 具體實現中,emit
方法就是先嚐試使用 tryEmit
來發送值,如果不能馬上發送再使用掛起函數 emitSuspend
方法:
kotlin
//code 10 class SharedFlowImpl
override suspend fun emit(value: T) {
if (tryEmit(value)) return // fast-path
emitSuspend(value)
}
2. StateFlow
看完 SharedFlow 再來看 StateFlow 的話就比較簡單了。因為 StateFlow 就是 SharedFlow 的一種特殊子類,特點有三:
1)它的 replay cache 容量為 1;即可緩存最近的一次粘性事件;
2)初始化時必須給它設置一個初始值;
3)每次發送數據都會與上次緩存的數據作比較,如果不一樣才會發送,自動過濾掉沒有發生變化的數據。
它還可直接訪問它自己的 value 參數獲取當前結果值,總體來説,在使用上與 LiveData 相似,下面是它倆的異同點對比。
2.1 與 LiveData 比較的相同點
- 均提供了 可讀可寫 和 僅可讀 兩個版本:MutableStateFlow、StateFlow 與 MutableLiveData、LiveData;
- 允許被多個觀察者觀察,即生產者對消費者可以為一對多的關係;
- 都只會把最新的值給到觀察者,即使沒有觀察者,也會更新自己的值;
- 都會產生粘性事件問題;
- 都可能產生丟失值的問題;
粘性事件問題:因為 StateFlow 初始化時必須給定初始值,且 replay
為 1,所以每個觀察者進行觀察時,都會收到最近一次的回播數據。如果想避免粘性事件問題,換用 SharedFlow 即可,replay
使用默認值 0 。
值丟失問題:出現在消費者處理數據比生產者生產數據慢的情況,消費者來不及處理數據,就會把之前生產者發送的舊數據丟棄掉,看個例子: ```kotlin //code 11 private fun stateFlowDemo1() { val stateFlow = MutableStateFlow(0) CoroutineScope(Dispatchers.Default).launch { var count = 1 while (true) { val tmp = count++ delay(1000) println("+++++ tmp = $tmp") stateFlow.value = tmp } }
CoroutineScope(Dispatchers.Default).launch {
stateFlow.collect{
println("++++ count = $it")
delay(5000) //模擬耗時操作
}
}
}
``` 可以從打印結果看出,StateFlow 會丟棄掉生產者之前發送的值,其實 MutableStateFlow 的丟棄策略就是設置的 BufferOverflow.DROP_OLDEST。
2.2 與 LiveData 比較的不同點
- StateFlow 必須在構建的時候傳入初始值,LiveData 不需要;
- StateFlow 默認是防抖的,LiveData 默認不防抖;
- 對於 Android 來説 StateFlow 默認沒有和生命週期綁定,直接使用會有問題;
StateFlow 默認防抖:即如果發送的值與上次相同,則生產者並不會真正發送。在源碼中也有説明,具體在 StateFlow.kt -> class StateFlowImpl
-> private fun updateState
-> if (oldState == newState) return true
感興趣的可以自行查閲,我看的版本是 1.5.0.
與 LiveData 相比,沒有和 Activity 的生命週期綁定恐怕是使用 StateFlow 最不方便的地方了。當 View 進入 STOPPED
狀態時,LiveData.observe()
會自動取消註冊使用方,這樣就不會再接收到數據了,也符合常理。因為用户此時已經離開頁面,再接收數據已沒有意義,如果繼續處理後續邏輯可能還會出 bug。
而如果使用的是 StateFlow 或其他數據流,在 View 進入 STOPPED
狀態時,收集數據的操作並不會自動停止。如需實現相同的行為,則需要從 Lifecycle.repeatOnLifecycle 塊收集數據流。如下是來自官方文檔的例子:
kotlin
//code 12
class LatestNewsActivity : AppCompatActivity() {
private val latestNewsViewModel = // getViewModel()
override fun onCreate(savedInstanceState: Bundle?) {
...
// Start a coroutine in the lifecycle scope
lifecycleScope.launch {
// repeatOnLifecycle launches the block in a new coroutine every time the
// lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
repeatOnLifecycle(Lifecycle.State.STARTED) {
// Trigger the flow and start listening for values.
// Note that this happens when lifecycle is STARTED and stops
// collecting when the lifecycle is STOPPED
latestNewsViewModel.uiState.collect { uiState ->
// New value received
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
}
//注意:repeatOnLifecycle API 僅在 androidx.lifecycle:lifecycle-runtime-ktx:2.4.0 庫及更高版本中提供。
英文部分註釋説的比較明確了,repeatOnLifecycle(Lifecycle.State.STARTED)
的作用就是每次進入 STARTED
可見狀態時都會重新觀察並收集數據;而在 STOPPED
狀態時就會 cancel 掉 StateFlow 收集流所在的協程從而停止收集。
總結
最後總結一下 Flow 第二小節的內容吧:
1)熱流有無消費者都可發送數據,生產者和消費者的關係可以是一對多;
2)SharedFlow 可構建熱流,可設置 replay 重播數據量及 extraBufferCapacity 緩衝區大小,以及 onBufferOverflow 緩衝區滿的策略;
3)emit
與 tryEmit
發送方法的異同,前者是掛起函數,注意在使用默認構造的 SharedFlow 時不要使用 tryEmit
;
4)StateFlow 是 SharedFlow 的一個子類,replay = 1,必須給定初始值,自帶防抖;
5)使用 StateFlow 或 SharedFlow 收集值時,記得在 repeatOnLifecycle(Lifecycle.State.STARTED)
方法中,防止出現崩潰等問題。
更多內容,歡迎關注公眾號:修之竹
贊人玫瑰,手留餘香!歡迎點贊、轉發~ 轉發請註明出處~
參考文獻
- Reactive Streams on Kotlin: SharedFlow and StateFlow; Ricardo Costeira; http://www.raywenderlich.com/22030171-reactive-streams-on-kotlin-sharedflow-and-stateflow
- Kotlin中 Flow、SharedFlow與StateFlow區別;五問;http://juejin.cn/post/7142038525997744141
- 一看就懂!圖解 Kotlin SharedFlow 緩存系統;fundroid;http://juejin.cn/post/7156408785886511111
- Kotlin:深入理解StateFlow與SharedFlow,StateFlow和LiveData使用差異區分,SharedFlow實現源碼解析; pumpkin的玄學; http://blog.csdn.net/weixin_44235109/article/details/121594988?spm=1001.2014.3001.5502
- StateFlow 和 SharedFlow 官方文檔 http://developer.android.google.cn/kotlin/flow/stateflow-and-sharedflow?hl=zh-cn