RX響應式程式設計之Rxjava2的使用原理解析

語言: CN / TW / HK

開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第19天,點選檢視活動詳情

Rxjava由於其基於事件流的鏈式呼叫、邏輯簡潔 & 使用簡單的特點,深受各大 Android開發者的歡迎。

img

定義

RxJava在GitHub的介紹:

RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM // 翻譯:RxJava 是一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫

RxJava是一個 基於事件流、實現非同步操作的庫


作用

實現非同步操作

類似於 Android中的 AsyncTaskHandler作用


特點

由於 RxJava的使用方式是:基於事件流的鏈式呼叫,所以使得 RxJava

  • 邏輯簡潔
  • 實現優雅
  • 使用簡單

原理流程總覽

在為RxJava刪繁就簡的能力驚歎之餘,好奇的我們肯定控制不住探索其原理的慾望。“為通過鏈式操作符就可以一路走到底?為啥執行緒可以鏈式切換呢?等等......”

我最近春節在家,終於有機會可以好好系統探索下RxJava的原理原始碼,下載了RxJava 1.x的原始碼仔細斟酌一番,撥開程式碼的重重迷霧,慢慢地抓住了它簡約而不簡單的設計原理。

為什麼說是簡約而不簡單呢?簡約是因為它的原理並不複雜,不高深,不簡單是因為它能把一切複雜都隱藏在流暢的鏈式中。

話不多說,進入主題。

經過分析,我覺得用手機包裝流水線來形容RxJava總的工作流程還是比較恰當的(大家覺得有不妥的歡迎指出討論~):

1.先搭建生產流水線

2.啟動流水線包裝

3.使用者一層層拆開包裝最後拿到手機(最後的結果)。

這裡各個比喻對應的程式碼物件:

Observable: 流水線的某一道工序

OnSubscribe:一道工序中的工人

OnSubscribe的call方法: 包裝Subscriber

subscribe方法: 啟動流水線

Subscriber: 一層包裝盒

Subscriber的onNext: 使用者拆開包裝

具體闡述下以上比喻的意思:

1. 先搭建生產流水線

其實大部分操作符,都是新建一個Observable物件,然後將上游的Observable物件包裝起來,傳入一個新的OnSubscribe,比如:

publicfinal<R> Observable<R> map(Func1<? superT, ? extends R> func) { ​ returnunsafeCreate( newOnSubscribeMap<T, R>( this, func)); ​ } ​ publicfinalObservable<T> filter(Func1<? superT, Boolean> predicate) { ​ returnunsafeCreate( newOnSubscribeFilter<T>( this, predicate)); ​ } ​ publicfinal<R> Observable<R> lift( finalOperator<? extends R, ? superT> operator) { ​ returnunsafeCreate( newOnSubscribeLift<T, R>(onSubscribe, operator)); ​ } ​ publicfinalObservable<T> subscribeOn(Scheduler scheduler, booleanrequestOn) { ​ if( thisinstanceofScalarSynchronousObservable) { ​ return((ScalarSynchronousObservable<T>) this).scalarScheduleOn(scheduler); ​ } ​ returnunsafeCreate( newOperatorSubscribeOn<T>( this, scheduler, requestOn)); ​ }

最後都是呼叫了create方法建立Observable, 把當前Observable傳入給新的Observable持有,以保持鏈式(有點類似連結串列持有上一個節點的指標) 。為什麼要這樣呢, 因為RxJava是的鏈式是基於代理模式做的,也就是說基於一層一層Observable的包裝

那包裝的是什麼呢? 就是OnSubscribe,那OnSubscribe包裝的意義是什麼呢?其實就是包裝如何包裝Subscriber的邏輯。

比如map,傳入的是OnSubscribeMap

(OnSubscribe的基類),它的call程式碼如下,

@Override ​ publicvoidcall( finalSubscriber<? superR> o) { ​ MapSubscriber<T, R> parent = newMapSubscriber<T, R>(o, transformer); ​ o.add(parent); ​ source.unsafeSubscribe(parent); ​ }

當map這個操作符創建出來的Observable被呼叫subscribe被呼叫的時候,就會該OnSubscribeMap的call方法,可以看程式碼發現這裡建立了一個MapSubscriber物件,然後呼叫上游的Observable的unsafeSubscribe方法,傳入該MapSubscriber物件作為引數。

所以當你開心地用RxJava一個個操作符把鏈寫得老長的時候,裡面的邏輯就是不斷一層層包裝Observable,每個Observable持有一個自己的OnSubscribe,具體型別由對應的操作符確定。

這就是我說的第一個流程 搭建流水線,總的來說就是從上往下不斷建立Observable,並連成鏈,即後一個Observable持有上游Observable的引用。

Observable之所以說是流水線的某一道工序,是因為它是這條鏈最基本的串聯元素,而OnSubscribe之所以說是一道工序中的工人,是因為它決定了Subscriber是如何被包裝的。

2. 啟動流水線包裝

啟動的開關正是鏈尾的subscribe方法。看下Observable的subscribe方法:

publicfinalSubion subscribe(Subscriber<? superT> subscriber) { ​ returnObservable.subscribe(subscriber, this); ​ }

subscribe(Subscriber<? super T> subscriber, Observable observable)方法,方法比較長,最重要的就是

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

RxJava 1.x中,RxJavaHooks.onObservableStart其實沒有做什麼操作,返回的就是原來的observable物件的onSubscribe,所以這裡就是呼叫observable物件的onSubscribe的call方法,傳入subscriber物件。

publicinterfaceOnSubscribe< T> extendsAction1< Subscriber<? superT>> { ​ // cover for generics insanity ​ }

OnSubscribe是一個繼承Action1的介面,Action1是一個只有call方法的介面,所以這裡call的邏輯由具體的OnSubscribe物件確定。

還記得還是那個面說的map操作生成的OnSubscribeMap物件的call方法邏輯麼?它的call方法中建立了一個MapSubscriber物件,然後呼叫上游的Observable的unsafeSubscribe方法,並傳入該MapSubscriber物件作為引數。

這裡要注意的是,在建立了一個MapSubscriber物件的時候, 會傳入當前Observable呼叫的subscribe方法的引數Subscriber物件,儲存該物件的引用actual ,以保持鏈式:

publicMapSubscriber(Subscriber<? superR> actual, Func1<? superT, ? extends R> mapper) { ​ this.actual = actual; ​ this.mapper = mapper; ​ }

所以假如鏈尾的Observable是map操作符建立的,則subscribe執行的時候,會執行Observable物件中的OnSubscribeMap物件的call方法,生成一個MapSubscriber物件並持有我們程式碼中鏈的最後傳入的 Subscriber物件,然後讓上游的Observable物件呼叫subscribe方法,並傳入這個MapSubscriber物件。

所以這裡就是從下往上遞迴呼叫Observable物件的subscribe方法,從而生成一條Subscriber物件的鏈(也可以理解為一層層包裝)。

在這裡,經過subscribe方法的啟動,已經開始加工包裝,最後生產出了一條Subscriber物件的鏈,即我們的手機包裝盒。

3. 使用者拆開手機包裝盒

這個流程,可以用楊宗緯的《洋蔥》一段經典歌詞來闡述:“一層一層一層地剝開我的心~~。”

上一步操作從下到上生成了Subscriber物件的鏈,鏈的尾部就是最上游的Observable中的:

Observable.create(object : OnSubscribe<Int> { ​ override fun call(t: Subscriber<in Int>){ ​ t.onStart ​ t.onNext( 1) ​ } ​ })

這裡第二行的t: Subscriber,現在第四行呼叫了 t.onNext(1),又以之前的map操作符生成的MapSubscriber物件為例:

publicvoidonNext(T t){ ​ R result; ​ try{ ​ result = mapper.call(t); ​ } catch(Throwable ex) { ​ Exceptions.throwIfFatal(ex); ​ unsubscribe; ​ (Throwable.addValueAsLastCause(ex, t)); ​ return; ​ } ​ actual.onNext(result); ​ }

這裡先使用mapper.call(t)對傳進去的Subscriber物件進行了變換,即map操作中指定的變換方法,這個下一節再談。

先注意這裡最後呼叫了 actual.onNext(result);, 而actual就是Subscriber鏈的下一個Subscriber物件,而除了map以外,大部分的Subscriber物件的onNext方法也有這樣的邏輯。

所以可以知道,這裡Subscriber鏈在遞迴呼叫,也可以看作一層一層一層地開啟,就彷彿是拆開手機包裝盒。

流程總結

看前面的敘述各位可能還是有點霧裡看花,總結一下前面三個小節各對應一個流程,從RxJava呼叫程式碼來說 :

就是先從上到下把各個變換的Observable連成鏈(拼裝流水線),然後在最後subscribe的時候,又從下到上通過每個Observable的OnSubscribe從最下的Subscriber物件開始連成鏈(流水線開始工作包裝Subscriber),直到頂端,當頂端的Subscriber物件呼叫了onNext方法的時候,又從上往下呼叫Subscriber鏈的onNext(使用者一層層拆開包裝盒),裡面執行了每個操作的變換邏輯。

舉個例子進一步說明以上流程:

Observable.create(object : OnSubscribe<Int> { ​ override fun call(t: Subscriber<in Int>){ ​ t.onStart ​ t.onNext( 1) ​ } ​ }) ​ .map(object : Func1<Int, String> { ​ override fun call(t: Int): String { ​ Log.d(TAG, Thread.currentThread.name) ​ returnt.toString ​ } ​ }) ​ .map(object : Func1<String, Book> { ​ override fun call(t: String): Book { ​ Log.d(TAG, Thread.currentThread.name) ​ returnBook(t) ​ } ​ }) ​ .subscribe(object : Subscriber<Book> { ​ override fun onStart{ ​ } ​ override fun onNext(t: Book){ ​ Log.d(TAG, Thread.currentThread.name) ​ Log.d(TAG, t.toString) ​ } ​ override fun onComplete{ ​ } ​ override fun (t: Throwable){ ​ Log.d(TAG, t.message) ​ } ​ })

為了簡單,這裡只使用了map操作符。以下是一個簡單的流程圖:

img

/ 操作符原理解析 /

如果上面的總流程分析能理解的話,那麼下面的操作符的理解就不難了。

普通的變換操作符

這裡舉map的例子。這裡的變換處於從上往下遞迴執行Subscriber鏈onNext階段(使用者拆手機包裝盒)

前面提到map中生成的MapSubscriber物件的onNext方法:

publicvoidonNext(T t){ ​ R result; ​ try{ ​ result = mapper.call(t); ​ } catch(Throwable ex) { ​ Exceptions.throwIfFatal(ex); ​ unsubscribe; ​ (Throwable.addValueAsLastCause(ex, t)); ​ return; ​ } ​ //呼叫下游的Subscriber的onNext方法 ​ actual.onNext(result); ​ }

注意到第四行程式碼 result = mapper.call(t);,這裡的mapper其實就是我們寫的map操作的變換方法:

.map(object : Func1<String, Book> { ​ override fun call(t: String): Book { ​ Log.d(TAG, Thread.currentThread.name) ​ returnBook(t) ​ } ​ })

這裡面的Func1回撥介面,所以經過這樣call方法的呼叫,就實現了map的操作變換,然後執行 actual.onNext(result);,即將變換後的結果交給下游的Subscriber的onNext方法。

如果理解了上面的流程圖,是不是理解map易如反掌呢?

執行緒切換操作符

執行緒切換主要兩個操作符: subscribeOn和observeOn

執行緒切換是我覺得RxJava最牛逼的地方,不過了解了原理之後覺得也不復雜高深,主要還是在上面的總流程中的對應節點使用了常見的切換執行緒方式。

subscribeOn

作用:將subscribe Observer的執行放在對應的執行緒。

subscribeOn最終會執行到:

publicfinalObservable<T> subscribeOn(Scheduler scheduler, booleanrequestOn) { ​ if( thisinstanceofScalarSynchronousObservable) { ​ return((ScalarSynchronousObservable<T>) this).scalarScheduleOn(scheduler); ​ } ​ returnunsafeCreate( newOperatorSubscribeOn<T>( this, scheduler, requestOn)); ​ }

注意最後執行了:

returnunsafeCreate( newOperatorSubscribeOn<T>( this, scheduler, requestOn));

根據前面的分析,這裡就是建立新的Observable物件,並傳入一個OnSubscribe例項物件,這裡是OperatorSubscribeOn物件。

根據上面的說明,這裡要看call方法:

publicvoidcall( finalSubscriber<? superT> subscriber) { ​ finalWorker inner = scheduler.createWorker; ​ SubscribeOnSubscriber<T> parent = newSubscribeOnSubscriber<T>(subscriber, requestOn, inner, source); ​ subscriber.add(parent); ​ subscriber.add(inner); ​ inner.schedule(parent); ​ }

可以看到第四行SubscribeOnSubscriber parent = new SubscribeOnSubscriber(subscriber, requestOn, inner, source);,所以它建立的就是Subscriber物件就是SubscribeOnSubscriber,注意這裡第二行final Worker inner = scheduler.createWorker;和最後一行 inner.schedule(parent);,

這裡的方法呼叫棧比較長就不贅述,直接說下,這裡worker裡面就是執行執行緒切換的,裡面封裝執行緒池或者Handler,通過schedule方法就可以將SubscribeOnSubscriber包裝成一個Runnable放入執行緒池中執行,執行的方法是SubscribeOnSubscriber的call方法。

而SubscribeOnSubscriber的call方法:

publicvoidcall{ ​ Observable<T> src = source; ​ source = null; ​ t = Thread.currentThread; ​ src.unsafeSubscribe( this); ​ }

和其他的Subscriber一樣,也是傳入上游Observable的subscribe方法中。

回憶上面講的總流程, 在第二個流程從下往上包裝Subscriber鏈(加工包裝)的時候,subscribeOn就是將從它當前這個節點開始將後面的一系列的Subscriber的成鏈以及從上往下執行各個Subscriber物件的onNext放到指定的執行緒執行。

常見的一種描述subscribeOn作用的說法: “將該subscribeOn語句的上游放在對應的執行緒中。”其實並不準確 ,因為如果只使用了subscribeOn而沒有使用observeOn的話, 整條鏈的變換過程都會執行在subscribeOn指定的執行緒的。**RxJava官方的解釋才是準確的:

Asynchronously subscribes Observers to thisObservable on the specified { @linkScheduler}. ​ 在剛才的示例程式碼中加入subscribeOn: ​ Observable.create(object : OnSubscribe<Int> { ​ override fun call(t: Subscriber<in Int>){ ​ t.onStart ​ t.onNext( 1) ​ } ​ }) ​ .map(object : Func1<Int, String> { ​ override fun call(t: Int): String { ​ Log.d(TAG, Thread.currentThread.name) ​ returnt.toString ​ } ​ }) ​ //這裡切換執行緒 ​ .subscribeOn(Schedulers.io) ​ .map(object : Func1<String, Book> { ​ override fun call(t: String): Book { ​ Log.d(TAG, Thread.currentThread.name) ​ returnBook(t) ​ } ​ }) ​ .subscribe(object : Subscriber<Book> { ​ override fun onStart{ ​ } ​ override fun onNext(t: Book){ ​ Log.d(TAG, Thread.currentThread.name) ​ Log.d(TAG, t.toString) ​ } ​ override fun onComplete{ ​ } ​ override fun (t: Throwable){ ​ Log.d(TAG, t.message) ​ } ​ })

用剛才的流程圖來表示的話,subscribeOn切換執行緒差不多是這樣子的:

img

紅色部分為放入指定執行緒的邏輯。

observeOn

observeOn最終會走到:

publicfinalObservable<T> observeOn(Scheduler scheduler, booleandelayError, intbufferSize) { ​ if( thisinstanceofScalarSynchronousObservable) { ​ return((ScalarSynchronousObservable<T>) this).scalarScheduleOn(scheduler); ​ } ​ returnlift( newOperatorObserveOn<T>(scheduler, delayError, bufferSize)); ​ }

這裡使用了lift方法:

publicfinal<R> Observable<R> lift( finalOperator<? extends R, ? superT> operator) { ​ returnunsafeCreate( newOnSubscribeLift<T, R>(onSubscribe, operator)); ​ }

這裡和map本質還是一樣的,建立一個新的額Observable並傳入一個新的OnSubscribe物件(OnSubscribeLift),那主要就是要看這個OnSubscribeLift的call做了什麼:

Subscriber<? superT> st = RxJavaHooks.onObservableLift(operator).call(o); ​ try{ ​ // new Subscriber created and being subscribed with so 'onStart' it ​ st.onStart; ​ parent.call(st);

call最主要的就是這幾行,和map基本差不多,就是使用operator對傳入從下游傳入的Subscribeder進行轉換,所以關鍵看OperatorObserveOn的call做了什麼轉換:

ObserveOnSubscriber<T> parent = newObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); ​ parent.init; ​ returnparent;

主要看這裡,OperatorObserveOn中建立了一個ObserveOnSubscriber,最後返回。

注意這裡和OperatorSubscribeOn的不同,OperatorSubscribeOn是在call方法就把新建的Subscriber物件包裝為Runnbale放入執行緒池中執行, 將上游Observable對他的subscribe呼叫放到了指定執行緒。

而OperatorObserveOn是將ObserveOnSubscriber物件作為引數傳入了上游的OnSubscribe的call方法,然後整個從下往上的包裝Subscribe還是在原來的執行緒中執行,那這裡關鍵點就是看ObserveOnSubscriber的onNext做了什麼操作:

if(!queue.offer(NotificationLite.next(t))) { ​ ( newMissingBackpressureException); ​ return; ​ } ​ schedule;

重點就是這幾行。第一行的t是onNext返回的Subscriber物件,NotificationLite.next這裡正常情況下返回的還是t,而queue是一個佇列,這裡將t入列, 然後執行了schedule,該方法是將當前的ObserveOnSubscriber物件包裝為Runnable,放入執行緒池中,然後在指定執行緒執行其call方法主要程式碼如下

... ​ finalQueue<Object> q = this.queue; ​ finalSubscriber<? superT> localChild = this.child; ​ ... ​ for(;;) { ​ ... ​ Object v = q.poll; ​ booleanempty = v == null; ​ if(checkTerminated(done, empty, localChild, q)) { ​ return; ​ } ​ if(empty) { ​ break; ​ } ​ localChild.onNext(NotificationLite.<T>getValue(v)); ​ ... ​ }

可以看到這裡就是迴圈從佇列中取出元素,然後再傳入下游的Subscriber的onNext方法。

總結obs erveOn的操作:**

在從下往上包裝Subscriber鏈的時候(使用者拆開手機包裝盒),在呼叫observeOn的地方插入一個ObserveOnSubscriber物件,該物件可以在之後從上往下遞迴呼叫Subscriber鏈的時候,將該ObserveOnSubscriber物件之後的所有的onNext方法放到指定執行緒中執行。

現在在例項中新增observeOn:

Observable.create(object : OnSubscribe<Int> { ​ override fun call(t: Subscriber<in Int>){ ​ t.onStart ​ t.onNext( 1) ​ } ​ }) ​ .map(object : Func1<Int, String> { ​ override fun call(t: Int): String { ​ Log.d(TAG, Thread.currentThread.name) ​ returnt.toString ​ } ​ }) ​ .subscribeOn(Schedulers.io) ​ //將當前呼叫之後的onNext放入指定執行緒 ​ .observeOn(Schedulers.main) ​ .map(object : Func1<String, Book> { ​ override fun call(t: String): Book { ​ Log.d(TAG, Thread.currentThread.name) ​ returnBook(t) ​ } ​ }) ​ .subscribe(object : Subscriber<Book> { ​ override fun onStart{ ​ } ​ override fun onNext(t: Book){ ​ Log.d(TAG, Thread.currentThread.name) ​ Log.d(TAG, t.toString) ​ } ​ override fun onComplete{ ​ } ​ override fun (t: Throwable){ ​ Log.d(TAG, t.message) ​ } ​ }) ​

用流程圖來表示:

img

紅色為subscribeOn指定執行緒執行部分,綠色為observeOn指定執行緒執行部分。

文末

除了滿足自己的探索慾望之外,通過學習RxJava原始碼,就可以學習到如何運用RxJava的設計思想,通過封裝程式碼編寫自己的響應式程式設計框架,以後我們就可以寫出自己的RxPay、RxLogin之類的了。