RxJava裝飾者模式
theme: cyanosis
開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第33天,點選檢視活動詳情
1.裝飾者模式
- 裝飾者模式時在保留原有結構的前提下新增新的功能,這些功能作為其原有結構的包裝。
2.RxJava的裝飾者模式
1.被觀察者Observable
- 根據
Observerable
的原始碼可知Observable
的結構介面是Observerablesource<T>
,裡面有一個方法subscribe
用於和觀察者實現訂閱,原始碼如下
```
/*
* Represents a basic, non-backpressured {@link Observable} source base interface,
* consumable via an {@link Observer}.
*
* @param
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(Observer<? super T> observer);
} ```
- 然後需要一個包裝類,就是實現
ObservableSource
介面的類,就是Observable<T>
,它實現了ObservableSource
並在subscribe方法中呼叫了subscribeActual
方法與觀察者實現訂閱關係,原始碼如下
```
public abstract class Observable
protected abstract void subscribeActual(Observer<? super T> observer);
} ```
- 第三步就是包裝類了,包裝類有很多有一百多個,如
ObservableAll
、ObservableAny
、ObservableCache
2.觀察者Observer
- 第一步,
Observer
的結構的介面有Emitter
和Observer
,兩個介面中的方法差不多,都是onNext
、OnError
、OnComplete
,用於被觀察者的回撥 - 第二步,實現
Emitter
或者Observer
介面的包裝類,觀察者中沒有實現這兩個介面的基礎包裝類,而是直接封裝了很多包裝類
3.被觀察者和觀察者的包裝類有在建立的時候進行包裝也有在呼叫的時候包裝,那麼他們的結構又是怎麼樣的
以RxJava的最基礎用法來分析,Observable.create().subscribeOn().observeOn().subscribe()
為例,層層呼叫後它的結構如下:
- 首先是
Observable.create
,通過建立ObservableCreate
物件進行第一層包裝,把ObservableOnSubscribe
包在了裡面
- 然後是
Observable.create().subscribeOn()
,呼叫時又進行了一層包裝,把ObservableCreate包進去了
- 再然後就分別是
observeOn()
了,結構如下
- 總共進行了4層包裝,可以理解為每呼叫一次操作符就會進行一層被觀察者的包裝,這樣包裝的好處就是為了新增額外的功能,那麼每一層又添加了哪些額外的功能呢
4.被觀察者的subscribe
方法
呼叫subscribe
方法後會從最外層的包裝類一步一步的往裡面呼叫,從被觀察者的subscribe
方法中可以得知額外功能的實現是在subscribeActual
方法中,那麼上面幾層包裝的subscribeActual
方法中又做了什麼呢,分析如下
- 先看最外層的包裝
observerOn
的subscribeActual
方法做了什麼,先看原始碼:
```
public final class ObservableObserveOn
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
} ```
- 原始碼中有一個
source
,這個source
是上一層包裝類的例項,在source.subscribe()
中對觀察者進行了一層包裝,也就是ObserveOnObserver
,它在onNext
方法裡面實現了執行緒切換,這個onNext
是在被觀察者在通知觀察者時會被回撥,然後通過包裝類實現額外的執行緒切換,這裡是切換到了主執行緒執行。此時觀察者的結構如下:
``` @Override public void onNext(T t) { if (done) { return; }
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
} ```
- 再看下一層的包裝
subscribeOn
的subscribeActual
方法做了什麼,先看原始碼
```
public final class ObservableSubscribeOn
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
...
} ```
這裡又對觀察者進行了一層包裝,也就是SubscribeOnObserver
,這裡面的額外功能就是資源釋放,包裝完後的結構如下
```
static final class SubscribeOnObserver
private static final long serialVersionUID = 8094547886072529208L;
...
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
```
在subscribeActual
方法中有一個呼叫是source.subscribe(parent)
,這個source
就是它的上一層的包裝類ObservableCreate
,那麼ObservableCreate
的subscribeActual
方法就會在子執行緒執行。
ObservableCreate
的subscribeActual
方法做了什麼,先看原始碼
```
public final class ObservableCreate
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
...
} ```
原始碼中的source
就是建立最原始的ObservableOnSubscribe
,這裡會回撥到ObservableOnSubscribe
的subscribe方法
,在subscribeActual
方法中又對觀察者進行了一層包裝也就是CreateEmitter
,這個類裡面做的事情是判斷執行緒是否被釋放,如果釋放了則不再進行回撥,這時候結構如下圖
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
這裡由於上面的包裝類已經切換到了子執行緒所以ObservableOnSubscribe
的subscribe
方法的執行也是在子執行緒;
3.總結
在建立被觀察者的時候會對被觀察者進行層層的包裝,建立幾次就包裝幾次,然後在被觀察者呼叫subscribe
方法時,一層層回撥被觀察者的subscribeActual
方法,而在被觀察者subscribeActual
方法中會對觀察者做一層包裝。也就是說被觀察者是建立的時候包裝,在subscribeActual
方法中實現額外的功能,觀察者是在被觀察者呼叫subscribeActual
方法時進行包裝的,然後針對觀察者實現自己的額外的功能,流程圖如下:
最終的結構如下:
- 第一步:建立被觀察者時或者使用操作符時會對被觀察者進行包裝
- 第二步:當被觀察者和觀察者產生訂閱關係後,被觀察者會一層層的回撥被觀察者的
subscribeActual
方法,在這個方法中對觀察者進行包裝,此時被觀察者的功能實現是在subscribeActual
中,觀察者的實現是在包裝類裡
- 第三步:被觀察者和觀察者不同的是,被觀察者是在訂閱成功後就執行了包裝類相應的功能,而觀察者是在事件回撥的時候,會在觀察者的包裝類裡實現相應的功能
- 最終流程圖