JAVA基於CompletableFuture的流水線並行處理深度實踐,滿滿乾貨

語言: CN / TW / HK

theme: healer-readable

大家好,又見面啦。

在專案開發中,後端服務對外提供API介面一般都會關注響應時長。但是某些情況下,由於業務規劃邏輯的原因,我們的介面可能會是一個聚合資訊處理類的處理邏輯,比如我們從多個不同的地方獲取資料,然後彙總處理為最終的結果再返回給呼叫方,這種情況下,往往會導致我們的介面響應特別的慢。

而如果我們想要動手進行優化的時候呢,就會涉及到序列處理改並行處理的問題。在JAVA中並行處理的能力支援已經相對完善,通過對CompletableFuture的合理利用,可以讓我們面對這種聚合類處理的場景會更加的得心應手。

好啦,話不多說,接下來就讓我們一起來品嚐下JAVA中組合式並行處理這道饕餮大餐吧。

image.png

前菜:先看個實際場景

在開始享用這頓大餐前,我們先來個前菜開開胃。

例如現在有這麼個需求:

需求描述: 實現一個全網比價服務,比如可以從某寶、某東、某夕夕去獲取某個商品的價格、優惠金額,並計算出實際付款金額,最終返回價格最優的平臺與價格資訊。

📢這裡假定每個平臺獲取原價格與優惠券的介面已經實現、且都是需要呼叫HTTP介面查詢的耗時操作,Mock介面每個耗時1s左右。

根據最初的需求理解,我們可以很自然的寫出對應實現程式碼:

java public PriceResult getCheapestPlatAndPrice(String product) { PriceResult mouBaoPrice = computeRealPrice(HttpRequestMock.getMouBaoPrice(product), HttpRequestMock.getMouBaoDiscounts(product)); PriceResult mouDongPrice = computeRealPrice(HttpRequestMock.getMouDongPrice(product), HttpRequestMock.getMouDongDiscounts(product)); PriceResult mouXiXiPrice = computeRealPrice(HttpRequestMock.getMouXiXiPrice(product), HttpRequestMock.getMouXiXiDiscounts(product)); // 計算並選出實際價格最低的平臺 return Stream.of(mouBaoPrice, mouDongPrice, mouXiXiPrice). min(Comparator.comparingInt(PriceResult::getRealPrice)) .get(); }

一切順利成章,執行測試下:

05:24:54.779[main|1]獲取某寶上 Iphone13的價格完成: 5199 05:24:55.781[main|1]獲取某寶上 Iphone13的優惠完成: -200 05:24:55.781[main|1]某寶最終價格計算完成:4999 05:24:56.784[main|1]獲取某東上 Iphone13的價格完成: 5299 05:24:57.786[main|1]獲取某東上 Iphone13的優惠完成: -150 05:24:57.786[main|1]某東最終價格計算完成:5149 05:24:58.788[main|1]獲取某夕夕上 Iphone13的價格完成: 5399 05:24:59.791[main|1]獲取某夕夕上 Iphone13的優惠完成: -5300 05:24:59.791[main|1]某夕夕最終價格計算完成:99 獲取最優價格資訊:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 6122ms ------

結果符合預期,功能一切正常,就是耗時長了點。試想一下,假如你在某個APP操作查詢的時候,等待6s才返回結果,估計會直接把APP給解除安裝了吧

梳理下前面程式碼的實現思路:

image.png

所有的環節都是序列的,每個環節耗時加到一起,介面總耗時肯定很長。

但實際上,每個平臺之間的操作是互不干擾的,那我們自然而然的可以想到,可以通過多執行緒的方式,同時去分別執行各個平臺的邏輯處理,最後將各個平臺的結果彙總到一起比對得到最低價格。

所以整個執行過程會變成如下的效果:

image.png

為了提升效能,我們採用執行緒池來負責多執行緒的處理操作,因為我們需要得到各個子執行緒處理的結果,所以我們需要使用 Future來實現:

```java public PriceResult getCheapestPlatAndPrice2(String product) { Future mouBaoFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getMouBaoPrice(product), HttpRequestMock.getMouBaoDiscounts(product))); Future mouDongFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getMouDongPrice(product), HttpRequestMock.getMouDongDiscounts(product))); Future mouXiXiFuture = threadPool.submit(() -> computeRealPrice(HttpRequestMock.getMouXiXiPrice(product), HttpRequestMock.getMouXiXiDiscounts(product)));

// 等待所有執行緒結果都處理完成,然後從結果中計算出最低價
return Stream.of(mouBaoFuture, mouDongFuture, mouXiXiFuture)
    .map(priceResultFuture -> {
        try {
            return priceResultFuture.get(5L, TimeUnit.SECONDS);
        } catch (Exception e) {
            return null;
        }
    })
    .filter(Objects::nonNull).min(Comparator.comparingInt(PriceResult::getRealPrice)).get();

} ```

上述程式碼中,將三個不同平臺對應的Callable函式邏輯放入到ThreadPool中去執行,返回Future物件,然後再逐個通過Future.get()介面阻塞獲取各自平臺的結果,最後經比較處理後返回最低價資訊。

執行程式碼,可以看到執行結果與過程如下:

05:42:25.291[pool-1-thread-2|13]獲取某東上 Iphone13的價格完成: 5299 05:42:25.291[pool-1-thread-3|14]獲取某夕夕上 Iphone13的價格完成: 5399 05:42:25.291[pool-1-thread-1|12]獲取某寶上 Iphone13的價格完成: 5199 05:42:26.294[pool-1-thread-2|13]獲取某東上 Iphone13的優惠完成: -150 05:42:26.294[pool-1-thread-3|14]獲取某夕夕上 Iphone13的優惠完成: -5300 05:42:26.294[pool-1-thread-1|12]獲取某寶上 Iphone13的優惠完成: -200 05:42:26.294[pool-1-thread-2|13]某東最終價格計算完成:5149 05:42:26.294[pool-1-thread-3|14]某夕夕最終價格計算完成:99 05:42:26.294[pool-1-thread-1|12]某寶最終價格計算完成:4999 獲取最優價格資訊:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 2119ms ------

結果與第一種實現方式一致,但是介面總耗時從6s下降到了2s,效果還是很顯著的。但是,是否還能再壓縮一些呢?

基於上面按照平臺拆分並行處理的思路繼續推進,我們可以看出每個平臺內的處理邏輯其實可以分為3個主要步驟:

  1. 獲取原始價格(耗時操作)
  2. 獲取折扣優惠(耗時操作)
  3. 得到原始價格和折扣優惠之後,計算實付價格

這3個步驟中,第1、2兩個耗時操作也是相對獨立的,如果也能並行處理的話,響應時長上應該又會縮短一些,即如下的處理流程:

image.png

我們當然可以繼續使用上面提到的執行緒池+Future的方式,但Future在應對並行結果組合以及後續處理等方面顯得力不從心,弊端明顯:

程式碼寫起來會非常拖沓:先封裝Callable函式放到執行緒池中去執行查詢操作,然後分三組阻塞等待結果並計算出各自結果,最後再阻塞等待價格計算完成後彙總得到最終結果。

說到這裡呢,就需要我們新的主人公CompletableFuture登場了,通過它我們可以很輕鬆的來完成任務的並行處理,以及各個並行任務結果之間的組合再處理等操作。我們使用CompletableFuture編寫實現程式碼如下:

```java public PriceResult getCheapestPlatAndPrice3(String product) { CompletableFuture mouBao = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice); CompletableFuture mouDong = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouDongPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouDongDiscounts(product)), this::computeRealPrice); CompletableFuture mouXiXi = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)), this::computeRealPrice);

// 排序並獲取最低價格
return Stream.of(mouBao, mouDong, mouXiXi)
        .map(CompletableFuture::join)
        .sorted(Comparator.comparingInt(PriceResult::getRealPrice))
        .findFirst()
        .get();

} ```

看下執行結果符合預期,而介面耗時則降到了1s(因為我們依賴的每一個查詢實際操作的介面耗時都是模擬的1s,所以這個結果已經算是此複合介面能達到的極限值了)。

06:01:13.354[ForkJoinPool.commonPool-worker-6|17]獲取某夕夕上 Iphone13的優惠完成: -5300 06:01:13.354[ForkJoinPool.commonPool-worker-13|16]獲取某夕夕上 Iphone13的價格完成: 5399 06:01:13.354[ForkJoinPool.commonPool-worker-4|15]獲取某東上 Iphone13的優惠完成: -150 06:01:13.354[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13的價格完成: 5199 06:01:13.354[ForkJoinPool.commonPool-worker-11|14]獲取某東上 Iphone13的價格完成: 5299 06:01:13.354[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13的優惠完成: -200 06:01:13.354[ForkJoinPool.commonPool-worker-13|16]某夕夕最終價格計算完成:99 06:01:13.354[ForkJoinPool.commonPool-worker-11|14]某東最終價格計算完成:5149 06:01:13.354[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999 獲取最優價格資訊:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 1095ms ------

好啦,通過餐前的前菜,大家應該能夠看出來序列並行處理邏輯的區別、以及並行處理邏輯的實現策略了吧?這裡我們應該也可以看出CompletableFuture在應對並行處理場景下的強大優勢。當然咯,上面也只是小小的窺視了下CompletableFuture功能的冰上一角,下面就讓我們一起來深入瞭解下,享用並消化CompletableFuture這道主菜吧!

主菜:CompletableFuture深入瞭解

好啦,下面該主菜上場了。

作為JAVA8之後加入的新成員,CompletableFuture的實現與使用上,也處處體現出了函式式非同步程式設計的味道。一個CompletableFuture物件可以被一個環節接一個環節的處理、也可以對兩個或者多個CompletableFuture進行組合處理或者等待結果完成。通過對CompletableFuture各種方法的合理使用與組合搭配,可以讓我們在很多的場景都可以應付自如。

下面就來一起了解下這些方法以及對應的使用方式吧。

Future與CompletableFuture

首先,先來理一下Future與CompletableFuture之間的關係。

Future

如果接觸過多執行緒相關的概念,那Future應該不會陌生,早在Java5中就已經存在了。

該如何理解Future呢?舉個生活中的例子:

你去咖啡店點了一杯咖啡,然後服務員會給你一個訂單小票。 當服務員在後臺製作咖啡的時候,你並沒有在店裡等待,而是出門到隔壁甜品店又買了個麵包。 當面包買好之後,你回到咖啡店,拿著訂單小票去取咖啡。 取到咖啡後,你邊喝咖啡邊把麵包吃了……嗝~

是不是很熟悉的生活場景? 對比到我們多執行緒非同步程式設計的場景中,咖啡店的訂單小票其實就是Future,通過Future可以讓稍後適當的時候可以獲取到對應的非同步執行執行緒中的執行結果。

上面的場景,我們翻譯為程式碼實現邏輯:

java public void buyCoffeeAndOthers() throws ExecutionException, InterruptedException { goShopping(); // 子執行緒中去處理做咖啡這件事,返回future物件 Future<Coffee> coffeeTicket = threadPool.submit(this::makeCoffee); // 主執行緒同步去做其他的事情 Bread bread = buySomeBread(); // 主執行緒其他事情並行處理完成,阻塞等待獲取子執行緒執行結果 Coffee coffee = coffeeTicket.get(); // 子執行緒結果獲取完成,主執行緒繼續執行 eatAndDrink(bread, coffee); }

image.png

編碼源於生活、程式碼中的設計邏輯,很多時候都是與生活哲學匹配的。

CompletableFuture

Future在應對一些簡單且相互獨立的非同步執行場景很便捷,但是在一些複雜的場景,比如同時需要多個有依賴關係的非同步獨立處理的時候,或者是一些類似流水線的非同步處理場景時,就顯得力不從心了。比如:

  • 同時執行多個並行任務,等待最快的一個完成之後就可以繼續往後處理
  • 多個非同步任務,每個非同步任務都需要依賴前一個非同步任務執行的結果再去執行下一個非同步任務,最後只需要一個最終的結果
  • 等待多個非同步任務全部執行完成後觸發下一個動作執行
  • ...

所以呢, 在JAVA8開始引入了全新的CompletableFuture類,它是Future介面的一個實現類。也就是在Future介面的基礎上,額外封裝提供了一些執行方法,用來解決Future使用場景中的一些不足,對流水線處理能力提供了支援。

image.png

下一節中,我們就來進一步的瞭解下CompletableFuture的具體使用場景與使用方式。

CompletableFuture使用方式

建立CompletableFuture並執行

當我們需要進行非同步處理的時候,我們可以通過CompletableFuture.supplyAsync方法,傳入一個具體的要執行的處理邏輯函式,這樣就輕鬆的完成了CompletableFuture的建立與觸發執行。

| 方法名稱 | 作用描述 | | --- | --- | | supplyAsync | 靜態方法,用於構建一個CompletableFuture<T>物件,並非同步執行傳入的函式,允許執行函式有返回值T。 | | runAsync | 靜態方法,用於構建一個CompletableFuture<Void>物件,並非同步執行傳入函式,與supplyAsync的區別在於此方法傳入的是Callable型別,僅執行,沒有返回值。 |

使用示例:

java public void testCreateFuture(String product) { // supplyAsync, 執行邏輯有返回值PriceResult CompletableFuture<PriceResult> supplyAsyncResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)); // runAsync, 執行邏輯沒有返回值 CompletableFuture<Void> runAsyncResult = CompletableFuture.runAsync(() -> System.out.println(product)); }

特別補充:

supplyAsync或者runAsync建立後便會立即執行,無需手動呼叫觸發。

環環相扣處理

在流水線處理場景中,往往都是一個任務環節處理完成後,下一個任務環節接著上一環節處理結果繼續處理。CompletableFuture用於這種流水線環節驅動類的方法有很多,相互之間主要是在返回值或者給到下一環節的入參上有些許差異,使用時需要注意區分:

image.png

具體的方法的描述歸納如下:

| 方法名稱 | 作用描述 | | --- | --- | | thenApply | 對CompletableFuture的執行後的具體結果進行追加處理,並將當前的CompletableFuture泛型物件更改為處理後新的物件型別,返回當前CompletableFuture物件。 | | thenCompose | 與thenApply類似。區別點在於:此方法的入參函式返回一個CompletableFuture型別物件。 | | thenAccept | 與thenApply方法類似,區別點在於thenAccept返回void型別,沒有具體結果輸出,適合無需返回值的場景。 | | thenRun | 與thenAccept類似,區別點在於thenAccept可以將前面CompletableFuture執行的實際結果作為入參進行傳入並使用,但是thenRun方法沒有任何入參,只能執行一個Runnable函式,並且返回void型別。 |

因為上述thenApplythenCompose方法的輸出仍然都是一個CompletableFuture物件,所以各個方法是可以一環接一環的進行呼叫,形成流水線式的處理邏輯:

image.png

期望總是美好的,但是實際情況卻總不盡如人意。在我們編排流水線的時候,如果某一個環節執行丟擲異常了,會導致整個流水線後續的環節就沒法再繼續下去了,比如下面的例子:

java public void testExceptionHandle() { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("supplyAsync excetion occurred..."); }).thenApply(obj -> { System.out.println("thenApply executed..."); return obj; }).join(); }

執行之後會發現,supplyAsync丟擲異常後,後面的thenApply並沒有被執行。

那如果我們想要讓流水線的每個環節處理失敗之後都能讓流水線繼續往下面環節處理,讓後續環節可以拿到前面環節的結果或者是丟擲的異常並進行對應的應對處理,就需要用到handlewhenCompletable方法了。

先看下兩個方法的作用描述:

| 方法名稱 | 作用描述 | | --- | --- | | handle | 與thenApply類似,區別點在於handle執行函式的入參有兩個,一個是CompletableFuture執行的實際結果,一個是是Throwable物件,這樣如果前面執行出現異常的時候,可以通過handle獲取到異常並進行處理。 | | whenComplete | 與handle類似,區別點在於whenComplete執行後無返回值。 |

我們對上面一段程式碼示例修改使用handle方法來處理:

java public void testExceptionHandle() { CompletableFuture.supplyAsync(() -> { throw new RuntimeException("supplyAsync excetion occurred..."); }).handle((obj, e) -> { if (e != null) { System.out.println("thenApply executed, exception occurred..."); } return obj; }).join(); }

再執行可以發現,即使前面環節出現異常,後面環節也可以繼續處理,且可以拿到前一環節丟擲的異常資訊:

thenApply executed, exception occurred...

多個CompletableFuture組合操作

前面一直在介紹流水線式的處理場景,但是很多時候,流水線處理場景也不會是一個鏈路順序往下走的情況,很多時候為了提升並行效率,一些沒有依賴的環節我們會讓他們同時去執行,然後在某些環節需要依賴的時候,進行結果的依賴合併處理,類似如下圖的效果。

image.png

CompletableFuture相比於Future的一大優勢,就是可以方便的實現多個並行環節的合併處理。相關涉及方法介紹歸納如下:

| 方法名稱 | 作用描述 | | --- | --- | | thenCombine | 將兩個CompletableFuture物件組合起來進行下一步處理,可以拿到兩個執行結果,並傳給自己的執行函式進行下一步處理,最後返回一個新的CompletableFuture物件。 | | thenAcceptBoth | 與thenCombine類似,區別點在於thenAcceptBoth傳入的執行函式沒有返回值,即thenAcceptBoth返回值為CompletableFuture<Void>。 | | runAfterBoth | 等待兩個CompletableFuture都執行完成後再執行某個Runnable物件,再執行下一個的邏輯,類似thenRun。 | | applyToEither | 兩個CompletableFuture中任意一個完成的時候,繼續執行後面給定的新的函式處理。再執行後面給定函式的邏輯,類似thenApply。 | | acceptEither | 兩個CompletableFuture中任意一個完成的時候,繼續執行後面給定的新的函式處理。再執行後面給定函式的邏輯,類似thenAccept。 | | runAfterEither | 等待兩個CompletableFuture中任意一個執行完成後再執行某個Runnable物件,可以理解為thenRun的升級版,注意與runAfterBoth對比理解。 | | allOf | 靜態方法,阻塞等待所有給定的CompletableFuture執行結束後,返回一個CompletableFuture<Void>結果。 | | anyOf | 靜態方法,阻塞等待任意一個給定的CompletableFuture物件執行結束後,返回一個CompletableFuture<Void>結果。 |

結果等待與獲取

在執行執行緒中將任務放到工作執行緒中進行處理的時候,執行執行緒與工作執行緒之間是非同步執行的模式,如果執行執行緒需要獲取到共工作執行緒的執行結果,則可以通過get或者join方法,阻塞等待並從CompletableFuture中獲取對應的值。

image.png

getjoin的方法功能含義說明歸納如下:

| 方法名稱 | 作用描述 | | --- | --- | | get() | 等待CompletableFuture執行完成並獲取其具體執行結果,可能會丟擲異常,需要程式碼呼叫的地方手動try...catch進行處理。 | | get(long, TimeUnit) | 與get()相同,只是允許設定阻塞等待超時時間,如果等待超過設定時間,則會丟擲異常終止阻塞等待。 | | join() | 等待CompletableFuture執行完成並獲取其具體執行結果,可能會丟擲執行時異常,無需程式碼呼叫的地方手動try...catch進行處理。 |

從介紹上可以看出,兩者的區別就在於是否需要呼叫方顯式的進行try...catch處理邏輯,使用程式碼示例如下:

```java public void testGetAndJoin(String product) { // join無需顯式try...catch... PriceResult joinResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)) .join();

try {
    // get顯式try...catch...
    PriceResult getResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
            .get(5L, TimeUnit.SECONDS);
} catch (Exception e) {
    e.printStackTrace();
}

} ```

CompletableFuture方法及其Async版本

我們在使用CompletableFuture的時候會發現,有很多的方法,都會同時有兩個以Async命名結尾的方法版本。以前面我們用的比較多的thenCombine方法為例:

  1. thenCombine(CompletionStage, BiFunction)
  2. thenCombineAsync(CompletionStage, BiFunction)
  3. thenCombineAsync(CompletionStage, BiFunction, Executor)

從引數上看,區別並不大,僅第三個方法入參中多了執行緒池Executor物件。看下三個方法的原始碼實現,會發現其整體實現邏輯都是一致的,僅僅是使用執行緒池這個地方的邏輯有一點點的差異:

image.png

有興趣的可以去翻一下此部分的原始碼實現,這裡概括下三者的區別:

  1. thenCombine方法,沿用上一個執行任務所使用的執行緒池進行處理
  2. thenCombineAsync兩個入參的方法,使用預設的ForkJoinPool執行緒池中的工作執行緒進行處理
  3. themCombineAsync三個入參的方法,支援自定義執行緒池並指定使用自定義執行緒池中的執行緒作為工作執行緒去處理待執行任務。

為了更好的理解下上述的三個差異點,我們通過下面的程式碼來演示下:

  • 用法1: 其中一個supplyAsync方法以及thenCombineAsync指定使用自定義執行緒池,另一個supplyAsync方法不指定執行緒池(使用預設執行緒池)

```java public PriceResult getCheapestPlatAndPrice4(String product) { // 構造自定義執行緒池 ExecutorService executor = Executors.newFixedThreadPool(5);

return
    CompletableFuture.supplyAsync(
        () -> HttpRequestMock.getMouXiXiPrice(product), 
        executor
    ).thenCombineAsync(
        CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)),
        this::computeRealPrice,
        executor
    ).join();

} ```

對上述程式碼實現策略的解讀,以及與執行結果的關係展示如下圖所示,可以看出,沒有指定自定義執行緒池的supplyAsync方法,其使用了預設的ForkJoinPool工作執行緒來執行,而另外兩個指定了自定義執行緒池的方法,則使用了自定義執行緒池來執行。

image.png

  • 用法2: 不指定自定義執行緒池,使用預設執行緒池策略,使用thenCombine方法

java public PriceResult getCheapestPlatAndPrice5(String product) { return CompletableFuture.supplyAsync( () -> HttpRequestMock.getMouXiXiPrice(product) ).thenCombine( CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)), this::computeRealPrice ).join(); }

執行結果如下,可以看到執行執行緒名稱與用法1示例相比發生了變化。因為沒有指定執行緒池,所以兩個supplyAsync方法都是用的預設的ForkJoinPool執行緒池,而thenCombine使用的是上一個任務所使用的執行緒池,所以也是用的ForkJoinPool

14:34:27.815[ForkJoinPool.commonPool-worker-1|12]獲取某夕夕上 Iphone13的價格 14:34:27.815[ForkJoinPool.commonPool-worker-2|13]獲取某夕夕上 Iphone13的優惠 14:34:28.831[ForkJoinPool.commonPool-worker-2|13]獲取某夕夕上 Iphone13的優惠完成: -5300 14:34:28.831[ForkJoinPool.commonPool-worker-1|12]獲取某夕夕上 Iphone13的價格完成: 5399 14:34:28.831[ForkJoinPool.commonPool-worker-2|13]某夕夕最終價格計算完成:99 獲取最優價格資訊:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】 -----執行耗時: 1083ms ------

現在,我們知道了方法名稱帶有Async和不帶Async的實現策略上的差異點就在於使用哪個執行緒池來執行而已。那麼,對我們實際的指導意義是啥呢?實際使用的時候,我們怎麼判斷自己應該使用帶Async結尾的方法、還是不帶Async結尾的方法呢?

image.png

上面是Async結尾方法預設使用的ForkJoinPool建立的邏輯,這裡可以看出,預設的執行緒池中的工作執行緒數是CPU核數 - 1,並且指定了預設的丟棄策略等,這就是一個主要關鍵點。

所以說,符合以下幾個條件的時候,可以考慮使用帶有Async字尾的方法,指定自定義執行緒池:

  • 預設執行緒池的執行緒數滿足不了實際訴求
  • 預設執行緒池的型別不符合自己業務訴求
  • 預設執行緒池的佇列滿處理策略不滿足自己訴求

與Stream結合使用的注意點

在我前面的文件中,有細緻全面的介紹過Stream流相關的使用方式(不清楚的同學速點👉👉《吃透JAVA的Stream流操作,多年實踐總結》瞭解下啦)。在涉及批量進行並行處理的時候,通過StreamCompletableFuture結合使用,可以簡化我們的很多編碼邏輯。但是在使用細節方面需要注意下,避免達不到使用CompletableFuture的預期效果。

需求場景: 在同一個平臺內,傳入多個商品,查詢不同商品對應的價格與優惠資訊,並選出實付價格最低的商品資訊。

結合前面的介紹分析,我們應該知道最佳的方式,就是同時並行的方式去各自請求資料,最後合併處理即可。所以我們規劃按照如下的策略來實現:

image.png

先看第一種編碼實現:

java public PriceResult comparePriceInOnePlat(List<String> products) { return products.stream() .map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)) .thenCombine( CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice)) .map(CompletableFuture::join) .sorted(Comparator.comparingInt(PriceResult::getRealPrice)) .findFirst() .get(); }

對於List的處理場景,這裡採用了Stream方式來進行遍歷與結果的收集、排序與返回。看似正常,但是執行的時候會發現,並沒有達到我們預期的效果:

07:37:15.408[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13黑色的價格完成: 5199 07:37:15.408[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13黑色的優惠完成: -200 07:37:15.408[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999 07:37:16.410[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13白色的價格完成: 5199 07:37:16.410[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13白色的優惠完成: -200 07:37:16.410[ForkJoinPool.commonPool-worker-11|14]某寶最終價格計算完成:4999 07:37:17.412[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13紅色的價格完成: 5199 07:37:17.412[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13紅色的優惠完成: -200 07:37:17.412[ForkJoinPool.commonPool-worker-9|12]某寶最終價格計算完成:4999 獲取最優價格資訊:【平臺:某寶, 原價:5199, 折扣:0, 實付價:4999】 -----執行耗時: 3132ms ------

從上述執行結果可以看出,其具體處理的時候,其實是按照下面的邏輯去處理了:

image.png

為什麼會出現這種實際與預期的差異呢?原因就在於我們使用的Stream上面!雖然Stream中使用兩個map方法,但Stream處理的時候並不會分別遍歷兩遍,其實寫法等同於下面這種寫到1個map中處理,改為下面這種寫法,其實大家也就更容易明白為啥會沒有達到我們預期的整體並行效果了:

java public PriceResult comparePriceInOnePlat1(List<String> products) { return products.stream() .map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice).join()) .sorted(Comparator.comparingInt(PriceResult::getRealPrice)) .findFirst() .get(); }

既然如此,這種場景是不是就不能使用Stream了呢?也不是,其實我們拆開成兩個Stream分步操作下其實就可以了。

再看下面的第二種實現程式碼:

java public PriceResult comparePriceInOnePlat2(List<String> products) { // 先觸發各自平臺的並行處理 List<CompletableFuture<PriceResult>> completableFutures = products.stream() .map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice)) .collect(Collectors.toList()); // 在獨立的流中,等待所有並行處理結束,做最終結果處理 return completableFutures.stream() .map(CompletableFuture::join) .sorted(Comparator.comparingInt(PriceResult::getRealPrice)) .findFirst() .get(); }

執行結果:

07:39:16.072[ForkJoinPool.commonPool-worker-6|17]獲取某寶上 Iphone13紅色的價格完成: 5199 07:39:16.072[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13黑色的價格完成: 5199 07:39:16.072[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13黑色的優惠完成: -200 07:39:16.072[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13白色的價格完成: 5199 07:39:16.072[ForkJoinPool.commonPool-worker-4|15]獲取某寶上 Iphone13白色的優惠完成: -200 07:39:16.072[ForkJoinPool.commonPool-worker-13|16]獲取某寶上 Iphone13紅色的優惠完成: -200 07:39:16.072[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999 07:39:16.072[ForkJoinPool.commonPool-worker-4|15]某寶最終價格計算完成:4999 07:39:16.072[ForkJoinPool.commonPool-worker-13|16]某寶最終價格計算完成:4999 獲取最優價格資訊:【平臺:某寶, 原價:5199, 折扣:0, 實付價:4999】 -----執行耗時: 1142ms ------

從執行結果可以看出,三個商品並行處理,整體處理耗時相比前面編碼方式有很大提升,達到了預期的效果。

📢歸納下

因為Stream的操作具有延遲執行的特點,且只有遇到終止操作(比如collect方法)的時候才會真正的執行。所以遇到這種需要並行處理且需要合併多個並行處理流程的情況下,需要將並行流程與合併邏輯放到兩個Stream中,這樣分別觸發完成各自的處理邏輯,就可以了。

甜點:併發和並行的區別

對一個吃貨而言,主餐完畢,總得來點餐後甜點才夠滿足。

在前面的內容中呢,我們始終是在圍繞並行處理這個話題在展開。實際工作的時候,我們對於併發這個詞肯定也不陌生,高併發這個詞,就像高階人士酒杯中那八二年的拉菲一般,成了每一個開發人員簡歷上用來彰顯實力的一個標籤。

那麼,併發並行到底啥區別?這裡我們也簡單的概括下。

併發

關於併發的詳細內容,可以參見我寫的另一篇內容,也即本篇文章的姊妹篇《不堆概念,換個角度聊多執行緒併發程式設計》,下面這裡簡單的介紹下併發的概念。

所謂併發,其關注的點是伺服器的吞吐量情況,也就是伺服器可以在單位時間內同時處理多少個請求。併發是通過多執行緒的方式來實現的,充分利用當前CPU多核能力,同時使用多個程序去處理業務,使得同一個機器在相同時間內可以處理更多的請求,提升吞吐量。

image.png 所有的操作在一個執行緒中序列推進,如果有多個執行緒同步處理,則同時有多個請求可以被處理。但是因為是序列處理,所以如果某個環節需要對外互動時,比如等待網路IO的操作,會使得當前執行緒處於阻塞狀態,直到資源可用時被喚醒繼續往後執行。

image.png

對於高併發場景,伺服器的執行緒資源是非常寶貴的。如果頻繁的處於阻塞則會導致浪費,且執行緒頻繁的阻塞、喚醒切換動作,也會加劇整體系統的效能損耗。所以併發這種多執行緒場景,更適合CPU密集型的操作。

並行

所謂並行,就是將同一個處理流程沒有相互依賴的部分放到多個執行緒中進行同時並行處理,以此來達到相對於序列模式更短的單流程處理耗時的效果,進而提升系統的整體響應時長吞吐量

image.png

基於非同步程式設計實現的並行操作也是藉助執行緒池的方式,通過多執行緒同時執行來實現效率提升的。與併發的區別在於:並行通過將任務切分為一個個可獨立處理的小任務塊,然後基於系統排程策略,將需要執行的任務塊分配給空閒可用工作執行緒去處理,如果出現需要等待的場景(比如IO請求)則工作執行緒會將此任務先放下,繼續處理後續的任務,等之前的任務IO請求好了之後,系統重新分配可用的工作執行緒來處理。

image.png

根據上面的示意圖介紹可以看出,非同步並行程式設計,對於工作執行緒的利用率上升,不會出現工作執行緒阻塞的情況,但是因為任務拆分、工作執行緒間的切換排程等系統層面的開銷也會隨之加大。

如何選擇

前面介紹了下併發與並行兩種模式的特點、以及各自的優缺點。所以選擇採用併發還是並行方式來提升系統的處理效能,還需要結合實際專案場景來確定。

綜合而言

  1. 如果業務處理邏輯是CPU密集型的操作,優先使用基於執行緒池實現併發處理方案(可以避免執行緒間切換導致的系統性能浪費)。
  2. 如果業務處理邏輯中存在較多需要阻塞等待的耗時場景、且相互之間沒有依賴,比如本地IO操作、網路IO請求等等,這種情況優先選擇使用並行處理策略(可以避免寶貴的執行緒資源被阻塞等待)。

總結回顧

好啦,關於JAVA中CompletableFuture的使用,以及並行程式設計相關的內容呢就介紹到這裡啦。看到這裡,相信您應該有所收穫吧?那麼你的專案裡有這種適合並行處理的場景嗎?你在處理並行場景的時候是怎麼做的呢?評論區一起討論下吧~~

補充:

本文中有提及CompletableFuture執行時所使用的預設執行緒池是ForkJoinPool,早在JAVA7版本就已經被引入,但是很多人對ForkJoinPool不是很瞭解,實際專案中使用的也比較少。其實對ForkJoinPool的合理利用,可以讓我們在面對某些多執行緒場景時會更加的從容高效。在後面的文章中,我會針對ForkJoinPool有關的內容進行專門的介紹與探討,如果有興趣,可以點個關注,及時獲取後續的內容。

此外

我是悟道,聊技術、又不僅僅聊技術~

如果覺得有用,請點贊 + 關注讓我感受到您的支援。也可以關注下我的公眾號【架構悟道】,獲取更及時的更新。

期待與你一起探討,一起成長為更好的自己。


我正在參與掘金技術社群創作者簽約計劃招募活動,點選連結報名投稿