你真的瞭解RxJava的執行緒切換嗎?

語言: CN / TW / HK

使用RxJava可以輕鬆地實現執行緒切換,所以在Android中常被用來替代AsyncTask、Handler等原生工具類。使用起來雖然簡單,但如果不瞭解其背後的基本原理,很可能因為使用不當而寫出bug。本文將帶大家簡單瞭解一下RxJava執行緒切換的實現原理以及開發中的注意事項

1. Basic Usage

  • Scheduler

If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators to operate on particular Schedulers.
通過Scheduler讓操作符跑在指定執行緒,從而實現多執行緒排程

  • observerOn

specify the Scheduler on which an observer will observe this Observable
指定一個觀察者在哪個排程器上觀察這個Observable

  • subscribeOn

specify the Scheduler on which an Observable will operate
指定Observable自身在哪個排程器上執行

RxJava呼叫鏈中每個操作符都會建立一個新的Observable,操作符產生的新Observable都會向上層的Observable註冊回撥。 subscribeOn和observeOn的實現原理一樣:

  • subscribeOn 在指定執行緒中向上遊訂閱(在指定執行緒中去調上游的subscribe方法)
  • observeOn 收到資料後在指定執行緒中呼叫下游的回撥方法(onNext/onError/onComplete等)

RxJava自下而上建立訂閱,而後自上而下發射資料,所以 subscribeOn 即使出現 observeOn 之後也能保證資料來源執行的執行緒,因為訂閱永遠發生在前。

2. subscribeOn

2.1 實現原理

通過原始碼瞭解一下subscribeOn實現執行緒切換的基本原理

//ObservableSubscribeOn.java
final class ObservableSubscribeOn extends Observable<T> {
    
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        // 沒有直接呼叫subscribe訂閱,而是先進行了執行緒變換(scheduler.scheduleDirect)
        parent.setDisposable(
            scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    final class SubscribeTask implements Runnable {
        @Override
        public void run() {
            // run()會在指定的scheduler呼叫,向上遊訂閱時執行緒已經發生了變化
            // 所以保證了上游所執行的執行緒
            source.subscribe(parent);
        }
    }
    
    static final
    class SubscribeOnObserver<T> implements Observer<T>, Disposable {
       
        @Override
        public void onNext(T t) {
            // 收到資料後不進行執行緒變換
            actual.onNext(t);
        }
    }
}
複製程式碼

2.2 subscribeOn只生效一次

subscribeOn通過切換訂閱執行緒,改變Observable.create所線上程,從而影響資料的發射執行緒。

由於訂閱過程自下而上,所以Observable.create只受最近一次subscribeOn影響,當呼叫鏈中有多個subscribeOn時只有第一個有效。其他subscibeOn仍然可以影響其上游的doOnSubscribe的執行執行緒。

@Test
fun test() {
    Observable.create<Unit> { emitter ->
        log("onSubscribe")
        emitter.onNext(Unit)
        emitter.onComplete()
    }.subscribeOn(namedScheduler("1 - subscribeOn"))
        .doOnSubscribe { log("1 - doOnSubscribe") }
        .subscribeOn(namedScheduler("2 - subscribeOn"))
        .doOnSubscribe { log("2 - doOnSubscribe") }
        .doOnNext { log("onNext") }
        .test().awaitTerminalEvent() // Wait until observable completes
 }
複製程式碼

2.3 正確理解subscribeOn的意義

Even though we added .subscribeOn() that is not enough. SubscribeOn operator only switches the subscribing process to the desired thread, but that doesn’t mean the items will be emitted on that thread.
subscribeOn用來決定訂閱執行緒,但這並不意味著上游資料一定來自此執行緒

@Test
fun test() {
    val observable = Observable.create<Int> { emitter ->
        log("onSubscribe")
        thread(name = "Main thread", isDaemon = false) {
            log("1 - emitting"); emitter.onNext(1)
            log("2 - emitting"); emitter.onNext(2)
            log("3 - emitting"); emitter.onNext(3)
            emitter.onComplete()
        }
    }
    
    observable
        .subscribeOn(Schedulers.computation())
        .doOnNext { log("$it - after subscribeOn") }
        .test().awaitTerminalEvent() // Wait until observable completes
}
複製程式碼

正確理解subscribeOn的含義有助於避免一些使用上的誤區:

對於PublishSubject無效

@Test
fun test() {
    val subject = PublishSubject.create<Int>()
    val observer1 = subject
        .subscribeOn(Schedulers.io())
        .doOnNext { log("$it - I want this happen on an IO thread") }
        .test()
    val observer2 = subject
        .subscribeOn(Schedulers.newThread())
        .doOnNext { log("$it - I want this happen on a new thread") }
        .test()
    
    sleep(10); 
    subject.onNext(1)
    subject.onNext(2)
    subject.onNext(3)
    subject.onComplete()
    
    observer1.awaitTerminalEvent()
    observer2.awaitTerminalEvent()
}
複製程式碼

對於PublishSubject來說,上游資料來自哪個執行緒是在onNext時決定的,所以對一個PublishSubject使用使用subscribeOn沒有意義。

對於Observable.just()無效

通常subcribeOn可以決定Observable.create {...} 的執行執行緒,因此很多初學者容易犯的一個錯誤是在Observable.just(...)裡做耗時任務,並誤認為會跑在subscribeOn的執行緒:

如上,readFromDb() 放在just中顯然是不合適的。just()在當前執行緒立即執行,因此不受subscribeOn影響,應該修改如下:

//Observable.defer
Observable.defer { Observable.just(readFromDb()) }
    .subscribeOn(Schedulers.io())
    .subscribe { ... }

//Observable.fromCallable
Observable.fromCallable { readFromDb() }
    .subscribeOn(Schedulers.io())
    .subscribe { ... }
複製程式碼

使用flatMap處理併發

subscribeOn決定的當前Observable的訂閱執行緒,因此對於flatMap的使用要特別留心

Observable.fromIterable(listOf("id1", "id2", "id3"))
    .flatMap { id -> loadData(id) }
    .subscribeOn(Schedulers.io())
    .observeOn(mainThread())
    .toList()
    .subscribe { result -> log(result) }
複製程式碼

如果我們希望多個loadData(id)併發執行,上述寫法是錯誤的。

subscribeOn決定了flatMap上游執行緒,flatMap返回多個Observable的訂閱都是發生在此執行緒,多個loadData只能執行在單一執行緒,無法實現並行。

想要達到並行執行效果,需要修改如下:

Observable.fromIterable(listOf("id1", "id2", "id3"))
    .flatMap { id ->
        loadData(id)
            .subscribeOn(Schedulers.io())
    }
    .observeOn(mainThread())
    .toList()
    .subscribe { result -> log(result) }
複製程式碼

3.observeOn

3.1 實現原理

通過原始碼瞭解一下observeOn實現執行緒切換的基本原理

//ObservableObserveOn.java
final class ObservableObserveOn extends Observable<T> {

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else { 
            Scheduler.Worker w = scheduler.createWorker();
            // 直接向上遊訂閱資料,不進行執行緒切換,切換操作在Observer中進行
            source.subscribe(
                new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    
    static final class ObserveOnObserver<T> implements Observer<T>, Runnable {
        
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            // 這裡選把資料放到佇列中,增加吞吐量,提高效能
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            // 在schedule方法裡進行執行緒切換並把資料迴圈取出
            // 回撥給下游,下游會在指定的執行緒中收到資料
            schedule();
        }
    
        void schedule() {
            if (this.getAndIncrement() == 0) {
                //切換執行緒
                this.worker.schedule(this);
            }
    
        }
    }
}
複製程式碼

3.2 observeOn多次生效

不同於subscribeOn,observeOn可以有多個而且每個都會生效

  • subscribeOn切換的執行緒可以通過doOnSubscribe監聽
  • observeOn切換的執行緒可以通過doOnNext監聽

3.3 連續發射多個item時能否保證序列?

observeOn使用Scheduler排程執行緒後,下游是執行在單執行緒中還是多個執行緒中?能否保證下游資料的有序性?

@Test
fun test() {
    Observable.create<Int> { emitter ->
        repeat(10) {
            emitter.onNext(it)
        }
        emitter.onComplete()
    }.observeOn(Schedulers.io())
        .subscribe {
            log(" - $it")
        }
}
複製程式碼

通過結果可以看到,即使經Scheduler排程之後,下游仍然執行在單一執行緒,可以保證資料在整個呼叫鏈上的有序性。

那麼為什麼經過Scheduler排程後都跑在單一執行緒呢?

4. Scheduler

4.1 實現原理

Scheculer並非直接排程Runnable,而是建立Worker,再由Worker來排程具體任務。

subscribeOn中的SubscribeTask以及observeOn中的ObserveOnObserver都實現了Runnable,所以最終都是在Worker中執行。

4.2 任務是由Worker排程的

一個Scheduler可以建立多個 Worker,一個Worker可以管理多個Task(Runnable)

Worker 的存在為了確保兩件事:

  • 同一個 Worker 建立的 Task 確保序列執行,且立即執行的任務符合先進先出原則。
  • Worker 綁定了呼叫了他的方法的 Runnable,當該 Worker 取消時,基於他的 Task 均被取消

4.3 Worker何保證序列?

非常簡單,每個Worker只有一個執行緒

現在可以解答疑問了:為什麼observeOn經過Scheduerl排程後,仍然跑在單一執行緒?

Scheduler為每個observeOn分配唯一Worker,因此observeOn的下游可以保證在單一執行緒序列執行。

//ObservableObserveOn.java
final class ObservableObserveOn extends Observable<T> {

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else { 
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(
                new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); //傳入worker
        }
    }
  
  ...
  
}
複製程式碼

如上,Worker作為ObserveOnObserver的成員變數被持有

4.4 預置Schedulers

如同Executors提供了多種ThreadPoolExecutor一樣,Schedulers提供了多種預設的Scheduler

No.Schedulers & Descriptions
1Schedulers.single()
全域性唯一執行緒,無論有多少個observables,都共享此唯一執行緒。
2Schedulers.io()
最常見的排程器之一,用於IO相關操作,比如網路請求和檔案操作。IO 排程器背後由執行緒池支撐。它首先建立一個工作執行緒,可以複用於其他操作,當這個工作執行緒(長時間任務的情況)不能被複用時,會建立一個新的執行緒來處理其他操作。
3Schedulers.computation()
和IO排程器非常相似,也是基於執行緒池實現的。其可用的執行緒數是固定的,與cpu核數目保持一致。當所有執行緒都處於忙碌狀態時,新任務只能處於等待狀態。因此,它不適合IO相關操作。適用於進行一些計算操作,單一計算任務不會長時間佔用執行緒。
4Schedulers.newThread()
每次呼叫都建立新執行緒
5Schedulers.trampoline()
在當前執行緒執行,不切換執行緒。
6Schedulers.from(java.util.concurrent.Executor executor)
更像是一種自定義的IO排程器。我們可以通過制定執行緒池的大小來建立一個自定義的執行緒池。適用於observables的數量對於IO排程器太多的場景使用,
//Sample of Schedulers.from
fun namedScheduler(name: String): Scheduler {
    return Schedulers.from(
        Executors.newCachedThreadPool { Thread(it, name) }
    )
}
複製程式碼

Thread-Safety

5.1 RxJava操作符是否執行緒安全?

@Test
fun test() {
    val numberOfThreads = 1000
    val publishSubject = PublishSubject.create<Int>()
    val actuallyReceived = AtomicInteger()

    publishSubject
        .take(300).subscribe {
            actuallyReceived.incrementAndGet()
        }

    val latch = CountDownLatch(numberOfThreads)
    var threads = listOf<Thread>()

    (0..numberOfThreads).forEach {
        threads += thread(start = false) {
            publishSubject.onNext(it)
            latch.countDown()
        }
    }

    threads.forEach { it.start() }
    latch.await()

    val sum = actuallyReceived.get()
    check(sum == 300) { "$sum != 300" }
}
複製程式碼

結果不符合預期,因為take不是執行緒安全的

看一下take的原始碼

public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
    final long limit;

    public ObservableTake(ObservableSource<T> source, long limit) {
        super(source);
        this.limit = limit;
    }
    protected void subscribeActual(Observer<? super T> observer) {
        this.source.subscribe(new ObservableTake.TakeObserver(observer, this.limit));
    }

    static final class TakeObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;
        boolean done;
        Disposable upstream;
        long remaining;

        TakeObserver(Observer<? super T> actual, long limit) {
            this.downstream = actual;
            this.remaining = limit;
        }

        public void onNext(T t) {
            if (!this.done && this.remaining-- > 0L) {
                boolean stop = this.remaining == 0L;
                this.downstream.onNext(t);
                if (stop) {
                    this.onComplete();
                }
            }

        }
    }
}
複製程式碼

果然不出所料 remaining--沒有加鎖操作

5.2 observableOn的執行緒安全

那如果加上observableOn是不是就保證串行了呢,因為take可以跑在單一執行緒上了

@Test
fun test() {
    repeat(10000) {
        val numberOfThreads = 1000
        val publishSubject = PublishSubject.create<Int>()
        val actuallyReceived = AtomicInteger()
    
        publishSubject
            .observeOn(Schedulers.io())
            .take(300).subscribe {
                actuallyReceived.incrementAndGet()
            }
    
        val latch = CountDownLatch(numberOfThreads)
        var threads = listOf<Thread>()
    
        (0..numberOfThreads).forEach {
            threads += thread(start = false) {
                publishSubject.onNext(it)
                latch.countDown()
            }
        }
    
        threads.forEach { it.start() }
        latch.await()
    
        check(actuallyReceived.get() == 300)
    }
}
複製程式碼

很遺憾,多次執行後發現依然有問題,因為observableOn本身也不是執行緒安全的,observableOn中使用的queue是一個非執行緒安全佇列。

5.3 The Observable Contract

Rx在對Observable的定義中已經明確告訴我們了:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
reactivex.io/documentati…

作為結論,RxJava的操作符預設並非執行緒安全的

但是對於接收多個Observable的操作符,例如 merge()、combineLatest()、zip()等 是執行緒安全的,所以即使多個Observable來自不執行緒時,也不需要考慮執行緒安全問題。