Flux和Mono的常用API原始碼分析

語言: CN / TW / HK

q這篇文章來分析Project-Reactor的兩大類FLux和Mono的常用Api以及其原始碼。

Flux

Flux定義了一個普通的響應式流,它可以產生零個,一個或多個元素,乃至無限個元素。

我們就先來研究一下Flux產生元素的程式碼

Flux.just 產生一個或者多個元素

```java Flux just = Flux.just("1", "2", "3");

public static Flux just(T... data) { return fromArray(data); }

public static Flux fromArray(T[] array) { if (array.length == 0) { return empty(); } if (array.length == 1) { return just(array[0]); } return onAssembly(new FluxArray<>(array)); }

``` Flux.just其實是返回了一個FluxArray物件。那麼我們來看看FluxArray的原始碼是如何。看的關鍵點是構造方法和subscribe方法

image.png

從原始碼中就可以看出,1個或多個的資料原理就是因為內部有一個數組,這個陣列長度可能是1個或多個,然後儲存起來通過一個 ArraySubscription 傳遞給消費者。消費者的程式碼邏輯可以參照上一篇說的 LambdaSubscriber . 我們繼續來看ArraySubscription的邏輯,關鍵點就是消費者呼叫 subscription的request之後 subscription是如何把 FluxArray傳遞給消費者的

image.png

Flux.empty() 產生0個元素

java public static <T> Flux<T> empty() { return FluxEmpty.instance(); } 可以看到呼叫empty返回的是一個 FluxEmpty例項 java @Override public void subscribe(CoreSubscriber<? super Object> actual) { Operators.complete(actual); } 在收到消費者註冊訊號的時候呼叫了 Operators的complete方法 java public static void complete(Subscriber<?> s) { s.onSubscribe(EmptySubscription.INSTANCE); s.onComplete(); } 可以看到在收到註冊的時候,給消費者註冊了一個EmptySubscription然後馬上呼叫了complete方法

image.png 然後在subscription內部什麼都沒有做。 所以總結起來就是一個0個元素的流,訂閱之後會馬上收到 complete訊號,其他的什麼都沒有。

repeat() 產生無限流

我們看了1到多個,也看了0個流,現在來看看無限數量的流是如何實現的。

```java public final Flux repeat() { return repeat(ALWAYS_BOOLEAN_SUPPLIER); }

public final Flux repeat(BooleanSupplier predicate) { return onAssembly(new FluxRepeatPredicate<>(this, predicate)); } ``` 可以看到是建立了一個 FluxRepeatPredicate 物件 這個物件是一個 操作符

image.png 原始碼也不難懂,就是結束的時候重新subscribe了Publisher

doOnSignal

```java Flux.just("1", "2", "3") .doOnNext(data->{

    })
    .doOnComplete(()->{

    });

``` 我們再來看一個常用的Api,很多時候我們不用真正的消費或者是轉變資料,就像Java8中流式程式設計中的peek一樣。Reactor也提供了peek的機制

image.png 直接去看原始碼,一樣這個 FluxPeekFuseable 也是一個操作符。我們就看一個peek onNext的程式碼就知道個大概

image.png

Mono

Mono表示要麼就有一個元素,要麼就只會產生完成和錯誤訊號的Publisher

then

image.png then是一個在框架中非常常用的一個Api,這裡有5個過載方法,我們依次來看 1. then() ```java public final Mono then() { return empty(this); }

static Mono empty(Publisher source) { @SuppressWarnings("unchecked") Mono then = (Mono)ignoreElements(source); return then; }

public static Mono ignoreElements(Publisher source) { return onAssembly(new MonoIgnorePublisher<>(source)); }

``` 一路看下來我們發現其實是建立了一個 MonoIgnorePublisher 這個物件是一個 操作符

image.png 通過原始碼我們可以看到 MonoIgnorePublisher 把真正的監聽者封裝了一個IgnoreElementsSubscriber 然後讓事件源監聽。我們來看看這個類

image.png 又是Subscription還是Subscriber

image.png 用影象表示then的情況就是

image.png 總結一下就是then方法會丟棄前面的onNext資料,只會傳遞給下游完成和錯誤的訊號

then(Mono other)

```java

Mono.just("1").then(Mono.just("haha")).subscribe();

public final Mono then(Mono other) { return onAssembly(new MonoIgnoreThen<>(new Publisher[] { this }, other)); } ``` 可以看到內部是讓 MonoIgnoreThen 成為Publisher

image.png 其內部有建立了一個 ThenIgnoreMain 關鍵注意後面呼叫的方法subscribeNext

image.png

可以看到then(Mono other) 的邏輯其實就是忽略之前的資料,然後全部執行結束之後開始下一個流的消費

Create

Mono和Flux都有Create的方法,用於建立一個對應的序列。我們就來研究mono的create方法 ```java Mono.create(sink->{ try { int i = 1 / 0; sink.success(i); }catch (Exception e){ sink.error(e); } }).subscribe();

public static Mono create(Consumer> callback) { return onAssembly(new MonoCreate<>(callback)); }

可以看到create其實是建立一個**MonoCreate**物件,裡面傳入一個帶有**MonoSink** 的一個comsumer 我們來看看**MonoSink**的APIjava public interface MonoSink {

void success();

void success(@Nullable T value);

void error(Throwable e);

@Deprecated Context currentContext();

default ContextView contextView() { return this.currentContext(); }

MonoSink onRequest(LongConsumer consumer);

MonoSink onCancel(Disposable d);

MonoSink onDispose(Disposable d); } ``` 其實就是定義了一些操作這個流的一些API。繼續看MonoCreate物件的內容

image.png

image.png