一看就懂!圖解 Kotlin SharedFlow 快取系統

語言: CN / TW / HK

theme: vuepress

我正在參加「掘金·啟航計劃」

前言

Kotlin 為我們提供了兩種建立“熱流”的工具:StateFlowSharedFlow。StateFlow 經常被用來替代 LiveData 充當架構元件使用,所以大家相對熟悉。其實 StateFlow 只是 SharedFlow 的一種特化形式,SharedFlow 的功能更強大、使用場景更多,這得益於其自帶的快取系統,本文用圖解的方式,帶大家更形象地理解 SharedFlow 的快取系統。

建立 SharedFlow 需要使用到 MutableSharedFlow() 方法,我們通過方法的三個引數配置快取:

kotlin fun <T> MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow<T> 接下來,我們通過時序圖的形式介紹這三個關鍵引數對快取的影響。正文之前讓我們先統一一下用語: - Emitter:Flow 資料的生產者,從上游發射資料 - Subcriber:Flow 資料的消費者,在下游接收資料

replay

當 Subscriber 訂閱 SharedFlow 時,有機會接收到之前已傳送過的資料,replay 指定了可以收到 subscribe 之前資料的數量。replay 不能為負數,預設值為 0 表示 Subscriber 只能接收到 subscribe 之後 emit 的資料:

上圖展示的是 replay = 0 的情況,Subscriber 無法收到 subscribe 之前 emit 的 ❶,只能接收到 ❷ 和 ❸。

當 replay = n ( n > 0)時,SharedFlow 會啟用快取,此時 BufferSize 為 n,意味著可以快取發射過的最近 n 個數據,併發送給新增的 Subscriber。

上圖以 n = 1 為例 :

  1. Emitter 傳送 ❶ ,並被 Buffer 快取
  2. Subscriber 訂閱 SharedFlow 後,接收到快取的 ❶
  3. Emitter 相繼傳送 ❷ ❸ ,Buffer 快取的資料相繼依次被更新

在生產者消費者模型中,有時消費的速度趕不及生產,此時要加以控制,要麼停止生產,要麼丟棄資料。SharedFlow 也同樣如此。有時 Subscriber 的處理速度較慢,Buffer 快取的資料得不到及時處理,當 Buffer 為空時,emit 預設將會被掛起 ( onBufferOverflow = SUSPEND)

上面的圖展示了 replay = 1 時 emit 發生 suspend 場景:

  1. Emitter 傳送 ❶ 並被快取
  2. Subscriber 訂閱 SharedFlow ,接收 replay 的 ❶ 開始處理
  3. Emitter 傳送 ❷ ,快取資料更新為 ❷ ,由於 Subscriber 對 ❶ 的處理尚未結束,❷ 在快取中沒有及時被消費
  4. Emitter 傳送 ❸,由於快取的 ❷ 尚未被 Subscriber 消費,emit 發生掛起
  5. Subscriber 開始消費 ❷ ,Buffer 快取 ❸ , Emitter 可以繼續 emit 新資料

注意 SharedFlow 作為一個多播可以有多個 Subscriber,所以上面例子中,❷ 被消費的時間點,取決於最後一個開始處理的 Subscriber。

extraBufferCapacity

extraBufferCapacity 中的 extra 表示 replay-cache 之外為 Buffer 還可以額外追加的快取。

若 replay = n, extraBufferCapacity = m,則 BufferSize = m + n

extraBufferCapacity 預設為 0,設定 extraBufferCapacity 有助於提升 Emitter 的吞吐量

在上圖的基礎之上,我們再設定 extraBufferCapacity = 1,效果如下圖:

上圖中 BufferSize = 1 + 1 = 2 :

  1. Emitter 傳送 ❶ 並得到 Subscriber1 的處理 ,❶ 作為 replay 的一個數據被快取,
  2. Emitter 傳送 ❷,Buffer 中 replay-cache 的資料更新為 ❷
  3. Emitter 傳送 ❸,Buffer 在儲存了 replay 資料 ❷ 之上,作為 extra 又儲存了 ❸
  4. Emitter 傳送 ❹,此時 Buffer 已沒有空餘位置,emit 掛起
  5. Subscriber2 訂閱 SharedFlow。雖然此時 Buffer 中存有 ❷ ❸ 兩個資料,但是由於 replay = 1,所以 Subscriber2 只能收到最近的一個數據 ❸
  6. Subscriber1 處理完 ❶ 後,依次處理 Buffer 中的下一個資料,開始消費 ❷
  7. 對於 SharedFlow 來說,已經不存在沒有消費 ❷ 的 Subscriber,❷ 移除快取,❹ 的 emit 繼續,並進入快取,此時 Buffer 又有兩個資料 ❸ ❹ ,
  8. Subscriber1 處理完 ❷ ,開始消費 ❸
  9. 不存在沒有消費 ❸ 的 Subscriber, ❸ 移除快取。

onBufferOverflow

前面的例子中,當 Buffer 被填滿時,emit 會被掛起,這都是建立在 onBufferOverflow 為 SUSPEND 的前提下的。onBufferOverflow 用來指定快取移除時的策略,除了預設的 SUSPEND,還有兩個資料丟棄策略:

  • DROP_LATEST:丟棄最新的資料
  • DROP_OLDEST:丟棄最老的資料

需要特別注意的是,當 BufferSize = 0 時,extraBufferCapacity 只支援 SUSPEND,其他丟棄策略是無效的。這很好理解,因為 Buffer 中沒有資料,所以丟棄無從下手,所以啟動丟棄策略的前提是 Buffer 至少有一個緩衝區,且資料被填滿

上圖展示 DROP_LATEST 的效果。假設 replay = 2,extra = 0

  1. Emitter 傳送 ❸ 時,由於 ❶ 已經被消費,所以 Buffer 資料從 ❶❷ 變為 ❷❸
  2. Emitter 傳送 ❹ 時,由於 ❷ 還未被消費,Buffer 處於填滿狀態, ❹ 直接被丟棄
  3. Emitter 傳送 ❺ 時,由於 ❷ 已經被費,可以移除快取,Buffer 資料變為 ❸❺

上圖展示了 DROP_OLDEST 的效果,與 DROP_LATEST 比較後非常明顯,快取中永遠會儲存最新的兩個資料,但是較老的資料不管有沒有被消費,都可能會從 Buffer 移除,所以 Subscriber 可以消費當前最新的資料,但是有可能漏掉中間的資料,比如圖中漏掉了 ❷

注意:當 extraBufferCapacity 設為 SUSPEND 可以保證 Subscriber 一個不漏的消費掉所有資料,但是會影響 Emitter 的速度;當設定為 DROP_XXX 時,可以保證 emit 呼叫後立即返回,但是 Subscriber 可能會漏掉部分資料。

如果我們不想讓 emit 發生掛起,除了設定 DROP_XXX 之外,還有一個方法就是呼叫 tryEmit,這是一個非 suspend 版本的 emit

```kotlin abstract suspend override fun emit(value: T)

abstract fun tryEmit(value: T): Boolean ```

tryEmit 返回一個 boolean 值,你可以這樣判斷返回值,當使用 emit 會掛起時,使用 tryEmit 會返回 false,其餘情況都是 true。這意味著 tryEmit 返回 false 的前提是 extraBufferCapacity 必須設為 SUSPEND,且 Buffer 中空餘位置為 0 。此時使用 tryEmit 的效果等同於 DROP_LATEST。

SharedFlow Buffer

前面介紹的 MutableSharedFlow 的三個引數,其本質都是圍繞 SharedFlow 的 Buffer 進行工作的。那麼這個 Buffer 具體結構是怎樣的呢?

上面這個圖是 SharedFlow 原始碼中關於 Buffer 的註釋,這個圖形象地告訴了我們 Buffer 是一個線性資料結構(就是一個普通的陣列 Array<Any?>),但是這個圖不能直觀反應 Buffer 執行機制。下面通過一個例子,看一下 Buffer 在執行時的具體更新過程:

```kotlin val sharedFlow = MutableSharedFlow( replay = 2, extraBufferCapacity = 2, onBufferOverflow = BufferOverflow.SUSPEND ) var emitValue = 1

fun main() { runBlocking { launch { sharedFlow.onEach { delay(200) // simulate the consume of data }.collect() }

    repeat(12) {
        sharedFlow.emit(emitValue)
        emitValue++
        delay(50)
    }
}

} ```

上面的程式碼很簡單,SharedFlow 的 BufferSize = 2+2 = 4,Emitter 生產的速度大於 Subscriber 消費的速度,所以過程中會出現 Buffer 的填充和更新,下面依舊用圖的方式展示 Buffer 的變化

先看一下程式碼對應的時序圖:

有前面的介紹,相信這個時序圖很容易理解,這裡就不再贅述了,下面重點圖解一下 Buffer 的記憶體變化。SharedFlow 的 Buffer 本質上是一個基於 Array 實現的 queue,通過指標移動從往佇列增刪元素,避免了元素在實際陣列中的移動。這裡關鍵的指標有三個:

  • head:佇列的 head 指向 Buffer 的第一個有效資料,這是時間上最早進入快取的資料,在資料被所有的 Subscriber 消費之前不會移除快取。因此 head 也代表了最慢的 Subscriber 的處理進度
  • replay:Buffer 為 replay-cache 預留空間的其實位置,當有新的 Subscriber 訂閱發生時,從此位置開始處理資料。
  • end:新資料進入快取時的位置,end 這也代表了最快的 Subscriber 的處理進度。

如果 bufferSize 表示當前 Buffer 中儲存資料的個數,則我們可知三指標 index 符合如下關係: - replay <= head + bufferSize
- end = head + bufferSize

瞭解了三指標的含義後,我們再來看上圖中的 Buffer 是如何工作的:

最後,總結一下 Buffer 的特點: - 基於陣列實現,當陣列空間不夠時進行 2n 的擴容 - 元素進入陣列後的位置保持不變,通過移動指標,決定資料的消費起點 - 指標移動到陣列尾部後,會重新指向頭部,陣列空間可迴圈使用