Future
自Java 5開(kāi)始添加了Future,用來(lái)描述一個(gè)異步計算的結果。獲取一個(gè)結果時(shí)方法較少,要么通過(guò)輪詢(xún)isDone,確認完成后調用get()獲取值,要么調用get()設置一個(gè)超時(shí)時(shí)間。但是get()方法會(huì )阻塞調用線(xiàn)程,這種阻塞的方式顯然和我們的異步編程的初衷相違背。如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Test public void testFuture() throws InterruptedException { ExecutorService es = Executors.newSingleThreadExecutor(); Future<Integer> f = es.submit(() -> { // 長(cháng)時(shí)間的異步計算 Thread.sleep(2000L); System.out.println("長(cháng)時(shí)間的異步計算"); return 100; }); while (true) { System.out.println("阻斷"); if (f.isDone()) { try { System.out.println(f.get()); es.shutdown(); break; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } Thread.sleep(100L); } }
|
雖然Future以及相關(guān)使用方法提供了異步執行任務(wù)的能力,但是對于結果的獲取卻是很不方便,只能通過(guò)阻塞或者輪詢(xún)的方式得到任務(wù)的結果。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢(xún)的方式又會(huì )耗費無(wú)謂的CPU資源,而且也不能及時(shí)地得到計算結果,為什么不能用觀(guān)察者設計模式當計算結果完成及時(shí)通知監聽(tīng)者呢?如Netty擴展Future的ChannelFuture接口,Node.js采用回調的方式實(shí)現異步編程。
為了解決這個(gè)問(wèn)題,自Java 8開(kāi)始,吸收了guava的設計思想,加入了Future的諸多擴展功能形成了CompletableFuture。
當一個(gè)Future可能需要顯示地完成時(shí),使用CompletionStage接口去支持完成時(shí)觸發(fā)的函數和操作。
當兩個(gè)及以上線(xiàn)程同時(shí)嘗試完成、異常完成、取消一個(gè)CompletableFuture時(shí),只有一個(gè)能成功。
CompletableFuture實(shí)現了CompletionStage接口的如下策略:
- 為了完成當前的CompletableFuture接口或者其他完成方法的回調函數的線(xiàn)程,提供了非異步的完成操作。
- 沒(méi)有顯式入參Executor的所有async方法都使用ForkJoinPool.commonPool()為了簡(jiǎn)化監視、調試和跟蹤,所有生成的異步任務(wù)都是標記接口A(yíng)synchronousCompletionTask的實(shí)例。
- 所有的CompletionStage方法都是獨立于其他共有方法實(shí)現的,因此一個(gè)方法的行為不會(huì )受到子類(lèi)中其他方法的覆蓋。
CompletableFuture實(shí)現了Future接口的如下策略:
- CompletableFuture無(wú)法直接控制完成,所以cancel操作被視為是另一種異常完成形式。方法
isCompletedExceptionally可以用來(lái)確定一個(gè)CompletableFuture是否以任何異常的方式完成。 - 方法get()和get(long,TimeUnit)拋出一個(gè)ExecutionException,對應CompletionException。為了在大多數上下文中簡(jiǎn)化用法,這個(gè)類(lèi)還定義了方法
join()和getNow(如果結果已經(jīng)計算完則返回結果或者拋出異常,否則返回給定的valueIfAbsent值),而不是直接在這些情況中直接拋出CompletionException。
CompletableFuture
CompletableFuture類(lèi)實(shí)現了CompletionStage和Future接口,所以你還是可以像以前一樣通過(guò)阻塞或者輪詢(xún)的方式獲得結果,盡管這種方式不推薦使用。
1 2 3
| public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { //... }
|
創(chuàng )建CompletableFuture對象
在該類(lèi)中提供了四個(gè)靜態(tài)方法創(chuàng )建CompletableFuture對象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
|
以Async結尾并且沒(méi)有指定Executor的方法會(huì )使用ForkJoinPool.commonPool()作為線(xiàn)程池執行異步代碼。
這些線(xiàn)程都是Daemon線(xiàn)程,主線(xiàn)程結束Daemon線(xiàn)程不結束,只有JVM關(guān)閉時(shí),生命周期終止。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Test public void testForCreate() { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//設置日期格式 String result = CompletableFuture.supplyAsync(() -> { return df.format(new Date()); }).thenApply(s -> "當前時(shí)間為: " + s).join(); System.out.println(result); CompletableFuture.runAsync(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("sleep for 1s :" + df.format(new Date()));// new Date()為獲取當前系統時(shí)間 }).join(); }
|
計算結果完成時(shí)的處理
當CompletableFuture的計算結果完成,或者拋出異常的時(shí)候,有如下四個(gè)方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); } public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); } public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor) { return uniWhenCompleteStage(screenExecutor(executor), action); } public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn) { return uniExceptionallyStage(fn); }
|
可以看到Action的類(lèi)型是BiConsumer<? super T,? super Throwable>它可以處理正常的計算結果,或者異常情況。
方法不以Async結尾,意味著(zhù)Action使用相同的線(xiàn)程執行,而Async可能會(huì )使用其他線(xiàn)程執行(如果是使用相同的線(xiàn)程池,也可能會(huì )被同一個(gè)線(xiàn)程選中執行)
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Test public void testComplete() { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//設置日期格式 CompletableFuture.runAsync(() -> { System.out.println("當前時(shí)間為:" + df.format(new Date())); throw new ArithmeticException("illegal exception!"); }).exceptionally(e -> { System.out.println("異常為: "+e.getMessage()); return null; }).whenComplete((v, e) -> System.out.println("complete")).join(); }
|
exceptionally方法返回一個(gè)新的CompletableFuture,當原始的CompletableFuture拋出異常的時(shí)候,就會(huì )觸發(fā)這個(gè)CompletableFuture的計算,調用function計算值,否則如果原始的CompletableFuture正常計算完后,這個(gè)新的CompletableFuture也計算完成,它的值和原始的CompletableFuture的計算的值相同。也就是這個(gè)exceptionally方法用來(lái)處理異常的情況。
除了上述四個(gè)方法之外,一組handle方法也可用于處理計算結果。當原先的CompletableFuture的值計算完成或者拋出異常的時(shí)候,會(huì )觸發(fā)這個(gè)CompletableFuture對象的計算,結果由BiFunction參數計算而得。因此這組方法兼有whenComplete和轉換的兩個(gè)功能。
1 2 3 4 5
| public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn); public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn); public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor);
|
我們將上面的例子進(jìn)行修改為使用handle實(shí)現的例子如下:
1 2 3 4 5 6 7 8 9 10 11
| @Test public void testHandle() { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//設置日期格式 String f = CompletableFuture.supplyAsync(() -> { System.out.println("當前時(shí)間為:" + df.format(new Date())); return "normal"; // throw new ArithmeticException("illegal exception!"); }).handleAsync((v, e) -> "value is: " + v + " && exception is: " + e).join(); System.out.println(f); }
|
從結果可以看出,handle實(shí)現了whenComplete和轉換的兩個(gè)功能。
進(jìn)行轉換
我們可以將操作串聯(lián)起來(lái),或者將CompletableFuture組合起來(lái)。關(guān)鍵的入參只有一個(gè)Function,它是函數式接口,所以使用Lambda表示起來(lái)會(huì )更加優(yōu)雅。它的入參是上一個(gè)階段計算后的結果,返回值是經(jīng)過(guò)轉化后結果。
1 2 3 4 5
| public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
|
函數的功能是當原來(lái)的CompletableFuture計算完后,將結果傳遞給函數fn,將fn的結果作為新的CompletableFuture計算結果。因此它的功能相當于將CompletableFuture<T>轉換成CompletableFuture<U>。
1 2 3 4 5 6
| @Test public void testFConvert() { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); String f = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString()).join(); System.out.println(f); }
|
需要注意的是,這些轉換并不是馬上執行的,也不會(huì )阻塞,而是在前一個(gè)stage完成后繼續執行。
它們與handle方法的區別在于handle方法會(huì )處理正常計算值和異常,因此它可以屏蔽異常,避免異常繼續拋出。而thenApply方法只是用來(lái)處理正常值,因此一旦有異常就會(huì )拋出。
消費
上面的方法是當計算完成的時(shí)候,會(huì )生成新的計算結果(thenApply, handle),或者返回同樣的計算結果whenComplete。我們可以在每個(gè)CompletableFuture 上注冊一個(gè)操作,該操作會(huì )在 CompletableFuture 完成執行后調用它。
1 2 3 4 5
| public CompletableFuture<Void> thenAccept(Consumer<? super T> action); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action); public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);
|
CompletableFuture 通過(guò) thenAccept 方法提供了這一功能,它接收CompletableFuture 執行完畢后的返回值做參數,只對結果執行Action,而不返回新的計算值,因此計算值為空:
1 2 3 4 5 6 7 8 9 10 11
| @Test public void testAccept() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }).thenAccept(System.out::println); }
|
thenAcceptBoth以及相關(guān)方法提供了類(lèi)似的功能,當兩個(gè)CompletionStage都正常完成計算的時(shí)候,就會(huì )執行提供的action,它用來(lái)組合另外一個(gè)異步的結果。
runAfterBoth是當兩個(gè)CompletionStage都正常完成計算的時(shí)候,執行一個(gè)Runnable,這個(gè)Runnable并不使用計算的結果。
1 2 3 4 5 6 7
| public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action); public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor); public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
|
如下的實(shí)現中,將會(huì )在兩個(gè)CompletionStage都正常完成后,輸出這兩個(gè)計算的結果:
1 2 3 4 5 6 7 8 9 10 11
| @Test public void testAcceptBoth() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } return "first"; }).thenAcceptBoth(CompletableFuture.completedFuture("second"), (first, second) -> System.out.println(first + " : " + second)).join(); }
|
下面一組方法當計算完成的時(shí)候會(huì )執行一個(gè)Runnable,與thenAccept不同,Runnable并不使用CompletableFuture計算的結果。
1 2 3 4 5
| public CompletableFuture<Void> thenRun(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action); public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);
|
我們進(jìn)行如下的應用:
1 2 3 4 5 6 7
| @Test public void testRun() { CompletableFuture.supplyAsync(() -> { System.out.println("執行CompletableFuture"); return "first"; }).thenRun(() -> System.out.println("finished")).join(); }
|
先前的CompletableFuture計算的結果被忽略了,這個(gè)方法返回CompletableFuture<Void>類(lèi)型的對象。
參考文檔
- Java8 Doc
- Java CompletableFuture 詳解
- CompletableFuture 詳解