一看就懂!圖解 Kotlin SharedFlow 快取系統
theme: vuepress
我正在參加「掘金·啟航計劃」
前言
Kotlin 為我們提供了兩種建立“熱流”的工具:StateFlow 和 SharedFlow。StateFlow 經常被用來替代 LiveData 充當架構元件使用,所以大家相對熟悉。其實 StateFlow 只是 SharedFlow 的一種特化形式,SharedFlow 的功能更強大、使用場景更多,這得益於其自帶的快取系統,本文用圖解的方式,帶大家更形象地理解 SharedFlow 的快取系統。
建立 SharedFlow 需要使用到 MutableSharedFlow()
方法,我們通過方法的三個引數配置快取:
kotlin
fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
接下來,我們通過時序圖的形式介紹這三個關鍵引數對快取的影響。正文之前讓我們先統一一下用語:
- Emitter:Flow 資料的生產者,從上游發射資料
- Subcriber:Flow 資料的消費者,在下游接收資料
replay
當 Subscriber 訂閱 SharedFlow 時,有機會接收到之前已傳送過的資料,replay 指定了可以收到 subscribe 之前資料的數量。replay 不能為負數,預設值為 0 表示 Subscriber 只能接收到 subscribe 之後 emit 的資料:
上圖展示的是 replay = 0 的情況,Subscriber 無法收到 subscribe 之前 emit 的 ❶,只能接收到 ❷ 和 ❸。
當 replay = n ( n > 0)時,SharedFlow 會啟用快取,此時 BufferSize 為 n,意味著可以快取發射過的最近 n 個數據,併發送給新增的 Subscriber。
上圖以 n = 1 為例 :
- Emitter 傳送 ❶ ,並被 Buffer 快取
- Subscriber 訂閱 SharedFlow 後,接收到快取的 ❶
- Emitter 相繼傳送 ❷ ❸ ,Buffer 快取的資料相繼依次被更新
在生產者消費者模型中,有時消費的速度趕不及生產,此時要加以控制,要麼停止生產,要麼丟棄資料。SharedFlow 也同樣如此。有時 Subscriber 的處理速度較慢,Buffer 快取的資料得不到及時處理,當 Buffer 為空時,emit 預設將會被掛起 ( onBufferOverflow = SUSPEND)
上面的圖展示了 replay = 1 時 emit 發生 suspend 場景:
- Emitter 傳送 ❶ 並被快取
- Subscriber 訂閱 SharedFlow ,接收 replay 的 ❶ 開始處理
- Emitter 傳送 ❷ ,快取資料更新為 ❷ ,由於 Subscriber 對 ❶ 的處理尚未結束,❷ 在快取中沒有及時被消費
- Emitter 傳送 ❸,由於快取的 ❷ 尚未被 Subscriber 消費,emit 發生掛起
- Subscriber 開始消費 ❷ ,Buffer 快取 ❸ , Emitter 可以繼續 emit 新資料
注意 SharedFlow 作為一個多播可以有多個 Subscriber,所以上面例子中,❷ 被消費的時間點,取決於最後一個開始處理的 Subscriber。
extraBufferCapacity
extraBufferCapacity 中的 extra 表示 replay-cache 之外為 Buffer 還可以額外追加的快取。
若 replay = n, extraBufferCapacity = m,則 BufferSize = m + n。
extraBufferCapacity 預設為 0,設定 extraBufferCapacity 有助於提升 Emitter 的吞吐量
在上圖的基礎之上,我們再設定 extraBufferCapacity = 1,效果如下圖:
上圖中 BufferSize = 1 + 1 = 2 :
- Emitter 傳送 ❶ 並得到 Subscriber1 的處理 ,❶ 作為 replay 的一個數據被快取,
- Emitter 傳送 ❷,Buffer 中 replay-cache 的資料更新為 ❷
- Emitter 傳送 ❸,Buffer 在儲存了 replay 資料 ❷ 之上,作為 extra 又儲存了 ❸
- Emitter 傳送 ❹,此時 Buffer 已沒有空餘位置,emit 掛起
- Subscriber2 訂閱 SharedFlow。雖然此時 Buffer 中存有 ❷ ❸ 兩個資料,但是由於 replay = 1,所以 Subscriber2 只能收到最近的一個數據 ❸
- Subscriber1 處理完 ❶ 後,依次處理 Buffer 中的下一個資料,開始消費 ❷
- 對於 SharedFlow 來說,已經不存在沒有消費 ❷ 的 Subscriber,❷ 移除快取,❹ 的 emit 繼續,並進入快取,此時 Buffer 又有兩個資料 ❸ ❹ ,
- Subscriber1 處理完 ❷ ,開始消費 ❸
- 不存在沒有消費 ❸ 的 Subscriber, ❸ 移除快取。
onBufferOverflow
前面的例子中,當 Buffer 被填滿時,emit 會被掛起,這都是建立在 onBufferOverflow 為 SUSPEND 的前提下的。onBufferOverflow 用來指定快取移除時的策略,除了預設的 SUSPEND,還有兩個資料丟棄策略:
- DROP_LATEST:丟棄最新的資料
- DROP_OLDEST:丟棄最老的資料
需要特別注意的是,當 BufferSize = 0 時,extraBufferCapacity 只支援 SUSPEND,其他丟棄策略是無效的。這很好理解,因為 Buffer 中沒有資料,所以丟棄無從下手,所以啟動丟棄策略的前提是 Buffer 至少有一個緩衝區,且資料被填滿
上圖展示 DROP_LATEST 的效果。假設 replay = 2,extra = 0
- Emitter 傳送 ❸ 時,由於 ❶ 已經被消費,所以 Buffer 資料從 ❶❷ 變為 ❷❸
- Emitter 傳送 ❹ 時,由於 ❷ 還未被消費,Buffer 處於填滿狀態, ❹ 直接被丟棄
- Emitter 傳送 ❺ 時,由於 ❷ 已經被費,可以移除快取,Buffer 資料變為 ❸❺
上圖展示了 DROP_OLDEST 的效果,與 DROP_LATEST 比較後非常明顯,快取中永遠會儲存最新的兩個資料,但是較老的資料不管有沒有被消費,都可能會從 Buffer 移除,所以 Subscriber 可以消費當前最新的資料,但是有可能漏掉中間的資料,比如圖中漏掉了 ❷
注意:當 extraBufferCapacity 設為 SUSPEND 可以保證 Subscriber 一個不漏的消費掉所有資料,但是會影響 Emitter 的速度;當設定為 DROP_XXX 時,可以保證 emit 呼叫後立即返回,但是 Subscriber 可能會漏掉部分資料。
如果我們不想讓 emit 發生掛起,除了設定 DROP_XXX 之外,還有一個方法就是呼叫 tryEmit
,這是一個非 suspend 版本的 emit
```kotlin abstract suspend override fun emit(value: T)
abstract fun tryEmit(value: T): Boolean ```
tryEmit 返回一個 boolean 值,你可以這樣判斷返回值,當使用 emit 會掛起時,使用 tryEmit 會返回 false,其餘情況都是 true。這意味著 tryEmit 返回 false 的前提是 extraBufferCapacity 必須設為 SUSPEND,且 Buffer 中空餘位置為 0 。此時使用 tryEmit 的效果等同於 DROP_LATEST。
SharedFlow Buffer
前面介紹的 MutableSharedFlow 的三個引數,其本質都是圍繞 SharedFlow 的 Buffer 進行工作的。那麼這個 Buffer 具體結構是怎樣的呢?
上面這個圖是 SharedFlow 原始碼中關於 Buffer 的註釋,這個圖形象地告訴了我們 Buffer 是一個線性資料結構(就是一個普通的陣列 Array<Any?>
),但是這個圖不能直觀反應 Buffer 執行機制。下面通過一個例子,看一下 Buffer 在執行時的具體更新過程:
```kotlin
val sharedFlow = MutableSharedFlow
fun main() { runBlocking { launch { sharedFlow.onEach { delay(200) // simulate the consume of data }.collect() }
repeat(12) {
sharedFlow.emit(emitValue)
emitValue++
delay(50)
}
}
} ```
上面的程式碼很簡單,SharedFlow 的 BufferSize = 2+2 = 4,Emitter 生產的速度大於 Subscriber 消費的速度,所以過程中會出現 Buffer 的填充和更新,下面依舊用圖的方式展示 Buffer 的變化
先看一下程式碼對應的時序圖:
有前面的介紹,相信這個時序圖很容易理解,這裡就不再贅述了,下面重點圖解一下 Buffer 的記憶體變化。SharedFlow 的 Buffer 本質上是一個基於 Array 實現的 queue,通過指標移動從往佇列增刪元素,避免了元素在實際陣列中的移動。這裡關鍵的指標有三個:
- head:佇列的 head 指向 Buffer 的第一個有效資料,這是時間上最早進入快取的資料,在資料被所有的 Subscriber 消費之前不會移除快取。因此 head 也代表了最慢的 Subscriber 的處理進度
- replay:Buffer 為 replay-cache 預留空間的其實位置,當有新的 Subscriber 訂閱發生時,從此位置開始處理資料。
- end:新資料進入快取時的位置,end 這也代表了最快的 Subscriber 的處理進度。
如果 bufferSize 表示當前 Buffer 中儲存資料的個數,則我們可知三指標 index 符合如下關係: - replay <= head + bufferSize
- end = head + bufferSize
瞭解了三指標的含義後,我們再來看上圖中的 Buffer 是如何工作的:
最後,總結一下 Buffer 的特點: - 基於陣列實現,當陣列空間不夠時進行 2n 的擴容 - 元素進入陣列後的位置保持不變,通過移動指標,決定資料的消費起點 - 指標移動到陣列尾部後,會重新指向頭部,陣列空間可迴圈使用
- Android Studio Electric Eel 起支援手機投屏
- Compose 為什麼可以跨平臺?
- 一看就懂!圖解 Kotlin SharedFlow 快取系統
- 深入淺出 Compose Compiler(2) 編譯器前端檢查
- 深入淺出 Compose Compiler(1) Kotlin Compiler & KCP
- Jetpack MVVM七宗罪之三:在 onViewCreated 中載入資料
- 為什麼說 Compose 的宣告式程式碼最簡潔 ?Compose/React/Flutter/SwiftUI 語法對比
- Compose 型別穩定性註解:@Stable & @Immutable
- Fragment 這些 API 已廢棄,你還在使用嗎?
- 告別KAPT!使用 KSP 為 Kotlin 編譯提速
- 探索 Jetpack Compose 核心:深入 SlotTable 系統
- 盤點 Material Design 3 帶來的新變化
- Compose 動畫邊學邊做 - 夏日彩虹
- Google I/O :Android Jetpack 最新變化(二) Performance
- Google I/O :Android Jetpack 最新變化(一) Architecture
- Google I/O :Android Jetpack 最新變化(四)Compose
- Google I/O :Android Jetpack 最新變化(三)UI
- 一文看懂 Jetpack Compose 快照系統
- 聊聊 Kotlin 代理的“缺陷”與應對
- AAB 扶正!APK 再見!