面試官:RxJava背壓機制有了解麼?

語言: CN / TW / HK

前言

RxJava已經出現很多個年頭了,但是依然被很多公司使用,如果現在還對RxJava瞭解的不夠透徹, 可以看這個系列對它的分析:相信看完後你對它會有個更全面的認識。 這個系列主要從下面幾個方面來講解: RxJava基本操作符使用 RxJava響應式程式設計是如何實現的 RxJava的背壓機制及Flowable是如何實現背壓的 RxJava的執行緒切換原理 關於RxJava的其他系列文章,可以點選下方連結

面試官:RxJava背壓機制有了解麼?

面試官:RxJava是如何做到響應式程式設計的?

使用rxjava建立一個rxbus事件處理框架

Rxjava操作符詳解--看看你還記得多少

簡介:

說道rxjava,大家並不陌生,對它的操作符應該也都可以信手拈來。 也都知道rxjava採用的是觀察者模式的響應式程式設計,也可以說是Steam的程式設計模式 Steam的程式設計模式: 我們在使用IO流的過程中經常使用這種方式讀取流和寫入流:

FileInputStream fileInputStream = new FileInputStream(new File("fileDir/filepath")); DataInputStream dis = new DataInputStream(fileInputStream); BufferedInputStream bis = new BufferedInputStream(dis);

  • 上面就是一種Stream資料流模式的處理方式,使用的是裝飾器模式,每次裝飾後,如果資料流流到某個點都會做一些額外處理 將處理後的資料流繼續分發到下一級,這就是流模式的處理方法

  • 在同步方式下,資料依次向下流動,每個資料流執行完畢後在執行下個數據流,是有順序的執行,這種模式的缺點就是無法執行併發狀態下的資料流

  • 那麼就需要使用到非同步方式: 非同步的方式,可以大大提高我們處理資料流的執行效率,同一時間會有多個數據流進行處理,效率提高的同時會引起一些其他的問題, 比如資料處理的不同步,又比如當上遊事件傳送的速度快於下游處理的速度,這個時候我們就說出現了背壓

    早期rxjava對這種情況的處理是使用一個無大小限制的佇列將積壓的事件儲存起來,這種情況有個問題是,如果資料流太多一直得不到處理或者處理了一半出現異常退出 那麼就會出現OOM的情況 下面我們使用一個例子來看下這個情況:

PublishProcessor<Integer> so = PublishProcessor.create(); so.observeOn(Schedulers.computation()).subscribe(v->compute(v),Throwable::printStackTrace); int count = 100; for (int i = 0; i < count; i++){ so.onNext(i); }

列印結果: computing : 0 computing : 1 computing : 2 computing : 3 ... ... computing : 96 computing : 97 computing : 98 computing : 99

我們將count設定為1000

列印結果: computing : 0 computing : 1 io.reactivex.exceptions.MissingBackpressureException: Could not emit value due to lack of requests at io.reactivex.processors.PublishProcessor$PublishSubscription.onNext(PublishProcessor.java:315)

可以看出上面是使用非同步的方式,同時傳送100個和1000個事件,前者正常而後者報了MissingBackpressureException異常 這就是因為我們的PublishProcessor預設最大支援儲存128個併發數,如果超過這個數就會報異常。

```

我們在每次onNext後延遲一秒來看看: PublishProcessor so = PublishProcessor.create(); so.observeOn(Schedulers.computation()).subscribe(v->compute(v),Throwable::printStackTrace); int count = 100; for (int i = 0; i < count; i++){ so.onNext(i); sleep(1);//秒 } 列印結果: computing : 0 computing : 1 computing : 2 computing : 3 ... ... computing : 96 computing : 97 computing : 98 computing : 99 看出在延遲一秒後,都正常列印了,說明確實是積壓數超過最大值128導致 ```

從上面例子我們知道,我們的積壓數MAX在背壓處理過程中起著關鍵作用 我們嘗試將這個值改大:

使用so.onBackpressureBuffer(1001).observeOn..改變大小

列印結果:

computing : 0 computing : 1 computing : 2 ... ... computing : 997 computing : 998 computing : 999

  • 可以看到在1001這個值內都是正常列印的 超過這個值後: 報:io.reactivex.exceptions.MissingBackpressureException: Buffer is full 快取已滿

既然這個值這麼重要那麼我們來從原始碼分析下這個值

原始碼分析

老規矩我們先把原始碼分層: 1.建立PublishProcessor 2.呼叫observerOn建立一個任務執行切換執行緒的觀察者 3.執行任務

1.建立PublishProcessor

```

PublishProcessor.create()
public static <T> PublishProcessor<T> create() {
    return new PublishProcessor<T>();//直接返回一個PublishProcessor
}
//構造方法:
PublishProcessor() {
    subscribers = new AtomicReference<PublishSubscription<T>[]>(EMPTY);
}
//這裡建立了一個PublishSubscription[]型別的AtomicReference格式的物件:預設值EMPTY = new PublishSubscription[0],這個預設值很關鍵,後面會對這個值進行判斷,看是否有改寫

```

2.呼叫observerOn建立一個任務執行切換執行緒的觀察者

通過之前我們對觀察者訂閱操作的分析, 訂閱回撥到最上游的:PublishProcessor的subscribeActual操作:

```

protected void subscribeActual(Subscriber<? super T> t) {
    PublishSubscription<T> ps = new PublishSubscription<T>(t, this);
    t.onSubscribe(ps);
    ...
}

```

subscribeActual中會呼叫t.onSubscribe(ps)訂閱dispose方法:t是下游ObservableObserveOn傳過來的FlowableObserveOn FlowableObserveOn.java

``` public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.s, s)) { this.s = s;

    if (s instanceof QueueSubscription) { 
        @SuppressWarnings("unchecked")
        QueueSubscription<T> f = (QueueSubscription<T>) s;

        int m = f.requestFusion(ANY | BOUNDARY);

        if (m == SYNC) {//1
            sourceMode = SYNC;
            queue = f;
            done = true;

            actual.onSubscribe(this);
            return;
        } else
        if (m == ASYNC) {
            sourceMode = ASYNC;
            queue = f;

            actual.onSubscribe(this);

            s.request(prefetch);//2

            return;
        }
    }

    queue = new SpscArrayQueue<T>(prefetch);

    actual.onSubscribe(this);

    s.request(prefetch);//2
}

} ```

在1處這裡是對非同步操作進行判斷,如果是actual.onSubscribe(this); 如果是非同步操作需要呼叫2處的s.request操作,這裡的s = PublishSubscription[0]物件

PublishSubscription.java

public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.addCancel(this, n); } } BackpressureHelper.java public static long addCancel(AtomicLong requested, long n) { ... long u = addCap(r, n); if (requested.compareAndSet(r, u)) { return r; } .. }

看到在這裡對requested的值進行了更改

通過以上步驟可以看出可以修改FlowableObserveOn中的prefetch值可以改變大小,這個就是快取大小

那是不是說改的越大越好呢,當我們把快取改的太大會發生什麼呢? 首先快取太大會有記憶體溢位甚至OOM的風險,設定的值最好根據自己的需求來設定

這裡我總結了幾個比較常見的背壓優化方法:

背壓優化方案:

1.使用改變快取大小的方式

1.1:observeOn(Schedulers.computation(),false,1000),第三個引數1000即時預設大小 根據FlowableObserveOn的構造方法:

```

        public FlowableObserveOn(
                Flowable<T> source,
                Scheduler scheduler,
                boolean delayError,
                int prefetch) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.prefetch = prefetch;
        }

```

可知可以從外部傳入值即可:observeOn(Schedulers.computation(),false,1000),第三個引數1000即時預設大小

1.2:也可以通過在observerOn前面呼叫onBackpressureBuffer 這個方法在onSubscribe方法中: BackpressureBufferSubscriber.java public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.s, s)) { this.s = s; actual.onSubscribe(this); s.request(Long.MAX_VALUE); }\ } 這裡設定了s.request(Long.MAX_VALUE);將快取設定為最大值

2.使用策略模式對背壓進行處理

onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy) { overflowStrategy:可以選擇 ERROR:直接報錯 DROP_LATEST:丟棄最新的資料,只儲存舊的資料,使用場景如對資料精度要求不高的情況,舊的資料也可以正常使用 DROP_OLDEST:丟棄舊的資料,儲存新的資料,使用場景如定位情況,可以把舊的丟棄,儲存新的定位資料就可以

3.其他方式

onBackpressureDrop(Consumer<? super T> onDrop) :來不及處理就丟棄,不快取任何資料 onBackpressureLatest():會快取一個數據,當正在執行某個任務的時候有新的資料過來,會把它快取起來,如果又有新的資料過來,那就把之前的替換掉,快取裡面的總是最新的

這裡我們再來看下Flowable中的背壓機制

我們根據除錯進入Flowable原始碼看看: 在FlowableObserveOn.java中: 方法request會在訂閱的時候呼叫:t.request(Long.MAX_VALUE);

``` public final void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); trySchedule(); }

} ```

內部呼叫了BackpressureHelper.add(requested, n)

public static long add(AtomicLong requested, long n) { for (;;) { long r = requested.get(); if (r == Long.MAX_VALUE) { return Long.MAX_VALUE; } long u = addCap(r, n); if (requested.compareAndSet(r, u)) { return r; } } }

這裡呼叫了requested.compareAndSet(r, u)將快取設定為了Long.MAX_VALUE,所以說Flowable是支援背壓的 Flowable對不同的操作符有不同給的背壓處理方式,可以自己去閱讀原始碼看看,我這只是對一個拋磚引玉的效果。

總結

以上就是背壓機制的一些內容,以及我們介紹了 Flowable 中的幾個背壓相關的方法。實際上,RxJava 的官方文件也有說明—— Flowable 適用於資料量比較大的情景,因為它的一些建立方法本身就使用了背壓機制。 這部分方法我們就不再一一進行說明,因為,它們的方法簽名和 Observable 基本一致,只是多了一層背壓機制。

其他系列文章

面試官:RxJava背壓機制有了解麼?

面試官:RxJava是如何做到響應式程式設計的?

[使用rxjava建立一個rxbus事件處理框架 Rxjava操作符詳解--看看你還記得多少