淺析RxJava

語言: CN / TW / HK

前言

今天給大家分享一些RxJava的小知識,由淺入深了地解一下它的工作流程。

操作符:

我們先看下幾個操作符的簡單使用

map:資料轉換

Observable.just(path)                 .map(new Function<String, Integer>() {                     @Override                     public Integer apply(String s) {                         return 1;                     }                 })                 .subscribe(new Observer<Integer>() {                     @Override                     public void onSubscribe(@NonNull Disposable d) {                         System.out.println("onSubscribe:" + d.toString());                     }

Compose」使用自定義操作符:

定義 public static <UD> ObservableTransformer<UD, UD> redux() {     return new ObservableTransformer<UD, UD>() {         @Override         public @NonNull ObservableSource<UD> apply(@NonNull Observable<UD> upstream) {             return upstream.map(new Function<UD, UD>() {                 @Override                 public UD apply(UD ud) throws Throwable {                     System.out.println("監聽到了");                     return ud;                 }             });         }     }; } 使用      Observable.just(path)                 .map(new Function<String, Integer>() {                     @Override                     public Integer apply(String s) {                         System.out.println("map");                         return 1;                     }                 })                 .compose(redux()) //使用自定義操作符                 .subscribe(new Observer<Integer>() {                     @Override                     public void onSubscribe(@NonNull Disposable d) {                         System.out.println("onSubscribe:");                     }

flatMap 轉換

map可以一對一轉換為任何資料,flatMap 只能轉換為ObservableSource型別 ,可以一對一,一對多,多對多轉換。

Observable.just(list)         //遍歷集合         .flatMap(new Function<List<String>, ObservableSource<String>>() {             @Override             public ObservableSource<String> apply(List<String> integer) {                 return Observable.fromIterable(integer);             }         })         .subscribe(new Observer<String>() {             @Override             public void onSubscribe(@NonNull Disposable d) {                 System.out.println("onSubscribe:");             }                    // 資料變換         Observable.just(path)                 .map(new Function<String, String>() {                     @Override                     public String apply(String s) throws Throwable {                         return s + "測試";                     }                 })                 .flatMap(new Function<String, ObservableSource<String>>() {                     @Override                     public ObservableSource<String> apply(String s) throws Throwable {                         System.out.println("flatMap"+s);                         return Observable.just("1064902354") ;                     }                 })                 .subscribe(new Observer<String>() {                     @Override                     public void onSubscribe(@NonNull Disposable d) {                         System.out.println("onSubscribe:");                     }

流程分析

```            //起點 (Observable) Observable.create(new ObservableOnSubscribe() {     @Override     public void subscribe(@NonNull ObservableEmitter emitter) {         emitter.onNext("測試");     } })         .subscribe(                 //終點 (Observer)                 new Observer() {             @Override             public void onSubscribe(@NonNull Disposable d) {                 System.out.println("onSubscribe:");             }

@Override             public void onNext(@NonNull String integer) {                 System.out.println("onNext:" + integer);             }

@Override             public void onError(@NonNull Throwable e) {                 System.out.println("onError:" + e.toString());             }

@Override             public void onComplete() {                 System.out.println("onComplete");             }         }); ```

Observable當做起點,將 Observer 當做終點,以最簡潔的使用方式為背景,我們來分析一下 RxJava最基礎的工作流程。

我們先看create

public static <@NonNull T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {     Objects.requireNonNull(source, "source is null");     return RxJavaPlugins.onAssembly(new ObservableCreate<>(source)); }

RxJavaPlugins.onAssembly 是每個操作符都會涉及到的一個流程,這裡是Rxjava留給使用者的一個Hook點,可以通過設定 onObservableAssembly ,可以對每個操作符進行hook監聽。

在create中,以我們自定義的source 為引數 進行了 ObservableCreate 的建立。稍後我們會對ObservableCreate 原始碼進行分析。

接下來看看終點

public interface Observer<@NonNull T> {     void onSubscribe(@NonNull Disposable d);     void onNext(@NonNull T t);     void onError(@NonNull Throwable e);     void onComplete(); }

只是一個介面,提供了四個方法。

起點和終點搞清楚了,我們接下來看下訂閱方法。

public final void subscribe(@NonNull Observer<? super T> observer) {     Objects.requireNonNull(observer, "observer is null");     try {         observer = RxJavaPlugins.onSubscribe(this, observer);         Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");         //最終呼叫的是這裡         subscribeActual(observer);     } catch (NullPointerException e) { // NOPMD         throw e;     } catch (Throwable e) {         Exceptions.throwIfFatal(e);         RxJavaPlugins.onError(e);         NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");         npe.initCause(e);         throw npe;     } }

protected abstract void subscribeActual(@NonNull Observer<? super T> observer);

這是一個抽象方法,所以實際呼叫的方法應該在子類中。這裡就是由create的建立的ObservableCreate

@Override protected void subscribeActual(Observer<? super T> observer) {     CreateEmitter<T> parent = new CreateEmitter<>(observer);     observer.onSubscribe(parent);     try {         source.subscribe(parent);     } catch (Throwable ex) {         Exceptions.throwIfFatal(ex);         parent.onError(ex);     } }

這裡首先將observer 進行一層封裝,封裝成發射器 CreateEmitter ,然後呼叫的 observer 的 onSubscribe 方法,這就是為什麼我們每次進行訂閱的時候首先會回撥 onSubscribe。

這個方法裡面的source 就是我們我們在起點 呼叫create 方法時 傳入的ObservableOnSubscribe

Observable.create(new ObservableOnSubscribe<String>() {         @Override         public void subscribe(@NonNull ObservableEmitter<String> emitter) {             emitter.onNext("測試");         }      }) ------------------------------------------------------------------------------           final ObservableOnSubscribe<T> source;     public ObservableCreate(ObservableOnSubscribe<T> source) {         this.source = source;     }

然後source 呼叫subscribe(parent) 方法,將發射器 CreateEmitter 會調到我們的介面中。

在起點的 subscribe 方法中 ,通過呼叫發射器的 onNext 方法 進行資料傳送,我們看下 emitter.onNext() 做了一些什麼事情。

``` final Observer<? super T> observer;

CreateEmitter(Observer<? super T> observer) {     this.observer = observer; }

@Override public void onNext(T t) {     if (t == null) {         onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));         return;     }     if (!isDisposed()) {         observer.onNext(t);     } } ```

在這裡是對資料進行判斷,如果資料正常,呼叫observer的onNext方法,也就是終點的onNext的方法。如果有異常會執行onError.

這樣我們最基本的流程就可以串起來了。

image-20220616155852536

接下來,我們再分析一個map 操作符

Observable.create(new ObservableOnSubscribe<String>() {     @Override     public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {         emitter.onNext("測試1");     } })         .map(new Function<String, String>() {             @Override             public String apply(String s) throws Throwable {                 return "測試1";             }         })         .subscribe(...)

我們來看原始碼

public final <@NonNull R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {     Objects.requireNonNull(mapper, "mapper is null");     return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper)); }

相同的配方,相同的味道,這裡也是進行了一層封裝,但是這裡封裝了兩個東西,一個是this,一個是我們傳入的介面。然後將封裝的包裹進行返回。

那this 是什麼呢?this就是整個上游,也就是通過create建立的ObservableCreate

接下來我們看訂閱 subscribe(...) ,它是由map操作符返回的 Observable 執行的。我們來詳細看下內部方法。

@Override public void subscribeActual(Observer<? super U> t) {     source.subscribe(new MapObserver<T, U>(t, function)); }

這裡是對我們的終點(觀察者) 和我們在map操作符中自定義的Function 進行了一個封裝。

這裡的source 就是前面提到的this。

source 通過呼叫 subscribe 方法 ,就會走到 ObservableCreate 的 subscribeActual 方法中。

protected void subscribeActual(Observer<? super T> observer) {     CreateEmitter<T> parent = new CreateEmitter<>(observer);     observer.onSubscribe(parent);     try {         source.subscribe(parent);     } catch (Throwable ex) {         Exceptions.throwIfFatal(ex);         parent.onError(ex);     } }

後續的就跟上文的流程一樣了:

對 MapObserver 進行再次封裝,封裝成 CreateEmitter

->先呼叫 觀察者的 onSubscribe

->然後呼叫source ( ObservableOnSubscribe ) 的 subscribe 方法 ,到我們自定義的ObservableOnSubscribe 介面的方法中

->然後 發射器 CreateEmitter 呼叫 onNext 進行資料傳輸

在上文講到這個步驟 呼叫的onNext 的方法是 我們自定義的觀察者的onNext方法,但是這裡呼叫的onNext方法是 MapObserver 中的onNext 方法,因為 CreateEmitter 是對 MapObserver 的二次封裝。

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {     final Function<? super T, ? extends U> mapper;     //downstream == actual     MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {         super(actual);         this.mapper = mapper;     }          @Override     public void onNext(T t) {        ...............         U v;         try {             v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");         } catch (Throwable ex) {         ...............           }         downstream.onNext(v);     }

在 MapObserver 的onNext方法中,將上游傳入的引數經過mapper.apply 方法,也就是我們在map操作符中自定義的Function,進行資料轉換並返回轉換後的資料

//這個Function == mapper map(new Function<Object, Object>() {     @Override     public Object apply(Object o) throws Throwable {         return o;     } })

最後我們由 downstream 再將轉換後的資料分發下去。這裡的 downstream 就是我們的終點觀察者了。

最終的流程大概就是這個樣子

image-20220616224546729

寫在最後

RxJava工作流程的分析就暫時先到這裡了,希望能對大家有所幫助,如果有更好的見解,歡迎留言討論。