如何避免寫重複程式碼:善用抽象和組合
通過抽象和組合,我們可以編寫出更加簡潔、易於理解和穩定的程式碼;類似於金字塔的建築過程,我們總是可以在一層抽象之上再疊加一層,從而達到自己的目標。但是在日常的開發工作中,我們如何進行實踐呢?本文將以筆者在Akka專案中的一段社群貢獻作為引子分享筆者的一點心得。
場景
通常,為了簡化我們對資料流的處理,我們可能會使用 Java8 中首次引入的 Stream 、或者是 Kotlin、Scala 等程式語言中提供的更加豐富的集合庫,亦或者使用反應式流的相關三方庫來簡化工作。雖然這些類庫已經提供了豐富的操作符,但是我們依然會工作中遇到其對某些場景未提供合適操作符的情況。比如:
- 在直播場景下,需要對某些型別的訊息進行緩衝和聚合,一段時間內的多個點贊合併為1個點贊,並且在處理了 N 個訊息的時候進行整體傳送,保障整體的擴散量級維持在一個平穩的水平。
- 在 IOT 場景中,接收來自終端裝置上報的資料,並返回當前的資料和前值,或者最近 3個值,從而計算其中的變化趨勢。此時我們可能會使用反應式流庫中提供的:
zipWithNext
、zipWithPrevious
、zipWithPreviousAndNext
,或者是sliding
。 - 在建立一個聊天室的時候,如果使用者輸入
bye
,則讓使用者斷開連線,離開聊天室,那麼這個時候我們可能會使用takeWhile
。 - 假設我們有一組SQL,我們需要按照順序執行,併合並他們的結果,並在處理完成後關閉對應的資料庫連線,這時我們可能會用
mapWithResource
,using
(資源安全)。 - 當處理檔案、寫入資料庫等使用資源的時候,我們需要開啟一個檔案或獲取一個數據庫連線,將資料寫入,然後在處理完成後關閉對應的資源,這時我們可能會使用
foldResource
(資源安全) - 假設需要對資料進行分批,每 3個元素一批,進行打包,這個時候我們可能會使用
batch(3)
- 假設我們需要將元素和每個元素的下標結合在一起,這個時候我們可能需要使用
zipWithIndex
- 假設我們需要快取元素,並在指定條件滿足前一直快取,我們可能需要
bufferUntil(predicate)
、bufferWhile(predicate)●
- 假設我們需要快取元素,直到資料變更,把相同項合併在一起,我們可能需要
bufferUtilChanged
- 假設我們需要對所有的元素進行去重,或者去掉連續的重複元素,我們可能會需要用到
distinct
、distinctUntilChanged
- 假設我們只需要返回前 N 個元素,我們可能需要使用
limit(N)
、take(N)
, 或者按照條件takeWhile
、takeUntil
假設我們需要跳過前N個元素,我們可能需要使用skip(N)、drop(N)
, 或者按照條件dropWhile、dropUntil
。 - 等等...
我們可以看到,上面這些操作符,每個都擁有具體的語義,雖然看起來只是一個簡單的方法,但是如果需我們完全自主實現,定然也有不小的難度,比如 zipWithNext
、zipWithPrevious
、zipWithPreviousAndNext
在 Reactor-core 目前的發行版本中就沒有直接提供,而和資源相關的, Reactor-core 中則只有一個 using
。
分析
作為程式設計師,第一件事情,肯定就是 Ctrl + C
,第二件事就是 Ctrl + V
, 第三件事就是 Commit & Push
。然而,事情並沒有這麼簡單。
難點有:
- 反應式流操作符需要完整實現反應式流的規範、並通過預設的測試套件的驗證。
- 操作符需要儘可能的抽象和可組合。
- 無論是單執行緒還是併發場景下都擁有正確的行為和語義、並有完整單元測試覆蓋。
- 操作符的實現需要儘可能的具備最高效的效能。
比如,以 zipWithIndex
舉例,在 Reactor-core 中有 FluxIndexFuseable(370行程式碼)
和 FluxIndex (296行程式碼)
兩個實現。而且清晰的處理了各種情況。而其他操作符也有類似:release 3.4.23
FluxBuffer
—— 575行程式碼FluxBufferPredicate
—— 464 行程式碼FluxDistinct
—— 609行程式碼FluxDistinctFuseable
—— 70行程式碼FluxDistinctUntilChanged
—— 337 行程式碼FluxUsing
—— 583 行程式碼
如果要實現一個zipWithNext
自定義操作符 ,應該也有接近的工作量。這樣的工作強度,個人認為無論是在程式碼審查還是後期的維護都是一個大問題。
為此,我認為需要一個新的抽象,來對上面的這些操作進行進一步的抽象。然後再這個之上,通過使用和組合其他的操作,從而更簡單的實現自定義操作符;
下面我們思考一下如何實現這些操作符吧~~
解法
所有上面的這些都可以抽象為:
- 帶有狀態,且執行緒安全
- 狀態可變,且根據狀態的不同,對輸入應用不同的操作,產生不同的值
- 可以提前結束、或者對不滿足條件的值進行選擇性丟棄
- 有完整的生命週期
- 在結束時可以根據內部狀態而產生可選的值,而不會丟失內部狀態
經過分析,這裡可以表達為 : 狀態 + 輸入 -(應用行為)-> 新的狀態 + 輸出
, 這樣再加上 onCraete
、onComplete
生命週期函式,就可以完整表達。而提前結束等行為,則可以通過組合takeWhile
實現。我們將方法命名為:statefulMap
,宣告如下:
```
public statefulMap(
java.util.function.Supplier create,
java.util.function.BiFunction> f,
java.util.function.Function> onComplete){...}
````
讓我們看一下如何通過這個方法來實現 zipWithIndex
吧:
實現zipWithIndex (indexed)
``` Source.from(Arrays.asList("A", "B", "C", "D"))
.statefulMap(
() -> 0L,
(index, element) -> Pair.create(index + 1, Pair.create(element, index)),
indexOnComplete -> Optional.empty())
.runForeach(System.out::println, system);
// prints // Pair(A,0) // Pair(B,1) // Pair(C,2) // Pair(D,3) ````
也可以實現 zipWithNext
、zipWithPreviousAndNext
我們再看看如何實現較為複雜的 bufferUntilChanged
吧。
實現 bufferUntilChanged
``` Source.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D"))
.statefulMap(
() -> (List<String>) new LinkedList<String>(),
(buffer, element) -> {
if (buffer.size() > 0 && (!buffer.get(0).equals(element))) {
return Pair.create(
new LinkedList<>(Collections.singletonList(element)),
Collections.unmodifiableList(buffer));
} else {
buffer.add(element);
return Pair.create(buffer, Collections.<String>emptyList());
}
},
Optional::ofNullable)
.filterNot(List::isEmpty)
.runForeach(System.out::println, system);
// prints // [A] // [B, B] // [C, C, C] // [D] ````
舉一反三,如何實現 distinctUntilChanged
呢 ?
實現 distinctUntilChanged
``` Source.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D"))
.statefulMap(
Optional::<String>empty,
(lastElement, element) -> {
if (lastElement.isPresent() && lastElement.get().equals(element)) {
return Pair.create(lastElement, Optional.<String>empty());
} else {
return Pair.create(Optional.of(element), Optional.of(element));
}
},
listOnComplete -> Optional.empty())
.via(Flow.flattenOptional())
.runForeach(System.out::println, system);
// prints
// A
// B
// C
// D
```
如果要實現聚合
buffer`呢?
實現 buffer
``` Source.fromJavaStream(() -> IntStream.rangeClosed(1, 10))
.statefulMap( () -> new ArrayList<Integer>(3), (list, element) -> { list.add(element); if (list.size() == 3) { return Pair.create(new ArrayList<Integer>(3), Collections.unmodifiableList(list)); } else { return Pair.create(list, Collections.<Integer>emptyList()); } }, listOnComplete -> Optional.ofNullable(listOnComplete)) .filterNot(List::isEmpty)
.runForeach(System.out::println, system);// printsList(1, 2, 3)List(4, 5, 6)List(7, 8, 9)List(10)
````
更復雜的例子:處理資源
在前面看了如何實現 zipWithIndex
、bufferUntilChanged
之後,讓我們進一步看看如何優雅和安全地處理資源。在任何的程式語言和框架中,資源的處理都是非常基礎但是又很棘手的事項。在 Java 7 中首次引入了 try-with-resources
語法,對資源處理進行了一定程度的簡化,而在反應式流中,我們又應該如何的操作呢?這裡我們可以分為兩種情況:
- 針對流中的每個元素都建立一個新的資源,使用這個資源,關閉這個資源。
- 針對整個流建立一個資源,並在處理流中的每個元素時使用這個資源,並在流的生命週期結束後,關閉這個資源。
因為資源通常開銷較大且需要妥善管理,所以在開發過程中,我們更容易遇到的是 第2種情況,即資源的建立和銷燬和流的生命週期進行了繫結。反應式流中的資源管理,還有更多的細節需要考慮:
- 資源的初始化和關閉需要支援併發安全;反應式流可以被多次物化,被多個下游訂閱者訂閱和處理,並且以任意的順序進行取消訂閱,需要在各種情況下(上游完成、下游取消、處理異常等)等情況下妥善的建立和銷燬資源。
- 在流生命週期的各個階段安全地建立和銷燬資源;比如:即使在建立資源或者銷燬資源的時觸發了異常,也不會對同一個資源關閉多次。
- 支援非同步從而提高資源使用的效率。
- 感知流的生命週期,支援在關閉資源時提供可選的值給到下游以標識流的結束,比如處理檔案時,使用一個特殊的識別符號標識檔案的結尾。
綜合上面的這些訴求,對應的程式碼就會變得很複雜,大家可以給自己一點時間思考一下:如果是自己獨立實現類似的操作需要做出那些努力呢?而在現實的開發過程中,我們遇到的述求很多時候並非一起提出,而時隨著迭代接踵而至,那麼如果當初的程式碼編寫的不是很易於擴充套件,擁有良好的測試,則可能按下葫蘆浮起瓢。比如在 reactor-core中就有如下的using
操作符:
```
public static
resourceSupplier
針對每個訂閱者,建立一個資源
sourceSupplier
結合建立的資源,產生對應的元素
resourceCleanup
取消訂閱或者流完成時,清理對應的資源
在 reactor-core 中,對應的底層實現為 MonoUsing
共 360 行程式碼,而要實現我們想要的邏輯,我們還需要和另一個流進行合併,即這裡的 using
類似於 unfoldResource
。那麼有沒有可能使用更加簡單的方案來進行實現呢?答案是肯定的,和前面的幾個操作符一樣,我們可以使用 statefulMap
來實現mapWithResource
,思維過程如下:
-
using
/mapWithResource
的生命週期管理 和statefulMap
的create
和onComplete
方法對應,針對資源,onComplete
方法可以被命名為更加貼切的release
/close
/cleanUp
。 -
在流中使用的資源,我們可以認為是一個狀態,只不過這個狀態在流的整個生命週期中不再變化,一直是
create
方法中返回的Resource
。 -
在關閉資源時,我們可以通過返回一個
Optional<Out>
來返回一個可選的值。 -
對併發資源的非同步處理,則可以通過返回一個
CompletionStage<Out>
而非Out
來實現,在using
方法中,我們返回的是一個Mono<T>
經過上面的思維過程,我們不難得出這個流上的方法的宣告可以為:
```
public
resourceSupplier
針對每個訂閱者/每次物化,建立一個資源
function
使用create
中建立的資源處理流中的每個元素
close
在流關閉的同時關閉資源,並再向下遊提供一個可選的值
具體的的實現這裡留空,感興趣的小夥伴可以結合前面的例子進行實現。下面我們看一下如何使用這個 mapWithResource
方法,從而加深大家的理解。
使用mapWithResource
假設我們有一組 SQL 需要進行處理,我們需要從資料庫中的多個表中查詢對應的結果,並將最終結果進行合併和輸出到控制檯。在mapWithResource
的幫助下,我們可以極大的簡化我們的程式碼:
``` Source.from( Arrays.asList( "SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;", "SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;")) .mapWithResource( () -> dbDriver.create(url, userName, password), (connection, query) -> db.doQuery(connection, query).toList(), connection -> { connection.close(); return Optional.empty(); }) .mapConcat(elems -> elems) .runForeach(System.out::println, system); ````
在上面的例子中:我們有一組預先定義好的 SQL,分別從多個表中讀取最新的 100 條資料,通過使用mapWithResource
,我們優雅地為每個流建立了 db 相關的連線,並進行對應的查詢操作,併合並查詢結果,在流處理完成後,關閉對應的資源。上面的程式碼通過複用我們前面編寫的 mapWithResource
將複雜資源和生命週期管理進行了簡化,作為對比,大家可以思考一下如果我們不使用已有抽象所需要付出的努力。
總結
在上面的例子中,我們通過 statefulMap
以及和其他的操作符相互組合,實現了很多和狀態、生命週期相關的操作符,而程式碼量則大大減少。基於一個經過考驗的操作符來編寫自定義操作符,也能進一步降低出錯的概率,以及程式碼審查的難度。而相關的操作符都是通過一個底層的 statefulMap
來實現。對映到我們的工作中則是儘可能地抽象、提煉,對系統的核心模型、核心功能進行打磨,從而每個應用都有一個精巧的核心,並和其他的應用構成豐富的生態。而非上來就 複製、貼上,重複造輪子;避免最終陷入複製、貼上的泥潭中。雖然有時我們可能沒有足夠的時間來進一步抽象,而是業務先行。但是我依然建議,在後續的實踐中,進行不斷回顧和提煉,在保障系統穩定可靠、在有測試手段保障的情況下,進行逐步的重構,使得系統更加容易理解、維護和穩固。
筆者相信:磨刀不誤砍柴工,在設計、方案review、測試和不斷重構、精煉的過程中所花費的時間,一定會在將來多倍的回報。
- 第14個天貓雙11,技術創新帶來消費新體
- 如何避免寫重複程式碼:善用抽象和組合
- 淘寶PC改版!我們跟一位背後付出6年的男人聊了聊……
- 在阿里做前端程式設計師,我是這樣規劃的
- 一種可灰度的介面遷移方案
- 如何快速理解複雜業務,系統思考問題?
- 淘寶iOS掃一掃架構升級 - 設計模式的應用
- HTTP3 RFC標準正式釋出,QUIC會成為傳輸技術的新一代顛覆者嗎?
- 2022大淘寶技術工程師推薦書單
- 國際頂會OSDI首度收錄淘寶系統論文,端雲協同智慧獲大會主旨演講推薦
- 如何持續突破效能表現? | DX研發模式
- 列表容器&事件鏈如何幫業務提升發版迭代效率? | DX研發模式
- 2022淘寶天貓618背後——與你息息相關的技術祕密
- 淘寶Native研發模式的演進與思考 | DX研發模式
- CVPR2022 | 開源:基於間距自適應查詢表的實時影象增強方法
- 無線運維的起源與專案建設思考
- 淘寶購物車5年技術升級與沉澱
- 從標準到開源,阿里大淘寶技術的“創新擔當”
- 程式設計師如何在業餘時間提升自己?