RxJava 系列九:執行緒切換的原始碼分析

語言: CN / TW / HK

本文概述:

  • 此文為RxJava 系列第九篇文章,為本系列難度最高,撰寫困難最多的一篇;文章以原始碼分析配合流程圖深入細緻分析了subscribeOn 原理;Schedulers.io() 到底幹了什麼?Schedulers 中的策略機制的具體實現;.subscribeOn():幹了什麼;任務與執行緒池是怎麼關聯起來的;ObservableObserveOn 原始碼分析

零碎問題

rxandroid 與 rxjava有什麼區別:為什麼要匯入rxandroid

  • 依賴匯入示意

    image-20220703142135694

  • 只有RxJava 沒有rxandroid是不全的:在android 中需要使用,兩個都要匯入進來

  • rxandroid 有什麼用:線上程切換時有用

    .observeOn(Schedulers.io()) // rxjava  .subscribeOn(AndroidSchedulers.mainThread()) // rxandroid

    • 原始碼示意:確實是從rxandroid 中匯入的

      image-20220703142842172

  • 在 Java 伺服器上就沒有必要匯入 rxandroid 了

中斷回收

  • 為什麼要使用中斷:不新增這個程式碼會報黃

    • 報黃示意:

      image-20220703143432160

    • 處理報黃:消極手段(在函式簽名前添加註解,進行鎮壓)

    • 處理報黃:因為沒有進行中斷處理,那麼接收中斷

      • 例項化中斷物件

      private Disposable disposable;

      • 接收中斷訊號

      圖片.png

  • 在訂閱函式內對disposable 進行賦值

    .subscribe(new Observer<String>() {      @Override      public void onSubscribe(Disposable d) {          disposable = d;     }

  • 在onDestroy 中進行回收:阻止事件繼續流動 ---> 解決OOM

    @Override  protected void onDestroy() {      super.onDestroy();  ​      // 最起碼的寫法      if (disposable != null)          if (!disposable.isDisposed())              disposable.dispose();  }

Just 與 Create操作符有什麼區別

  • 為什麼要學Create,不學其他的

    • Create 是一個最原始的東西,Just 等其他的都是封裝了的
  • Just 內部進行了封裝,不用手動呼叫onNext

    //Create 操作符  @Override  public void subscribe(ObservableEmitter<String> e) throws Exception {      e.onNext("Derry");      e.onNext("A");      e.onComplete();  }

  • Just 內部的封裝處理

    //new ObservableJust() ---> subscribeActual.run()  @Override  public void run() {      if (get() == START && compareAndSet(START, ON_NEXT)) {          observer.onNext(value);          if (get() == ON_NEXT) {              lazySet(ON_COMPLETE);              observer.onComplete();         }     }  }

subscribeOn 原理分析

整體流程概述:

  • 流程關係:

    • 將事件終點傳給訂閱處:.subscribe

    • 在訂閱處:

      • 執行抽象函式subscribeActual,跳轉到訂閱的具體實現類,也就是訂閱函式的實現類
    • 訂閱函式的實現類:

      • 抽象函式重寫:以事件終點作為引數
      • 將事件終點封裝成了包裹一
    • 將包裹一作為引數傳給任務:跳轉到

      • 實現了Runnable介面,在其run 方法中將包裹一作為引數,與執行緒池相關聯
    • 跳轉到 的實現類

      • 抽象函式重寫:以包裹一作為引數
      • 封裝了包裹二
    • 跳轉到 的呼叫者

      • 包裹二.onNext()
  • 示意圖:

    subscribeOn

基本程式碼準備:

  • subscribeOn 作用:為上面程式碼分配執行緒

  • 程式碼展示:onCreate 中

    Observable.create(      // 自定義source      new ObservableOnSubscribe<String>() {          @Override          public void subscribe(ObservableEmitter<String> e) throws Exception {              e.onNext("Derry");  ​              Log.d(L.TAG, "自定義source: " + Thread.currentThread().getName());         }     })  ​      // ObservbaleCreate.subscribeOn      // TODO 第二步     new IOScheduler ---> 執行緒池 傳遞進去     .subscribeOn(  ​      // TODO 第一步 到底幹了什麼 ( new IOScheduler ---> 執行緒池)      Schedulers.io() // 耗時讀取的非同步  ​      // Schedulers.newThread() // 開啟新執行緒  ​  )  ​      // A執行緒. subscribe      // ObservableSubscribeOn.subscribe     .subscribe(  ​      // 終點      new Observer<String>() {          @Override          public void onSubscribe(Disposable d) {  ​              Disposable disposable = d;              Log.d(L.TAG, "onSubscribe: " +  Thread.currentThread().getName());         }  ​          @Override          public void onNext(String s) {              Log.d(L.TAG, "onNext: " + Thread.currentThread().getName());         }  ​          @Override          public void onError(Throwable e) {  ​         }  ​          @Override          public void onComplete() {  ​         }     });

Schedulers.io() 到底幹了什麼?

  • 最終返回:Schedulers 物件

  • 進入原始碼:Hook 技術 + IO

    @NonNull  public static Scheduler io() {      return RxJavaPlugins.onIoScheduler(IO);  }

    • Hook技術:因為f 預設為null,直接就返回回去了

      @NonNull  public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {      Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;      if (f == null) {          return defaultScheduler;     }      return apply(f, defaultScheduler);  }

      • 補充自定義 Hook 技術
    • final 關鍵字:檢視 IO

      public final class Schedulers {       ……      @NonNull      static final Scheduler IO;

      • 例項化部分:new IOTask();

        static {   ……          //在例項化的時候,又來一個Hook點          IO = RxJavaPlugins.initIoScheduler(new IOTask());     }

  • 為什麼來這麼多Hook,向用戶提供攔截點

//初始化 IO Hook  @NonNull      public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {          ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");          Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;          if (f == null) {              return callRequireNonNull(defaultScheduler);         }          return applyRequireNonNull(f, defaultScheduler);     }

  • 使用者Hook 思路:對 IO 進行初始化

// TODO hook 給 IO 初始化  RxJavaPlugins.setInitIoSchedulerHandler(new Function<Callable<Scheduler>, Scheduler>() {      @Override      public Scheduler apply(Callable<Scheduler> schedulerCallable) throws Exception {          Log.d(L.TAG, "apply: 全域性 監聽 init scheduler:" +schedulerCallable.call());          return schedulerCallable.call();     }  });

Schedulers 中的策略機制的具體實現

  • 結果:例項化具體的策略,最終通過執行緒池管控,但此時沒有觸發

  • 示意圖:

    Schedulers.io

  • IO策略、NEW_THREAD 策略、COMPUTATION 策略

  • IO策略:為耗時讀取的 IO操作,提供非同步執行緒,採用IOTask() 進行處理(有返回值(IOScheduler)的非同步任務)

    • IO 策略:

    public final class Schedulers {   @NonNull      static final Scheduler IO;

    • IOTask()

    IO = RxJavaPlugins.initIoScheduler(new IOTask());

    • new IOTask():有返回值的非同步任務

    static final class IOTask implements Callable<Scheduler> {      @Override      public Scheduler call() throws Exception {          return IoHolder.DEFAULT;     }  }

    • 返回值為:IOScheduler() 物件

    static final class IoHolder {      static final Scheduler DEFAULT = new IoScheduler();  }

  • NEW_THREAD 策略:為頻繁開啟執行緒,提供非同步執行緒

    • 類屬性

    public final class Schedulers {      @NonNull      static final Scheduler NEW_THREAD;

    • 在靜態塊內初始化

    static{  NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());  }

    • 建立一個物件:new NewThreadTask()

    static final class NewThreadTask implements Callable<Scheduler> {      @Override      public Scheduler call() throws Exception {          return NewThreadHolder.DEFAULT;     }  }

    • 返回:NewThreadScheduler 例項

    static final class NewThreadHolder {      static final Scheduler DEFAULT = new NewThreadScheduler();  }

  • 是怎麼呼叫到執行緒池的:以IOScheduler() 為例

    • 原始碼展示:IoScheduler

    public IoScheduler() {      this(WORKER_THREAD_FACTORY);  }

    • 通過this 指標進入建構函式

    //存在工廠設計模式  public IoScheduler(ThreadFactory threadFactory) {      //為這個工廠取個名字      this.threadFactory = threadFactory;      this.pool = new AtomicReference<CachedWorkerPool>(NONE);      start();  }

    • 進入start() 方法:其中做了許多的優化(快取池)

    @Override  public void start() {      CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);      if (!pool.compareAndSet(NONE, update)) {          update.shutdown();     }  }

    • 進入shutdown:找到所使用的執行緒池例項

    void shutdown() {      allWorkers.dispose();            if (evictorTask != null) {          evictorTask.cancel(true);     }            //就是這個evictorService      if (evictorService != null) {          evictorService.shutdownNow();     }  }

    • 證明這個東西確實是執行緒池

    static final class CachedWorkerPool implements Runnable {      ……      private final ScheduledExecutorService evictorService;

    • 因為這個存在繼承關係

      ScheduledExecutorService extends ExecutorService

    • 執行緒池鐵證:

    if (unit != null) {      //在這個地方      evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);      task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);  }  evictorService = evictor;

.subscribeOn():幹了什麼

  • 由具體操作符物件呼叫,將具體策略傳給執行緒池,並進行封裝

@CheckReturnValue  @SchedulerSupport(SchedulerSupport.CUSTOM)  public final Observable<T> subscribeOn(Scheduler scheduler) {      ObjectHelper.requireNonNull(scheduler, "scheduler is null");      return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));  }

  • 進入new ObservableSubscribeOn

//入參scheduler:具體策略  public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {      super(source);      this.scheduler = scheduler;  }

  • 訂閱流程:而訂閱是由Schedulers.io() 返回的物件 ObservableSubscribeOn 呼叫的

    • 傳入事件終點

    • 進行校驗

    • 呼叫抽象函式,跳轉到其實現類(ObservableSubscribeOn )的具體實現處

      • 傳入了事件終點 s,打包

      • 將包裹交給new SubscribeTask(parent)

        • 一定是交給執行緒池執行的

        image-20220703154215523

      //s 是事件的終點  @Override  public void subscribeActual(final Observer<? super T> s) {      final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);  ​      s.onSubscribe(parent);  ​      parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));  }

任務與執行緒池是怎麼關聯起來的

  • 關鍵程式碼:ObservableSubscribeOn.subscribeActual()

    //c(執行緒池點出來的)  scheduler.scheduleDirect(new SubscribeTask(parent))

  • 觀察是怎麼將任務提交到執行緒池的:進入scheduleDirect

    public Disposable scheduleDirect(@NonNull Runnable run) {      return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);  }

    • 存在繼承關係:父類(Scheduler) ---> 子類(IoScheduler)
  • 檢視具體實現:進入scheduleDirect

    @NonNull  public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {      //交給子類去執行:避免破壞設計模式      final Worker w = createWorker();  ​      //又是Hook 技術:將Runnable包裝一層      final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);  ​      //又將Runnable包裝一層      DisposeTask task = new DisposeTask(decoratedRun, w);  ​      //將包裝好的Runnable交給w ---> 最終呼叫到EventLoopWorker.schedule      w.schedule(task, delay, unit);  ​      return task;  }

  • 觀察子類的具體實現邏輯:進入createWorker

    • 發現這是一個抽象函式:肯定了上述猜想,一定是交給子類實現的

      @NonNull  public abstract Worker createWorker();

  • 檢視子類實現:進入 IoScheduler.reateWorker()

    • 建立了物件EventLoopWorker

    public Worker createWorker() {      return new EventLoopWorker(pool.get());  }

  • 最終呼叫到執行緒池中:NewThreadWorker.scheduleActual()

    if (delayTime <= 0) {      f = executor.submit((Callable<Object>)sr);  } else {      f = executor.schedule((Callable<Object>)sr, delayTime, unit);  }  sr.setFuture(f);

    • 為什麼使用submit 不用excute

      • 都是執行具體任務,在於同步/非同步,是否有返回值
  • 執行緒池實現了分配執行緒::ObservableSubscribeOn.SubscribeTask()

    final class SubscribeTask implements Runnable {   ……      @Override      public void run() {          //執行緒池執行任務          source.subscribe(parent);     }  }

    • 因為採用執行緒池 ---> 此處為非同步執行緒,並且將任務提交後,後續流程會立即觸發,因此後面的均處在非同步執行緒中了,事件終點都是非同步;
    • 可以在不同部分,列印執行緒名字
    • 重點:訂閱處的onSubscribe 是不參與上述的流程的,打印出來的是subscribe的呼叫者,此處為android主執行緒

    // 終點  new Observer<String>() {      @Override      public void onSubscribe(Disposable d) {  ​          Disposable disposable = d;          Log.d(L.TAG, "onSubscribe: " +  Thread.currentThread().getName());     }

終點是需要更新UI:分析observeOn(為下面的程式碼分配主執行緒)

  • 結果:拿到主執行緒的Handler,交給observeOn
  • 業務需求:現在終點是非同步執行緒,怎麼將它變成主執行緒;

案例分析:

  • 基礎環境搭建:營造子執行緒環境:整個鏈條全部都是子執行緒

@Override  protected void onCreate(Bundle savedInstanceState) {      super.onCreate(savedInstanceState);  ​      //將所有程式碼全部放到子執行緒中去      new Thread(){          @Override          public void run() {              super.run();   //封裝了操作              test();         }     }.start();  ​  }  ​  private void test() {      Observable.create(          // 自定義source          new ObservableOnSubscribe<String>() {              @Override              public void subscribe(ObservableEmitter<String> e) throws Exception {                  e.onNext("Derry");  ​                  Log.d(L.TAG, "自定義source: " + Thread.currentThread().getName());             }         })          //子執行緒.subscribe         .subscribe(          // 終點          new Observer<String>() {              @Override              public void onSubscribe(Disposable d) {                  Log.d(L.TAG, "onSubscribe: " +  Thread.currentThread().getName());             }  ​              @Override              public void onNext(String s) {                  Log.d(L.TAG, "onNext: " + Thread.currentThread().getName());             }  ​              @Override              public void onError(Throwable e) {  ​             }  ​              @Override              public void onComplete() {  ​             }         });  }

  • 執行結果:整塊程式碼都是在子執行緒

    result

  • 業務需求:終點處變成主執行緒

    • 新增程式碼即可解決

      .observeOn(AndroidSchedulers.mainThread())

為什麼添加了上面哪行程式碼就切到了主執行緒中去了?

  • 最終效果:

    • 拿到了一個百分百在主執行緒中執行的Handler 暴露了任務入口run ,我們只需要將任務交給Runnable 交給run ,那麼這部分程式碼就執行在主執行緒中了
  • 原始碼分析:

    • 整體流程:
    • image-20220704002947780

    • 細節分析:AndroidSchedulers.MainHolder

      • Handler 切到主執行緒

        image-20220704000832660

    • 細節分析:HandlerScheduler

      • createWorker最終執行到schedule

        image-20220704001630862

      • 為什麼會執行到Schedule?

        • Scheduler.createWoker是個抽象函式

          image-20220704001822622

        • 具體實現中會呼叫 w.schedule();

          image-20220704001921703

    • 細節分析:HandlerScheduler.HandlerWorker.schedule

      • 此時使用的就是主執行緒的 Handler

        image-20220704002127593

      • 通過run 執行到主執行緒中去

        image-20220704002233549

      • run 中包裹的程式碼就在主執行緒中執行

    • 細節分析:AndroidSchedulers.MAIN_THREAD

      • Hook 技術 + 使用Callable + 返回了MainHolder.DEFAULT;

        image-20220704000714009

原始碼分析:ObservableObserveOn

  • 畫圖:new Thread 執行的全部都在子執行緒中執行

    image-20220704003857647

  • 部分原始碼分析:ObservableObserveOn

    • 存放了一份主執行緒的Handler

    image-20220704003610697

    • 節省開銷:切換的目標執行緒是當前執行緒,那麼就不切換,直接呼叫

    image-20220704003624171

    • 打包:切換的目標執行緒不是當前執行緒

      image-20220704003717191

  • 原始碼詳細分析過程:

    • 事件終點處發起訂閱,訂閱中存在抽象subscribeActual(),跳轉到實現處

      • 訂閱函式的呼叫者:ObservableObserveOn
      • 這是由Rx 響應式程式設計決定的
    • 跳轉至ObservableObserveOn.subscribeActual(事件終點)

      • 封裝包裹一併向上傳遞:注意此時source 指的是再上面的一層

        image-20220704004431574

    • 跳轉至ObservableCreate.subscribeActual(),抽象函式在上層呼叫者中實現

      • 接收入參(包裹一) + 封裝包裹二

      圖片.png

    • 跳轉到自定義source:開始向下依次分發(呼叫各層的onNext() 方法)

      • 開始向下分發:包裹二.onNext()

        image-20220704005031347

    • 重點:包裹一.onNext(),此時需要將任務提交給Handler

      • 此時還是在子執行緒中,但執行了schedule 之後就交給了Handler,就切到主執行緒了

        image-20220704005420732

    • schedule 中幹了什麼:分析ObservableObserveOn中重寫的onNext()

      • 完成了執行緒的切換與任務的提交

      • 拿到worker

        image-20220704010745655

      • worker 是哪裡來的:分析ObservableObserveOn.subscribeActual()

        • scheduler 就是之前返回的主執行緒 Handler

          • 意味著scheduler 可以在Handler中執行主執行緒程式碼

        image-20220704010915936

      • 因為worker 是 Scheduler.Worker 型別

        image-20220704010422361

      • 呼叫抽象函式createWorker();

        image-20220704010240496

      • 實現類HandlerScheduler,返回HandlerWorker(handler)物件

        圖片.png

      • 最終呼叫schedule:ObservableObserveOn.schedule

        image-20220704011242358

      • 跳轉到HandlerScheduler 中重寫的schedule

        • 將任務以Runnable 形式傳進來

        image-20220704010135355

        • 提交給Handler

        image-20220704010152025

        • 此時意味著 run 函式就在主執行緒中了

          • 最後是由HandlerScheduler.run 函式執行的
          • run 函式中均屬於主執行緒(android.main)部分
          • 拿到事件終點後:事件終點.onNext()
    • 原始碼分析(ObservableObserveOn 重寫的run()):run 函式幹了什麼?

      • drainFused();

      • drainNormal();

        • 拿到事件流動終點

          image-20220704012334727

        • 終點.onNext():就切到主執行緒了

          image-20220704012422713

  • 補充:Handler切換主執行緒時,為什麼要加Looper.getMainLooper()

    • 到底為什麼?

      • 因為一個執行緒對應一個Loop,main 執行緒就只有一個Loop
    • 最正宗的寫法 (百分百在主執行緒) :傳入Looper.getMainLooper()

      new Handler(Looper.getMainLooper()) {      @Override      public void handleMessage(@NonNull Message msg) {          super.handleMessage(msg);  ​          // UI 操作 ...     }  };

    • 一般寫法:80% 可能性在主執行緒

      new Handler() {      @Override      public void handleMessage(@NonNull Message msg) {          super.handleMessage(msg);  ​          // UI 操作 ...     }  };

    • 最次寫法:20% 可能性在主執行緒

      new Thread(){      @Override      public void run() {          super.run();  ​          new Handler() {              @Override              public void handleMessage(@NonNull Message msg) {                  super.handleMessage(msg);  ​                  // UI 操作 ... 百分之百 Ui執行緒 沒有問題             }         };     }  }.start();

補充:自定義Hook 技術(RxJava 2.0)

  • RxJava 1.0:傳進來返回回去,只是預留了 2.0 的優化空間

  • 尋找 f 的賦值位置:只有一處

    image-20220703145031008

  • Hook 思路:我們只需要呼叫這個函式,對 f 進行賦值即可(後續在執行的時候,先執行Hook 程式碼),呈現出全域性監聽的效果(因為這個是靜態的)

    • 想怎麼玩就怎麼玩

    image-20220703145241918

  • Hook 程式碼:

    // TODO Hook IO 正式幹活的 Hook  RxJavaPlugins.setIoSchedulerHandler(new Function<Scheduler, Scheduler>() {      @Override      public Scheduler apply(Scheduler scheduler) throws Exception {          Log.d(L.TAG, "apply: 全域性 監聽 scheduler:" +scheduler);          return scheduler;     }  });

非同步事件流程式設計:

  • 卡片式程式設計的執行緒分配:在上一張卡片切主線,下一張卡片切非同步執行緒

問題彙總:

  • 事件終點是什麼:流程的走向問題

    • 案例一:終點為1

      subscribeOn(1);  subscribeOn(2);  subscribeOn(3);

    • 案例二:終點為C

      observeOn(A);  observeOn(B);  observeOn(C);