JAVA基於CompletableFuture的流水線並行處理深度實踐,滿滿乾貨

語言: CN / TW / HK

theme: healer-readable

大家好,又見面啦。

在項目開發中,後端服務對外提供API接口一般都會關注響應時長。但是某些情況下,由於業務規劃邏輯的原因,我們的接口可能會是一個聚合信息處理類的處理邏輯,比如我們從多個不同的地方獲取數據,然後彙總處理為最終的結果再返回給調用方,這種情況下,往往會導致我們的接口響應特別的慢。

而如果我們想要動手進行優化的時候呢,就會涉及到串行處理改並行處理的問題。在JAVA中並行處理的能力支持已經相對完善,通過對CompletableFuture的合理利用,可以讓我們面對這種聚合類處理的場景會更加的得心應手。

好啦,話不多説,接下來就讓我們一起來品嚐下JAVA中組合式並行處理這道饕餮大餐吧。

image.png

前菜:先看個實際場景

在開始享用這頓大餐前,我們先來個前菜開開胃。

例如現在有這麼個需求:

需求描述: 實現一個全網比價服務,比如可以從某寶、某東、某夕夕去獲取某個商品的價格、優惠金額,並計算出實際付款金額,最終返回價格最優的平台與價格信息。

📢這裏假定每個平台獲取原價格與優惠券的接口已經實現、且都是需要調用HTTP接口查詢的耗時操作,Mock接口每個耗時1s左右。

根據最初的需求理解,我們可以很自然的寫出對應實現代碼:

java public PriceResult getCheapestPlatAndPrice(String product) { PriceResult mouBaoPrice = computeRealPrice(HttpRequestMock.getMouBaoPrice(product), HttpRequestMock.getMouBaoDiscounts(product)); PriceResult mouDongPrice = computeRealPrice(HttpRequestMock.getMouDongPrice(product), HttpRequestMock.getMouDongDiscounts(product)); PriceResult mouXiXiPrice = computeRealPrice(HttpRequestMock.getMouXiXiPrice(product), HttpRequestMock.getMouXiXiDiscounts(product)); // 計算並選出實際價格最低的平台 return Stream.of(mouBaoPrice, mouDongPrice, mouXiXiPrice). min(Comparator.comparingInt(PriceResult::getRealPrice)) .get(); }

一切順利成章,運行測試下:

05:24:54.779[main|1]獲取某寶上 Iphone13的價格完成: 5199 05:24:55.781[main|1]獲取某寶上 Iphone13的優惠完成: -200 05:24:55.781[main|1]某寶最終價格計算完成:4999 05:24:56.784[main|1]獲取某東上 Iphone13的價格完成: 5299 05:24:57.786[main|1]獲取某東上 Iphone13的優惠完成: -150 05:24:57.786[main|1]某東最終價格計算完成:5149 05:24:58.788[main|1]獲取某夕夕上 Iphone13的價格完成: 5399 05:24:59.791[main|1]獲取某夕夕上 Iphone13的優惠完成: -5300 05:24:59.791[main|1]某夕夕最終價格計算完成:99 獲取最優價格信息:【平台:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 6122ms ------

結果符合預期,功能一切正常,就是耗時長了點。試想一下,假如你在某個APP操作查詢的時候,等待6s才返回結果,估計會直接把APP給卸載了吧

梳理下前面代碼的實現思路:

image.png

所有的環節都是串行的,每個環節耗時加到一起,接口總耗時肯定很長。

但實際上,每個平台之間的操作是互不干擾的,那我們自然而然的可以想到,可以通過多線程的方式,同時去分別執行各個平台的邏輯處理,最後將各個平台的結果彙總到一起比對得到最低價格。

所以整個執行過程會變成如下的效果:

image.png

為了提升性能,我們採用線程池來負責多線程的處理操作,因為我們需要得到各個子線程處理的結果,所以我們需要使用 Future來實現:

```java public PriceResult getCheapestPlatAndPrice2(String product) { Future mouBaoFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getMouBaoPrice(product), HttpRequestMock.getMouBaoDiscounts(product))); Future mouDongFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getMouDongPrice(product), HttpRequestMock.getMouDongDiscounts(product))); Future mouXiXiFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getMouXiXiPrice(product), HttpRequestMock.getMouXiXiDiscounts(product)));

// 等待所有線程結果都處理完成,然後從結果中計算出最低價
return Stream.of(mouBaoFuture, mouDongFuture, mouXiXiFuture)
    .map(priceResultFuture -> {
        try {
            return priceResultFuture.get(5L, TimeUnit.SECONDS);
        } catch (Exception e) {
            return null;
        }
    })
    .filter(Objects::nonNull).min(Comparator.comparingInt(PriceResult::getRealPrice)).get();

} ```

上述代碼中,將三個不同平台對應的Callable函數邏輯放入到ThreadPool中去執行,返回Future對象,然後再逐個通過Future.get()接口阻塞獲取各自平台的結果,最後經比較處理後返回最低價信息。

執行代碼,可以看到執行結果與過程如下:

05:42:25.291[pool-1-thread-2|13]獲取某東上 Iphone13的價格完成: 5299 05:42:25.291[pool-1-thread-3|14]獲取某夕夕上 Iphone13的價格完成: 5399 05:42:25.291[pool-1-thread-1|12]獲取某寶上 Iphone13的價格完成: 5199 05:42:26.294[pool-1-thread-2|13]獲取某東上 Iphone13的優惠完成: -150 05:42:26.294[pool-1-thread-3|14]獲取某夕夕上 Iphone13的優惠完成: -5300 05:42:26.294[pool-1-thread-1|12]獲取某寶上 Iphone13的優惠完成: -200 05:42:26.294[pool-1-thread-2|13]某東最終價格計算完成:5149 05:42:26.294[pool-1-thread-3|14]某夕夕最終價格計算完成:99 05:42:26.294[pool-1-thread-1|12]某寶最終價格計算完成:4999 獲取最優價格信息:【平台:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 2119ms ------

結果與第一種實現方式一致,但是接口總耗時從6s下降到了2s,效果還是很顯著的。但是,是否還能再壓縮一些呢?

基於上面按照平台拆分並行處理的思路繼續推進,我們可以看出每個平台內的處理邏輯其實可以分為3個主要步驟:

  1. 獲取原始價格(耗時操作)
  2. 獲取折扣優惠(耗時操作)
  3. 得到原始價格和折扣優惠之後,計算實付價格

這3個步驟中,第1、2兩個耗時操作也是相對獨立的,如果也能並行處理的話,響應時長上應該又會縮短一些,即如下的處理流程:

image.png

我們當然可以繼續使用上面提到的線程池+Future的方式,但Future在應對並行結果組合以及後續處理等方面顯得力不從心,弊端明顯:

代碼寫起來會非常拖沓:先封裝Callable函數放到線程池中去執行查詢操作,然後分三組阻塞等待結果並計算出各自結果,最後再阻塞等待價格計算完成後彙總得到最終結果。

説到這裏呢,就需要我們新的主人公CompletableFuture登場了,通過它我們可以很輕鬆的來完成任務的並行處理,以及各個並行任務結果之間的組合再處理等操作。我們使用CompletableFuture編寫實現代碼如下:

```java public PriceResult getCheapestPlatAndPrice3(String product) { CompletableFuture mouBao = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice); CompletableFuture mouDong = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouDongPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouDongDiscounts(product)), this::computeRealPrice); CompletableFuture mouXiXi = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)), this::computeRealPrice);

// 排序並獲取最低價格
return Stream.of(mouBao, mouDong, mouXiXi)
        .map(CompletableFuture::join)
        .sorted(Comparator.comparingInt(PriceResult::getRealPrice))
        .findFirst()
        .get();

} ```

看下執行結果符合預期,而接口耗時則降到了1s(因為我們依賴的每一個查詢實際操作的接口耗時都是模擬的1s,所以這個結果已經算是此複合接口能達到的極限值了)。

06:01:13.354[ForkJoinPool.commonPool-worker-6|17]獲取某夕夕上 Iphone13的優惠完成: -5300 06:01:13.354[ForkJoinPool.commonPool-worker-13|16]獲取某夕夕上 Iphone13的價格完成: 5399 06:01:13.354[ForkJoinPool.commonPool-worker-4|15]獲取某東上 Iphone13的優惠完成: -150 06:01:13.354[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13的價格完成: 5199 06:01:13.354[ForkJoinPool.commonPool-worker-11|14]獲取某東上 Iphone13的價格完成: 5299 06:01:13.354[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13的優惠完成: -200 06:01:13.354[ForkJoinPool.commonPool-worker-13|16]某夕夕最終價格計算完成:99 06:01:13.354[ForkJoinPool.commonPool-worker-11|14]某東最終價格計算完成:5149 06:01:13.354[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999 獲取最優價格信息:【平台:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 1095ms ------

好啦,通過餐前的前菜,大家應該能夠看出來串行並行處理邏輯的區別、以及並行處理邏輯的實現策略了吧?這裏我們應該也可以看出CompletableFuture在應對並行處理場景下的強大優勢。當然咯,上面也只是小小的窺視了下CompletableFuture功能的冰上一角,下面就讓我們一起來深入瞭解下,享用並消化CompletableFuture這道主菜吧!

主菜:CompletableFuture深入瞭解

好啦,下面該主菜上場了。

作為JAVA8之後加入的新成員,CompletableFuture的實現與使用上,也處處體現出了函數式異步編程的味道。一個CompletableFuture對象可以被一個環節接一個環節的處理、也可以對兩個或者多個CompletableFuture進行組合處理或者等待結果完成。通過對CompletableFuture各種方法的合理使用與組合搭配,可以讓我們在很多的場景都可以應付自如。

下面就來一起了解下這些方法以及對應的使用方式吧。

Future與CompletableFuture

首先,先來理一下Future與CompletableFuture之間的關係。

Future

如果接觸過多線程相關的概念,那Future應該不會陌生,早在Java5中就已經存在了。

該如何理解Future呢?舉個生活中的例子:

你去咖啡店點了一杯咖啡,然後服務員會給你一個訂單小票。 當服務員在後台製作咖啡的時候,你並沒有在店裏等待,而是出門到隔壁甜品店又買了個麪包。 當面包買好之後,你回到咖啡店,拿着訂單小票去取咖啡。 取到咖啡後,你邊喝咖啡邊把麪包吃了……嗝~

是不是很熟悉的生活場景? 對比到我們多線程異步編程的場景中,咖啡店的訂單小票其實就是Future,通過Future可以讓稍後適當的時候可以獲取到對應的異步執行線程中的執行結果。

上面的場景,我們翻譯為代碼實現邏輯:

java public void buyCoffeeAndOthers() throws ExecutionException, InterruptedException { goShopping(); // 子線程中去處理做咖啡這件事,返回future對象 Future<Coffee> coffeeTicket = threadPool.submit(this::makeCoffee); // 主線程同步去做其他的事情 Bread bread = buySomeBread(); // 主線程其他事情並行處理完成,阻塞等待獲取子線程執行結果 Coffee coffee = coffeeTicket.get(); // 子線程結果獲取完成,主線程繼續執行 eatAndDrink(bread, coffee); }

image.png

編碼源於生活、代碼中的設計邏輯,很多時候都是與生活哲學匹配的。

CompletableFuture

Future在應對一些簡單且相互獨立的異步執行場景很便捷,但是在一些複雜的場景,比如同時需要多個有依賴關係的異步獨立處理的時候,或者是一些類似流水線的異步處理場景時,就顯得力不從心了。比如:

  • 同時執行多個並行任務,等待最快的一個完成之後就可以繼續往後處理
  • 多個異步任務,每個異步任務都需要依賴前一個異步任務執行的結果再去執行下一個異步任務,最後只需要一個最終的結果
  • 等待多個異步任務全部執行完成後觸發下一個動作執行
  • ...

所以呢, 在JAVA8開始引入了全新的CompletableFuture類,它是Future接口的一個實現類。也就是在Future接口的基礎上,額外封裝提供了一些執行方法,用來解決Future使用場景中的一些不足,對流水線處理能力提供了支持。

image.png

下一節中,我們就來進一步的瞭解下CompletableFuture的具體使用場景與使用方式。

CompletableFuture使用方式

創建CompletableFuture並執行

當我們需要進行異步處理的時候,我們可以通過CompletableFuture.supplyAsync方法,傳入一個具體的要執行的處理邏輯函數,這樣就輕鬆的完成了CompletableFuture的創建與觸發執行。

| 方法名稱 | 作用描述 | | --- | --- | | supplyAsync | 靜態方法,用於構建一個CompletableFuture<T>對象,並異步執行傳入的函數,允許執行函數有返回值T。 | | runAsync | 靜態方法,用於構建一個CompletableFuture<Void>對象,並異步執行傳入函數,與supplyAsync的區別在於此方法傳入的是Callable類型,僅執行,沒有返回值。 |

使用示例:

java public void testCreateFuture(String product) { // supplyAsync, 執行邏輯有返回值PriceResult CompletableFuture<PriceResult> supplyAsyncResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)); // runAsync, 執行邏輯沒有返回值 CompletableFuture<Void> runAsyncResult = CompletableFuture.runAsync(() -> System.out.println(product)); }

特別補充:

supplyAsync或者runAsync創建後便會立即執行,無需手動調用觸發。

環環相扣處理

在流水線處理場景中,往往都是一個任務環節處理完成後,下一個任務環節接着上一環節處理結果繼續處理。CompletableFuture用於這種流水線環節驅動類的方法有很多,相互之間主要是在返回值或者給到下一環節的入參上有些許差異,使用時需要注意區分:

image.png

具體的方法的描述歸納如下:

| 方法名稱 | 作用描述 | | --- | --- | | thenApply | 對CompletableFuture的執行後的具體結果進行追加處理,並將當前的CompletableFuture泛型對象更改為處理後新的對象類型,返回當前CompletableFuture對象。 | | thenCompose | 與thenApply類似。區別點在於:此方法的入參函數返回一個CompletableFuture類型對象。 | | thenAccept | 與thenApply方法類似,區別點在於thenAccept返回void類型,沒有具體結果輸出,適合無需返回值的場景。 | | thenRun | 與thenAccept類似,區別點在於thenAccept可以將前面CompletableFuture執行的實際結果作為入參進行傳入並使用,但是thenRun方法沒有任何入參,只能執行一個Runnable函數,並且返回void類型。 |

因為上述thenApplythenCompose方法的輸出仍然都是一個CompletableFuture對象,所以各個方法是可以一環接一環的進行調用,形成流水線式的處理邏輯:

image.png

期望總是美好的,但是實際情況卻總不盡如人意。在我們編排流水線的時候,如果某一個環節執行拋出異常了,會導致整個流水線後續的環節就沒法再繼續下去了,比如下面的例子:

java public void testExceptionHandle() { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("supplyAsync excetion occurred..."); }).thenApply(obj -> { System.out.println("thenApply executed..."); return obj; }).join(); }

執行之後會發現,supplyAsync拋出異常後,後面的thenApply並沒有被執行。

那如果我們想要讓流水線的每個環節處理失敗之後都能讓流水線繼續往下面環節處理,讓後續環節可以拿到前面環節的結果或者是拋出的異常並進行對應的應對處理,就需要用到handlewhenCompletable方法了。

先看下兩個方法的作用描述:

| 方法名稱 | 作用描述 | | --- | --- | | handle | 與thenApply類似,區別點在於handle執行函數的入參有兩個,一個是CompletableFuture執行的實際結果,一個是是Throwable對象,這樣如果前面執行出現異常的時候,可以通過handle獲取到異常並進行處理。 | | whenComplete | 與handle類似,區別點在於whenComplete執行後無返回值。 |

我們對上面一段代碼示例修改使用handle方法來處理:

java public void testExceptionHandle() { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("supplyAsync excetion occurred..."); }).handle((obj, e) -> { if (e != null) { System.out.println("thenApply executed, exception occurred..."); } return obj; }).join(); }

再執行可以發現,即使前面環節出現異常,後面環節也可以繼續處理,且可以拿到前一環節拋出的異常信息:

thenApply executed, exception occurred...

多個CompletableFuture組合操作

前面一直在介紹流水線式的處理場景,但是很多時候,流水線處理場景也不會是一個鏈路順序往下走的情況,很多時候為了提升並行效率,一些沒有依賴的環節我們會讓他們同時去執行,然後在某些環節需要依賴的時候,進行結果的依賴合併處理,類似如下圖的效果。

image.png

CompletableFuture相比於Future的一大優勢,就是可以方便的實現多個並行環節的合併處理。相關涉及方法介紹歸納如下:

| 方法名稱 | 作用描述 | | --- | --- | | thenCombine | 將兩個CompletableFuture對象組合起來進行下一步處理,可以拿到兩個執行結果,並傳給自己的執行函數進行下一步處理,最後返回一個新的CompletableFuture對象。 | | thenAcceptBoth | 與thenCombine類似,區別點在於thenAcceptBoth傳入的執行函數沒有返回值,即thenAcceptBoth返回值為CompletableFuture<Void>。 | | runAfterBoth | 等待兩個CompletableFuture都執行完成後再執行某個Runnable對象,再執行下一個的邏輯,類似thenRun。 | | applyToEither | 兩個CompletableFuture中任意一個完成的時候,繼續執行後面給定的新的函數處理。再執行後面給定函數的邏輯,類似thenApply。 | | acceptEither | 兩個CompletableFuture中任意一個完成的時候,繼續執行後面給定的新的函數處理。再執行後面給定函數的邏輯,類似thenAccept。 | | runAfterEither | 等待兩個CompletableFuture中任意一個執行完成後再執行某個Runnable對象,可以理解為thenRun的升級版,注意與runAfterBoth對比理解。 | | allOf | 靜態方法,阻塞等待所有給定的CompletableFuture執行結束後,返回一個CompletableFuture<Void>結果。 | | anyOf | 靜態方法,阻塞等待任意一個給定的CompletableFuture對象執行結束後,返回一個CompletableFuture<Void>結果。 |

結果等待與獲取

在執行線程中將任務放到工作線程中進行處理的時候,執行線程與工作線程之間是異步執行的模式,如果執行線程需要獲取到共工作線程的執行結果,則可以通過get或者join方法,阻塞等待並從CompletableFuture中獲取對應的值。

image.png

getjoin的方法功能含義説明歸納如下:

| 方法名稱 | 作用描述 | | --- | --- | | get() | 等待CompletableFuture執行完成並獲取其具體執行結果,可能會拋出異常,需要代碼調用的地方手動try...catch進行處理。 | | get(long, TimeUnit) | 與get()相同,只是允許設定阻塞等待超時時間,如果等待超過設定時間,則會拋出異常終止阻塞等待。 | | join() | 等待CompletableFuture執行完成並獲取其具體執行結果,可能會拋出運行時異常,無需代碼調用的地方手動try...catch進行處理。 |

從介紹上可以看出,兩者的區別就在於是否需要調用方顯式的進行try...catch處理邏輯,使用代碼示例如下:

```java public void testGetAndJoin(String product) { // join無需顯式try...catch... PriceResult joinResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)) .join();

try {
    // get顯式try...catch...
    PriceResult getResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
            .get(5L, TimeUnit.SECONDS);
} catch (Exception e) {
    e.printStackTrace();
}

} ```

CompletableFuture方法及其Async版本

我們在使用CompletableFuture的時候會發現,有很多的方法,都會同時有兩個以Async命名結尾的方法版本。以前面我們用的比較多的thenCombine方法為例:

  1. thenCombine(CompletionStage, BiFunction)
  2. thenCombineAsync(CompletionStage, BiFunction)
  3. thenCombineAsync(CompletionStage, BiFunction, Executor)

從參數上看,區別並不大,僅第三個方法入參中多了線程池Executor對象。看下三個方法的源碼實現,會發現其整體實現邏輯都是一致的,僅僅是使用線程池這個地方的邏輯有一點點的差異:

image.png

有興趣的可以去翻一下此部分的源碼實現,這裏概括下三者的區別:

  1. thenCombine方法,沿用上一個執行任務所使用的線程池進行處理
  2. thenCombineAsync兩個入參的方法,使用默認的ForkJoinPool線程池中的工作線程進行處理
  3. themCombineAsync三個入參的方法,支持自定義線程池並指定使用自定義線程池中的線程作為工作線程去處理待執行任務。

為了更好的理解下上述的三個差異點,我們通過下面的代碼來演示下:

  • 用法1: 其中一個supplyAsync方法以及thenCombineAsync指定使用自定義線程池,另一個supplyAsync方法不指定線程池(使用默認線程池)

```java public PriceResult getCheapestPlatAndPrice4(String product) { // 構造自定義線程池 ExecutorService executor = Executors.newFixedThreadPool(5);

return
    CompletableFuture.supplyAsync(
        () -> HttpRequestMock.getMouXiXiPrice(product), 
        executor
    ).thenCombineAsync(
        CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)),
        this::computeRealPrice,
        executor
    ).join();

} ```

對上述代碼實現策略的解讀,以及與執行結果的關係展示如下圖所示,可以看出,沒有指定自定義線程池的supplyAsync方法,其使用了默認的ForkJoinPool工作線程來運行,而另外兩個指定了自定義線程池的方法,則使用了自定義線程池來執行。

image.png

  • 用法2: 不指定自定義線程池,使用默認線程池策略,使用thenCombine方法

java public PriceResult getCheapestPlatAndPrice5(String product) { return CompletableFuture.supplyAsync( () -> HttpRequestMock.getMouXiXiPrice(product) ).thenCombine( CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)), this::computeRealPrice ).join(); }

執行結果如下,可以看到執行線程名稱與用法1示例相比發生了變化。因為沒有指定線程池,所以兩個supplyAsync方法都是用的默認的ForkJoinPool線程池,而thenCombine使用的是上一個任務所使用的線程池,所以也是用的ForkJoinPool

14:34:27.815[ForkJoinPool.commonPool-worker-1|12]獲取某夕夕上 Iphone13的價格 14:34:27.815[ForkJoinPool.commonPool-worker-2|13]獲取某夕夕上 Iphone13的優惠 14:34:28.831[ForkJoinPool.commonPool-worker-2|13]獲取某夕夕上 Iphone13的優惠完成: -5300 14:34:28.831[ForkJoinPool.commonPool-worker-1|12]獲取某夕夕上 Iphone13的價格完成: 5399 14:34:28.831[ForkJoinPool.commonPool-worker-2|13]某夕夕最終價格計算完成:99 獲取最優價格信息:【平台:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 1083ms ------

現在,我們知道了方法名稱帶有Async和不帶Async的實現策略上的差異點就在於使用哪個線程池來執行而已。那麼,對我們實際的指導意義是啥呢?實際使用的時候,我們怎麼判斷自己應該使用帶Async結尾的方法、還是不帶Async結尾的方法呢?

image.png

上面是Async結尾方法默認使用的ForkJoinPool創建的邏輯,這裏可以看出,默認的線程池中的工作線程數是CPU核數 - 1,並且指定了默認的丟棄策略等,這就是一個主要關鍵點。

所以説,符合以下幾個條件的時候,可以考慮使用帶有Async後綴的方法,指定自定義線程池:

  • 默認線程池的線程數滿足不了實際訴求
  • 默認線程池的類型不符合自己業務訴求
  • 默認線程池的隊列滿處理策略不滿足自己訴求

與Stream結合使用的注意點

在我前面的文檔中,有細緻全面的介紹過Stream流相關的使用方式(不清楚的同學速點👉👉《吃透JAVA的Stream流操作,多年實踐總結》瞭解下啦)。在涉及批量進行並行處理的時候,通過StreamCompletableFuture結合使用,可以簡化我們的很多編碼邏輯。但是在使用細節方面需要注意下,避免達不到使用CompletableFuture的預期效果。

需求場景: 在同一個平台內,傳入多個商品,查詢不同商品對應的價格與優惠信息,並選出實付價格最低的商品信息。

結合前面的介紹分析,我們應該知道最佳的方式,就是同時並行的方式去各自請求數據,最後合併處理即可。所以我們規劃按照如下的策略來實現:

image.png

先看第一種編碼實現:

java public PriceResult comparePriceInOnePlat(List<String> products) { return products.stream() .map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)) .thenCombine( CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice)) .map(CompletableFuture::join) .sorted(Comparator.comparingInt(PriceResult::getRealPrice)) .findFirst() .get(); }

對於List的處理場景,這裏採用了Stream方式來進行遍歷與結果的收集、排序與返回。看似正常,但是執行的時候會發現,並沒有達到我們預期的效果:

07:37:15.408[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13黑色的價格完成: 5199 07:37:15.408[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13黑色的優惠完成: -200 07:37:15.408[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999 07:37:16.410[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13白色的價格完成: 5199 07:37:16.410[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13白色的優惠完成: -200 07:37:16.410[ForkJoinPool.commonPool-worker-11|14]某寶最終價格計算完成:4999 07:37:17.412[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13紅色的價格完成: 5199 07:37:17.412[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13紅色的優惠完成: -200 07:37:17.412[ForkJoinPool.commonPool-worker-9|12]某寶最終價格計算完成:4999 獲取最優價格信息:【平台:某寶, 原價:5199, 折扣:0, 實付價:4999】 -----執行耗時: 3132ms ------

從上述執行結果可以看出,其具體處理的時候,其實是按照下面的邏輯去處理了:

image.png

為什麼會出現這種實際與預期的差異呢?原因就在於我們使用的Stream上面!雖然Stream中使用兩個map方法,但Stream處理的時候並不會分別遍歷兩遍,其實寫法等同於下面這種寫到1個map中處理,改為下面這種寫法,其實大家也就更容易明白為啥會沒有達到我們預期的整體並行效果了:

java public PriceResult comparePriceInOnePlat1(List<String> products) { return products.stream() .map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice).join()) .sorted(Comparator.comparingInt(PriceResult::getRealPrice)) .findFirst() .get(); }

既然如此,這種場景是不是就不能使用Stream了呢?也不是,其實我們拆開成兩個Stream分步操作下其實就可以了。

再看下面的第二種實現代碼:

java public PriceResult comparePriceInOnePlat2(List<String> products) { // 先觸發各自平台的並行處理 List<CompletableFuture<PriceResult>> completableFutures = products.stream() .map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice)) .collect(Collectors.toList()); // 在獨立的流中,等待所有並行處理結束,做最終結果處理 return completableFutures.stream() .map(CompletableFuture::join) .sorted(Comparator.comparingInt(PriceResult::getRealPrice)) .findFirst() .get(); }

執行結果:

07:39:16.072[ForkJoinPool.commonPool-worker-6|17]獲取某寶上 Iphone13紅色的價格完成: 5199 07:39:16.072[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13黑色的價格完成: 5199 07:39:16.072[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13黑色的優惠完成: -200 07:39:16.072[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13白色的價格完成: 5199 07:39:16.072[ForkJoinPool.commonPool-worker-4|15]獲取某寶上 Iphone13白色的優惠完成: -200 07:39:16.072[ForkJoinPool.commonPool-worker-13|16]獲取某寶上 Iphone13紅色的優惠完成: -200 07:39:16.072[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999 07:39:16.072[ForkJoinPool.commonPool-worker-4|15]某寶最終價格計算完成:4999 07:39:16.072[ForkJoinPool.commonPool-worker-13|16]某寶最終價格計算完成:4999 獲取最優價格信息:【平台:某寶, 原價:5199, 折扣:0, 實付價:4999】 -----執行耗時: 1142ms ------

從執行結果可以看出,三個商品並行處理,整體處理耗時相比前面編碼方式有很大提升,達到了預期的效果。

📢歸納下

因為Stream的操作具有延遲執行的特點,且只有遇到終止操作(比如collect方法)的時候才會真正的執行。所以遇到這種需要並行處理且需要合併多個並行處理流程的情況下,需要將並行流程與合併邏輯放到兩個Stream中,這樣分別觸發完成各自的處理邏輯,就可以了。

甜點:併發和並行的區別

對一個吃貨而言,主餐完畢,總得來點餐後甜點才夠滿足。

在前面的內容中呢,我們始終是在圍繞並行處理這個話題在展開。實際工作的時候,我們對於併發這個詞肯定也不陌生,高併發這個詞,就像高端人士酒杯中那八二年的拉菲一般,成了每一個開發人員簡歷上用來彰顯實力的一個標籤。

那麼,併發並行到底啥區別?這裏我們也簡單的概括下。

併發

關於併發的詳細內容,可以參見我寫的另一篇內容,也即本篇文章的姊妹篇《不堆概念,換個角度聊多線程併發編程》,下面這裏簡單的介紹下併發的概念。

所謂併發,其關注的點是服務器的吞吐量情況,也就是服務器可以在單位時間內同時處理多少個請求。併發是通過多線程的方式來實現的,充分利用當前CPU多核能力,同時使用多個進程去處理業務,使得同一個機器在相同時間內可以處理更多的請求,提升吞吐量。

image.png 所有的操作在一個線程中串行推進,如果有多個線程同步處理,則同時有多個請求可以被處理。但是因為是串行處理,所以如果某個環節需要對外交互時,比如等待網絡IO的操作,會使得當前線程處於阻塞狀態,直到資源可用時被喚醒繼續往後執行。

image.png

對於高併發場景,服務器的線程資源是非常寶貴的。如果頻繁的處於阻塞則會導致浪費,且線程頻繁的阻塞、喚醒切換動作,也會加劇整體系統的性能損耗。所以併發這種多線程場景,更適合CPU密集型的操作。

並行

所謂並行,就是將同一個處理流程沒有相互依賴的部分放到多個線程中進行同時並行處理,以此來達到相對於串行模式更短的單流程處理耗時的效果,進而提升系統的整體響應時長吞吐量

image.png

基於異步編程實現的並行操作也是藉助線程池的方式,通過多線程同時執行來實現效率提升的。與併發的區別在於:並行通過將任務切分為一個個可獨立處理的小任務塊,然後基於系統調度策略,將需要執行的任務塊分配給空閒可用工作線程去處理,如果出現需要等待的場景(比如IO請求)則工作線程會將此任務先放下,繼續處理後續的任務,等之前的任務IO請求好了之後,系統重新分配可用的工作線程來處理。

image.png

根據上面的示意圖介紹可以看出,異步並行編程,對於工作線程的利用率上升,不會出現工作線程阻塞的情況,但是因為任務拆分、工作線程間的切換調度等系統層面的開銷也會隨之加大。

如何選擇

前面介紹了下併發與並行兩種模式的特點、以及各自的優缺點。所以選擇採用併發還是並行方式來提升系統的處理性能,還需要結合實際項目場景來確定。

綜合而言

  1. 如果業務處理邏輯是CPU密集型的操作,優先使用基於線程池實現併發處理方案(可以避免線程間切換導致的系統性能浪費)。
  2. 如果業務處理邏輯中存在較多需要阻塞等待的耗時場景、且相互之間沒有依賴,比如本地IO操作、網絡IO請求等等,這種情況優先選擇使用並行處理策略(可以避免寶貴的線程資源被阻塞等待)。

總結回顧

好啦,關於JAVA中CompletableFuture的使用,以及並行編程相關的內容呢就介紹到這裏啦。看到這裏,相信您應該有所收穫吧?那麼你的項目裏有這種適合並行處理的場景嗎?你在處理並行場景的時候是怎麼做的呢?評論區一起討論下吧~~

補充:

本文中有提及CompletableFuture執行時所使用的默認線程池是ForkJoinPool,早在JAVA7版本就已經被引入,但是很多人對ForkJoinPool不是很瞭解,實際項目中使用的也比較少。其實對ForkJoinPool的合理利用,可以讓我們在面對某些多線程場景時會更加的從容高效。在後面的文章中,我會針對ForkJoinPool有關的內容進行專門的介紹與探討,如果有興趣,可以點個關注,及時獲取後續的內容。

此外

我是悟道,聊技術、又不僅僅聊技術~

如果覺得有用,請點贊 + 關注讓我感受到您的支持。也可以關注下我的公眾號【架構悟道】,獲取更及時的更新。

期待與你一起探討,一起成長為更好的自己。


我正在參與掘金技術社區創作者簽約計劃招募活動,點擊鏈接報名投稿