如何避免寫重複程式碼:善用抽象和組合

語言: CN / TW / HK

通過抽象和組合,我們可以編寫出更加簡潔、易於理解和穩定的程式碼;類似於金字塔的建築過程,我們總是可以在一層抽象之上再疊加一層,從而達到自己的目標。但是在日常的開發工作中,我們如何進行實踐呢?本文將以筆者在Akka專案中的一段社群貢獻作為引子分享筆者的一點心得。

場景

通常,為了簡化我們對資料流的處理,我們可能會使用 Java8 中首次引入的 Stream 、或者是 Kotlin、Scala 等程式語言中提供的更加豐富的集合庫,亦或者使用反應式流的相關三方庫來簡化工作。雖然這些類庫已經提供了豐富的操作符,但是我們依然會工作中遇到其對某些場景未提供合適操作符的情況。比如:

  1. 在直播場景下,需要對某些型別的訊息進行緩衝和聚合,一段時間內的多個點贊合併為1個點贊,並且在處理了 N 個訊息的時候進行整體傳送,保障整體的擴散量級維持在一個平穩的水平。
  2. 在 IOT 場景中,接收來自終端裝置上報的資料,並返回當前的資料和前值,或者最近 3個值,從而計算其中的變化趨勢。此時我們可能會使用反應式流庫中提供的:zipWithNextzipWithPreviouszipWithPreviousAndNext,或者是 sliding 。
  3. 在建立一個聊天室的時候,如果使用者輸入bye,則讓使用者斷開連線,離開聊天室,那麼這個時候我們可能會使用 takeWhile
  4. 假設我們有一組SQL,我們需要按照順序執行,併合並他們的結果,並在處理完成後關閉對應的資料庫連線,這時我們可能會用 mapWithResourceusing(資源安全)。
  5. 當處理檔案、寫入資料庫等使用資源的時候,我們需要開啟一個檔案或獲取一個數據庫連線,將資料寫入,然後在處理完成後關閉對應的資源,這時我們可能會使用 foldResource(資源安全)
  6. 假設需要對資料進行分批,每 3個元素一批,進行打包,這個時候我們可能會使用 batch(3)
  7. 假設我們需要將元素和每個元素的下標結合在一起,這個時候我們可能需要使用 zipWithIndex
  8. 假設我們需要快取元素,並在指定條件滿足前一直快取,我們可能需要 bufferUntil(predicate)bufferWhile(predicate)●
  9. 假設我們需要快取元素,直到資料變更,把相同項合併在一起,我們可能需要 bufferUtilChanged
  10. 假設我們需要對所有的元素進行去重,或者去掉連續的重複元素,我們可能會需要用到 distinctdistinctUntilChanged
  11. 假設我們只需要返回前 N 個元素,我們可能需要使用 limit(N)take(N), 或者按照條件 takeWhiletakeUntil假設我們需要跳過前N個元素,我們可能需要使用 skip(N)、drop(N), 或者按照條件 dropWhile、dropUntil
  12. 等等...

我們可以看到,上面這些操作符,每個都擁有具體的語義,雖然看起來只是一個簡單的方法,但是如果需我們完全自主實現,定然也有不小的難度,比如 zipWithNextzipWithPreviouszipWithPreviousAndNext 在 Reactor-core 目前的發行版本中就沒有直接提供,而和資源相關的, Reactor-core 中則只有一個 using 。

分析

作為程式設計師,第一件事情,肯定就是 Ctrl + C ,第二件事就是 Ctrl + V, 第三件事就是 Commit & Push。然而,事情並沒有這麼簡單。

難點有:

  1. 反應式流操作符需要完整實現反應式流的規範、並通過預設的測試套件的驗證。
  2. 操作符需要儘可能的抽象和可組合。
  3. 無論是單執行緒還是併發場景下都擁有正確的行為和語義、並有完整單元測試覆蓋。
  4. 操作符的實現需要儘可能的具備最高效的效能。

比如,以 zipWithIndex舉例,在 Reactor-core 中有 FluxIndexFuseable(370行程式碼)和 FluxIndex (296行程式碼)兩個實現。而且清晰的處理了各種情況。而其他操作符也有類似:release 3.4.23

  1. FluxBuffer—— 575行程式碼
  2. FluxBufferPredicate—— 464 行程式碼
  3. FluxDistinct—— 609行程式碼
  4. FluxDistinctFuseable—— 70行程式碼
  5. FluxDistinctUntilChanged—— 337 行程式碼
  6. FluxUsing—— 583 行程式碼

如果要實現一個zipWithNext 自定義操作符 ,應該也有接近的工作量。這樣的工作強度,個人認為無論是在程式碼審查還是後期的維護都是一個大問題。

為此,我認為需要一個新的抽象,來對上面的這些操作進行進一步的抽象。然後再這個之上,通過使用和組合其他的操作,從而更簡單的實現自定義操作符;

下面我們思考一下如何實現這些操作符吧~~

解法

所有上面的這些都可以抽象為:

  1. 帶有狀態,且執行緒安全
  2. 狀態可變,且根據狀態的不同,對輸入應用不同的操作,產生不同的值
  3. 可以提前結束、或者對不滿足條件的值進行選擇性丟棄
  4. 有完整的生命週期
  5. 在結束時可以根據內部狀態而產生可選的值,而不會丟失內部狀態

經過分析,這裡可以表達為 : 狀態 + 輸入 -(應用行為)-> 新的狀態 + 輸出, 這樣再加上 onCraeteonComplete生命週期函式,就可以完整表達。而提前結束等行為,則可以通過組合takeWhile
實現。我們將方法命名為:statefulMap,宣告如下:

``` public statefulMap( java.util.function.Supplier create, java.util.function.BiFunction> f, java.util.function.Function> onComplete){...} ````

讓我們看一下如何通過這個方法來實現 zipWithIndex吧:

實現zipWithIndex (indexed)

圖片

``` Source.from(Arrays.asList("A", "B", "C", "D"))

.statefulMap(
    () -> 0L,
    (index, element) -> Pair.create(index + 1, Pair.create(element, index)),
    indexOnComplete -> Optional.empty())


.runForeach(System.out::println, system);

// prints // Pair(A,0) // Pair(B,1) // Pair(C,2) // Pair(D,3) ````

也可以實現 zipWithNextzipWithPreviousAndNext我們再看看如何實現較為複雜的 bufferUntilChanged吧。

實現 bufferUntilChanged

圖片

``` Source.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D"))

.statefulMap(
    () -> (List<String>) new LinkedList<String>(),
    (buffer, element) -> {
      if (buffer.size() > 0 && (!buffer.get(0).equals(element))) {
        return Pair.create(
            new LinkedList<>(Collections.singletonList(element)),
            Collections.unmodifiableList(buffer));
      } else {
        buffer.add(element);
        return Pair.create(buffer, Collections.<String>emptyList());
      }
    },
    Optional::ofNullable)
.filterNot(List::isEmpty)

.runForeach(System.out::println, system);

// prints // [A] // [B, B] // [C, C, C] // [D] ````

舉一反三,如何實現 distinctUntilChanged呢 ?

實現 distinctUntilChanged

圖片

``` Source.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D"))

.statefulMap(
    Optional::<String>empty,
    (lastElement, element) -> {
      if (lastElement.isPresent() && lastElement.get().equals(element)) {
        return Pair.create(lastElement, Optional.<String>empty());
      } else {
        return Pair.create(Optional.of(element), Optional.of(element));
      }
    },
    listOnComplete -> Optional.empty())
.via(Flow.flattenOptional())

.runForeach(System.out::println, system);

// prints // A // B // C // D ``` 如果要實現聚合buffer`呢?

實現 buffer

圖片

``` Source.fromJavaStream(() -> IntStream.rangeClosed(1, 10))

.statefulMap(        () -> new ArrayList<Integer>(3),        (list, element) -> {          list.add(element);          if (list.size() == 3) {            return Pair.create(new ArrayList<Integer>(3), Collections.unmodifiableList(list));          } else {            return Pair.create(list, Collections.<Integer>emptyList());          }        },        listOnComplete -> Optional.ofNullable(listOnComplete))    .filterNot(List::isEmpty)

.runForeach(System.out::println, system);// printsList(1, 2, 3)List(4, 5, 6)List(7, 8, 9)List(10)

````

更復雜的例子:處理資源

在前面看了如何實現 zipWithIndex 、bufferUntilChanged 之後,讓我們進一步看看如何優雅和安全地處理資源。在任何的程式語言和框架中,資源的處理都是非常基礎但是又很棘手的事項。在 Java 7 中首次引入了 try-with-resources 語法,對資源處理進行了一定程度的簡化,而在反應式流中,我們又應該如何的操作呢?這裡我們可以分為兩種情況:

  1. 針對流中的每個元素都建立一個新的資源,使用這個資源,關閉這個資源。
  2. 針對整個流建立一個資源,並在處理流中的每個元素時使用這個資源,並在流的生命週期結束後,關閉這個資源。

因為資源通常開銷較大且需要妥善管理,所以在開發過程中,我們更容易遇到的是 第2種情況,即資源的建立和銷燬和流的生命週期進行了繫結。反應式流中的資源管理,還有更多的細節需要考慮:

  1. 資源的初始化和關閉需要支援併發安全;反應式流可以被多次物化,被多個下游訂閱者訂閱和處理,並且以任意的順序進行取消訂閱,需要在各種情況下(上游完成、下游取消、處理異常等)等情況下妥善的建立和銷燬資源。
  2. 在流生命週期的各個階段安全地建立和銷燬資源;比如:即使在建立資源或者銷燬資源的時觸發了異常,也不會對同一個資源關閉多次。
  3. 支援非同步從而提高資源使用的效率。
  4. 感知流的生命週期,支援在關閉資源時提供可選的值給到下游以標識流的結束,比如處理檔案時,使用一個特殊的識別符號標識檔案的結尾。

綜合上面的這些訴求,對應的程式碼就會變得很複雜,大家可以給自己一點時間思考一下:如果是自己獨立實現類似的操作需要做出那些努力呢?而在現實的開發過程中,我們遇到的述求很多時候並非一起提出,而時隨著迭代接踵而至,那麼如果當初的程式碼編寫的不是很易於擴充套件,擁有良好的測試,則可能按下葫蘆浮起瓢。比如在 reactor-core中就有如下的using 操作符:

``` public static Mono using( Callable<? extends D> resourceSupplier, Function<? super D, ? extends Mono<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup) {...} ````

resourceSupplier 針對每個訂閱者,建立一個資源

sourceSupplier 結合建立的資源,產生對應的元素

resourceCleanup 取消訂閱或者流完成時,清理對應的資源

在 reactor-core 中,對應的底層實現為 MonoUsing 共 360 行程式碼,而要實現我們想要的邏輯,我們還需要和另一個流進行合併,即這裡的 using 類似於 unfoldResource 。那麼有沒有可能使用更加簡單的方案來進行實現呢?答案是肯定的,和前面的幾個操作符一樣,我們可以使用 statefulMap 來實現mapWithResource ,思維過程如下:

  1. using / mapWithResource 的生命週期管理 和 statefulMap 的 create 和  onComplete 方法對應,針對資源,onComplete 方法可以被命名為更加貼切的 release / close / cleanUp

  2. 在流中使用的資源,我們可以認為是一個狀態,只不過這個狀態在流的整個生命週期中不再變化,一直是 create 方法中返回的 Resource 。

  3. 在關閉資源時,我們可以通過返回一個 Optional<Out> 來返回一個可選的值。

  4. 對併發資源的非同步處理,則可以通過返回一個 CompletionStage<Out> 而非 Out 來實現,在 using 方法中,我們返回的是一個 Mono<T>

經過上面的思維過程,我們不難得出這個流上的方法的宣告可以為:

``` public mapWithResource( Supplier<? extends R> create, BiFunction<? super R, ? super In, ? extends Out> function, Function<? super R, ? extends Optional<? extends Out> close) {...} ````

resourceSupplier 針對每個訂閱者/每次物化,建立一個資源

function 使用create 中建立的資源處理流中的每個元素

close 在流關閉的同時關閉資源,並再向下遊提供一個可選的值

具體的的實現這裡留空,感興趣的小夥伴可以結合前面的例子進行實現。下面我們看一下如何使用這個 mapWithResource 方法,從而加深大家的理解。

使用mapWithResource

假設我們有一組 SQL 需要進行處理,我們需要從資料庫中的多個表中查詢對應的結果,並將最終結果進行合併和輸出到控制檯。在mapWithResource 的幫助下,我們可以極大的簡化我們的程式碼:

``` Source.from( Arrays.asList( "SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;", "SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;")) .mapWithResource( () -> dbDriver.create(url, userName, password), (connection, query) -> db.doQuery(connection, query).toList(), connection -> { connection.close(); return Optional.empty(); }) .mapConcat(elems -> elems) .runForeach(System.out::println, system); ````

在上面的例子中:我們有一組預先定義好的 SQL,分別從多個表中讀取最新的 100 條資料,通過使用mapWithResource ,我們優雅地為每個流建立了 db 相關的連線,並進行對應的查詢操作,併合並查詢結果,在流處理完成後,關閉對應的資源。上面的程式碼通過複用我們前面編寫的 mapWithResource 將複雜資源和生命週期管理進行了簡化,作為對比,大家可以思考一下如果我們不使用已有抽象所需要付出的努力。

總結

在上面的例子中,我們通過 statefulMap以及和其他的操作符相互組合,實現了很多和狀態、生命週期相關的操作符,而程式碼量則大大減少。基於一個經過考驗的操作符來編寫自定義操作符,也能進一步降低出錯的概率,以及程式碼審查的難度。而相關的操作符都是通過一個底層的 statefulMap 來實現。對映到我們的工作中則是儘可能地抽象、提煉,對系統的核心模型、核心功能進行打磨,從而每個應用都有一個精巧的核心,並和其他的應用構成豐富的生態。而非上來就 複製、貼上,重複造輪子;避免最終陷入複製、貼上的泥潭中。雖然有時我們可能沒有足夠的時間來進一步抽象,而是業務先行。但是我依然建議,在後續的實踐中,進行不斷回顧和提煉,在保障系統穩定可靠、在有測試手段保障的情況下,進行逐步的重構,使得系統更加容易理解、維護和穩固。

筆者相信:磨刀不誤砍柴工,在設計、方案review、測試和不斷重構、精煉的過程中所花費的時間,一定會在將來多倍的回報。