RxJava裝飾者模式

語言: CN / TW / HK

theme: cyanosis


開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第33天,點擊查看活動詳情

1.裝飾者模式

  • 裝飾者模式時在保留原有結構的前提下添加新的功能,這些功能作為其原有結構的包裝。

2.RxJava的裝飾者模式

1.被觀察者Observable

  • 根據Observerable的源碼可知Observable的結構接口是Observerablesource<T>,裏面有一個方法subscribe用於和觀察者實現訂閲,源碼如下

``` /* * Represents a basic, non-backpressured {@link Observable} source base interface, * consumable via an {@link Observer}. * * @param the element type * @since 2.0 / public interface ObservableSource {

/**
 * Subscribes the given Observer to this ObservableSource instance.
 * @param observer the Observer, not null
 * @throws NullPointerException if {@code observer} is null
 */
void subscribe(Observer<? super T> observer);

} ```

  • 然後需要一個包裝類,就是實現ObservableSource接口的類,就是Observable<T>,它實現了ObservableSource並在subscribe方法中調用了subscribeActual方法與觀察者實現訂閲關係,源碼如下

``` public abstract class Observable implements ObservableSource { @Override public final void subscribe(Observer<? super T> observer) { ... subscribeActual(observer); ... }

protected abstract void subscribeActual(Observer<? super T> observer);

} ```

  • 第三步就是包裝類了,包裝類有很多有一百多個,如ObservableAllObservableAnyObservableCache

2.觀察者Observer

  • 第一步,Observer的結構的接口有EmitterObserver,兩個接口中的方法差不多,都是onNextOnErrorOnComplete,用於被觀察者的回調
  • 第二步,實現Emitter或者Observer接口的包裝類,觀察者中沒有實現這兩個接口的基礎包裝類,而是直接封裝了很多包裝類

3.被觀察者和觀察者的包裝類有在創建的時候進行包裝也有在調用的時候包裝,那麼他們的結構又是怎麼樣的

以RxJava的最基礎用法來分析,Observable.create().subscribeOn().observeOn().subscribe()為例,層層調用後它的結構如下:

  • 首先是Observable.create,通過創建ObservableCreate對象進行第一層包裝,把ObservableOnSubscribe包在了裏面

  • 然後是Observable.create().subscribeOn(),調用時又進行了一層包裝,把ObservableCreate包進去了

  • 再然後就分別是observeOn()了,結構如下

  • 總共進行了4層包裝,可以理解為每調用一次操作符就會進行一層被觀察者的包裝,這樣包裝的好處就是為了添加額外的功能,那麼每一層又添加了哪些額外的功能呢

4.被觀察者的subscribe方法

調用subscribe方法後會從最外層的包裝類一步一步的往裏面調用,從被觀察者的subscribe方法中可以得知額外功能的實現是在subscribeActual方法中,那麼上面幾層包裝的subscribeActual方法中又做了什麼呢,分析如下

  • 先看最外層的包裝observerOnsubscribeActual方法做了什麼,先看源碼:

``` public final class ObservableObserveOn extends AbstractObservableWithUpstream { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; }

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
...

} ```

  • 源碼中有一個source,這個source是上一層包裝類的實例,在source.subscribe()中對觀察者進行了一層包裝,也就是ObserveOnObserver,它在onNext方法裏面實現了線程切換,這個onNext是在被觀察者在通知觀察者時會被回調,然後通過包裝類實現額外的線程切換,這裏是切換到了主線程執行。此時觀察者的結構如下:

``` @Override public void onNext(T t) { if (done) { return; }

if (sourceMode != QueueDisposable.ASYNC) {
    queue.offer(t);
}
schedule();

} ```

  • 再看下一層的包裝subscribeOnsubscribeActual方法做了什麼,先看源碼

``` public final class ObservableSubscribeOn extends AbstractObservableWithUpstream { final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
    super(source);
    this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));
}
...

} ```

這裏又對觀察者進行了一層包裝,也就是SubscribeOnObserver,這裏面的額外功能就是資源釋放,包裝完後的結構如下

``` static final class SubscribeOnObserver extends AtomicReference implements Observer, Disposable {

    private static final long serialVersionUID = 8094547886072529208L;

        ...

    @Override
    public void dispose() {
        DisposableHelper.dispose(s);
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }

    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }
}

```

subscribeActual方法中有一個調用是source.subscribe(parent),這個source就是它的上一層的包裝類ObservableCreate,那麼ObservableCreatesubscribeActual方法就會在子線程執行。

ObservableCreatesubscribeActual方法做了什麼,先看源碼

``` public final class ObservableCreate extends Observable { final ObservableOnSubscribe source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
    ...

} ```

源碼中的source就是創建最原始的ObservableOnSubscribe,這裏會回調到ObservableOnSubscribesubscribe方法,在subscribeActual方法中又對觀察者進行了一層包裝也就是CreateEmitter,這個類裏面做的事情是判斷線程是否被釋放,如果釋放了則不再進行回調,這時候結構如下圖

@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } }

這裏由於上面的包裝類已經切換到了子線程所以ObservableOnSubscribesubscribe方法的執行也是在子線程;

3.總結

在創建被觀察者的時候會對被觀察者進行層層的包裝,創建幾次就包裝幾次,然後在被觀察者調用subscribe方法時,一層層回調被觀察者的subscribeActual方法,而在被觀察者subscribeActual方法中會對觀察者做一層包裝。也就是説被觀察者是創建的時候包裝,在subscribeActual方法中實現額外的功能,觀察者是在被觀察者調用subscribeActual方法時進行包裝的,然後針對觀察者實現自己的額外的功能,流程圖如下:

最終的結構如下:

  • 第一步:創建被觀察者時或者使用操作符時會對被觀察者進行包裝

  • 第二步:當被觀察者和觀察者產生訂閲關係後,被觀察者會一層層的回調被觀察者的subscribeActual方法,在這個方法中對觀察者進行包裝,此時被觀察者的功能實現是在subscribeActual中,觀察者的實現是在包裝類裏

  • 第三步:被觀察者和觀察者不同的是,被觀察者是在訂閲成功後就執行了包裝類相應的功能,而觀察者是在事件回調的時候,會在觀察者的包裝類裏實現相應的功能
  • 最終流程圖