一文讀懂Kotlin的數據流

語言: CN / TW / HK

一、Android分層架構

不管是早期的MVC、MVP,還是最新的MVVM和MVI架構,這些框架一直解決的都是一個數據流的問題。一個良好的數據流框架,每一層的職責是單一的。例如,我們可以在表現層(Presentation Layer)的基礎上添加一個領域層(Domain Layer) 來保存業務邏輯,使用數據層(Data Layer)對上層屏蔽數據來源(數據可能來自遠程服務,可能是本地數據庫)。

在Android中,一個典型的Android分層架構圖如下:

image.png

其中,我們需要重點看下Presenter 和 ViewModel, Presenter 和 ViewModel向 View 提供數據的機制是不同的。

  • Presenter: Presenter通過持有 View 的引用並直接調用操作 View,以此向 View 提供和更新數據。
  • ViewModel:ViewModel 通過將可觀察的數據暴露給觀察者來向 View 提供和更新數據。

目前,官方提供的可觀察的數據組件有LiveData、StateFlow和SharedFlow。可能大家對LiveData比較熟悉,配合ViewModel可以很方便的實現數據流的流轉。不過,LiveData也有很多常見的缺陷,並且使用場景也比較固定,如果網上出現了KotlinFlow 替代 LiveData的聲音。那麼 Flow 真的會替代 LiveData嗎?Flow 真的適合你的項目嗎?看完下面的分析後,你定會有所收穫。

二、ViewModel + LiveData

ViewModel的作用是將視圖和邏輯進行分離,Activity或者Fragment只負責UI顯示部分,網絡請求或者數據庫操作則有ViewModel負責。ViewModel旨在以注重生命週期的方式存儲和管理界面相關的數據,讓數據可在發生屏幕旋轉等配置更改後繼續留存。並且ViewModel不持有View層的實例,通過LiveData與Activity或者Fragment通訊,不需要擔心潛在的內存泄漏問題。

而LiveData 則是一種可觀察的數據存儲器類,與常規的可觀察類不同,LiveData 具有生命週期感知能力,它遵循其他應用組件(如 Activity、Fragment 或 Service)的生命週期。這種感知能力可確保LiveData當數據源發生變化的時候,通知它的觀察者更新UI界面。同時它只會通知處於Active狀態的觀察者更新界面,如果某個觀察者的狀態處於Paused或Destroyed時那麼它將不會收到通知,所以不用擔心內存泄漏問題。

下面是官方發佈的架構組件庫的生命週期的説明:

image.png

2.1 LiveData 特性

通過前面的介紹可以知道,LiveData 是 Android Jetpack Lifecycle 組件中的內容,具有生命週期感知能力。一句話概括就是:LiveData 是可感知生命週期的,可觀察的,數據持有者。特點如下: - 觀察者的回調永遠發生在主線程 - 僅持有單個且最新的數據 - 自動取消訂閲 - 提供「可讀可寫」和「僅可讀」兩個版本收縮權限 - 配合 DataBinding 實現「雙向綁定」

觀察者的回調永遠發生在主線程

因為LiveData 是被用來更新 UI的,因此 Observer 接口的 onChanged() 方法必須在主線程回調。

public interface Observer<T> { void onChanged(T t); }

背後的道理也很簡單,LiveData 的 setValue() 發生在主線程(非主線程調用會拋異常),而如果調用postValue()方法,則它的內部會切換到主線程調用 setValue()。

protected void postValue(T value) { boolean postTask; synchronized (mDataLock) { postTask = mPendingData == NOT_SET; mPendingData = value; } if (!postTask) { return; } ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable); }

可以看到,postValue()方法的內部調用了postToMainThread()實現線程的切換,之後遍歷所有觀察者的 onChanged() 方法。

僅持有單個且最新數據

作為數據持有者,LiveData僅持有【單個且最新】的數據。單個且最新,意味着 LiveData 每次只能持有一個數據,如果有新數據則會覆蓋上一個。並且,由於LiveData具備生命週期感知能力,所以觀察者只會在活躍狀態下(STARTED 到 RESUMED)才會接收到 LiveData 最新的數據,在非活躍狀態下則不會收到。

自動取消訂閲

可感知生命週期的重要優勢就是可以自動取消訂閲,這意味着開發者無需手動編寫那些取消訂閲的模板代碼,降低了內存泄漏的可能性。背後的實現邏輯是在生命週期處於 DESTROYED 時,移除觀察者。

@Override public void onStateChanged(@NonNull LifecycleOwner source, @NonNull Lifecycle.Event event) { Lifecycle.State currentState = mOwner.getLifecycle().getCurrentState(); if (currentState == DESTROYED) { removeObserver(mObserver); return; } ... //省略其他代碼 }

提供「可讀可寫」和「僅可讀」兩種方式

LiveData 提供了setValue() 和 postValue()兩種方式來操作實體數據,而為了細化權限,LiveData又提供了mutable(MutableLiveData) 和 immutable(LiveData) 兩個類,前者「可讀可寫」,後者則「僅可讀」。

image.png

配合 DataBinding 實現「雙向綁定」

LiveData 配合 DataBinding 可以實現更新數據自動驅動UI變化,如果使用「雙向綁定」還能實現 UI 變化影響數據的變化功能。

2.2 LiveData的缺陷

正如前面説的,LiveData有自己的使用場景,只有滿足使用場景才會最大限度的發揮它的功能,而下面這些則是在設計時將自帶的一些缺陷: - value 可以是 nullable 的 - 在 fragment 訂閲時需要傳入正確的 lifecycleOwner - 當 LiveData 持有的數據是「事件」時,可能會遇到「粘性事件」 - LiveData 是不防抖的 - LiveData 的 transformation 需要工作在主線程

value 可以是 nullable 的

由於LiveData的getValue() 是可空的,所以在使用時應該注意判空,否則容易出現空指針的報錯。

@Nullable public T getValue() { Object data = mData; if (data != NOT_SET) { return (T) data; } return null; }

傳入正確的 lifecycleOwner

Fragment 調用 LiveData的observe() 方法時傳入 this 和 viewLifecycleOwner 的含義是不一樣的。因為Fragment與Fragment中的View的生命週期並不一致,有時候我們需要的讓observer感知Fragment中的View的生命週期而非Fragment。

粘性事件

粘性事件的定義是,發射的事件如果早於註冊,那麼註冊之後依然可以接收到的事件,這一現象稱為粘性事件。解決辦法是:將事件作為狀態的一部分,在事件被消費後,不再通知觀察者。推薦兩種解決方式: - KunMinX/UnPeek-LiveData - 使用kotlin 擴展函數和 typealias 封裝解決「粘性」事件的 LiveData

默認不防抖

當setValue()/postValue() 傳入相同的值且多次調用時,觀察者的 onChanged() 也會被多次調用。不過,嚴格來講,這也不算一個問題,我們只需要在調用 setValue()/postValue() 前判斷一下 vlaue 與之前是否相同即可。

transformation 工作在主線程

有些時候,我們需要對從Repository 層得到的數據進行處理。例如,從數據庫獲得 User列表,我們需要根據 id 獲取某個 User, 那麼就需要用到MediatorLiveData 和 Transformatoins 來實現。 - Transformations.map - Transformations.switchMap

並且,map 和 switchMap 內部均是使用 MediatorLiveData的addSource() 方法實現的,而該方法會在主線程調用,使用不當會有性能問題。

@MainThread public <S> void addSource(@NonNull LiveData<S> source, @NonNull Observer<? super S> onChanged) { Source<S> e = new Source<>(source, onChanged); Source<?> existing = mSources.putIfAbsent(source, e); if (existing != null && existing.mObserver != onChanged) { throw new IllegalArgumentException( "This source was already added with the different observer"); } if (existing != null) { return; } if (hasActiveObservers()) { e.plug(); } }

2.3 LiveData 小結

LiveData 是一種可觀察的數據存儲器類,與常規的可觀察類不同,LiveData 具有生命週期感知能力,它遵循其他應用組件(如 Activity、Fragment 或 Service)的生命週期。這種感知能力可確保LiveData當數據源發生變化的時候,通知它的觀察者更新UI界面。同時它只會通知處於Active狀態的觀察者更新界面,如果某個觀察者的狀態處於Paused或Destroyed時那麼它將不會收到通知,所以不用擔心內存泄漏問題。

同時,LiveData 專注單一功能,因此它的一些方法使用上是有侷限性的,並且需要配合 ViewModel 使用才能顯示其價值。

三、Flow

3.1 簡介

Flow是Google官方提供的一套基於kotlin協程的響應式編程模型,它與RxJava的使用類似,但相比之下Flow使用起來更簡單,另外Flow作用在協程內,可以與協程的生命週期綁定,當協程取消時,Flow也會被取消,避免了內存泄漏風險。

協程是輕量級的線程,本質上協程、線程都是服務於併發場景下,其中協程是協作式任務,線程是搶佔式任務。默認協程用來處理實時性不高的數據,請求到結果後整個協程就結束了。比如,有下面一個例子:

image.png

其中,紅框中需要展示的內容實時性不高,而需要交互的,比如轉發和點贊屬於實時性很高的數據需要定時刷新。對於實時性不高的場景,直接使用 Kotlin 的協程處理即可,比如。

``` suspend fun loadData(): Data

uiScope.launch { val data = loadData() updateUI(data) } ```

而對於實時性要求較高的場景,上面的方式就不起作用了,此時需要用到Kotlin提供的Flow數據流。

fun dataStream(): Flow<Data>uiScope.launch { dataStream().collect { data -> updateUI(data) } }

3.2 基本概念

Kotlin的數據流主要由三個成員組成,分別是生產者、消費者和中介。 生產者:生成添加到數據流中的數據,可以配合得協程使用,使用異步方式生成數據。 中介(可選):可以修改發送到數據流的值,或修正數據流本身。 消費者:使用方則使用數據流中的值。

其中,中介可以對數據流中的數據進行更改,甚至可以更改數據流本身,他們的架構示意圖如下。

image.png

在Kotlin中,Flow 是一種冷流,不過有一種特殊的Flow( StateFlow/SharedFlow) 是熱流。什麼是冷流,他和熱流又有什麼關係呢?

冷流:只有訂閲者訂閲時,才開始執行發射數據流的代碼。並且冷流和訂閲者只能是一對一的關係,當有多個不同的訂閲者時,消息是重新完整發送的。也就是説對冷流而言,有多個訂閲者的時候,他們各自的事件是獨立的。 熱流:無論有沒有訂閲者訂閲,事件始終都會發生。當 熱流有多個訂閲者時,熱流與訂閲者們的關係是一對多的關係,可以與多個訂閲者共享信息。

3.3 StateFlow

前面説過,冷流和訂閲者只能是一對一的關係,當我們要實現一個流多個訂閲者的場景時,就需要使用熱流了。

StateFlow 是一個狀態容器式可觀察數據流,可以向其收集器發出當前狀態更新和新狀態更新。可以通過其 value 屬性讀取當前狀態值,如需更新狀態並將其發送到數據流,那麼就需要使用MutableStateFlow。

3.3.1 基本使用

在Android 中,StateFlow 非常適合需要讓可變狀態保持可觀察的類。由於StateFlow並不是系統API,所以使用前需要添加依賴:

``` dependencies { ... //省略其他

implementation "androidx.activity:activity-ktx:1.3.1"
implementation "androidx.fragment:fragment-ktx:1.4.1"
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.1'

} ```

接着,我們需要創建一個ViewModel,比如:

class StateFlowViewModel: ViewModel() { val data = MutableStateFlow<Int>(0) fun add(v: View) { data.value++ } fun del(v: View) { data.value-- } }

可以看到,我們使用MutableStateFlow包裹需要操作的數據,並添加了add()和del()兩個方法。然後,我們再編寫一段測試代碼實現數據的修改,並自動刷新數據。

``` class StateFlowActivity : AppCompatActivity() { private val viewModel by viewModels() private val mBinding : ActivityStateFlowBinding by lazy { ActivityStateFlowBinding.inflate(layoutInflater) } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(mBinding.root) initFlow() } private fun initFlow() { mBinding.apply { btnAdd.setOnClickListener { viewModel.add(it) } btnDel.setOnClickListener { viewModel.del(it) } } }

} ```

上面代碼中涉及到的佈局代碼如下:

```

<FrameLayout
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    android:orientation="vertical">
    <TextView
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_marginLeft="200dp"
        android:layout_marginTop="30dp"
        android:text="@{String.valueOf(stateFlowViewModel.data)}"
        android:textSize="24sp" />

    <com.google.android.material.floatingactionbutton.FloatingActionButton
        android:id="@+id/btn_add"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="bottom|start"
        android:layout_marginStart="10dp"
        android:layout_marginBottom="10dp"
        android:contentDescription="start"
        android:src="@android:drawable/ic_input_add" />

    <com.google.android.material.floatingactionbutton.FloatingActionButton
        android:id="@+id/btn_del"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="bottom|end"
        android:layout_marginEnd="10dp"
        android:layout_marginBottom="10dp"
        android:contentDescription="cancel"
        android:src="@android:drawable/ic_menu_close_clear_cancel" />
</FrameLayout>

```

上面代碼中,我們使用了DataBing寫法,因此不需要再手動的綁定數據和刷新數據。

3.4 SharedFlow

3.4.1 SharedFlow基本概念

SharedFlow提供了SharedFlow 與 MutableSharedFlow兩個版本,平時使用較多的是MutableSharedFlow。它們的區別是,SharedFlow可以保留歷史數據,MutableSharedFlow 沒有起始值,發送數據時需要調用 emit()/tryEmit() 方法。

首先,我們來看看SharedFlow的構造函數:

public fun <T> MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow<T>

可以看到,MutableSharedFlow需要三個參數:

  • replay:表示當新的訂閲者Collect時,發送幾個已經發送過的數據給它,默認為0,即默認新訂閲者不會獲取以前的數據
  • extraBufferCapacity:表示減去replay,MutableSharedFlow還緩存多少數據,默認為0
  • onBufferOverflow:表示緩存策略,即緩衝區滿了之後Flow如何處理,默認為掛起。除此之外,還支持DROP_OLDEST 和DROP_LATEST 。

``` //ViewModel val sharedFlow=MutableSharedFlow() viewModelScope.launch{ sharedFlow.emit("Hello") sharedFlow.emit("SharedFlow") }

//Activity lifecycleScope.launch{ viewMode.sharedFlow.collect { print(it) } } ```

3.4.2 基本使用

SharedFlow並不是系統API,所以使用前需要添加依賴:

``` dependencies { ... //省略其他

implementation "androidx.activity:activity-ktx:1.3.1"
implementation "androidx.fragment:fragment-ktx:1.4.1"
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.1'

} ```

接下來,我們創建一個SharedFlow,由於需要一對多的進行通知,所以我們MutableSharedFlow,然後重寫postEvent()方法,代碼如下:

object LocalEventBus { private val events= MutableSharedFlow< Event>() suspend fun postEvent(event: Event){ events.emit(event) } } data class Event(val timestamp:Long)

接下來,我們再創建一個ViewModel,裏面添加startRefresh()和cancelRefresh()兩個方法,如下。

``` class SharedViewModel: ViewModel() { private lateinit var job: Job

fun startRefresh(){
    job=viewModelScope.launch (Dispatchers.IO){
        while (true){
            LocalEventBus.postEvent(Event(System.currentTimeMillis()))
        }
    }
}

fun cancelRefresh(){
    job.cancel()
}

} ```

前面説過,一個典型的Flow是由三部分構成的。所以,此處我們先新建一個用於數據消費的Fragment,代碼如下:

``` class FlowFragment: Fragment() { private val mBinding : FragmentFlowBinding by lazy { FragmentFlowBinding.inflate(layoutInflater) }

override fun onCreateView(
    inflater: LayoutInflater, container: ViewGroup?,
    savedInstanceState: Bundle?
): View? {
    return mBinding.root
}
override fun onStart() {
    super.onStart()
    lifecycleScope.launchWhenCreated {
        LocalEventBus.events.collect {
            mBinding.tvShow.text=" ${it.timestamp}"
        }
    }
}

} ```

FlowFragment的主要作用就是接收LocalEventBus的數據,並顯示到視圖上。接下來,我們還需要創建一個數據的生產者,為了簡單,我們只在生產者頁面中開啟協程,代碼如下:

``` class FlowActivity : AppCompatActivity() { private val viewModel by viewModels() private val mBinding : ActivityFlowBinding by lazy { ActivityFlowBinding.inflate(layoutInflater) }

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(mBinding.root)
    initFlow()
}

private fun initFlow() {
    mBinding.apply {
        btnStart.setOnClickListener {
            viewModel.startRefresh()
        }
        btnStop.setOnClickListener {
            viewModel.cancelRefresh()
        }
    }
}

} ```

其中,FlowActivity代碼中涉及的佈局如下:

```

    <LinearLayout
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        android:orientation="vertical">

        <fragment
            android:name="com.xzh.demo.FlowFragment"
            android:layout_width="match_parent"
            android:layout_height="0dp"
            android:layout_weight="1" />
    </LinearLayout>

    <com.google.android.material.floatingactionbutton.FloatingActionButton
        android:id="@+id/btn_start"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="bottom|start"
        android:layout_marginStart="10dp"
        android:layout_marginBottom="10dp"
        android:src="@android:drawable/ic_input_add"
        android:contentDescription="start" />

    <com.google.android.material.floatingactionbutton.FloatingActionButton
        android:id="@+id/btn_stop"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="bottom|end"
        android:layout_marginEnd="10dp"
        android:layout_marginBottom="10dp"
        android:src="@android:drawable/ic_menu_close_clear_cancel"
        android:contentDescription="cancel" />
</FrameLayout>

```

最後,當我們運行上面的代碼時,就會在FlowFragment的頁面上顯示當前的時間戳,並且頁面的數據會自動進行刷新。

3.5 冷流轉熱流

前文説過,Kotlin的Flow是一種冷流,而StateFlow/SharedFlow則屬於熱流。那麼有人會問:怎麼將冷流轉化為熱流呢?答案就是kotlin提供的shareIn()和stateIn()兩個方法。

首先,來看一下StateFlow的shareIn的定義:

public fun <T> Flow<T>.stateIn( scope: CoroutineScope, started: SharingStarted, initialValue: T ): StateFlow<T>

shareIn方法將流轉換為SharedFlow,需要三個參數,我們重點看一下started參數,表示流啟動的條件,支持三種: - SharingStarted.Eagerly:無論當前有沒有訂閲者,流都會啟動,訂閲者只能接收到replay個緩衝區的值。 - SharingStarted.Lazily:當有第一個訂閲者時,流才會開始,後面的訂閲者只能接收到replay個緩衝區的值,當沒有訂閲者時流還是活躍的。 - SharingStarted.WhileSubscribed:只有滿足特定的條件時才會啟動。

接下來,我們在看一下SharedFlow的shareIn的定義:

public fun <T> Flow<T>.shareIn( scope: CoroutineScope, started: SharingStarted, replay: Int = 0 ): SharedFlow<T>

此處,我們重點看下replay參數,該參數表示轉換為SharedFlow之後,當有新的訂閲者的時候發送緩存中值的個數。

3.6 StateFlow與SharedFlow對比

從前文的介紹可以知道,StateFlow與SharedFlow都是熱流,都是為了滿足流的多個訂閲者的使用場景的,一時間讓人有些傻傻分不清,那StateFlow與SharedFlow究竟有什麼區別呢?總結起來,大概有以下幾點: - SharedFlow配置更為靈活,支持配置replay、緩衝區大小等,StateFlow是SharedFlow的特殊化版本,replay固定為1,緩衝區大小默認為0。 - StateFlow與LiveData類似,支持通過myFlow.value獲取當前狀態,如果有這個需求,必須使用StateFlow。 - SharedFlow支持發出和收集重複值,而StateFlow當value重複時,不會回調collect給新的訂閲者,StateFlow只會重播當前最新值,SharedFlow可配置重播元素個數(默認為0,即不重播)。

從上面的描述可以看出,StateFlow為我們做了一些默認的配置,而SharedFlow澤添加了一些默認約束。總的來説,SharedFlow相比StateFlow更靈活。

四、總結

目前,官方提供的可觀察的數據組件有LiveData、StateFlow和SharedFlow。LiveData是Android早期的數據流組件,具有生命週期感知能力,需要配合ViewModel才能實現它的價值。不過,LiveData也有很多使用場景缺陷,常見的有粘性事件、不支持防抖等。

於是,Kotlin在1.4.0版本,陸續推出了StateFlow與SharedFlow兩個組件,StateFlow與SharedFlow都是熱流,都是為了滿足流的多個訂閲者的使用場景,不過它們也有微妙的區別,具體參考前面內容的説明。

參考資料
Kotlin 的數據流