【日常小問題】談談Rxjava中的操作符以及幾種常見的Subject,應用到實際場景中

語言: CN / TW / HK

highlight: a11y-light theme: orange


提出問題

最近做專案中有個小需求,簡單說下,有A,B兩個介面需要同時請求,一般情況下B介面返回資料會比A介面快,但是我們需要優先拿取A的資料,只有當A介面沒有資料返回或者異常的情況下,才會拿取B介面的資料;

  • 首先考慮直接用Rxjava中merge或者zip操作符進行併發執行,但是返回的一直都是時間較快的B介面資料,所以不太滿足需求,需要加些條件操作符,那麼我們需要使用怎麼樣的邏輯呢,值得商榷下
  • 目前要滿足兩個條件:1.兩個介面同時執行,優先A介面資料 2.當A介面沒有返回時才返回B介面

這裡就需要涉及到Rxjava相關的操作符來實現邏輯,感興趣的小夥伴可以提供下好的思路,最後我會給出大佬教我的思路來參考,在此之前,我們先來回顧學習下Rxjava中的操作符和常見的Subject,只有知道它們是幹什麼的才能知道如何去合理的運用它們,工欲善其事必先利其器;

Rxjava操作符

對於Rxjava想必大家都不陌生吧,它提供了豐富的操作符,幾乎能勝任所有的功能需求,但是對於操作符的應用場景是千變萬化的,如何在不同場景下使用合適的操作符是大家掌握Rxjava操作符使用的基本條件

話不多說,我們根據Rxjava操作符的官方文件,來看看有那些以及它們分別有什麼作用呢,貼上官網地址 -->https://reactivex.io/documentation/operators.html

結合文件,我們可以看到每個操作符都有詳細的介紹使用,並且提供了魚骨流程圖方便我們更直觀的理解每一個操作符的作用。

大致有如下分類

| 分類 | 作用 | | ------------------------------------------------------------------- | ---------------------------------------------- | | 建立操作符(Creating Observables) | 建立(被觀察者)Observable物件&傳送事件 | | 轉變操作符(Transforming Observables) | 轉變(被觀察者)Observable傳送的事件 | | 過濾操作符(Filtering Observables) | 過濾(被觀察者)Observable傳送的事件,以及篩選(觀察者)Observer接收的事件 | | 組合操作符(Combining Observables) | 組合多個被觀察者(Observable)&合併需要傳送的事件 | | 一些功能性操作符(Error Handling Operators,Connectable Observable Operators) | 對被觀察者(Observable)傳送事件的時候進行些處理(如錯誤處理,執行緒排程等等) | | 布林操作符(Conditional and Boolean Operators) | 設定條件,判斷被觀察者(Observable)傳送的事件是否符合條件 |

這些日常開發中常用的一些操作符,類似justdefaultIfEmpty,takeUntil等等,這裡我就不多說了,相信網上已經有非常多的詳細講解了,今天主要是通過這些操作符來解決我們上面提到的問題,在這之前,我們繼續看下Rxjava中常見的Subject

Rxjava常見的Subject

現在我們來回顧下Rxjava中的Subject,那麼Rxjava中的Subject是什麼呢? 從官方解釋來看,簡單來說,它就是使用序列來組成非同步的、基於事件的程式的庫。

既然是基於事件的,又是觀察者模式的一種,那麼EventBus的功能它都能夠W事件咯,可以看下下圖關於它的官方介紹:

image-20220515230432491

可以看到在它的子類有AsyncSubject,BehaviorSubject,PublishSubejct,ReplaySubject,dUnicastSubject, 它們分別有什麼作用呢? 下面來說說我們常用的幾種Subject(作為觀察者亦或者是被觀察者)

PublishSubject

publishSubject作為最常用的Subject, 大致作用就是它會發送所有資料給訂閱者,一旦某個觀察者者訂閱了該Subject,如下圖所示

可以看到,這個訂閱者只能接收訂閱之後收到的所有資料,訂閱之前的資料就無法收到了,看下下面簡單的例子

java        val publishSubject: PublishSubject<String> = PublishSubject.create()        publishSubject.subscribe()        publishSubject.onNext("one")        publishSubject.onNext("two")       //這時候觀察者可以接收到來自PublishObject發射的one,two的事件 ​        publishSubject.subscribe()        publishSubject.onNext("one")        publishSubject.onComplete()        //這時候觀察者僅僅可以接收到來自Publish發射的one,onComplete()事件

這也恰恰說明了,訂閱者只會接收到訂閱之後來自PublishSubject的資料,那麼如果我想有沒有Subject可以接收訂閱之前的資料呢,答案毋庸置疑,有且不止一個,但是最常見運用最多的還是BehaviorSubject

BehaviorSubject

下面我們來簡單瞭解下BehaviorSubject,官方給出的描述大致意思是它會發送訂閱最近的上一個事件,如果沒有的話,會直接使用給出的預設值,如下圖所示

它的Observer接收的是BehaviorSubject被訂閱前傳送的最後一個數據,看下下面的例子方便理解

java   fun testBehaviorObject() {        val behaviorSubject1 = BehaviorSubject.createDefault("default")        behaviorSubject1.subscribe()        behaviorSubject1.onNext("one")        behaviorSubject1.onNext("two")        behaviorSubject1.onNext("three")        //這時候會接收被訂閱之前的事件,但是還是接收之後傳送的onNext事件,不需要我們手動呼叫onComplete() ​        val behaviorSubject2 = BehaviorSubject.createDefault("default")        behaviorSubject2.onNext("zeus");        behaviorSubject2.onNext("one");        behaviorSubject2.subscribe();        behaviorSubject2.onNext("two");        behaviorSubject2.onNext("three");        /**         * 由於behaviorSubject定義就是可以接收離訂閱最近的一個數據,並且之後還會繼續接收資料,所以這時候我們接收         * 的事件就有one,two,three,就沒有zeus這個事件,因為它不是離訂閱最近的一個數據         */ ​   }

AsyncSubject

對於AsyncSubject來說,它的作用其實和BehaviorSubject類似,都是接收最近的一個事件資料,但是不同的是,無論有多少事件,它始終只能接收到最後一個事件資料,且必須要我們手動呼叫onComplete(),否則是無法接收到任何資料;另外如果在傳送過程中遇到錯誤,則觀察者僅僅會接收到錯誤資訊,可以看下官方的示例圖

可以看到它的Observer會接收到onCompleted()前傳送的最後一個數據,之後不會再接收資料,看下下面的例子方便理解

java   fun testAsyncSubject() {        val asyncSubject1 = AsyncSubject.create<String>()        asyncSubject1.subscribe()        asyncSubject1.onNext("1")        asyncSubject1.onNext("2")        asyncSubject1.onNext("3")        //由於沒有呼叫onComplete方法,所以此時觀察者不會接收到事件,如果有異常,則會接收錯誤資訊 ​        val asyncSubject2 = AsyncSubject.create<String>()        asyncSubject2.subscribe()        asyncSubject2.onNext("1")        asyncSubject2.onNext("2")        asyncSubject2.onNext("3")        asyncSubject2.onComplete()        //因為呼叫了onComplete方法,此時觀察者接收到的事件是3,它是離Complete最近的一個數據   }

ReplaySubject

根據官方解釋,ReplaySubject會發射所有事件資料給觀察者,不管它什麼時候訂閱的,它都會快取所有的發射資料,如下圖所示

話不多說,我們舉個例子來看下

java  fun testReplaySubject() {        val replaySubject = ReplaySubject.create<String>()        replaySubject.onNext("one")        replaySubject.onNext("two")        replaySubject.onNext("three")        replaySubject.onComplete()                replaySubject.subscribe()        //在這之前的所有發射的資料都能接收到,包括one,two,three,以及onComplete,不管它什麼時候訂閱   }

回到最初的問題

經過以上Rxjava的操作符和Subject的學習瞭解,我們回到一開始提出的問題,我們需要滿足兩個條件

  1. 兩個介面同時執行,優先A介面資料
  1. 要A介面沒有資料返回時,考慮失敗後丟擲異常,然後返回B介面的資料,亦或者兩者都失敗了丟擲異常

  2. 首先要保證兩個介面同時進行請求,這樣可用到的操作符就有merge和zip,但是merge操作符雖然是並行,但是是按照時間順序來進行傳送,所以優先都是請求時間短的介面拿到資料,這裡的場景暫時不適合,所以我們使用zip操作符,對結果進行合併進行傳送,那麼我們可以對兩個介面的資料來源進行分開判斷,通過PublishSubject訂閱傳送標誌位Boolean值,它會接收所有在訂閱之後傳送的資料,在a介面的被觀察者中只有它在doOnNext對該subject傳送true,這時候在b介面的被觀察者中設定takeUntil操作符,它的值就是前面設定的PublishSubject,這樣的話只要a介面返回資料了,b介面就會停止傳送資料;綜上,我們已經滿足了第一個條件了

  3. 接著就要考慮下如果A介面失敗了,這時候是沒有資料返回的,由於是zip操作符,所以我們自定義資料來源item,將異常都記錄起來,在a介面的被觀察中的OnErrorReturn操作符中傳送這個資料來源item,(這個攔截是不會觸發onError傳送給觀察者的異常,它會發送出一個正常的item),如下

    java data class SourceResult(val result: String?, val exception: Throwable? = null)

    然後在b介面被觀察者中同樣也是使用OnErrorReturn,但值得注意的是當a介面拿到資料了,此時b介面就不傳送事件了,我們也需要它傳送一個預設事件(自定義異常),不傳送任何有效事件doOnNext,僅僅傳送Complete事件的前提上,這裡就用到了defaultIfEmpty操作符,它解決了這個問題,這樣我們就滿足了第二個條件,至此,這個小問題就得以解決了,可以看下下面的例項程式碼,簡單梳理了下

    java private val publishSubject = PublishSubject.create<Boolean>() fun getResult(imageUrl: String): Observable<String> { ​        return Observable.just(imageUrl)           .flatMap { ​                val aSource = getAResult(imageUrl)                    //這時候a介面如果有資料返回的話,將publishSubject傳送true資料                   .doOnNext {                        publishSubject.onNext(true)                   }                   .map {                        SourceResult(imageUrl, null)                   }                    //攔截對應的error進行返回                       .onErrorReturn {                        SourceResult(null, it)                   } ​                val bSource = getBResult(imageUrl)                   .map {                        SourceResult(imageUrl, null)                   }                   .onErrorReturn {                        SourceResult(null, it)                   }                    //如果該條件為true的話,就停止傳送資料,意味著A介面有資料了                   .takeUntil(publishSubject)                   .defaultIfEmpty(SourceResult(null, RuntimeException())) ​                //zip進行合併                val result = Observable.zip(aSource, bSource) { a, b ->                    ZipResult(a, b)               } ​                return@flatMap result.map {                    val aResult = it.aSource?.result                    val bResult = it.bSource?.result ​                    when {                        aResult != null -> {                            aResult                       }                        bResult != null -> {                            bResult                       }                        else -> {                            //丟擲對應的異常,這裡我簡單丟擲b介面的異常                            it.bSource?.exception?.let {                                throw it                           }                       }                   }               }           }   } ​ ​    fun getAResult(imageUrl: String): Observable<String> {        return Observable.just(imageUrl)           .map {                //....                return@map it           }   } ​ ​    fun getBResult(imageUrl: String): Observable<String> {        return Observable.just(imageUrl)           .map {                return@map it           }   } } ​ data class SourceResult(val result: String?, val exception: Throwable? = null) data class ZipResult(val aSource: SourceResult?, val bSource: SourceResult? = null)

    至此,這個小問題就得到了解決,對於Rxjava的操作符和Subject的運用也更加得心應手了