RxJava 系列四: doOnNext 的運用

語言: CN / TW / HK

doOnNext 的運用

本文概述

  • 本文以具體案例(需要頻繁在非同步執行緒與主執行緒之間切換),引出了doOnNext;記錄了編寫思路、編寫細節、完整程式碼等;向讀者展示了,doOnNext的獨特魅力;

業務需求

  • 需要頻繁的(主/非同步)切換執行緒

  • 示意圖:

    圖片.png

應用場景:

  • 例如銀行(會頻繁切換APP)

工程結構:

圖片.png

整體思路:

* Retrofit + RxJava  * 需求:  * 1.請求伺服器註冊操作  * 2.註冊完成之後,更新註冊UI  * 3.馬上去登入伺服器操作  * 4.登入完成之後,更新登入的UI

具體結構:

* wy.RxJava配合Retrofit。  * RxJava + Retrofit (請求網路OkHttp ---- Retorfit --- Observable)  *  * 1.OkHttp 請求網路 (Retorfit)  * 2.Retorfit 返回一個結果 (Retorfit) --- Observable  * 3.最終的結果 是RxJava中的 被觀察者 上游 Observable  * 4.一行程式碼寫完需求流程: 從上往下  *   1.請求伺服器,執行註冊操作(耗時)切換非同步執行緒  *   2.更新註冊後的所有 註冊相關UI - main 切換主執行緒  *   3.請求伺服器,執行登入操作(耗時)切換非同步執行緒  *   4.更新登入後的所有 登入相關UI - main 切換主執行緒  *  * 5.看RxJava另外一種的執行流程  *   初始點 開始點 訂閱  *   1.onSubscribe  *   2.registerAction(new RegisterRequest())  *   3..doOnNext 更新註冊後的 所有UI  *   4.flatMap執行登入的耗時操作  *   5.訂閱的觀察者 下游 onNext 方法,更新所有登入後的UI  *   6.progressDialog.dismiss()

環境準備:

  • 封裝操作:為呼叫處上面分配非同步執行緒,為呼叫處下面分配主執行緒

    //DownLoadActivity.rxud  public final static <UD> ObservableTransformer<UD, UD> rxud() {      return new ObservableTransformer<UD, UD>() {          @Override          public ObservableSource<UD> apply(Observable<UD> upstream) {              return  upstream.subscribeOn(Schedulers.io())     // 給上面程式碼分配非同步執行緒                 .observeOn(AndroidSchedulers.mainThread()) // 給下面程式碼分配主執行緒;                 .map(new Function<UD, UD>() {                      @Override                      public UD apply(UD ud) throws Exception {                          Log.d(TAG, "apply: 我監聽到你了,居然再執行");                          return ud;                     }                 });              // .....       ;         }     };  }

  • 封裝操作:封裝了Retrofit(返回一個Retrofit 例項)

    package com.xiangxue.rxjavademo.doOnNext.retrofit_okhttp_rxjava.retrofit_okhttp;  ​  import java.util.concurrent.TimeUnit;  ​  import butterknife.BuildConfig;  import okhttp3.OkHttpClient;  import okhttp3.logging.HttpLoggingInterceptor;  import retrofit2.Retrofit;  import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;  import retrofit2.converter.gson.GsonConverterFactory;  ​  public class MyRetrofit {  ​      // 把Retrofit給Build出來 Retrofit給創建出來      public static Retrofit createRetrofit() {  ​          OkHttpClient.Builder builder = new OkHttpClient.Builder();  ​          builder.readTimeout(10, TimeUnit.SECONDS);          builder.connectTimeout(9, TimeUnit.SECONDS);  ​          if (BuildConfig.DEBUG) {              HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();              interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);              builder.addInterceptor(interceptor);         }  ​          return new Retrofit.Builder().baseUrl("http://xxxxxxx")                 .client(builder.build())                 .addConverterFactory(GsonConverterFactory.create())                 .addCallAdapterFactory(RxJava2CallAdapterFactory.create())                 .build();     }  ​  }

業務程式碼:分開寫

// TODO 方式一 分開寫  @SuppressLint("CheckResult")  public void request(View view) {      // 1.請求伺服器註冊操作      // 2.註冊完成之後,更新註冊UI      MyRetrofit.createRetrofit().create(IReqeustNetwork.class)          //註冊操作:非同步操作,耗時         .registerAction(new RegisterRequest())          //為上面分配非同步執行緒,為下面分配主執行緒         .compose(DownloadActivity.rxud())          //RxJava 處理終點資訊         .subscribe(new Consumer<RegisterResponse>() {              @Override              public void accept(RegisterResponse registerResponse) throws Exception {                  // 更新註冊相關的所有UI                  // .....             }         });  ​  ​      // 3.馬上去登入伺服器操作      // 4.登入完成之後,更新登入的UI      MyRetrofit.createRetrofit().create(IReqeustNetwork.class)          //登入操作:非同步操作,耗時         .loginAction(new LoginReqeust())          //為上面分配非同步執行緒,為下面分配主執行緒         .compose(DownloadActivity.rxud())          //RxJava 處理終點資訊         .subscribe(new Consumer<LoginResponse>() {              @Override              public void accept(LoginResponse loginResponse) throws Exception {                  // 更新登入相關的所有UI                  // .....             }         });  }

業務程式碼:將整體業務邏輯合併

  • 思路:

    /**   * 一行程式碼 實現需求   * 需求:   *   還有彈出載入   * * 1.請求伺服器註冊操作   * * 2.註冊完成之後,更新註冊UI   * * 3.馬上去登入伺服器操作   * * 4.登入完成之後,更新登入的UI   */

  • 完整程式碼:

    private ProgressDialog progressDialog;  Disposable disposable;  ​  public void request2(View view) {      MyRetrofit.createRetrofit().create(IReqeustNetwork.class)          // todo 1.請求伺服器註冊操作(應當在非同步執行緒)           .registerAction(new RegisterRequest())          //給上面分配非同步執行緒         .subscribeOn(Schedulers.io())          // 給下面分配主執行緒         .observeOn(AndroidSchedulers.mainThread())          //編寫細節一:這裡是不能寫.subScribe(new Comsumer)         .doOnNext(new Consumer<RegisterResponse>() { // todo 3              @Override              public void accept(RegisterResponse registerResponse) throws Exception {                  // todo 2.註冊完成之後,更新註冊UI             }         })          // todo 3.馬上去登入伺服器操作(切換到非同步執行緒)           // 給下面分配了非同步執行緒         .observeOn(Schedulers.io())          //使用flatMap解決巢狀導致程式碼結構不美觀         //編寫細節二:flatMap的第二個引數應該寫什麼?         .flatMap(new Function<RegisterResponse,                                                   ObservableSource<LoginResponse>>() {                           @Override                   public ObservableSource<LoginResponse> apply(RegisterResponse                      registerResponse) throws Exception {                       Observable<LoginResponse> loginResponseObservable = MyRetrofit.createRetrofit().create(IReqeustNetwork.class)                             .loginAction(new LoginReqeust());                       //返回的是登入響應的結果                      return loginResponseObservable;                 }             })         // 給下面 執行主執行緒:因為後面第四步的時候要更新UI             .observeOn(AndroidSchedulers.mainThread())                       .subscribe(new Observer<LoginResponse>() {                  // 一定是主執行緒,為什麼,因為 subscribe 馬上呼叫onSubscribe                  @Override                  public void onSubscribe(Disposable d) {                      // TODO 1                      progressDialog = new ProgressDialog(RequestActivity.this);                      progressDialog.show();                      // UI 操作                      disposable = d;                 }  ​                  @Override                  public void onNext(LoginResponse loginResponse) { // todo 5                      // TODO 4.登入完成之後,更新登入的UI                 }  ​                  @Override                  public void onError(Throwable e) {  ​                 }  ​                  // todo 6                  @Override                  public void onComplete() {                      // 殺青了                      if (progressDialog != null) {                          progressDialog.dismiss();                     }                 }             });  ​  }

編寫細節:

  • 編寫細節一:在第一次切換到main之後是不能寫subScribe的,要寫doOnNext

    • 寫subScribe:返回Disposable(此時流程會終止掉了)

    圖片.png

    • 寫doOnNext:返回Observable

      • 此時流程不會終止,整體業務上是不斷新增卡片進行事件的攔截(進行操作)而不是將流程給斷掉了

    圖片.png

  • 編寫細節二:flatMap的第二個引數應該寫什麼?

    • flatMap的第一個引數:此卡片接收的事件型別

      • 註冊響應
    • flatMap的第一個引數:此卡片向後分發的事件型別

      • 登入響應
  • 編寫細節三:執行緒的切換

    • 給上面分配非同步執行緒

      .subscribeOn(Schedulers.io())

    • 給下面分配主執行緒

      .observeOn(AndroidSchedulers.mainThread())

    • 給下面分配非同步執行緒

      .observeOn(Schedulers.io())

  • 實際的業務走向流程

    • 第一步:彈出載入框

      progressDialog = new ProgressDialog(RequestActivity.this);  progressDialog.show();

    • 第二步:請求伺服器進行註冊操作

      .registerAction(new RegisterRequest())

    • 第三步:更新註冊後的UI

      .doOnNext(new Consumer<RegisterResponse>() { // todo 3      @Override      public void accept(RegisterResponse registerResponse) throws Exception {          // todo 2.註冊完成之後,更新註冊UI     }  })

    • 第四步:請求伺服器進行登入操作

      .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {      @Override      public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {          Observable<LoginResponse> loginResponseObservable = MyRetrofit.createRetrofit().create(IReqeustNetwork.class)             .loginAction(new LoginReqeust());          return loginResponseObservable;     }  })

    • 第五步:更新登入UI

      @Override  public void onNext(LoginResponse loginResponse) { // todo 5      // TODO 4.登入完成之後,更新登入的UI  }

    • 第六步:整個鏈條結束

      // todo 6  @Override  public void onComplete() {      // 殺青了      if (progressDialog != null) {          progressDialog.dismiss();     }  }

  • 細節四:RxJava起點到終點是單向的,從原始碼中來的

  • 細節五:Disposable記憶體洩漏問題

    • 基礎:Rx 在每次事件分發前內部會判斷Disposable 是否被中斷,沒有被中斷才分發,中斷了就不會分發;
    • 具體場景:在Activity最網路請求,但網絡卡,但是這個時候使用者殺掉了Activity;此時,在事件分發的時候就需要去檢測中斷標記,此時終止事件流動(避免了記憶體洩漏)

    //宣告 Disposable  Disposable disposable;  ​  //在訂閱處給 Disposable賦值  .subscribe(new Observer<LoginResponse>() {  ​      // 一定是主執行緒,為什麼,因為 subscribe 馬上呼叫onSubscribe      @Override      public void onSubscribe(Disposable d) {          // TODO 第一步:訂閱後彈出載入框          progressDialog = new ProgressDialog(RequestActivity.this);          progressDialog.show();  ​          // UI 操作  ​          disposable = d;     }  ​      //在Activity回收的時候,就要把這個 Disposable回收掉,不然有可能出現記憶體洩漏      @Override      protected void onDestroy() {          super.onDestroy();          // 必須這樣寫,最起碼的標準          if (disposable != null)              if (!disposable.isDisposed())                  disposable.dispose();     }

  • 細節六:onSubscribe一定執行在主執行緒

    • 因為請求函式是由UI控制元件觸發---》request是主執行緒

    • 訂閱(subscribe)也是主執行緒,一訂閱後會立馬觸發onSubscribe

      • 因此,onSubscribe也是主執行緒
      • 這個裡面可以寫UI操作