為什麼 RxJava 有 Single / Maybe 等單發資料型別,而 Flow 沒有?

語言: CN / TW / HK

theme: juejin highlight: androidstudio


Coroutine Flow 與 RxJava 都是流式資料處理框架, 所以經常被拿來做比較,本文比較一下他們的資料型別。

Rx 與 Flow 在單發資料支援上的不同

RxJava 支援多種資料型別 - Observable :流式資料型別 - Flowable:與 Observable 類似,支援背壓 - Single:單發資料型別,只能且必須發射一個數據 - Maybe:單發資料型別,發射零個或一個數據 - Completable:不發射任何資料,只通知流的結束。

以上,Single<T>Maybe<T> 以及 Completable 都至多隻能發射一個數據(單發資料型別)。而反觀 Coroutine Flow,只提供了 Flow<T> 這一種型別,對標 Observable<T>Flowable<T> (Flow 天然支援背壓)的流式資料。同為流式框架,為什麼 Rx 需要支援單發資料型別,而 Flow 不提供不支援呢?

Rx 支援單發資料主要源於以下三個原因(或目的),而在這幾點對於 Flow 卻構不成問題:

||RxJava 支援單發的原因| Flow 不支援單發的原因| |--|--|--| |執行緒切換|RxJava 同時是一個非同步框架,提供 observeOnsubscribeOn 等執行緒操作符。在 Java 時代,缺少稱手的多執行緒工具,Rx 對於單發資料也是最好的選擇之一。|進入 Kotlin 時代 Coroutine 提供了足夠的非同步處理工具,單發資料使用掛起函式實現足矣。Flow 的執行緒切換也是構築在 Coroutine 之上。| |程式碼直觀|RxJava 的操作符幫助單發資料實現鏈式呼叫,避免回撥。比如通過 zip, concat 等實現單發資料的組合,或者基於 switchIfEmpty 等實現單發資料的選擇邏輯等。|Coroutine 可以使用同步呼叫的方式完成非同步,無需再借助鏈式呼叫語法來規避回撥。| |型別轉換|很多業務場景都涉及單發與流式資料的轉換,RxJava 為這些轉換提供操作符支援。比如 toObservable 或者 flatMapObservable 將單發資料轉成流式資料,反之則可以通過 firsttoList 等將流式資料轉成單發資料|Flow 也提供了雙向轉換,而且更加簡單,比如 toList 直接輸出拆箱後的資料型別 T,無需為單發資料專門定義裝箱型別。|

總結起來,RxJava 在很多方面彌補了語言本身的不足,能力越大責任也越大,Rx 對於單發或是流式資料的場景都要有所考慮。而 Kotlin 通過 Coroutine 解決了大部分非同步場景的開發需要。Flow 只專心於流式資料處理即可,雖然你依然可以使用 Flow 接受或傳送單發資料,但是官方並不推薦這麼做,自然也就不提供額外的單發資料型別。

接下來,通過與 Rx 的對比來具體瞭解一下 Coroutine 是如何對單發資料提供支援的。

執行緒切換

下面通過例子對比一下 Rx 與 Coroutine 的執行緒切換。

首先,我們模擬一個 RxJava 中的單發資料請求:

kotlin fun readFromRemoteRx(data: String = "data"): Single<String> { return Single.create { it.onSuccess(data) } .delay(100, TimeUnit.MILLISECONDS) .doOnSuccess { println("read from remote: $it") } .subscribeOn(Schedulers.io()) } 如上,delay 模擬 IO 的延時,subscribeOn 指定資料請求發生在 IO 執行緒。

前面說過,執行緒切換這種事情已經不是 Flow 的主要職責了。在 Coroutine 中,單發請求使用掛起函式即可:

kotlin suspend fun readFromRemote(data: String = "data"): String { delay(100) println("read from remote: $data") return data } 如上,我們用掛起函式定義單發資料,在協程中通過 withContext 就可以切換到 IO 執行緒。

程式碼直觀

Coroutine 處理單發資料的程式碼相對於 Rx 更加簡潔。

選擇邏輯

先看一個單發資料選擇邏輯的例子, 在 Rx 我們通過操作符進行選擇: ```kotlin fun readFromCacheRx(data: String? = null): Maybe { return run { if (data != null) Maybe.just(data) else Maybe.empty() }.delay(100, TimeUnit.MILLISECONDS) .doOnSuccess { println("read from cache: $it") } }

fun test() { readFromCacheRx(null) // pass "data" to check when cache has data .switchIfEmpty(readFromRemoteRx()) .subscribeOn(Schedulers.io()) .test() } ```

如上,readFromCacheRx 使用 Maybe 型別模擬本地資料來源的請求結果,當本地沒有資料時請求網路遠端資料。 Rx 基於 switchIfEmpty 完成條件選擇邏輯,否則我們只能在非同步回撥中做判斷。

在 Kotlin 時代,我們在 Coroutine 中用掛起函式實現選擇邏輯:

```kotlin suspend fun readFromCache(data: String? = null): String? { delay(100) println("read from cache: $data") return data }

fun test() { runBlocking { withContext(Dispatchers.IO) { val data = readFromCache() ?: readFromRemote() } } } ```

readFromCache 返回一個 Nullable 型別,直接使用 ?: 即可,基於協程的同步呼叫優勢,可以命令式地寫任何控制語句。

組合邏輯

再看一個組合邏輯的例子,Rx 使用 zip 將兩個單發資料組合成成一個新的單發資料:

```kotlin fun test() { readFromRemoteRx().zipWith(readFromRemote2Rx()) { res, res2 -> "$res & $res2" }.doOnSuccess { println("read from remote: $it") } .subscribeOn(Schedulers.io()) .test() }

/* output:


read from remote: data & data */ ```

Coroutine 的實現同樣的邏輯則非常簡單,使用 async + await 用命令式語句即可:

kotlin fun test() { runBlocking { val res = async { readFromRemote() } val res2 = async { readFromRemote2() } val data = "${res.await()} & ${res.await()}" println("read from remote: $data") } }

型別轉換

接下來對比一下單發與流式的資料轉換。

單發 > 流式

Rx 可以使用 toObservable 或者 flatMapObservable 將單發型別轉成 Observable

```kotlin readFromCacheRx() .flatMapObservable { Observable.just(it) } .doOnNext { println("read from cache: $it") } .doOnComplete { println("complete") } .test()

/* output:


read from cache: null complete */ `` 由於readFromCacheRx沒有發射任何資料,所以沒有doOnNext` 的日誌輸出。

協程的單發轉流式資料很簡單,flow {...} 是 Flow 的構造器,內部可以直接呼叫掛起函式,如果需要還可以使用 withContext 切換執行緒。

kotlin runBlocking { flow { readFromCache()?.let { emit(it) } } .onCompletion { println("complete") } .collect { println("next: $it") } }

我們常常會組合多個單發資料來實現某些業務邏輯。比如 Rx 中使用 merge 組合多個數據源的讀取結果,當本地 Cache 有資料時會先行傳送,這有利於冷啟後的首屏快速顯示

kotlin Observable.merge( readFromCacheRx().toObservable(), readFromRemoteRx().toObservable() ).test()

同樣的邏輯,在 Flow 中同樣可以基於掛起函式實現。

kotlin flowOf( flow { emit(readFromRemote()) }, flow { emit(readFromRemote()) }) .flattenMerge() .collect { println("$it") }

流式 > 單發

Rx 中我們可以將一個 Observable 轉化成 Single 資料:

```kotlin fun test() { Observable.just(1, 2, 3) .toList() .doOnSuccess { println("$it") } .test()

Observable.just(1, 2, 3)
    .first()
    .doOnSuccess { println("$it") }
    .test()

}

/* output:


[1, 2, 3] 1 */ ```

Flow 也提供了類似的操作符比如 firsttoList 等,而且直接輸出拆箱後的資料,不必再通過 collect 進行收集

kotlin data = flowOf(1, 2, 3).toList() println("$data")

流式 > 單發 > 流式

有一些業務場景中,可能需要流式 > 單發 > 流式這樣的多次轉換,這裡面通常涉及 flatMapconcatMap 等的非同步轉換。

```kotlin Observable.just(1, 3, 5) .concatMapSingle { readFromRemoteRx("$it") } .doOnComplete { println("complete") } .subscribe { println("next: $it") }

/* output:


read from remote: 1 next: 1 read from remote: 3 next: 3 read from remote: 5 next: 5 complete */

```

上面例子中,我們在資料流中序列的進行了三次單發請求並返回結果。相對於序列的 concatMapSingle, Rx 同時還提供了並行版本的 flatMapSingle 。 同樣的邏輯如果用 Flow 實現,如下: kotlin runBlocking { flowOf(1, 3, 5) .flatMapConcat { flow { emit(readFromRemote("$it")) } } .onCompletion { println("complete") } .collect { println("next: $it") } } Flow 的 flatMapConcat 與 Rx 的同名方法功能一樣,都是將 flatMap 後的資料流再次進行序列方式。Flow 也提供了 flatMapMerge 處理並行的場景,相當於 Rx 中的 flatMap。出於命名清晰的考慮,Flow 的 flatMap 方法已經 Deprecate 改名為 flatMapMerge

flatMapConcatflatMapMerge 在轉換時每次都要構建一個 Flow<T> ,這對於單發資料是沒必要的開銷,因此我們可以使用 map 簡化:

kotlin runBlocking { flowOf(1, 3, 5) .map { readFromRemote("$it") } .onCompletion { println("complete") } .collect { println("next: $it") } } 效果等價於 flatMapConcat,注意 map 無法在並行場景中使用,即使你在 map 中切換了新的執行緒。Flow 的 map { } 內可呼叫掛起函式,所以可以基於協程實現非同步邏輯,而 Rx 的 map 內只能同步執行,所以有人會將 Flow 的 map 比作 Rx 的 flatMap,這是不準確的,因為 Flow 的 map 並不能使整個資料流序列發射,map 會掛起等待當前資料執行結束後再繼續。

流式 > Comletable

Rx 還提供了 Completable 型別,我們可以在流式處理中插入無需返回結果的邏輯,例如下面這種場景

```kotlin fun saveToCacheRx(data: String): Completable { return Completable .fromAction { println("saved to cache: $data") } .delay(100, TimeUnit.MILLISECONDS) }

Observable.just(1, 2, 3) .flatMapCompletable { saveToCacheRx("$it") } .doOnComplete { println("complete") } .subscribe { println("next: $it") }

/* output:


saved to cache: 1 saved to cache: 2 saved to cache: 3 complete */ ``saveToCacheRx模擬一個數據儲存,Completable 沒有任何實際返回值,只用來通知儲存已結束,因此日誌中沒有next,只有最後的complete` 。

Flow 如何實現同樣的邏輯呢?

```kotlin suspend fun saveToCache(data: String) { delay(100) println("saved to cache: $data") }

runBlocking { flowOf(1, 2, 3) .flatMapMerge { flow { saveToCache("$it") } } .onCompletion { println("complete") } .collect { println("next: $it") }

/* output:


saved to cache: 1 saved to cache: 2 saved to cache: 3 complete */ ```

如上,掛起函式的 saveToCache 沒有任何返回值。flow { ... } 中呼叫的掛起函式執行結束後,Flow 的後續執行就會繼續,無需像 Rx 那樣通過 onComplete 通知。由於掛起函式沒有返回任何數值,next 日誌也不會輸出。

總結

在 Java 時代,由於語言能力的缺失 RxJava 需要承包包括單發資料在內的處理, 而進入 Kotlin 時代,掛起函式處理單發資料已經足矣,Flow 不是處理單發資料的最佳方案,我們在今後選型時因該避免對 Flow 的濫用。管中窺豹,可以預見 Kotlin 及協程的強大似的今後 RxJava 的使用場景將越來越少。