RxJava裝飾者模式

語言: CN / TW / HK

theme: cyanosis


開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第33天,點選檢視活動詳情

1.裝飾者模式

  • 裝飾者模式時在保留原有結構的前提下新增新的功能,這些功能作為其原有結構的包裝。

2.RxJava的裝飾者模式

1.被觀察者Observable

  • 根據Observerable的原始碼可知Observable的結構介面是Observerablesource<T>,裡面有一個方法subscribe用於和觀察者實現訂閱,原始碼如下

``` /* * Represents a basic, non-backpressured {@link Observable} source base interface, * consumable via an {@link Observer}. * * @param the element type * @since 2.0 / public interface ObservableSource {

/**
 * Subscribes the given Observer to this ObservableSource instance.
 * @param observer the Observer, not null
 * @throws NullPointerException if {@code observer} is null
 */
void subscribe(Observer<? super T> observer);

} ```

  • 然後需要一個包裝類,就是實現ObservableSource介面的類,就是Observable<T>,它實現了ObservableSource並在subscribe方法中呼叫了subscribeActual方法與觀察者實現訂閱關係,原始碼如下

``` public abstract class Observable implements ObservableSource { @Override public final void subscribe(Observer<? super T> observer) { ... subscribeActual(observer); ... }

protected abstract void subscribeActual(Observer<? super T> observer);

} ```

  • 第三步就是包裝類了,包裝類有很多有一百多個,如ObservableAllObservableAnyObservableCache

2.觀察者Observer

  • 第一步,Observer的結構的介面有EmitterObserver,兩個介面中的方法差不多,都是onNextOnErrorOnComplete,用於被觀察者的回撥
  • 第二步,實現Emitter或者Observer介面的包裝類,觀察者中沒有實現這兩個介面的基礎包裝類,而是直接封裝了很多包裝類

3.被觀察者和觀察者的包裝類有在建立的時候進行包裝也有在呼叫的時候包裝,那麼他們的結構又是怎麼樣的

以RxJava的最基礎用法來分析,Observable.create().subscribeOn().observeOn().subscribe()為例,層層呼叫後它的結構如下:

  • 首先是Observable.create,通過建立ObservableCreate物件進行第一層包裝,把ObservableOnSubscribe包在了裡面

  • 然後是Observable.create().subscribeOn(),呼叫時又進行了一層包裝,把ObservableCreate包進去了

  • 再然後就分別是observeOn()了,結構如下

  • 總共進行了4層包裝,可以理解為每呼叫一次操作符就會進行一層被觀察者的包裝,這樣包裝的好處就是為了新增額外的功能,那麼每一層又添加了哪些額外的功能呢

4.被觀察者的subscribe方法

呼叫subscribe方法後會從最外層的包裝類一步一步的往裡面呼叫,從被觀察者的subscribe方法中可以得知額外功能的實現是在subscribeActual方法中,那麼上面幾層包裝的subscribeActual方法中又做了什麼呢,分析如下

  • 先看最外層的包裝observerOnsubscribeActual方法做了什麼,先看原始碼:

``` public final class ObservableObserveOn extends AbstractObservableWithUpstream { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; }

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
...

} ```

  • 原始碼中有一個source,這個source是上一層包裝類的例項,在source.subscribe()中對觀察者進行了一層包裝,也就是ObserveOnObserver,它在onNext方法裡面實現了執行緒切換,這個onNext是在被觀察者在通知觀察者時會被回撥,然後通過包裝類實現額外的執行緒切換,這裡是切換到了主執行緒執行。此時觀察者的結構如下:

``` @Override public void onNext(T t) { if (done) { return; }

if (sourceMode != QueueDisposable.ASYNC) {
    queue.offer(t);
}
schedule();

} ```

  • 再看下一層的包裝subscribeOnsubscribeActual方法做了什麼,先看原始碼

``` public final class ObservableSubscribeOn extends AbstractObservableWithUpstream { final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
    super(source);
    this.scheduler = scheduler;
}

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));
}
...

} ```

這裡又對觀察者進行了一層包裝,也就是SubscribeOnObserver,這裡面的額外功能就是資源釋放,包裝完後的結構如下

``` static final class SubscribeOnObserver extends AtomicReference implements Observer, Disposable {

    private static final long serialVersionUID = 8094547886072529208L;

        ...

    @Override
    public void dispose() {
        DisposableHelper.dispose(s);
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }

    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }
}

```

subscribeActual方法中有一個呼叫是source.subscribe(parent),這個source就是它的上一層的包裝類ObservableCreate,那麼ObservableCreatesubscribeActual方法就會在子執行緒執行。

ObservableCreatesubscribeActual方法做了什麼,先看原始碼

``` public final class ObservableCreate extends Observable { final ObservableOnSubscribe source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
    ...

} ```

原始碼中的source就是建立最原始的ObservableOnSubscribe,這裡會回撥到ObservableOnSubscribesubscribe方法,在subscribeActual方法中又對觀察者進行了一層包裝也就是CreateEmitter,這個類裡面做的事情是判斷執行緒是否被釋放,如果釋放了則不再進行回撥,這時候結構如下圖

@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } }

這裡由於上面的包裝類已經切換到了子執行緒所以ObservableOnSubscribesubscribe方法的執行也是在子執行緒;

3.總結

在建立被觀察者的時候會對被觀察者進行層層的包裝,建立幾次就包裝幾次,然後在被觀察者呼叫subscribe方法時,一層層回撥被觀察者的subscribeActual方法,而在被觀察者subscribeActual方法中會對觀察者做一層包裝。也就是說被觀察者是建立的時候包裝,在subscribeActual方法中實現額外的功能,觀察者是在被觀察者呼叫subscribeActual方法時進行包裝的,然後針對觀察者實現自己的額外的功能,流程圖如下:

最終的結構如下:

  • 第一步:建立被觀察者時或者使用操作符時會對被觀察者進行包裝

  • 第二步:當被觀察者和觀察者產生訂閱關係後,被觀察者會一層層的回撥被觀察者的subscribeActual方法,在這個方法中對觀察者進行包裝,此時被觀察者的功能實現是在subscribeActual中,觀察者的實現是在包裝類裡

  • 第三步:被觀察者和觀察者不同的是,被觀察者是在訂閱成功後就執行了包裝類相應的功能,而觀察者是在事件回撥的時候,會在觀察者的包裝類裡實現相應的功能
  • 最終流程圖