日常思考,目前Kotlin協程能完全取代Rxjava嗎
theme: channing-cyan highlight: arduino-light
前言
自從jetbrains
公司提出Kotlin
協程用來解決非同步執行緒問題,並且衍生出來了Flow
作為響應式框架,引來了大量Android開發者的青睞;而目前比較穩定的響應式庫當屬Rxjava
,這樣以來目的就很明顯了,旨在用Kotlin
協程來逐步替代掉Rxjava
;
仔細思考下,真的可以完全替代掉Rxjava
麼,它的複雜性和多樣化的操作符,而協程的許多API
仍然是實驗性的,目前為止,隨著kt不斷地進行版本迭代,越來越趨於穩定,對此我不能妄下斷言;當然Rxjava
無疑也是一個非常優秀的框架,值得我們不斷深入思考,但是隨著協程的出現,就個人而言我會更喜歡使用協程來作為滿足日常開發的非同步解決方案。
協程的本質和
Rxjava
是截然不同的,所以直接拿它們進行對比是比較棘手的;換一種思路,本文我們從日常開發中的非同步問題出發,分別觀察協程與Rxjava
是如何提供相應的解決方案,依次來進行比對,探討下Kotlin
協程是否真的足以取代Rxjava
這個話題吧
流型別的比較
現在我們來看下Rxjava
提供的流型別有哪些,我們可以使用的基本流型別操作符如下圖所示
它們的基本實現在下文會提及到,這裡我們簡單來討論下在協程中是怎麼定義這些流操作符的
-
Single<T>
其實就是一個返回不可空值的suspend
函式 -
Maybe<T>
恰好相反,是一個返回可空的supspend
函式 -
Completable
不會發送事件,所以在協程中就是一個不返回任何東西的簡單掛起函式 -
對於
Observable
和Flowable
,兩者都可以發射多個事件,不同在於前者是沒有背壓管理的,後者才有,而他們在協程中我們可以直接使用Flow
來完成,在非同步資料流中按順序發出值,所以只需要一個返回當前Data
資料型別的Flow<T>
值得注意的是,該函式本身是不需要
supsend
修飾符的,由於Flow
是冷流,在進行收集\訂閱之前是不會發射資料,只要在collect
的時候才需要協程作用域中執行。為什麼說Flow
足以替代Observable
和Flowable
原因在與它處理背壓(backpressure
)的方式。這自然而然來源於協程中的設計與理念,不需要一些巧妙設計的解決方案來處理顯示背壓,Flow
中所有Api
基本上都帶有suspend
修復符,它也成為了解決背壓的關鍵先生。其目的就是在不阻塞執行緒的情況下暫停呼叫者的執行,因此,當Flow<T>
在同一個協程中發射和收集的時候,如果收集器跟不上資料流,它可以簡單地暫停元素的發射,直到它準備好接收更多。
流型別比較的基本實現
好的小夥伴們,上文我們簡單用協程寫出Rxjava
的幾個基本流型別,現在讓我們用幾個詳細的例項來看看他們的不同之處吧
Completable ---- 非同步任務完成沒有結果,可能會丟擲錯誤
在Rxjava
中,我們使用Completable.create
去建立,裡面的CompletableEmitter
中有onComplete
表示完成的方法和一個onError
傳遞異常的方法,如下程式碼所示
//completable in Rxjava
fun completableRequest(): Completable {
return Completable.create { emitter->
try {
emitter.onComplete()
}catch (e:Exception) {
emitter.onError(e)
}
}
}
fun main() {
completableRequest()
.subscribe {
println("I,am done")
println()
}
}
在協程當中,我們對應的就是呼叫一個不返回任何內容的掛起函式(returns Unit
),就類似於我們呼叫一個普通函式一樣
fun completableCoroutine() = runBlocking {
try {
delay(500L)
println("I am done")
} catch (e: Exception) {
println("Got an exception")
}
}
注意不要在生產環境程式碼使用
runBlocking
,你應該有一個合適的CoroutineScope
,由於是測試程式碼本文都將使用runBlocking
來輔助說明測試場景
Single ---- 必須返回或丟擲錯誤的非同步任務
在 RxJava
中,我們使用一個Single
,它裡面有一個onSuccess
傳遞返回值的方法和一個onError
傳遞異常的方法。
`
kotlin
/*
* Single in RxJava
/
fun main() {
singleResult()
.subscribe(
{ result -> println(result) },
{ println("Got an exception") }
)
}
fun singleResult(): Single
而在協程中,我們呼叫一個返回非空值的掛起函式:
``` /* * Single equivalent in coroutines / fun main() = runBlocking { try { val result = getResult() println(result) } catch (e: Exception) { println("Got an exception") } }
suspend fun getResult(): String { // process a request delay(100) return "Some result" } ```
Maybe --- 可能返回結果或丟擲錯誤的非同步任務
在 RxJava
中,我們使用一個Maybe
. 它裡面有一個onSuccess
傳遞返回值的方法onComplete
,一個在沒有值的情況下發出完成訊號的方法,以及一個onError
傳遞異常的方法。
``` /* * Maybe in RxJava / fun main() { maybeResult() .subscribe( { result -> println(result) }, { println("Got an exception") }, { println("Completed without a value!") } ) }
fun maybeResult(): Maybe
在協程中,我們呼叫一個返回可空值得掛起函式
``` /* * Maybe equivalent in coroutines / fun main() = runBlocking { try { val result = getNullableResult() if (result != null) { println(result) } else { println("Completed without a value!") } } catch (e: Exception) { println("Got an exception") } }
suspend fun getNullableResult(): String? { // process a request delay(100) return if (Random.nextBoolean()) { "Some value" } else { null } } ```
0..N事件的非同步流
由於在Rxjava
中,Flowable
和Observable
都是屬於0..N
事件的非同步流,但是Observable
幾乎沒有做相應的背壓管理,所以這裡我們主要以Flowable
為例子,onNext
發出下一個流值的方法,一個onComplete
表示流完成的方法,以及一個onError
傳遞異常的方法。
``` /* * Flowable in RxJava / fun main() { flowableValues() .subscribe( { value -> println(value) }, { println("Got an exception") }, { println("I'm done") } ) }
fun flowableValues(): Flowable
return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER) } ```
在協程中,我們只是建立一個Flow
就可以完成這個方法
``` /* * Flow in Kotlin / fun main() = runBlocking { try { eventFlow().collect { value -> println(value) } println("I'm done") } catch (e: Exception) { println("Got an exception") } }
fun eventFlow() = flow { for (i in 1..10) { emit(i) } } ```
在慣用的
Kotlin
中,建立上述流程的方法之一是:fun eventFlow() = (1..10).asFlow()
如上面這些程式碼所見,我們基本可以使用協程涵蓋Rxjava
所有的主要基本用法,此外,協程的設計允許我們使用所有標準的Kotlin
功能編寫典型的順序程式碼 ,它還消除了對onComplete
或onError
回撥的需要。我們可以像在普通程式碼中那樣捕獲錯誤或設定協程異常處理程式。並且,考慮到當掛起函式完成時,協程繼續按順序執行,我們可以在下一行繼續編寫我們的“完成邏輯”。
值得注意的是,當我們進行呼叫collect
收集的時候也是如此,在收集完所有元素後才會執行下一行程式碼
eventFlow().collect { value ->
println(value)
}
println("I'm done")
Flow
收集完所有元素後,才會呼叫列印I'm done
操作符的比較
總所周知,Rxjava
的主要優勢在於它擁有非常多的操作符,基本上可以應對日常開發中出現的各種情況,由於它種類特別繁多又比較難記憶,這裡我只簡單舉些常見的操作符進行比較
COMPLETABLE
,SINGLE
, MAYBE
這裡需要強調的是,在Rxjava
中Completable
,Single
和Maybe
都有許多相同的操作符,然而在協程中任何型別的操作符其實都是多餘的,我們以Single
中的map()
簡單操作符為例來看下:
/**
* Maps Single<String> to
* Single<User> synchronously
*/
fun main() {
getUsername()
.map { username ->
User(username)
}
.subscribe(
{ user -> println(user) },
{ println("Got an exception") }
)
}
map
作為Rxjava
中最常用的操作符,獲取一個值並將其轉換為另一個值,但是在協程中我們不需要.map()
操作符就可以實現這種操作
fun main() = runBlocking {
try {
val username = getUsername() // suspend fun
val user = User(username)
println(user)
} catch (e: Exception) {
println("Got an exception")
}
}
使用suspend
掛起函式可以掛起當前函式,當執行完畢後在按順序執行接下來的程式碼
Flow
操作符與Rxjava
操作符
現在讓我們看看Flow
中有哪些操作符,它們與Rxjava
相比有什麼不同,由於篇幅原因,這裡我簡單比較下日常開發中最常用的操作符
map()
對於map
操作符,Flow
中也具有相同的操作符
/**
* Maps Flow<String> to Flow<User>
*/
fun main() = runBlocking {
usernameFlow()
.map { username ->
User(username)
}
.collect { user ->
println(user)
}
}
Flow
中的map
操作符 相當於Rxjava
做了一定的簡化處理,這是它的一個主要優勢,可以看下它的原始碼
fun <T, R> Flow<T>.map(transform: suspend (T) -> R): Flow<R> = flow {
collect { value -> emit(transform(value)) }
}
是不是非常簡單,只是重新建立一個新的flow
,它從從上游收集值transform
並在當前函式應用後發出這些值;事實上大多數Flow
的操作符都是這樣工作的,不需要遵循嚴格的協議;對於大多數應用場景,標準Flow
操作符就已經足夠了,當然編寫自定義操作符也是非常簡單容易的;相對於Rxjava
,如果想要編寫自定義操作符,你必須非常瞭解Rxjava
的
flatmap()
另外,在Rxjava
中我們經常使用的操作符還有flatmap()
,同時還有很多種變體,例如.flatMapSingle()
,flatMapObservable()
,flatMapIterable()
等,簡單來說,在Rxjava
中我們如果需要對一個值進行同步轉換,就使用map
,進行非同步轉換的時候就需要使用flatMap()
;對此,Flow
進行同步或者非同步轉換的時候不需要不同的操作符,僅僅使用map
就足夠了,由於它們都有supsend
掛起函式進行修飾,不用擔心同步性
可以看下在Rxjava
中的示例
fun compareFlatMap() {
getUsernames() //Flowable<String>
.flatMapSingle { username ->
getUserFromNetwork(username) // Single<User>
}
.subscribe(
{ user -> println(user) },
{ println("Got an exception") }
)
}
好的,我們使用Flow
來轉換下上述的這一段程式碼,只需要使用map
就可以以任何方式進行轉換值,如下程式碼所示:
runBlocking {
flow {
emit(User("Jacky"))
}.map {
getUserFromName(it) //suspend
}.collect {
println(it)
}
}
suspend fun getUserFromName(user: User): String {
return user.userName
}
實際上使用Flow
中的map
操作符,就可以將上游流發出的值轉換為新流,然後將所有流扁平化為一個,這和flatMap
的功能幾乎可以達到同樣的效果
filter()
對於filter
操作符,我們在Rxjava
中並沒有直接的方法進行非同步過濾,這需要我們自己編寫程式碼來進行過濾判斷,如下所示
fun getUsernames(): Flowable<String> {
val flowableEmitter = { emitter: FlowableEmitter<String> ->
emitter.onNext("Jacky")
}
return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER)
}
fun isCorrectUserName(userName: String): Single<Boolean> {
return Single.create { emitter ->
runCatching {
//名字判斷....
if (userName.isNotEmpty()) {
emitter.onSuccess(true)
} else {
emitter.onSuccess(false)
}
}.onFailure {
emitter.onError(it)
}
}
}
fun compareFilter() {
getUsernames()//Flowable<String>
.flatMapSingle { userName ->
isCorrectUserName(userName)
.flatMap { isCorrect ->
if (isCorrect) {
Single.just(userName)
} else {
Single.never()
}
}
}.subscribe {
println(it)
}
}
乍一看,是不是感覺有點麻煩,事實上這確實需要我們使用些小手段才能達到目的;而在Flow
中,我們能夠輕鬆地根據同步和非同步呼叫過濾流
runBlocking {
userNameFlow().filter { user ->
isCorrectName(user.userName)
}.collect { user->
println(user)
}
}
suspend fun isCorrectName(userName: String): Boolean {
return userName.isNotEmpty()
}
結語
由於篇幅原因,Rxjava
和協程都是一個非常龐大的思考話題,它們之間的不同比較可以永遠進行下去;事實上,在Kotlin
協程被廣泛使用之前,Rxjava
作為專案中主要的非同步解決方案,以至於到現在工作上還有很多專案用著Rxjava
, 所以即使切換到Kotlin
協程之後,還有相當長一段時間還在用著Rxjava
;這並不代表Rxjava
不夠好,而是協程讓程式碼變得更易讀,更易於使用;
暫時先告一段落了,事實上證明協程確實能夠滿足我們日常開發的主要需求,下次將會對Rxjava
中的背壓和之前所討論的Flow
背壓問題進行比較探討,還有非常多的東西要學,共勉!!!!
本文主要內容譯至 -> https://www.javaadvent.com/2021/12/are-kotlin-coroutines-enough-to-replace-rxjava.html
- 請收下這些Kotlin開發必知必會的編碼實踐方式
- 日常思考,目前Kotlin協程能完全取代Rxjava嗎
- 誇誇其談,簡單說說HTTP的優化歷程
- 【一起學習開源框架】Retrofit相對於OKHttp,解決了什麼問題
- Kotlin Sequences Api:入門
- 【日常小問題】談談Rxjava中的操作符以及幾種常見的Subject,應用到實際場景中
- 【一起學習Android開源框架】Retrofit原始碼解析-1(第四部分)
- 【日常小問題】解決BottomSheetDialogFragment中多個fragment滑動衝突
- 【一起學習Android開源框架】Retrofit註解解析(第三部分)
- 【一起學習Android開源框架】Retrofit使用的代理模式解析(第二部分)
- 【一起學習Android開源框架】Retrofit的簡單使用(第一部分)