RxJava的執行緒切換

語言: CN / TW / HK

RxJava 執行緒切換

前言

在上篇文章對RxJava 的工作流程進行的簡單的分析,今天來分享一下執行緒切換的流程。如果覺得原始碼枯燥可以直接移至文末看圖理解。

例項程式碼

Observable.create(new ObservableOnSubscribe<Object>() {             @Override             public void subscribe(@NonNull ObservableEmitter<Object> emitter) {                 emitter.onNext("123");             }         })         .subscribeOn(Schedulers.io())         .observeOn(AndroidSchedulers.mainThread())         .subscribe(new Observer<Object>() {         .........         }

我們都subscribeOn 是切換上游執行緒,observeOn是切換下游環境,接下來我們就看下它是怎麼切換的。

我們先看下 Schedulers.io() 是什麼

@NonNull public static Scheduler io() {     return RxJavaPlugins.onIoScheduler(IO); } -> Scheduler IO = RxJavaPlugins.initIoScheduler(new IOTask());   -> static final class IOTask implements Supplier<Scheduler> {         @Override         public Scheduler get() {             return IoHolder.DEFAULT;         }     }   -> static final class IoHolder {         static final Scheduler DEFAULT = new IoScheduler();  }

我們通過層層分解,層層遞進,瞭解到 Schedulers.io() 最終返回的是一個 IoScheduler()

可以暫時將它理解為一個任務排程器,用來執行我們的任務。

接下來我們在看下 AndroidSchedulers.mainThread() 。

public static Scheduler mainThread() {     return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } ->  private static final Scheduler MAIN_THREAD =         RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT); ->  Scheduler DEFAULT  = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);

可以看到這裡返回的是一個 HandlerScheduler ,這裡是對Handler進行了一個封裝,所以歸根結底,向主執行緒切換任務還是通過handler 來完成的,接下來我們就看看其中的細枝末節。

兩個引數分析完了,然後來看下兩個操作符,看看它倆做了些什麼事情。

首先我們來看下subscribeOn

public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {     Objects.requireNonNull(scheduler, "scheduler is null");     return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler)); }

這裡是對 IoScheduler() 和上游封裝的包裹進行的二次封裝

上游的包裹:這裡指的通過create建立的 ObservableCreate

observeO操作符:

public final Observable<T> observeOn(@NonNull Scheduler scheduler) {     return observeOn(scheduler, false, bufferSize()); } ->  public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {         Objects.requireNonNull(scheduler, "scheduler is null");         ObjectHelper.verifyPositive(bufferSize, "bufferSize");         return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));     }

在這裡是對 HandlerScheduler 和 上游封裝的包裹進行的二次封裝

上游的包裹:這裡指的是通過subscribeOn 操作符 建立的 ObservableSubscribeOn

然後我們開始看訂閱這裡了,我們的流程從 subscribe 這裡才剛剛開始。

經過上一篇文章的分析,我們可以知道呼叫的 subscribeActual 方法都是在上游操作符建立的封裝物件裡,所以我們直接看 ObservableObserveOn 的 subscribeActual 方法。

如果感覺這段講解有些跳躍,可以先看一下上篇文章《淺析RxJava》

@Override     protected void subscribeActual(Observer<? super T> observer) {          .........          Scheduler.Worker w = scheduler.createWorker();          source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));     }

在這裡是將我們自定義的 Observer 封裝成 ObserveOnObserver ,這裡的source 是我們上游的封裝的包裹,這裡指的就是通過subscribeOn 操作符建立的 ObservableSubscribeOn。最終會呼叫到 ObservableSubscribeOn 的 subscribeActual

方法。

@Override public void subscribeActual(final Observer<? super T> observer) {     final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);     observer.onSubscribe(parent);     parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }

這裡是將下游傳上來的 ObserveOnObserver 再次進行封裝,封裝成 SubscribeOnObserver ,然後再將 SubscribeOnObserver 封裝成 SubscribeTask,其實就是一個Runnable。

流程我們稍後再進行分析,我們先來看看任務是非同步任務是怎麼切換的。

通過上文分析得知,此處的 scheduler 為 subscribeOn 操作符傳入的引數,也就是 IoScheduler() 。

接下來我們再看 scheduler的 scheduleDirect 方法。

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {     final Worker w = createWorker();     final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);     DisposeTask task = new DisposeTask(decoratedRun, w);     w.schedule(task, delay, unit);     return task; }

在這裡是通過 createWorker() 建立了一個 Worker ,由這個 Worker 去執行 具體的任務。

createWork()是抽象方法,我們需要看IoScheduler 的具體實現。

``` public Worker createWorker() {     return new EventLoopWorker(pool.get()); } ->  public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {  ....   return threadWorker.scheduleActual(action, delayTime, unit, tasks);  } ->   NewThreadWorker類   public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);            .........            if (delayTime <= 0) {                 f = executor.submit((Callable)sr);             } else {                 f = executor.schedule((Callable)sr, delayTime, unit);             }             sr.setFuture(f);         return sr;     }   ```

經過層層挖掘,我們看到任務最後是通過 executor 來執行的,executor 就是內部維護的執行緒池

private final ScheduledExecutorService executor;

至此,整個工作流 就切換為了子執行緒來工作。

接下來我們繼續分析封裝的SubscribeTask

final class SubscribeTask implements Runnable {     private final SubscribeOnObserver<T> parent;     SubscribeTask(SubscribeOnObserver<T> parent) {         this.parent = parent;     }     @Override     public void run() {         source.subscribe(parent);     } }

在任務執行的時候,會通過source 繼續將 SubscribeOnObserver向上遊傳送。這裡的source 指的是create 建立的ObservableCreate ,source.subscribe 就會直接呼叫到 ObservableCreate 的 subscribeActual

@Override protected void subscribeActual(Observer<? super T> observer) {     CreateEmitter<T> parent = new CreateEmitter<>(observer);     observer.onSubscribe(parent);     try {         source.subscribe(parent);     } catch (Throwable ex) {         Exceptions.throwIfFatal(ex);         parent.onError(ex);     } }

一直到這裡,跟我們在上篇文章分析的流程就一樣了。

將下游傳過來的SubscribeOnObserver 再次封裝成 CreateEmitter 發射器,然後通過source 繼續向上傳遞,這裡的souce 就是指的是我們在create 中傳遞進去的ObservableOnSubscribe。

然後在ObservableOnSubscribe 的 subscribe 中,通過 emitter.onNext 將我們的資料開始進行下發。

ObservableEmitter @Override public void onNext(T t) {     ........     if (!isDisposed()) {         observer.onNext(t);     } }

這裡的observer 是SubscribeOnObserver

SubscribeOnObserver @Override public void onNext(T t) {     downstream.onNext(t); }

這裡的downstream 是指 ObservableObserveOn

ObserveOnObserver @Override public void onNext(T t) {    ......     if (sourceMode != QueueDisposable.ASYNC) {      queue.offer(t);    }       schedule(); }

這裡的 sourceMode 未被賦值,會呼叫 queue.offer(t) ,將資料放入到佇列中。

接下來再看 schedule() 做了些什麼 ?

void schedule() {     if (getAndIncrement() == 0) {         worker.schedule(this);     } }

通過上文的分析,我們可以知道 observeOn 操作符建立的 Scheduler 為 HandlerScheduler ,所以這裡的 worker.schedule(this) 方法呼叫的是 HandlerScheduler 的內部靜態子類 HandlerWorker 的 schedule 方法。

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {     ...........     if (disposed) {         return Disposable.disposed();     }     run = RxJavaPlugins.onSchedule(run);     ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);     Message message = Message.obtain(handler, scheduled);     message.obj = this;     if (async) {         message.setAsynchronous(true);     }     handler.sendMessageDelayed(message, unit.toMillis(delay));     if (disposed) {         handler.removeCallbacks(scheduled);         return Disposable.disposed();     }     return scheduled; }

最終是在這裡通過Handler將任務切換到了主執行緒執行。

ObserveOnObserver 類實現了Runnable 介面, worker.schedule(this) 是將自身交給Handler 去執行。所以最終的結果還會由 ObserveOnObserver 的run方法來執行。

public void run() {     if (outputFused) {         drainFused();     } else {         drainNormal();     } }

這裡我們是典型的使用方式,我們直接來看下 drainNormal();

void drainNormal() {     final SimpleQueue<T> q = queue;     final Observer<? super T> a = downstream;     for (;;) {         ............         for (;;) {             T v = q.poll();             a.onNext(v);         }         ..............     } }

在這裡 將資料從佇列中取出,然後呼叫下游的 onNext ,這裡的 downstream 也就是我們最後自定義的觀察者 Observer 了。

整個過程也好比是一個封包裹和拆包裹的過程。用洋蔥模型表示一下會更加的形象。

image-20220618111944299

最後上圖!

image-20220618114225467

可能文字的敘述還是太抽象, 用這樣一張圖來表示整個流程可能相對好理解一些。

寫在最後

紙上得來終覺淺,絕知此事要躬行。如果有時間還是建議自己跟一遍原始碼流程,這樣才能真正理解。

今天就水到這裡,希望對大家有所幫助。