RxJava 由淺入深之 Single 序列執行 (一)

語言: CN / TW / HK

RxJava3 Single 簡介

本文的程式碼是在IntellJ環境中測試的,可以直接copy過去執行看看

IntellJ新建Gradle工程,引入依賴

implementation "io.reactivex.rxjava3:rxjava:3.0.0"

在RxJava中,Single可以描述只發送一次事件值的事件,如一次網路請求,它有2個可訂閱介面,onSuccess和onError,在任何一個介面觸發後會自動解除訂閱關係。 ``` //建立Single Single single = Single.create(emitter -> { emitter.onSuccess("ok"); });

//訂閱single single.subscribe(System.out::println); ```

多個Single序列執行

首先建立多個Single ``` public Single task1() { return Single.create((SingleOnSubscribe) em -> { em.onSuccess("task-1"); }).doOnSuccess(s -> { System.out.println("發射 ->" + s); }); }

public Single task2() { return Single.create((SingleOnSubscribe) em -> { em.onSuccess("task-2"); }).doOnSuccess(s -> { System.out.println("發射 ->" + s); }); } 宣告多個Single任務 Single single1 = task1(); Single single2 = task2(); 工作執行緒,通常直接使用Schedulers.io()```,這裡用一個執行緒池,方便在一個執行緒中列印日誌

Executor executor = Executors.newSingleThreadExecutor();

場景一:序列執行並接收每個結果

Single.concat(single1, single2) .subscribeOn(Schedulers.from(executors)) .subscribe(s -> { System.out.println("接收到->" + s); }); 執行結果:

發射 ->task-1 接收到->task-1 發射 ->task-2 接收到->task-2

場景二:根據前一個single的結果,判斷是否執行後一個

### 如先從資料庫快取中讀取資料,如果沒有再從網路進行請求 Single.concat(single1, single2) .subscribeOn(Schedulers.from(executors)) .takeUntil(s -> { return !"".equals(s); }) .filter(s -> { return !"".equals(s); }) .subscribe(s -> { System.out.println("接收到->" + s); });

執行結果: 發射 ->task-1 接收到->task-1 這裡使用takeUntilfilter操作符實現條件判斷和過濾不需要的請求結果。

takeUntil在條件返回true時,不再執行下一個Single,並直接返回結果;如果takeUntil條件返回false,其發射的值在filter操作符中被過濾掉,並繼續執行下一個Single。

提供預設返回值和異常處理

如果多個single的執行結果都被過濾,則訂閱者將收不到任何值,這時可以提供一個預設返回值;

使用single()操作符提供結果為空時的預設返回值

使用onErrorReturn()操作符處理異常

Single.concat(single1, single2) .subscribeOn(Schedulers.from(executors)) .takeUntil(s -> { return false;//此處測試全都執行 }) .filter(s -> { return true;//此處測試全都不滿足過濾條件的情況 }) .single("default-value") .onErrorReturn(Throwable::getLocalizedMessage) .subscribe(s -> { System.out.println("接收到->" + s); }); 執行結果 發射 ->task-1 發射 ->task-2 接收到->default

阻塞的方式實現序列 (不建議使用)

使用blockingGet阻塞工作執行緒,該方式會使執行緒阻塞而非掛起,線上程完成前系統其他資源無法使用該執行緒。

這裡很像Kotlin中的協程,但是協程是非阻塞的。

String str1 = single1.subscribeOn(Schedulers.from(executors)).blockingGet(); if (!"success".equals(str1)) { single2.subscribeOn(Schedulers.from(executors)) .subscribe(System.out::println); } 執行結果:

``` 發射 ->task-1 發射 ->task-2 task-2

```