你知道Java8併發新特性CompletableFuture嗎?

語言: CN / TW / HK

theme: orange highlight: a11y-dark


持續創作,加速成長!這是我參與「掘金日新計劃 · 6 月更文挑戰」的第1天,點選檢視活動詳情

1.CompletableFuture是什麼?

各位小夥伴是否有一種感覺,整天大量時間沉迷於業務開發的同時,缺少對於一些技術更新的關注,忽略掉了很多實用又簡單的方法,以往我們做非同步任務的時候都習慣於使用Callable或者Runnable介面去實現,今天我們就來聊聊與之不同的CompletableFuture類。

CompletableFuture針對Future介面做了改進,相比Callable/Runnable介面它支援多工進行鏈式呼叫、組合、多工併發處理。很多時候我們在設計過程中會想在一個非同步任務執行完成後,直接獲取它的結果傳遞給下一個任務繼續執行後續的流程,這時候CompletableFuture的作用就來了。 - CompletableFuture類關係圖 從以下類圖可以看到,CompletableFuture實現了Future和CompletionStage兩個介面,Future提供了獲取任務執行結果和任務執行狀態的功能。 CompletionStage表示一個任務的執行階段,提供了諸多方法支援了多工的聚合功能。

CompletableFuture.jpg

2.CompletableFuture的方法使用說明

2.1 CompletableFuture類提供幾個靜態方法來進行非同步操作:

supplyAsync與runAsync主要用於構建非同步事件。 - supplyAsync帶有返回值的非同步任務,支援在預設執行緒池ForkJoinPool.commonPool()中完成非同步任務,也可以使用自定義執行緒池執行非同步任務,結果返回一個新的CompletableFuture,返回結果型別U。最終的任務執行結果可通過返回CompletableFuture物件的 get()/join() 方法獲取返回值。 ``` // 使用預設執行緒池 public static CompletableFuture supplyAsync(Supplier supplier) {...} // 使用自定義執行緒池Executor public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) {...}

// ====================================demo華麗分割線============================================ CompletableFuture supplyAsyncFuture = CompletableFuture.supplyAsync(() -> { log.info("executing supplyAsync task ..."); return "this is supplyAsync"; }); // 進入阻塞獲取非同步任務結果 log.info(supplyAsyncFuture.get()); // 輸出結果:this is supplyAsync - **runAsync**:**不帶返回值**的非同步任務,支援在預設執行緒池ForkJoinPool.commonPool()中完成非同步任務,也可以使用自定義執行緒池執行非同步任務,結果返回一個新的CompletableFuture,返回結果型別為Void,也就是無返回值。 public static CompletableFuture runAsync(Runnable runnable) {...} public static CompletableFuture runAsync(Runnable runnable, Executor executor) {...}

// ====================================demo華麗分割線============================================ CompletableFuture runAsyncFuture = CompletableFuture.runAsync(() -> { log.info("executing runAsync task ..."); }); runAsyncFuture.get(); - **allOf**:多個CompletableFuture任務併發執行,所有CompletableFuture任務完成時,返回一個新的CompletableFuture物件,其返回值為Void,也就是無返回值。 public static CompletableFuture allOf(CompletableFuture<?>... cfs) {...}

// ====================================demo華麗分割線============================================ // allOf,可傳遞返回值不同型別的future,最終結果按自己設計預期處理即可 CompletableFuture cf11 = CompletableFuture.supplyAsync(() -> { log.info("executing supplyAsync task cf11 ..."); return "this is supplyAsync"; }); CompletableFuture cf12 = CompletableFuture.supplyAsync(() -> { log.info("executing supplyAsync task cf12 ..."); return "this is supplyAsync"; }); CompletableFuture allOfFuture = CompletableFuture.allOf(cf11, cf12); allOfFuture.get(); - **anyOf**:多個CompletableFuture任務併發執行,只要有一個CompletableFuture任務完成時,就會返回一個新的CompletableFuture物件,並返回該CompletableFuture執行完成任務的返回值。 public static CompletableFuture anyOf(CompletableFuture<?>... cfs) {...}

// ====================================demo華麗分割線============================================ CompletableFuture cf21 = CompletableFuture.supplyAsync(() -> { log.info("executing supplyAsync task cf21 ..."); return "this is supplyAsync cf21"; }); CompletableFuture cf22 = CompletableFuture.supplyAsync(() -> { log.info("executing supplyAsync task cf22 ..."); return "this is supplyAsync cf22"; }); CompletableFuture anyOfFuture = CompletableFuture.anyOf(cf21, cf22); log.info("{}", anyOfFuture.get()); // 輸出結果:this is supplyAsync cf21或cf22 ```

2.2 獲取非同步任務執行結果的方法 get()/join()

join()和get()方法都是CompletableFuture物件基於阻塞的方式來獲取非同步任務執行結果。

  • get方法會丟擲顯示異常必須捕獲處理,任務允許被中斷丟擲InterruptedException異常,通過帶有超時時間的阻塞方式獲取非同步任務執行結果,超時等待無結果則中斷任務丟擲TimeoutException異常。
  • join方法會丟擲未檢查異常,與get()方法不同的是join()方法不允許被中斷。 ``` // 可中斷,可設定超時時間 public T get() throws InterruptedException, ExecutionException {...} public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {...} /**
  • 不可中斷 */ public T join() {...} ```

3.CompletionStage的方法使用說明

CompletionStage表示一個任務的執行階段,每個任務都會返回一個CompletionStage物件,可以對多個CompletionStage物件進行序列、並行或者聚合的方式來進行下階段的操作,也就是說實現非同步任務的回撥功能。CompletionStage總共提供了38個方法來實現多個CompletionStage任務的各種操作, 接下來我們就針對這些方法分類來了解一下。

以下型別均有三種使用方式:

  • thenAccept:方法名不帶Async的使用主執行緒同步執行回撥函式,不做非同步處理
  • thenAcceptAsync:方法名帶Async,但是無executor引數的,使用預設執行緒池ForkJoinPool.commonPool非同步執行任務
  • thenAcceptAsync:方法名帶Async,有executor引數的,使用自定義執行緒池非同步執行任務

3.1 純消費型別

  • 依賴單個任務完成(thenAccept):由上一個CompletionStage任務執行完成的結果傳遞到action進行回撥處理,即僅僅消費了上一個CompletionStage任務的返回值,回撥處理結果無返回值。 ``` // 不使用執行緒池,僅依賴當前執行緒執行,不做非同步 public CompletionStage thenAccept(Consumer<? super T> action); // 使用預設執行緒池ForkJoinPool.commonPool執行任務 public CompletionStage thenAcceptAsync(Consumer<? super T> action); // 使用自定義執行緒池執行任務 public CompletionStage thenAcceptAsync(Consumer<? super T> action, Executor executor);

// ====================================demo華麗分割線============================================ CompletableFuture.supplyAsync(() -> "this is supplyAsync") .thenAcceptAsync((result) -> { log.info("{} thenAcceptAsync", result); }).join();

// 輸出結果:this is supplyAsync thenAcceptAsync ```

  • 依賴兩個任務都完成(thenAcceptBoth):兩個CompletionStage任務併發執行,必須都完成了才執行action回撥處理,即僅僅消費了兩個CompletionStage任務的返回值,回撥處理結果無返回值。 ``` /**
  • 額外多了CompletionStage引數表示CompletionStage任務依賴的另一個CompletionStage任務
  • action接收兩個引數,分別表示兩個CompletionStage任務的返回值 */ public CompletionStage thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); // 原理同上,使用預設執行緒池執行非同步任務 public CompletionStage thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); // 原理同上,使用自定義執行緒池執行非同步任務 public CompletionStage thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);

// ====================================demo華麗分割線============================================ CompletableFuture cf311 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf311"); CompletableFuture cf312 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf312"); cf311.thenAcceptBothAsync(cf312, (r1, r2) -> { log.info("{} and {}", r1, r2); }).join(); // 輸出結果:this is supplyAsync cf311 and this is supplyAsync cf312 ```

  • 依賴兩個任務中的任何一個完成(acceptEither):兩個CompletionStage任務併發執行,只要其中一個先完成了就攜帶返回值執行action回撥處理,即僅僅消費了優先完成的CompletionStage任務的返回值,回撥處理結果無返回值。 ``` /**
  • 類似thenAcceptBothAsync,只不過acceptEither只需兩個任務中的其中一個完成即可回撥action
  • action中的值為兩個任務中先執行完任務的返回值 */ public CompletionStage acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action); public CompletionStage acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action); public CompletionStage acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);

// ====================================demo華麗分割線============================================ CompletableFuture cf311 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf311"); CompletableFuture cf312 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf312"); cf311.acceptEitherAsync(cf312, (r) -> { log.info(r); // 輸出結果:this is supplyAsync cf311或cf312 }).join(); ```

3.2 有返回值型別

  • 依賴單個任務完成(thenApply):由上一個CompletionStage任務執行完成的結果傳遞到action進行回撥處理,即不止消費了上一個CompletaionStage任務的返回值,同時回撥處理結果也有返回值 ``` public CompletionStage thenApply(Function<? super T,? extends U> fn); public CompletionStage thenApplyAsync(Function<? super T,? extends U> fn); public CompletionStage thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

// ====================================demo華麗分割線============================================ CompletableFuture cf32 = CompletableFuture.supplyAsync(() -> "this is supplyAsync") .thenApplyAsync(result -> result + " and thenApplyAsync"); log.info(cf32.join()); // 輸出結果:this is supplyAsync and thenApplyAsync ```

  • 依賴兩個任務都完成(thenCombine):兩個CompletionStage任務併發執行,必須都完成了才執行action回撥處理,即不止消費了兩個CompletaionStage任務的返回值,同時回撥處理結果也有返回值。 ``` public CompletionStage thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public CompletionStage thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public CompletionStage thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);

// ====================================demo華麗分割線============================================ CompletableFuture cf321 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf321"); CompletableFuture cf322 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf322"); CompletableFuture thenCombineFuture = cf321.thenCombineAsync(cf322, (r1, r2) -> { return r1 + " and " + r2; }); log.info(thenCombineFuture.join()); // 輸出結果:this is supplyAsync cf321 and this is supplyAsync cf322 ```

  • 依賴兩個任務中的任何一個完成(applyToEither):兩個CompletionStage任務併發執行,只要其中一個任務執行完成就會action回撥處理,即不止消費了優先完成的CompletionStage的返回值,同時回撥處理結果也有返回值。 ``` // 原理同3.1的acceptEither,只不過applyToEither任務執行完成會返回一個帶有返回值的CompletionStage public CompletionStage applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn); public CompletionStage applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn); public CompletionStage applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor);

// ====================================demo華麗分割線============================================

CompletableFuture cf321 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf321"); CompletableFuture cf322 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf322"); CompletableFuture thenCombineFuture = cf321.applyToEitherAsync(cf322, (r) -> { return r; }); log.info(thenCombineFuture.join()); // 輸出結果:this is supplyAsync cf321或cf322 ```

3.3 不消費也不返回型別

  • 依賴單個任務完成(thenRun):單個CompletionStage任務執行完成回撥action處理,即執行action回撥方法無引數,回撥處理結果也無返回值。 ``` // 上一個CompletionStage任務執行完成後直接回調action處理,無返回值 public CompletionStage thenRun(Runnable action); // 同上,使用預設執行緒池執行action處理 public CompletionStage thenRunAsync(Runnable action); // 同上,使用自定義執行緒池執行action處理 public CompletionStage thenRunAsync(Runnable action, Executor executor);

// ====================================demo華麗分割線============================================ CompletableFuture.runAsync(() -> { // TODO }).thenRunAsync(() -> { log.info("this is thenRunAsync"); // 輸出結果:this is thenRunAsync }).join(); ```

  • 依賴兩個任務都完成(runAfterBoth):兩個CompletionStage任務併發執行,必須兩個任務都完成才執行action回撥處理,即執行action回撥方法無引數,回撥處理結果也無返回值。 ``` // 原理同3.1的thenAcceptBoth,只不過runAfterBoth的action回撥處理不接收引數且任務執行完成無返回值 public CompletionStage runAfterBoth(CompletionStage<?> other, Runnable action); // 同上,使用預設執行緒池執行action處理 public CompletionStage runAfterBothAsync(CompletionStage<?> other, Runnable action); // 同上,使用自定義執行緒池執行action處理 public CompletionStage runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);

// ====================================demo華麗分割線============================================ CompletableFuture cf331 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf331"); CompletableFuture cf332 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf332"); cf331.runAfterBoth(cf332, () -> { log.info("this is runAfterBoth"); }).join(); // 輸出結果:this is runAfterBoth ```

  • 依賴兩個任務中的任何一個完成(runAfterEither):兩個CompletionStage任務併發執行,只需其中任何一個任務完成即可回撥action處理,即執行action回撥方法無引數,回撥處理結果也無返回值。 ``` public CompletionStage runAfterEither(CompletionStage<?> other, Runnable action); public CompletionStage runAfterEitherAsync(CompletionStage<?> other, Runnable action); public CompletionStage runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);

// ====================================demo華麗分割線============================================ CompletableFuture cf331 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf331"); CompletableFuture cf332 = CompletableFuture.supplyAsync(() -> "this is supplyAsync cf332"); cf331.runAfterEitherAsync(cf332, () -> { log.info("this is runAfterEitherAsync"); }).join(); // 輸出結果:this is runAfterEitherAsync ```

3.4 組合型別

  • thenCompose:存在先後關係的兩個任務進行序列組合,由第一個CompletionStage任務執行結果作為引數傳遞給第二個CompletionStage任務,最終返回第二個CompletionStage。 ``` public CompletionStage thenCompose(Function<? super T, ? extends CompletionStage> fn); public CompletionStage thenComposeAsync(Function<? super T, ? extends CompletionStage> fn); public CompletionStage thenComposeAsync(Function<? super T, ? extends CompletionStage> fn, Executor executor);

// ====================================demo華麗分割線============================================ CompletableFuture supplyFuture = CompletableFuture.supplyAsync(() -> { return "this is supplyAsync"; }); CompletableFuture thenComposeFuture = supplyFuture.thenComposeAsync((r) -> { return CompletableFuture.supplyAsync(() -> { return r + " and this is thenComposeAsync"; }); }); log.info(thenComposeFuture.join()); // 輸出結果:this is supplyAsync and this is thenComposeAsync ```

3.5 任務事件型別

CompletionStage介面也支援類似我們常用的try-catch-finally中的finally的作用,無論這個任務的執行結果是正常還是出現異常的情況,都必須要去執行的一個程式碼塊。在CompletionStage介面提供了以下兩種介面回撥的形式(whenComplete、handle),並支援主執行緒同步執行同時也支援使用預設執行緒池,或者使用自定義執行緒池去非同步執行最終的回撥處理。例如我們一個事務操作,無論這段程式碼執行是否成功,我們都必須要去關閉事務。

  • 任務完成事件(whenComplete):結果無返回值,若出現異常執行完whenComplete回撥處理完成後將中斷主執行緒的執行。 ``` // 1.whenComplete回撥函式中Throwable物件不對空代表出現異常,為空則表示無異常 public CompletionStage whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletionStage whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletionStage whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);

// ====================================demo華麗分割線============================================ CompletableFuture whenCompleteFufute = CompletableFuture.supplyAsync(() -> { int a = 0; int b = 100 / a; return "this is supplyAsync normal"; }).whenCompleteAsync((r, th) -> { if (th != null) { log.error("this is whenCompleteAsync error"); } else { log.info("this is whenCompleteAsync success"); } }); log.info(whenCompleteFufute.join()); // 輸出結果:this is whenCompleteAsync error ```

  • 任務完成回撥事件(handle):結果有返回值,若出現異常執行完handle回撥處理完成後將繼續執行主執行緒的後續操作,不中斷主執行緒執行

``` // 2.handle回撥函式中Throwable物件不對空代表出現異常,為空則表示無異常 public CompletionStage handle(BiFunction<? super T, Throwable, ? extends U> fn); public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); public CompletionStage handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);

// ====================================demo華麗分割線============================================ CompletableFuture whenCompleteFufute = CompletableFuture.supplyAsync(() -> { int a = 0; int b = 100 / a; return "this is supplyAsync normal"; }).handleAsync((r, th) -> { if (th != null) { return "this is handleAsync error"; } else { return "this is handleAsync success"; } }); log.info(whenCompleteFufute.join()); // 輸出結果:this is handleAsync error log.info("main thread is running"); // 輸出結果:main thread is running ```

4.CompletionStage異常處理方法

  • exceptionally:只要是個程式,就會有異常出現的情況,例如一個CompletionStage任務,如果執行過程中出現異常,我們為了保證異常情況下程式能夠正常處理業務邏輯,那麼在這裡我們就可以使用exceptionally進行異常回調處理。當CompletionStage任務出現異常時就會觸發回撥exceptionally,否則CompletionStage任務正常執行業務不進行異常回調處理。 ``` public CompletionStage exceptionally(Function fn);

// ====================================demo華麗分割線============================================ CompletableFuture exceptionallyFuture = CompletableFuture.supplyAsync(() -> { int a = 0; int b = 10 / a; // 除數為0將拋異常 return "this is supplyAsync normal"; }).exceptionally(th -> { log.error("exception:{}", th.getMessage()); return "this is exceptionally"; }); log.info(exceptionallyFuture.join()); // 輸出結果:this is exceptionally ```

:以下這兩種情況可能大家在實際開發過程中會比較少見,但還是得在這裡做個提醒,以免到最後準備不充分出現設計上的缺陷。 - 當whenCompleteAsync與exceptionally同時使用時,若出現異常情況,由於exceptionally有返回值,所以優先執行whenCompleteAsync,後執行exceptionally。 - 當handleAsync與exceptionally同時出現時,由於handleAsync已經包含了exceptionally的所有操作,即handleAsync回撥有返回值,且有Throwable異常物件能夠進行異常處理,所以這兩者同時出現時exceptionally將失效。

5.方法型別總結

根據以上的方法我們可以總結出這些任務其實就分為三大類,相當於通過CompletionStage的回撥機制來實現多個任務串、多個任務並行、多個任務聚合的操作,因此CompletableFuture對於非同步任務的處理提供了更加強大的程式設計模型。所以說java8提供的CompletableFuture類相對原來的Future介面來說提供了一些鏈式的程式設計,省去了我們很多非同步任務回撥操作複雜的步驟,讓我們這些碼農們能夠有更高的效率輸出產品。