深入理解RxJava與RxKotlin

語言: CN / TW / HK

What is Rx?

基本概念

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

按照官方介紹RxJava是由Netflix發起的一個在JVM平臺上以Observable序列來處理非同步任務的響應式編碼庫

Why Rx?

現在Android社群都在廣泛的推廣和使用RxJava,但是使用它究竟能能帶來什麼樣的好處呢,我們通過一個具體的案例來研究

使用者登入系統需求

假設現在我們有一個微信使用者第三方登入功能的需求,這個需求很簡單,就是實現一個登入介面,使用者點選登入跳轉到微信進行使用者認證,拿到微信的openId然後再接入到自己後臺伺服器進行登入。這裡我們設計一個 AccountManager 類,這個類裡面有2個方法,首先是 requestWxOpenId() 方法用於跳轉微信獲得微信的openId,另外一個就是通過openId登入認證的 login() 方法拿到使用者資訊

```kotlin interface AccountManager { fun requestWxOpenId(): String

fun login(wxOpenId: String): User

} ```

傳統程式碼實現

因為AccountManager這些方法都是網路訪問的介面,是個非同步任務,如果直接呼叫會堵塞主執行緒肯定是不行的,所以我們會封裝一個callback出來,回撥方法用來處理當介面請求結束後的操作,因為介面請求有可能成功也有可能失敗,所以回撥介面中包含了兩個方法 onSuccessonFaillure

```kotlin interface OpenIdCallback { fun onSuccess(openId: String) fun onFailure() }

interface AccountCallback { fun onSuccess(resultl: String)

fun onFailure()

}

interface AccountManager { fun requestWxOpenId(callback: OpenIdCallback): String

fun login(wxOpenId: String, callback: AccountCallback)

} ```

AccountManager的實現很簡單首先呼叫requestWxOpenId方法獲取到openId後再呼叫login方法獲取使用者資訊,呼叫程式碼如下所示

```kotlin private fun clickLogin() { val accountManager = AccountManagerIml() accountManager.requestWxOpenId(object : OpenIdCallback { override fun onSuccess(openId: String) { accountManager.login(openId, object : AccountCallback { override fun onSuccess(user: User) { if (!isActivityDestroyed) { runOnUiThread { //update UI } } }

                override fun onFailure() {
                    //handle error
                }

            })
        }

        override fun onFailure() {
            //handle error
        }
    })
}

```

可以看到整套流程下來,有兩個痛點:

第一是每一個非同步請求我們都需要宣告一個callback介面,另外一個是callback的多層巢狀使得程式碼很不清晰,如果再巢狀多幾層會使得程式碼很難看,這種callback的多層巢狀還有個專有的名詞叫 回撥地獄(callback hell)

第二就是像在Android應用開發中需要在主執行緒操作UI元件以及Activity生命週期的判斷,這就需要在各個回撥中加上各種判斷。

為了解決這些問題Rx應運而生

使用RX的實現

為了解決上述提到哪些問題,我們看看Rx怎麼解決的,首先為了解決不同的callback問題,Rx定義了統一的回撥介面 Observer

```kotlin public interface Observer { void onCompleted();

void onError(Throwable e);

void onNext(T t);

} ```

通過泛型包裝來統一回調介面,Observer回撥中封裝了三個方法onErro用來處理錯誤情況,onNext用來處理正常情況下成功返回的結果,onComplete表示整個流程結束時的回撥。

通過統一泛型回撥介面解決了宣告多個回撥的問題,那麼如何解決回撥巢狀的問題呢,因為回撥巢狀產生的原因是由於非同步方法的的鏈式呼叫產生的,如果把非同步方法的的鏈式呼叫都能直接反映在鏈式的資料處理上,Rx是通過將每一個非同步方法直接返回Observable物件,非同步方法的組合呼叫通過Observable的組合,最終得到一個Observable來表示鏈式任務處理的最終結果。

Observable表示一個數據源,在AccountManager的方法中,我們對應的介面就需要修改

```kotlin interface AccountManager { fun requestWxOpenId(): Observable

fun login(wxOpenId: String):Observable<User>

} ```

我們呼叫程式碼就只需要一個subscriber就行了

```kotlin private fun clickLogin(){ val accountManager = AccountManagerIml2() accountManager.requestWxOpenId() .flatMap { accountManager.login(it) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object: Subscriber() { override fun onNext(t: User?) { //update ui }

                override fun onCompleted() {
                }

                override fun onError(e: Throwable?) {
                }
            })
}

```

看到上面的程式碼是不是變得清晰了,如果使用RxKotlin,會更加清晰

kotlin private fun clickLogin() { val accountManager = AccountManagerIml2() accountManager.requestWxOpenId() .flatMap { accountManager.login(it) } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeBy( onError = {}, onNext = { //update ui }) }

可以看到,使用RxJava來封裝這些非同步操作後,整個程式碼跟同步操作的程式碼都非常接近,結構很清晰和明瞭

What's Rx inside?

要想更好的使用RxJava,就有必要更好更深入的瞭解RxJava的api,這一部分將深入瞭解RxJava和RxKotlin的api和部分實現原理

RxJava1 Api探究

事件源

Observable

Observable是最為常用的資料來源,Observable物件提供了常用的建立方法,create,just,from等方法

```kotlin Observable.just("hello")

Observable.from(arrayOf(1, 2, 3, 4, 5))

Observable.create(Observable.OnSubscribe { t -> try { t.onNext("hello") t.onStart() } catch (e: Exception) { t.onError(e) } })

```

Completable

Completable資料來源只會觸發onComplete和onError回撥,通過 from() 方法可以建立Completable資料來源

kotlin Completable.fromAction { println("action done") }

Single

Single資料來源只會發射一條資料,Single類也有create,just,from等方法用來建立Single資料來源物件,subscribe方法只有onSuccess和onError,比方說如下程式碼

kotlin Single.just(1) .subscribeBy( onSuccess ={ println(it) }, onError = {} )

操作符

操作符可以說是是RxJava中的核心了,通過不同的操作符來實現資料的拼接,轉換等等操作從而達到最終的資料結果,RxJava提過了很多的操作符,這裡就不一一講解,主要列出一些常用的

map

用於資料轉換,常見的應用場景比方說id應設成名稱

kotlin Observable.just(1, 2, 3, 4) .map { when (it) { 1 -> "Jane" 2 -> "Mike" 3 -> "Mary" 4 -> "tom" else -> "noName" } } .subscribeBy(onNext = { println(it) })

輸出內容為

Jane Mike Mary tom

flatMap

flatMap操作類似於Map,但是Map轉換是 T -> D ,但是flatMap是 T -> Observable

```kotlin fun getInfoById(id: Int): Observable { //模擬耗時請求 return Observable.just(id) }

Observable.just(1, 2, 3, 4) .flatMap { getInfoById(it) } .subscribeBy(onNext = { println(it)})

```

這裡getInfoById我們假設是一個網路請求或者資料庫操作之類的耗時操作,它返回Observable物件,最後subscriber中onNext中接受到的還是Int資料

輸出結果

1 2 3 4

filter

filter用於過濾資料

kotlin Observable.just(1,2,3,4) .filter { it > 2 } .subscribeBy(onNext = { println(it)})

輸出結果

3 4

RxJava中操作符可以說是非常非常多,想了解更多可以檢視官方文件 Rx操作符

執行緒排程

執行緒排程在RxJava可以說是非常精簡,簡單到一兩行程式碼來實現執行緒切換

kotlin Observable.just(1,2,3,4) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeBy(onNext = { println(it)})

上面程式碼中通過切換android ui執行緒和io執行緒是的回撥方法都執行在ui執行緒,Observable資料來源操作在io執行緒中執行

Transformer

我們在使用RxJava的過程中,通常都會用到執行緒切換,耗時操作在io執行緒,subscriber回撥在主執行緒

kotlin Observable.just(1,2,3,4) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeBy(onNext = { println(it)})

對於多個地方都總是要重複呼叫subscirbeOn和observeOn這兩個方法,我們會想到去封裝一下,比方說增加一個方法

kotlin fun schedule(observable: Observable<Any>): Observable<Any> { return observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) }

這樣我們就可以這樣呼叫

kotlin schedule(observable) .subscribeBy(onNext = { println(it)})

這樣做雖然加了一層封裝,但是打破了整個Observable的鏈式呼叫,這就顯得不太好看

好在RxJava有Transformer,通過與Observable的compose()操作符使用來組合Observable操作

Transformer就是一個Func1, Observable\>,可以通過它將一種型別的Observable轉換成另一種型別的Observable

kotlin fun <T> applySchedulers(): Observable.Transformer<T, T> { return Observable.Transformer { observable -> observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) } }

我們建立一個applySchedulers方法返回一個Transformer物件,這時候我們就可以直接鏈式切換執行緒了

kotlin observable.compose(applySchedulers()) .subscribeBy(onNext = { println(it)})

Subject

Subject是一個很特殊的介面,它即實現了Observable介面又實現了Observer介面,也就是說它既是一個數據發射方又是一個數據接收方

AsyncSubject

AsyncSubject註冊到的Observer只會在onComplete回撥之後接收到最後一個onNext發射出來的資料

```kotlin // observer不會接收到資料,因為subject沒有呼叫onComplete AsyncSubject subject = AsyncSubject.create(); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three");

// observer只會接收到最後一個數據,也就是“three” AsyncSubject subject = AsyncSubject.create(); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onCompleted(); ```

BehaviorSubject

註冊到BehaviorSubject的observer只會接收到subscribe之前發射的最後一個item和之後發射的所有item

```kotlin // observer會接收到所有4個事件(包括"default"). BehaviorSubject subject = BehaviorSubject.create("default"); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three");

// observer will receive the "one", "two" and "three" events, but not "default" and "zero" BehaviorSubject subject = BehaviorSubject.create("default"); subject.onNext("zero"); subject.onNext("one"); subject.subscribe(observer); subject.onNext("two"); subject.onNext("three");

// observer will receive only onCompleted BehaviorSubject subject = BehaviorSubject.create("default"); subject.onNext("zero"); subject.onNext("one"); subject.onCompleted(); subject.subscribe(observer);

// observer will receive only onError BehaviorSubject subject = BehaviorSubject.create("default"); subject.onNext("zero"); subject.onNext("one"); subject.onError(new RuntimeException("error")); subject.subscribe(observer); ```

PublishSubject

observer會接收到subscribe後所有發射出來的item

kotlin PublishSubject<Object> subject = PublishSubject.create(); // observer1 will receive all onNext and onCompleted events subject.subscribe(observer1); subject.onNext("one"); subject.onNext("two"); // observer2 will only receive "three" and onCompleted subject.subscribe(observer2); subject.onNext("three"); subject.onCompleted();

TestSubject

TestSubject是專門為單元測試開發而來的

```kotlin @Test public void cancelShouldUnsubscribe() { TestSubject observable = TestSubject.create(new TestScheduler()); assertFalse(observable.hasObservers());

T future = toFuture(observable);
assertTrue(observable.hasObservers());

future.cancel(true);

assertFalse(observable.hasObservers());

} ```

RxJava2

Reactive Streams

Reactive Streams是一套在JVM上建立的響應式標準協議,旨在標準化以無阻塞式back pressre的形式來處理非同步事件流協議,類似於網路的標準http協議一樣,只是Reactive Streams是在JVM和JavaScript平臺上的響應式協議

規範概覽
Publisher

kotlin public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }

Publisher介面用來定義發射事件物件,通過subscribe方法接收想要接收事件的subscriber

Subscriber

kotlin public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }

Subscription

kotlin public interface Subscription { public void request(long n); public void cancel(); }

Processor

kotlin public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }

Java9 Flow

Reactive Streams定義了規範,JDK也實現了這個規範,在最新的Java9中引入了一個 java.util.concurrent.Flow 模組,這就意味著如果你使用Java9,可以直接引用Flow模組來應用響應式api,而不必須引入第三方的RxJava包

api變動

資料來源
  • Observable和Flowable兩種資料來源,Observable是沒有back pressure支援的,Flowable是需要設定back pressure選項的,也就是說對於事件發射方是否會產生back pressure問題需要使用者來決定,使用Flowable就必須要指定back pressure模式,不指定會直接報錯
  • Single資料來源的subscriber在RxJava2中改為SingleObserver,增加了onSubscribe方法

kotlin interface SingleObserver<T> { void onSubscribe(Disposable d); void onSuccess(T value); void onError(Throwable error); }

  • Completable的Subscriber在RxJava2中改為CompletableObserver

kotlin interface CompletableObserver<T> { void onSubscribe(Disposable d); void onComplete(); void onError(Throwable error); }

  • Maybe : Maybe是在RxJava2中新加的一種資料型別,他是Single和Completable的結合體Maybe可能會發射0或者1個事件或者error
Subject和Processor

在RxJava1中的Subject,在RxJava2中新增了Processor,Processor與Subject類似,Processor支援back pressure而Subject不支援back pressure

Subscriber和Subscription

Subscriber增加了一個onSubscribe(Subscription)的回撥方法,因為原有的Observable在subscribe一個Subscriber的時候會返回Subscription,但是在RxJava2中subscribe方法返回的是void,所以對應在Subscriber中增加了Subscription的回撥方法

其他
  • RxJava2不再接收直接發射null,例如 Observable.just(null) 如果發射的物件是null,在RxJava2中會直接報錯或者將error傳到onError中
  • Schedules將immediate改為trampoline
  • 增加了test的操作符

RxJava2還有許多變更,這裡未能一一講述,更詳細文件請參考官方 What's-different-in-2.0

RxKotlin

RxKotlin是基於RxJava輕量級框架,在基於Kotlin的專案中,本身可以直接使用RxJava,但是RxKotlin通過增加各類擴充套件函式和規範使得RxJava在Kotlin專案中更易使用和標準化

```kotlin import io.reactivex.rxkotlin.subscribeBy import io.reactivex.rxkotlin.toObservable

fun main(args: Array) {

val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

list.toObservable() // extension function for Iterables
        .filter { it.length >= 5 }
        .subscribeBy(  // named arguments for lambda Subscribers
                onNext = { println(it) },
                onError =  { it.printStackTrace() },
                onComplete = { println("Done!") }
        )

} ```

completable擴充套件

kotlin fun Action0.toCompletable(): Completable = Completable.fromAction(this) fun <T> Callable<T>.toCompletable(): Completable = Completable.fromCallable { this.call() } fun <T> Future<T>.toCompletable(): Completable = Completable.fromFuture(this) fun (() -> Any).toCompletable(): Completable = Completable.fromCallable(this)

completable擴充套件主要是為各種物件增加 toCompletable() 的轉換方法

observables擴充套件

observabled擴充套件方法主要是對各個物件增加toObservable物件,包括各個Array物件,iterator物件,Sequence物件等

operators擴充套件

```kotlin fun Observable>.mergeAll() = flatMap { it }

fun Observable>.concatAll() = concatMap { it }

fun Observable>.switchLatest() = switchMap { it }

fun Observable>.switchOnNext(): Observable = Observable.switchOnNext(this) ```

操作符主要增加了對Observable>操作的擴充套件方法

single擴充套件

kotlin fun <T> T.toSingle(): Single<T> = Single.just(this) fun <T> Future<T>.toSingle(): Single<T> = Single.from(this) fun <T> Callable<T>.toSingle(): Single<T> = Single.fromCallable { this.call() } single擴充套件增加了Future,Callable和T的toSingle方法

subscribers擴充套件

subscribers增加了 subscribeBy 及其相應的重構方法,使用方法可參考如下程式碼

kotlin list.toObservable() // extension function for Iterables .filter { it.length >= 5 } .subscribeBy( // named arguments for lambda Subscribers onNext = { println(it) }, onError = { it.printStackTrace() }, onComplete = { println("Done!") } )

subscription擴充套件

subscription增加了兩個擴充套件方法

```kotlin operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription)

fun Subscription.addTo(compositeSubscription: CompositeSubscription) : Subscription { compositeSubscription.add(this) return this } ```

一個是kotlin符號過載,可以直接使用 += 將subscription新增到CompositeSubscription中

kotlin subscription += observable.subscribe{}

另外一個是在Subscription中增加了addTo方法

kotlin list.toObservable() .subscribeBy( // named arguments for lambda Subscribers onNext = { println(it) }, onError = { it.printStackTrace() }, onComplete = { println("Done!") } ) .addTo(compositeSubscription)

How to apply Rx?

Android專案架構

anddroid-arch.jpeg

上圖是Google IO 2017上新提出的Android Architecture,旨在幫助開發者如何更快更穩定的基於此架構開發應用

從上面的架構圖中,我們可以看到資料來源的處理統一是通過Repository來封裝,不管是資料庫的資料還是伺服器介面的資料,通過通過Repository封裝再以LiveData的形式提供,注意這裡的LiveData其實可以理解為RxJava的Observable。所以對於Android專案結構中資料來源的封裝,建議都採用Repository的形式,如果採用Android Architecture那可以採用LiveData,如果採用RxJava就以Observable來組合Repository

Rx應用

Retrofit

Retrofit是一個基於RestFul Api的Java框架,我們可以直接使用Observable返回

kotlin public interface GitHubService { @GET("users/{user}/repos") Observable<List<Repo>> listRepos(@Path("user") String user); }

定義介面請求後,我們就可以通過Retrofit獲取service例項做請求

```kotlin Retrofit retrofit = new Retrofit.Builder() .baseUrl("https://api.github.com/") .build();

GitHubService service = retrofit.create(GitHubService.class);

service.listRepost("test") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeBy(onNext={}) ```

Retrofit具體介紹可檢視其專案https://github.com/square/retrofit 這裡就不一一介紹了

RxBinding

RxBinding是一個將Android UI控制元件事件轉換為Rx事件的Java庫

kotlin Button button = ...; RxView.clickEvents(button) .subscribe(new Action1<ViewClickEvent>() { @Override public void call(ViewClickEvent event) { // Click handling } });

上面的程式碼中就是通過RxBinding將按鈕的點選事件轉換為Observable發射,這麼做就可以通過一些操作符來顯示快速點選或遮蔽指定的點選動作等等

RxLifecycle

RxLifecycle是一個管理Android Activity或Fragment元件週期的應用庫

kotlin myObservable .compose(RxLifecycle.bind(lifecycle)) .subscribe();

通過RxLifecycle的繫結到指定生命週期來處理比方說在Activity destroy的時候unsubscribe,在Activity onCreate的時候進行繫結

RxBus

RxBus是一個通過RxJava來實現時間匯流排的開源庫,使用方式與大多是EventBus庫類似,只是內部是通過RxJava來處理事件,這裡就不做篇幅講解了,有興趣的同學可以參考https://github.com/AndroidKnife/RxBus

注意事項

  • 在使用RxJava的時候,有時候可能會寫出如下程式碼

kotlin fun getUser(uId: String, subscriber: Subscriber<User>)

在呼叫時傳入Subscriber,這種方式還是傳統的callback形式,這種方式完全背棄了Rx的初衷,這樣修正會更貼切

kotlin fun getUser(uId: String):Observable<User>

  • 在使用RxJava的時候忽視back pressure問題,可能會產生莫名的問題,需要注意實際事件發射場景是否會產生back pressure然後採用正確的操作符或者Flowable源來處理

總結

至此,整個RxJava的學習過程結束了,本篇文章站在RxJava應用的角度去分析,沒有太過深入原始碼解析,這也不是本篇文章的目的,後續可能會再出一些原始碼解析類的文章,再就是隨著Kotlin coroutine(協程)出現,Android有更多可以實現非同步任務的方式。

關於Android Coroutine相關內容,可以參考Android Coroutine開發實踐

更多Android相關技術文章,參考我的個人部落格