JAVA基於CompletableFuture的流水線並行處理深度實踐,滿滿乾貨
theme: healer-readable
大家好,又見面啦。
在專案開發中,後端服務對外提供API介面一般都會關注響應時長
。但是某些情況下,由於業務規劃邏輯的原因,我們的介面可能會是一個聚合資訊處理類的處理邏輯,比如我們從多個不同的地方獲取資料,然後彙總處理為最終的結果再返回給呼叫方,這種情況下,往往會導致我們的介面響應特別的慢。
而如果我們想要動手進行優化的時候呢,就會涉及到序列
處理改並行
處理的問題。在JAVA
中並行處理的能力支援已經相對完善,通過對CompletableFuture的合理利用,可以讓我們面對這種聚合類處理的場景會更加的得心應手。
好啦,話不多說,接下來就讓我們一起來品嚐下JAVA中組合式並行處理這道饕餮大餐吧。
前菜:先看個實際場景
在開始享用這頓大餐前,我們先來個前菜開開胃。
例如現在有這麼個需求:
需求描述: 實現一個全網比價服務,比如可以從某寶、某東、某夕夕去獲取某個商品的價格、優惠金額,並計算出實際付款金額,最終返回價格最優的平臺與價格資訊。
📢這裡假定每個平臺獲取原價格與優惠券的介面已經實現、且都是需要呼叫HTTP介面查詢的耗時操作,Mock介面每個耗時1s
左右。
根據最初的需求理解,我們可以很自然的寫出對應實現程式碼:
java
public PriceResult getCheapestPlatAndPrice(String product) {
PriceResult mouBaoPrice = computeRealPrice(HttpRequestMock.getMouBaoPrice(product), HttpRequestMock.getMouBaoDiscounts(product));
PriceResult mouDongPrice = computeRealPrice(HttpRequestMock.getMouDongPrice(product), HttpRequestMock.getMouDongDiscounts(product));
PriceResult mouXiXiPrice = computeRealPrice(HttpRequestMock.getMouXiXiPrice(product), HttpRequestMock.getMouXiXiDiscounts(product));
// 計算並選出實際價格最低的平臺
return Stream.of(mouBaoPrice, mouDongPrice, mouXiXiPrice). min(Comparator.comparingInt(PriceResult::getRealPrice)) .get();
}
一切順利成章,執行測試下:
05:24:54.779[main|1]獲取某寶上 Iphone13的價格完成: 5199
05:24:55.781[main|1]獲取某寶上 Iphone13的優惠完成: -200
05:24:55.781[main|1]某寶最終價格計算完成:4999
05:24:56.784[main|1]獲取某東上 Iphone13的價格完成: 5299
05:24:57.786[main|1]獲取某東上 Iphone13的優惠完成: -150
05:24:57.786[main|1]某東最終價格計算完成:5149
05:24:58.788[main|1]獲取某夕夕上 Iphone13的價格完成: 5399
05:24:59.791[main|1]獲取某夕夕上 Iphone13的優惠完成: -5300
05:24:59.791[main|1]某夕夕最終價格計算完成:99
獲取最優價格資訊:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】
-----執行耗時: 6122ms ------
結果符合預期,功能一切正常,就是耗時長了點。試想一下,假如你在某個APP操作查詢的時候,等待6s才返回結果,估計會直接把APP給解除安裝了吧?
梳理下前面程式碼的實現思路:
所有的環節都是序列
的,每個環節耗時加到一起,介面總耗時肯定很長。
但實際上,每個平臺之間的操作是互不干擾的,那我們自然而然的可以想到,可以通過多執行緒
的方式,同時去分別執行各個平臺的邏輯處理,最後將各個平臺的結果彙總到一起比對得到最低價格。
所以整個執行過程會變成如下的效果:
為了提升效能,我們採用執行緒池來負責多執行緒的處理操作,因為我們需要得到各個子執行緒處理的結果,所以我們需要使用 Future
來實現:
```java
public PriceResult getCheapestPlatAndPrice2(String product) {
Future
// 等待所有執行緒結果都處理完成,然後從結果中計算出最低價
return Stream.of(mouBaoFuture, mouDongFuture, mouXiXiFuture)
.map(priceResultFuture -> {
try {
return priceResultFuture.get(5L, TimeUnit.SECONDS);
} catch (Exception e) {
return null;
}
})
.filter(Objects::nonNull).min(Comparator.comparingInt(PriceResult::getRealPrice)).get();
} ```
上述程式碼中,將三個不同平臺對應的Callable
函式邏輯放入到ThreadPool
中去執行,返回Future
物件,然後再逐個通過Future.get()
介面阻塞獲取各自平臺的結果,最後經比較處理後返回最低價資訊。
執行程式碼,可以看到執行結果與過程如下:
05:42:25.291[pool-1-thread-2|13]獲取某東上 Iphone13的價格完成: 5299
05:42:25.291[pool-1-thread-3|14]獲取某夕夕上 Iphone13的價格完成: 5399
05:42:25.291[pool-1-thread-1|12]獲取某寶上 Iphone13的價格完成: 5199
05:42:26.294[pool-1-thread-2|13]獲取某東上 Iphone13的優惠完成: -150
05:42:26.294[pool-1-thread-3|14]獲取某夕夕上 Iphone13的優惠完成: -5300
05:42:26.294[pool-1-thread-1|12]獲取某寶上 Iphone13的優惠完成: -200
05:42:26.294[pool-1-thread-2|13]某東最終價格計算完成:5149
05:42:26.294[pool-1-thread-3|14]某夕夕最終價格計算完成:99
05:42:26.294[pool-1-thread-1|12]某寶最終價格計算完成:4999
獲取最優價格資訊:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】
-----執行耗時: 2119ms ------
結果與第一種實現方式一致,但是介面總耗時從6s
下降到了2s
,效果還是很顯著的。但是,是否還能再壓縮一些呢?
基於上面按照平臺拆分並行處理的思路繼續推進,我們可以看出每個平臺內的處理邏輯其實可以分為3個主要步驟:
- 獲取原始價格(耗時操作)
- 獲取折扣優惠(耗時操作)
- 得到原始價格和折扣優惠之後,計算實付價格
這3個步驟中,第1、2兩個耗時操作也是相對獨立的,如果也能並行處理的話,響應時長上應該又會縮短一些,即如下的處理流程:
我們當然可以繼續使用上面提到的執行緒池+Future
的方式,但Future
在應對並行結果組合以及後續處理等方面顯得力不從心,弊端明顯:
程式碼寫起來會非常拖沓:先封裝
Callable
函式放到執行緒池中去執行查詢操作,然後分三組阻塞等待
結果並計算出各自結果,最後再阻塞等待
價格計算完成後彙總得到最終結果。
說到這裡呢,就需要我們新的主人公CompletableFuture
登場了,通過它我們可以很輕鬆的來完成任務的並行處理,以及各個並行任務結果之間的組合再處理等操作。我們使用CompletableFuture
編寫實現程式碼如下:
```java
public PriceResult getCheapestPlatAndPrice3(String product) {
CompletableFuture
// 排序並獲取最低價格
return Stream.of(mouBao, mouDong, mouXiXi)
.map(CompletableFuture::join)
.sorted(Comparator.comparingInt(PriceResult::getRealPrice))
.findFirst()
.get();
} ```
看下執行結果符合預期,而介面耗時則降到了1s
(因為我們依賴的每一個查詢實際操作的介面耗時都是模擬的1s,所以這個結果已經算是此複合介面能達到的極限值了)。
06:01:13.354[ForkJoinPool.commonPool-worker-6|17]獲取某夕夕上 Iphone13的優惠完成: -5300
06:01:13.354[ForkJoinPool.commonPool-worker-13|16]獲取某夕夕上 Iphone13的價格完成: 5399
06:01:13.354[ForkJoinPool.commonPool-worker-4|15]獲取某東上 Iphone13的優惠完成: -150
06:01:13.354[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13的價格完成: 5199
06:01:13.354[ForkJoinPool.commonPool-worker-11|14]獲取某東上 Iphone13的價格完成: 5299
06:01:13.354[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13的優惠完成: -200
06:01:13.354[ForkJoinPool.commonPool-worker-13|16]某夕夕最終價格計算完成:99
06:01:13.354[ForkJoinPool.commonPool-worker-11|14]某東最終價格計算完成:5149
06:01:13.354[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999
獲取最優價格資訊:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】
-----執行耗時: 1095ms ------
好啦,通過餐前的前菜,大家應該能夠看出來序列與並行處理邏輯的區別、以及並行處理邏輯的實現策略了吧?這裡我們應該也可以看出CompletableFuture
在應對並行處理場景下的強大優勢。當然咯,上面也只是小小的窺視了下CompletableFuture
功能的冰上一角,下面就讓我們一起來深入瞭解下,享用並消化CompletableFuture
這道主菜吧!
主菜:CompletableFuture深入瞭解
好啦,下面該主菜上場了。
作為JAVA8
之後加入的新成員,CompletableFuture
的實現與使用上,也處處體現出了函式式非同步程式設計的味道。一個CompletableFuture
物件可以被一個環節接一個環節的處理、也可以對兩個或者多個CompletableFuture
進行組合處理或者等待結果完成。通過對CompletableFuture
各種方法的合理使用與組合搭配,可以讓我們在很多的場景都可以應付自如。
下面就來一起了解下這些方法以及對應的使用方式吧。
Future與CompletableFuture
首先,先來理一下Future與CompletableFuture之間的關係。
Future
如果接觸過多執行緒相關的概念,那Future
應該不會陌生,早在Java5中就已經存在了。
該如何理解Future
呢?舉個生活中的例子:
你去咖啡店點了一杯咖啡,然後服務員會給你一個訂單小票。 當服務員在後臺製作咖啡的時候,你並沒有在店裡等待,而是出門到隔壁甜品店又買了個麵包。 當面包買好之後,你回到咖啡店,拿著訂單小票去取咖啡。 取到咖啡後,你邊喝咖啡邊把麵包吃了……嗝~
是不是很熟悉的生活場景? 對比到我們多執行緒非同步程式設計的場景中,咖啡店的訂單小票其實就是Future,通過Future可以讓稍後適當的時候可以獲取到對應的非同步執行執行緒中的執行結果。
上面的場景,我們翻譯為程式碼實現邏輯:
java
public void buyCoffeeAndOthers() throws ExecutionException, InterruptedException {
goShopping();
// 子執行緒中去處理做咖啡這件事,返回future物件
Future<Coffee> coffeeTicket = threadPool.submit(this::makeCoffee);
// 主執行緒同步去做其他的事情
Bread bread = buySomeBread();
// 主執行緒其他事情並行處理完成,阻塞等待獲取子執行緒執行結果
Coffee coffee = coffeeTicket.get();
// 子執行緒結果獲取完成,主執行緒繼續執行
eatAndDrink(bread, coffee);
}
編碼源於生活、程式碼中的設計邏輯,很多時候都是與生活哲學匹配的。
CompletableFuture
Future在應對一些簡單且相互獨立的非同步執行場景很便捷,但是在一些複雜的場景,比如同時需要多個有依賴關係的非同步獨立處理的時候,或者是一些類似流水線的非同步處理場景時,就顯得力不從心了。比如:
- 同時執行多個並行任務,等待最快的一個完成之後就可以繼續往後處理
- 多個非同步任務,每個非同步任務都需要依賴前一個非同步任務執行的結果再去執行下一個非同步任務,最後只需要一個最終的結果
- 等待多個非同步任務全部執行完成後觸發下一個動作執行
- ...
所以呢, 在JAVA8開始引入了全新的CompletableFuture
類,它是Future介面的一個實現類。也就是在Future介面的基礎上,額外封裝提供了一些執行方法,用來解決Future使用場景中的一些不足,對流水線處理能力提供了支援。
下一節中,我們就來進一步的瞭解下CompletableFuture的具體使用場景與使用方式。
CompletableFuture使用方式
建立CompletableFuture並執行
當我們需要進行非同步處理的時候,我們可以通過CompletableFuture.supplyAsync
方法,傳入一個具體的要執行的處理邏輯函式,這樣就輕鬆的完成了CompletableFuture的建立與觸發執行。
| 方法名稱 | 作用描述 |
| --- | --- |
| supplyAsync | 靜態方法,用於構建一個CompletableFuture<T>
物件,並非同步執行傳入的函式,允許執行函式有返回值T
。 |
| runAsync | 靜態方法,用於構建一個CompletableFuture<Void>
物件,並非同步執行傳入函式,與supplyAsync的區別在於此方法傳入的是Callable型別,僅執行,沒有返回值。 |
使用示例:
java
public void testCreateFuture(String product) {
// supplyAsync, 執行邏輯有返回值PriceResult
CompletableFuture<PriceResult> supplyAsyncResult =
CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product));
// runAsync, 執行邏輯沒有返回值
CompletableFuture<Void> runAsyncResult =
CompletableFuture.runAsync(() -> System.out.println(product));
}
特別補充:
supplyAsync
或者runAsync
建立後便會立即執行,無需手動呼叫觸發。
環環相扣處理
在流水線處理場景中,往往都是一個任務環節處理完成後,下一個任務環節接著上一環節處理結果繼續處理。CompletableFuture
用於這種流水線環節驅動類的方法有很多,相互之間主要是在返回值或者給到下一環節的入參上有些許差異,使用時需要注意區分:
具體的方法的描述歸納如下:
| 方法名稱 | 作用描述 |
| --- | --- |
| thenApply | 對CompletableFuture
的執行後的具體結果進行追加處理,並將當前的CompletableFuture
泛型物件更改為處理後新的物件型別,返回當前CompletableFuture
物件。 |
| thenCompose | 與thenApply
類似。區別點在於:此方法的入參函式返回一個CompletableFuture
型別物件。 |
| thenAccept | 與thenApply
方法類似,區別點在於thenAccept
返回void型別,沒有具體結果輸出,適合無需返回值的場景。 |
| thenRun | 與thenAccept
類似,區別點在於thenAccept
可以將前面CompletableFuture
執行的實際結果作為入參進行傳入並使用,但是thenRun
方法沒有任何入參,只能執行一個Runnable函式,並且返回void型別。 |
因為上述thenApply
、thenCompose
方法的輸出仍然都是一個CompletableFuture物件,所以各個方法是可以一環接一環的進行呼叫,形成流水線式的處理邏輯:
期望總是美好的,但是實際情況卻總不盡如人意。在我們編排流水線的時候,如果某一個環節執行丟擲異常了,會導致整個流水線後續的環節就沒法再繼續下去了,比如下面的例子:
java
public void testExceptionHandle() {
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("supplyAsync excetion occurred...");
}).thenApply(obj -> {
System.out.println("thenApply executed...");
return obj;
}).join();
}
執行之後會發現,supplyAsync丟擲異常後,後面的thenApply並沒有被執行。
那如果我們想要讓流水線的每個環節處理失敗之後都能讓流水線繼續往下面環節處理,讓後續環節可以拿到前面環節的結果或者是丟擲的異常並進行對應的應對處理,就需要用到handle
和whenCompletable
方法了。
先看下兩個方法的作用描述:
| 方法名稱 | 作用描述 |
| --- | --- |
| handle | 與thenApply
類似,區別點在於handle執行函式的入參有兩個,一個是CompletableFuture
執行的實際結果,一個是是Throwable物件,這樣如果前面執行出現異常的時候,可以通過handle獲取到異常並進行處理。 |
| whenComplete | 與handle
類似,區別點在於whenComplete
執行後無返回值。 |
我們對上面一段程式碼示例修改使用handle方法來處理:
java
public void testExceptionHandle() {
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("supplyAsync excetion occurred...");
}).handle((obj, e) -> {
if (e != null) {
System.out.println("thenApply executed, exception occurred...");
}
return obj;
}).join();
}
再執行可以發現,即使前面環節出現異常,後面環節也可以繼續處理,且可以拿到前一環節丟擲的異常資訊:
thenApply executed, exception occurred...
多個CompletableFuture組合操作
前面一直在介紹流水線式的處理場景,但是很多時候,流水線處理場景也不會是一個鏈路順序往下走的情況,很多時候為了提升並行效率,一些沒有依賴的環節我們會讓他們同時去執行,然後在某些環節需要依賴的時候,進行結果的依賴合併處理,類似如下圖的效果。
CompletableFuture
相比於Future
的一大優勢,就是可以方便的實現多個並行環節的合併處理。相關涉及方法介紹歸納如下:
| 方法名稱 | 作用描述 |
| --- | --- |
| thenCombine | 將兩個CompletableFuture
物件組合起來進行下一步處理,可以拿到兩個執行結果,並傳給自己的執行函式進行下一步處理,最後返回一個新的CompletableFuture
物件。 |
| thenAcceptBoth | 與thenCombine
類似,區別點在於thenAcceptBoth
傳入的執行函式沒有返回值,即thenAcceptBoth返回值為CompletableFuture<Void>
。 |
| runAfterBoth | 等待兩個CompletableFuture
都執行完成後再執行某個Runnable物件,再執行下一個的邏輯,類似thenRun。 |
| applyToEither | 兩個CompletableFuture
中任意一個完成的時候,繼續執行後面給定的新的函式處理。再執行後面給定函式的邏輯,類似thenApply。 |
| acceptEither | 兩個CompletableFuture
中任意一個完成的時候,繼續執行後面給定的新的函式處理。再執行後面給定函式的邏輯,類似thenAccept。 |
| runAfterEither | 等待兩個CompletableFuture
中任意一個執行完成後再執行某個Runnable物件,可以理解為thenRun
的升級版,注意與runAfterBoth
對比理解。 |
| allOf | 靜態方法,阻塞等待所有給定的CompletableFuture
執行結束後,返回一個CompletableFuture<Void>
結果。 |
| anyOf | 靜態方法,阻塞等待任意一個給定的CompletableFuture
物件執行結束後,返回一個CompletableFuture<Void>
結果。 |
結果等待與獲取
在執行執行緒中將任務放到工作執行緒中進行處理的時候,執行執行緒與工作執行緒之間是非同步執行的模式,如果執行執行緒需要獲取到共工作執行緒的執行結果,則可以通過get
或者join
方法,阻塞等待並從CompletableFuture
中獲取對應的值。
對get
和join
的方法功能含義說明歸納如下:
| 方法名稱 | 作用描述 |
| --- | --- |
| get() | 等待CompletableFuture
執行完成並獲取其具體執行結果,可能會丟擲異常,需要程式碼呼叫的地方手動try...catch
進行處理。 |
| get(long, TimeUnit) | 與get()相同,只是允許設定阻塞等待超時時間,如果等待超過設定時間,則會丟擲異常終止阻塞等待。 |
| join() | 等待CompletableFuture
執行完成並獲取其具體執行結果,可能會丟擲執行時異常,無需程式碼呼叫的地方手動try...catch進行處理。 |
從介紹上可以看出,兩者的區別就在於是否需要呼叫方顯式的進行try...catch處理邏輯,使用程式碼示例如下:
```java public void testGetAndJoin(String product) { // join無需顯式try...catch... PriceResult joinResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product)) .join();
try {
// get顯式try...catch...
PriceResult getResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
.get(5L, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
} ```
CompletableFuture方法及其Async版本
我們在使用CompletableFuture的時候會發現,有很多的方法,都會同時有兩個以Async命名結尾的方法版本。以前面我們用的比較多的thenCombine
方法為例:
- thenCombine(CompletionStage, BiFunction)
- thenCombineAsync(CompletionStage, BiFunction)
- thenCombineAsync(CompletionStage, BiFunction, Executor)
從引數上看,區別並不大,僅第三個方法入參中多了執行緒池Executor物件。看下三個方法的原始碼實現,會發現其整體實現邏輯都是一致的,僅僅是使用執行緒池這個地方的邏輯有一點點的差異:
有興趣的可以去翻一下此部分的原始碼實現,這裡概括下三者的區別:
- thenCombine方法,沿用上一個執行任務所使用的執行緒池進行處理
- thenCombineAsync兩個入參的方法,使用預設的ForkJoinPool執行緒池中的工作執行緒進行處理
- themCombineAsync三個入參的方法,支援自定義執行緒池並指定使用自定義執行緒池中的執行緒作為工作執行緒去處理待執行任務。
為了更好的理解下上述的三個差異點,我們通過下面的程式碼來演示下:
- 用法1: 其中一個supplyAsync方法以及thenCombineAsync指定使用自定義執行緒池,另一個supplyAsync方法不指定執行緒池(使用預設執行緒池)
```java public PriceResult getCheapestPlatAndPrice4(String product) { // 構造自定義執行緒池 ExecutorService executor = Executors.newFixedThreadPool(5);
return
CompletableFuture.supplyAsync(
() -> HttpRequestMock.getMouXiXiPrice(product),
executor
).thenCombineAsync(
CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)),
this::computeRealPrice,
executor
).join();
} ```
對上述程式碼實現策略的解讀,以及與執行結果的關係展示如下圖所示,可以看出,沒有指定自定義執行緒池的supplyAsync方法,其使用了預設的ForkJoinPool
工作執行緒來執行,而另外兩個指定了自定義執行緒池的方法,則使用了自定義執行緒池來執行。
- 用法2: 不指定自定義執行緒池,使用預設執行緒池策略,使用thenCombine方法
java
public PriceResult getCheapestPlatAndPrice5(String product) {
return
CompletableFuture.supplyAsync(
() -> HttpRequestMock.getMouXiXiPrice(product)
).thenCombine(
CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)),
this::computeRealPrice
).join();
}
執行結果如下,可以看到執行執行緒名稱與用法1示例相比發生了變化。因為沒有指定執行緒池,所以兩個supplyAsync
方法都是用的預設的ForkJoinPool
執行緒池,而thenCombine
使用的是上一個任務所使用的執行緒池,所以也是用的ForkJoinPool
。
14:34:27.815[ForkJoinPool.commonPool-worker-1|12]獲取某夕夕上 Iphone13的價格
14:34:27.815[ForkJoinPool.commonPool-worker-2|13]獲取某夕夕上 Iphone13的優惠
14:34:28.831[ForkJoinPool.commonPool-worker-2|13]獲取某夕夕上 Iphone13的優惠完成: -5300
14:34:28.831[ForkJoinPool.commonPool-worker-1|12]獲取某夕夕上 Iphone13的價格完成: 5399
14:34:28.831[ForkJoinPool.commonPool-worker-2|13]某夕夕最終價格計算完成:99
獲取最優價格資訊:【平臺:某夕夕, 原價:5399, 折扣:0, 實付價:99】
-----執行耗時: 1083ms ------
現在,我們知道了方法名稱帶有Async和不帶Async的實現策略上的差異點就在於使用哪個執行緒池來執行而已。那麼,對我們實際的指導意義是啥呢?實際使用的時候,我們怎麼判斷自己應該使用帶Async結尾的方法、還是不帶Async結尾的方法呢?
上面是Async結尾方法預設使用的ForkJoinPool建立的邏輯,這裡可以看出,預設的執行緒池中的工作執行緒數是CPU核數 - 1
,並且指定了預設的丟棄策略等,這就是一個主要關鍵點。
所以說,符合以下幾個條件的時候,可以考慮使用帶有Async字尾的方法,指定自定義執行緒池:
- 預設執行緒池的執行緒數滿足不了實際訴求
- 預設執行緒池的型別不符合自己業務訴求
- 預設執行緒池的佇列滿處理策略不滿足自己訴求
與Stream結合使用的注意點
在我前面的文件中,有細緻全面的介紹過Stream
流相關的使用方式(不清楚的同學速點👉👉《吃透JAVA的Stream流操作,多年實踐總結》瞭解下啦)。在涉及批量進行並行處理的時候,通過Stream
與CompletableFuture
結合使用,可以簡化我們的很多編碼邏輯。但是在使用細節方面需要注意下,避免達不到使用CompletableFuture
的預期效果。
需求場景: 在同一個平臺內,傳入多個商品,查詢不同商品對應的價格與優惠資訊,並選出實付價格最低的商品資訊。
結合前面的介紹分析,我們應該知道最佳的方式,就是同時並行的方式去各自請求資料,最後合併處理即可。所以我們規劃按照如下的策略來實現:
先看第一種編碼實現:
java
public PriceResult comparePriceInOnePlat(List<String> products) {
return products.stream()
.map(product ->
CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)),
this::computeRealPrice))
.map(CompletableFuture::join)
.sorted(Comparator.comparingInt(PriceResult::getRealPrice))
.findFirst()
.get();
}
對於List的處理場景,這裡採用了Stream方式來進行遍歷與結果的收集、排序與返回。看似正常,但是執行的時候會發現,並沒有達到我們預期的效果:
07:37:15.408[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13黑色的價格完成: 5199
07:37:15.408[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13黑色的優惠完成: -200
07:37:15.408[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999
07:37:16.410[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13白色的價格完成: 5199
07:37:16.410[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13白色的優惠完成: -200
07:37:16.410[ForkJoinPool.commonPool-worker-11|14]某寶最終價格計算完成:4999
07:37:17.412[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13紅色的價格完成: 5199
07:37:17.412[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13紅色的優惠完成: -200
07:37:17.412[ForkJoinPool.commonPool-worker-9|12]某寶最終價格計算完成:4999
獲取最優價格資訊:【平臺:某寶, 原價:5199, 折扣:0, 實付價:4999】
-----執行耗時: 3132ms ------
從上述執行結果可以看出,其具體處理的時候,其實是按照下面的邏輯去處理了:
為什麼會出現這種實際與預期的差異呢?原因就在於我們使用的Stream上面!雖然Stream中使用兩個map
方法,但Stream處理的時候並不會分別遍歷兩遍,其實寫法等同於下面這種寫到1個
map中處理,改為下面這種寫法,其實大家也就更容易明白為啥會沒有達到我們預期的整體並行效果了:
java
public PriceResult comparePriceInOnePlat1(List<String> products) {
return products.stream()
.map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice).join())
.sorted(Comparator.comparingInt(PriceResult::getRealPrice))
.findFirst()
.get();
}
既然如此,這種場景是不是就不能使用Stream了呢?也不是,其實我們拆開成兩個Stream分步操作下其實就可以了。
再看下面的第二種實現程式碼:
java
public PriceResult comparePriceInOnePlat2(List<String> products) {
// 先觸發各自平臺的並行處理
List<CompletableFuture<PriceResult>> completableFutures = products.stream()
.map(product -> CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product)).thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)), this::computeRealPrice))
.collect(Collectors.toList());
// 在獨立的流中,等待所有並行處理結束,做最終結果處理
return completableFutures.stream()
.map(CompletableFuture::join)
.sorted(Comparator.comparingInt(PriceResult::getRealPrice))
.findFirst()
.get();
}
執行結果:
07:39:16.072[ForkJoinPool.commonPool-worker-6|17]獲取某寶上 Iphone13紅色的價格完成: 5199
07:39:16.072[ForkJoinPool.commonPool-worker-9|12]獲取某寶上 Iphone13黑色的價格完成: 5199
07:39:16.072[ForkJoinPool.commonPool-worker-2|13]獲取某寶上 Iphone13黑色的優惠完成: -200
07:39:16.072[ForkJoinPool.commonPool-worker-11|14]獲取某寶上 Iphone13白色的價格完成: 5199
07:39:16.072[ForkJoinPool.commonPool-worker-4|15]獲取某寶上 Iphone13白色的優惠完成: -200
07:39:16.072[ForkJoinPool.commonPool-worker-13|16]獲取某寶上 Iphone13紅色的優惠完成: -200
07:39:16.072[ForkJoinPool.commonPool-worker-2|13]某寶最終價格計算完成:4999
07:39:16.072[ForkJoinPool.commonPool-worker-4|15]某寶最終價格計算完成:4999
07:39:16.072[ForkJoinPool.commonPool-worker-13|16]某寶最終價格計算完成:4999
獲取最優價格資訊:【平臺:某寶, 原價:5199, 折扣:0, 實付價:4999】
-----執行耗時: 1142ms ------
從執行結果可以看出,三個商品並行處理,整體處理耗時相比前面編碼方式有很大提升,達到了預期的效果。
📢歸納下:
因為Stream的操作具有延遲執行的特點,且只有遇到終止操作(比如collect方法)的時候才會真正的執行。所以遇到這種需要並行處理且需要合併多個並行處理流程的情況下,需要將並行流程與合併邏輯放到兩個Stream中,這樣分別觸發完成各自的處理邏輯,就可以了。
甜點:併發和並行的區別
對一個吃貨而言,主餐完畢,總得來點餐後甜點才夠滿足。
在前面的內容中呢,我們始終是在圍繞並行
處理這個話題在展開。實際工作的時候,我們對於併發這個詞肯定也不陌生,高併發
這個詞,就像高階人士酒杯中那八二年的拉菲一般,成了每一個開發人員簡歷上用來彰顯實力的一個標籤。
那麼,併發和並行到底啥區別?這裡我們也簡單的概括下。
併發
關於併發的詳細內容,可以參見我寫的另一篇內容,也即本篇文章的姊妹篇《不堆概念,換個角度聊多執行緒併發程式設計》,下面這裡簡單的介紹下併發的概念。
所謂併發,其關注的點是伺服器的吞吐量
情況,也就是伺服器可以在單位時間內同時處理多少個請求。併發是通過多執行緒
的方式來實現的,充分利用當前CPU多核能力,同時使用多個程序去處理業務,使得同一個機器在相同時間內可以處理更多的請求,提升吞吐量。
所有的操作在一個執行緒中序列推進,如果有多個執行緒同步處理,則同時有多個請求可以被處理。但是因為是序列處理,所以如果某個環節需要對外互動時,比如等待網路IO的操作,會使得當前執行緒處於
阻塞狀態
,直到資源可用時被喚醒繼續往後執行。
對於高併發場景,伺服器的執行緒資源是非常寶貴的。如果頻繁的處於阻塞則會導致浪費,且執行緒頻繁的阻塞、喚醒切換動作,也會加劇整體系統的效能損耗。所以併發這種多執行緒場景,更適合CPU密集型的操作。
並行
所謂並行,就是將同一個處理流程沒有相互依賴的部分放到多個執行緒中進行同時並行處理,以此來達到相對於序列模式更短的單流程處理耗時的效果,進而提升系統的整體響應時長與吞吐量。
基於非同步程式設計實現的並行操作也是藉助執行緒池的方式,通過多執行緒同時執行來實現效率提升的。與併發的區別在於:並行通過將任務切分為一個個可獨立處理的小任務塊,然後基於系統排程策略
,將需要執行的任務塊分配給空閒可用工作執行緒去處理,如果出現需要等待的場景(比如IO請求)則工作執行緒會將此任務先放下,繼續處理後續的任務,等之前的任務IO請求好了之後,系統重新分配可用的工作執行緒來處理。
根據上面的示意圖介紹可以看出,非同步並行程式設計,對於工作執行緒的利用率上升,不會出現工作執行緒阻塞的情況,但是因為任務拆分、工作執行緒間的切換排程等系統層面的開銷也會隨之加大。
如何選擇
前面介紹了下併發與並行兩種模式的特點、以及各自的優缺點。所以選擇採用併發還是並行方式來提升系統的處理效能,還需要結合實際專案場景來確定。
綜合而言
- 如果業務處理邏輯是CPU密集型的操作,優先使用基於執行緒池實現併發處理方案(可以避免執行緒間切換導致的系統性能浪費)。
- 如果業務處理邏輯中存在較多需要阻塞等待的耗時場景、且相互之間沒有依賴,比如本地IO操作、網路IO請求等等,這種情況優先選擇使用並行處理策略(可以避免寶貴的執行緒資源被阻塞等待)。
總結回顧
好啦,關於JAVA中CompletableFuture
的使用,以及並行程式設計相關的內容呢就介紹到這裡啦。看到這裡,相信您應該有所收穫吧?那麼你的專案裡有這種適合並行處理的場景嗎?你在處理並行場景的時候是怎麼做的呢?評論區一起討論下吧~~
補充:
本文中有提及CompletableFuture執行時所使用的預設執行緒池是ForkJoinPool
,早在JAVA7版本就已經被引入,但是很多人對ForkJoinPool
不是很瞭解,實際專案中使用的也比較少。其實對ForkJoinPool
的合理利用,可以讓我們在面對某些多執行緒場景時會更加的從容高效。在後面的文章中,我會針對ForkJoinPool
有關的內容進行專門的介紹與探討,如果有興趣,可以點個關注,及時獲取後續的內容。
此外
- 關於本文中涉及的演示程式碼的完整示例,我已經整理並提交到github中,如果您有需要,可以自取:http://github.com/veezean/JavaBasicSkills
我是悟道,聊技術、又不僅僅聊技術~
如果覺得有用,請點贊 + 關注讓我感受到您的支援。也可以關注下我的公眾號【架構悟道】,獲取更及時的更新。
期待與你一起探討,一起成長為更好的自己。
我正在參與掘金技術社群創作者簽約計劃招募活動,點選連結報名投稿。
- JAVA中使用最廣泛的本地快取?Ehcache的自信從何而來3 —— 解析Ehcache的各種叢集方案,本地快取如何變身分散式快取
- Redis快取何以一枝獨秀?——從百變應用場景與熱門面試題中感受下Redis的核心特性與使用注意點
- 聊一聊安全且正確使用快取的那些事 —— 關於快取可靠性、關乎資料一致性
- 聊一聊作為高併發系統基石之一的快取,會用很簡單,用好才是技術活
- JAVA基於CompletableFuture的流水線並行處理深度實踐,滿滿乾貨
- 講透JAVA Stream的collect用法與原理,遠比你想象的更強大
- 是時候優雅地和NullPointException說再見了
- 吃透JAVA的Stream流操作,多年實踐總結
- JAVA中計算兩個日期時間的差值竟然也有這麼多門道
- JAVA中簡單的for迴圈竟有這麼多坑,但願你沒踩過
- 我是如何將一個老系統的kafka消費者服務的效能提升近百倍的
- 搭建一個通用監控告警平臺,架構上需要有哪些設計
- 為什麼不建議使用自定義Object作為HashMap的key?
- 2022年中總結|桃李春風一杯酒,江湖夜雨十年燈
- Spring Data JPA系列3:JPA專案中核心場景與進階用法介紹
- Spring Data JPA系列2:SpringBoot整合JPA詳細教程,快速在專案中熟練使用JPA