日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Java8異步編程CompletableFuture全解析

本文轉(zhuǎn)載自微信公眾號「KK架構(gòu)師」,作者wangkai 。轉(zhuǎn)載本文請聯(lián)系KK架構(gòu)師公眾號。   

創(chuàng)新互聯(lián)主營金堂縣網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,app軟件開發(fā)公司,金堂縣h5重慶小程序開發(fā)搭建,金堂縣網(wǎng)站營銷推廣歡迎金堂縣等地區(qū)企業(yè)咨詢

本文大綱速看

 

一、異步編程

通常來說,程序都是順序執(zhí)行,同一時(shí)刻只會發(fā)生一件事情。如果一個函數(shù)依賴于另一個函數(shù)的結(jié)果,它只能等待那個函數(shù)結(jié)束才能繼續(xù)執(zhí)行,從用戶角度來說,整個程序才算執(zhí)行完畢。但現(xiàn)在的計(jì)算機(jī)普遍擁有多核 CPU,在那里干等著毫無意義,完全可以在另一個處理器內(nèi)核上干其他工作,耗時(shí)長的任務(wù)結(jié)束之后會主動通知你。這就是異步編程的出發(fā)點(diǎn):充分使用多核 CPU 的優(yōu)勢,最大程度提高程序性能。一句話來說:所謂異步編程,就是實(shí)現(xiàn)一個無需等待被調(diào)用函數(shù)的返回值而讓操作繼續(xù)運(yùn)行的方法。

二、拋出一個問題:如何實(shí)現(xiàn)燒水泡茶的程序

 

最后我們會使用傳統(tǒng)方式和 Java8 異步編程方式分別實(shí)現(xiàn),來對比一下實(shí)現(xiàn)復(fù)雜度。

三、Java5 的 Future 實(shí)現(xiàn)的異步編程

Future 是 Java 5 添加的類,用來描述一個異步計(jì)算的結(jié)果。你可以使用 isDone() 方法檢查計(jì)算是否完成,或者使用 get() 方法阻塞住調(diào)用線程,直到計(jì)算完成返回結(jié)果,也可以使用 cancel() 方法停止任務(wù)的執(zhí)行。

 
 
 
 
  1. public static void main(String[] args) throws InterruptedException, ExecutionException { 
  2.         ExecutorService es = Executors.newFixedThreadPool(5); 
  3.         Future f = es.submit(() -> 100); 
  4.         System.out.println(f.get()); 
  5.         es.shutdown(); 
  6.     } 

雖然 Future 提供了異步執(zhí)行任務(wù)的能力,但是對于結(jié)果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果。阻塞的方式顯然和我們異步編程的初衷相違背,輪詢的方式又會耗費(fèi)無謂的 CPU 資源,而且也不能及時(shí)的獲取結(jié)果。

當(dāng)然,很多其他的語言采用回調(diào)的方式來實(shí)現(xiàn)異步編程,比如 Node.js;Java 的一些框架,比如 Netty,Google Guava 也擴(kuò)展了 Future 接口,提供了很多回調(diào)的機(jī)制,封裝了工具類,輔助異步編程開發(fā)。

Java 作為老牌編程語言,自然也不會落伍。在 Java 8 中,新增了一個包含 50 多個方法的類:CompletableFuture,提供了非常強(qiáng)大的 Future 擴(kuò)展功能,可以幫助我們簡化異步編程的復(fù)雜性,提供函數(shù)式編程的能力。

四、CompletableFuture 類功能概覽

如下圖是 CompletableFuture 實(shí)現(xiàn)的接口:

 

它實(shí)現(xiàn)了 Future 接口,擁有 Future 所有的特性,比如可以使用 get() 方法獲取返回值等;還實(shí)現(xiàn)了 CompletionStage 接口,這個接口有超過 40 個方法,功能太豐富了,它主要是為了編排任務(wù)的工作流。

我們可以把工作流和工作流之間的關(guān)系分類為三種:串行關(guān)系,并行關(guān)系,匯聚關(guān)系。

串行關(guān)系

 

提供了如下的 api 來實(shí)現(xiàn)(先大致瀏覽一遍):

 
 
 
 
  1. CompletionStage thenApply(fn); 
  2. CompletionStage thenApplyAsync(fn); 
  3. CompletionStage thenAccept(consumer); 
  4. CompletionStage thenAcceptAsync(consumer); 
  5. CompletionStage thenRun(action); 
  6. CompletionStage thenRunAsync(action); 
  7. CompletionStage thenCompose(fn); 
  8. CompletionStage thenComposeAsync(fn); 

并行關(guān)系

多線程異步執(zhí)行就是并行關(guān)系

匯聚關(guān)系

 

匯聚關(guān)系,又分為 AND 匯聚關(guān)系和 OR 匯聚關(guān)系:

AND 匯聚關(guān)系,就是所有依賴的任務(wù)都完成之后再執(zhí)行;OR 匯聚關(guān)系,就是依賴的任務(wù)中有一個執(zhí)行完成,就開始執(zhí)行。

AND 匯聚關(guān)系由這些接口表達(dá):

 
 
 
 
  1. CompletionStage thenCombine(other, fn); 
  2. CompletionStage thenCombineAsync(other, fn); 
  3. CompletionStage thenAcceptBoth(other, consumer); 
  4. CompletionStage thenAcceptBothAsync(other, consumer); 
  5. CompletionStage runAfterBoth(other, action); 
  6. CompletionStage runAfterBothAsync(other, action); 

OR 匯聚關(guān)系由這些接口來表達(dá):

 
 
 
 
  1. CompletionStage applyToEither(other, fn); 
  2. CompletionStage applyToEitherAsync(other, fn); 
  3. CompletionStage acceptEither(other, consumer); 
  4. CompletionStage acceptEitherAsync(other, consumer); 
  5. CompletionStage runAfterEither(other, action); 
  6. CompletionStage runAfterEitherAsync(other, action); 

五、CompletableFuture 接口精講

1、提交執(zhí)行的靜態(tài)方法

方法名描述

方法名 描述
runAsync(Runnable runnable)執(zhí)行異步代碼,使用 ForkJoinPool.commonPool() 作為它的線程池
runAsync(Runnable runnable, Executor executor)執(zhí)行異步代碼,使用指定的線程池
supplyAsync(Supplier supplier)異步執(zhí)行代碼,有返回值,使用 ForkJoinPool.commonPool() 作為它的線程池
supplyAsync(Supplier supplier, Executor executor)異步執(zhí)行代碼,有返回值,使用指定的線程池執(zhí)行

上述四個方法,都是提交任務(wù)的,runAsync 方法需要傳入一個實(shí)現(xiàn)了 Runnable 接口的方法,supplyAsync 需要傳入一個實(shí)現(xiàn)了 Supplier 接口的方法,實(shí)現(xiàn) get 方法,返回一個值。

(1)run 和 supply 的區(qū)別

run 就是執(zhí)行一個方法,沒有返回值,supply 執(zhí)行一個方法,有返回值。

(2)一個參數(shù)和兩個參數(shù)的區(qū)別

第二個參數(shù)是線程池,如果沒有傳,則使用自帶的 ForkJoinPool.commonPool() 作為線程池,這個線程池默認(rèn)創(chuàng)建的線程數(shù)是 CPU 的核數(shù)(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設(shè)置 ForkJoinPool 線程池的線程數(shù))

2、串行關(guān)系 api

這些 api 之間主要是能否獲得前一個任務(wù)的返回值與自己是否有返回值的區(qū)別。

api 是否可獲得前一個任務(wù)的返回值 是否有返回值
thenApply
thenAccept
thenRun不能
thenCompose

(1) thenApply 和 thenApplyAsync 使用

thenApply 和 thenApplyAsync 把兩個并行的任務(wù)串行化,另一個任務(wù)在獲得上一個任務(wù)的返回值之后,做一些加工和轉(zhuǎn)換。它也是有返回值的。

 
 
 
 
  1. public class BasicFuture4 { 
  2.  
  3.     @Data 
  4.     @AllArgsConstructor 
  5.     @ToString 
  6.     static class Student { 
  7.         private String name; 
  8.     } 
  9.      
  10.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  11.         CompletableFuture future = CompletableFuture.supplyAsync(() -> "Jack") 
  12.                 .thenApply(s -> s + " Smith") 
  13.                 .thenApply(String::toUpperCase) 
  14.                 .thenApplyAsync(Student::new); 
  15.         System.out.println(future.get()); 
  16.     } 
  17.  

結(jié)果可以看到,輸入是一個字符串,拼接了一個字符串,轉(zhuǎn)換成大寫,new 了一個 Student 對象返回。

 
 
 
 
  1. BasicFuture4.Student(name=JACK SMITH) 

和 thenApply 一起的還有 thenAccept 和 thenRun,thenAccept 能獲得到前一個任務(wù)的返回值,但是自身沒有返回值;thenRun 不能獲得前一個任務(wù)的返回值,自身也沒有返回值。

(2)thenApply 和 thenApplyAsync 的區(qū)別

這兩個方法的區(qū)別,在于誰去執(zhí)行任務(wù)。如果使用 thenApplyAsync,那么執(zhí)行的線程是從 ForkJoinPool.commonPool() 或者自己定義的線程池中取線程去執(zhí)行。如果使用 thenApply,又分兩種情況,如果 supplyAsync 方法執(zhí)行速度特別快,那么 thenApply 任務(wù)就使用主線程執(zhí)行,如果 supplyAsync 執(zhí)行速度特別慢,就是和 supplyAsync 執(zhí)行線程一樣。

可以使用下面的例子演示一下:

 
 
 
 
  1. package com.dsj361.future; 
  2.  
  3. import java.util.concurrent.CompletableFuture; 
  4. import java.util.concurrent.ExecutionException; 
  5.  
  6. /** 
  7.  * @Author wangkai 
  8.  */ 
  9. public class BasicFuture8 { 
  10.  
  11.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  12.         System.out.println("----------supplyAsync 執(zhí)行很快"); 
  13.         CompletableFuture future1 = CompletableFuture.supplyAsync(() -> { 
  14.             System.out.println(Thread.currentThread().getName()); 
  15.             return "1"; 
  16.         }).thenApply(s -> { 
  17.             System.out.println(Thread.currentThread().getName()); 
  18.             return "2"; 
  19.         }); 
  20.         System.out.println(future1.get()); 
  21.  
  22.         System.out.println("----------supplyAsync 執(zhí)行很慢"); 
  23.         CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { 
  24.             try { 
  25.                 Thread.sleep(1000); 
  26.             } catch (InterruptedException e) { 
  27.             } 
  28.             System.out.println(Thread.currentThread().getName()); 
  29.             return "1"; 
  30.         }).thenApply(s -> { 
  31.             System.out.println(Thread.currentThread().getName()); 
  32.             return "2"; 
  33.         }); 
  34.         System.out.println(future2.get()); 
  35.     } 

執(zhí)行結(jié)果:

 
 
 
 
  1. ----------supplyAsync 執(zhí)行很快 
  2. ForkJoinPool.commonPool-worker-1 
  3. main 
  4. ----------supplyAsync 執(zhí)行很慢 
  5. ForkJoinPool.commonPool-worker-1 
  6. ForkJoinPool.commonPool-worker-1 

(3)thenCompose 的使用

假設(shè)有兩個異步任務(wù),第二個任務(wù)想要獲取第一個任務(wù)的返回值,并且做運(yùn)算,我們可以用 thenCompose。此時(shí)使用 thenApply 也可以實(shí)現(xiàn),看一段代碼發(fā)現(xiàn)他們的區(qū)別:

 
 
 
 
  1. public class BasicFuture9 { 
  2.  
  3.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  4.         CompletableFuture future = getLastOne().thenCompose(BasicFuture9::getLastTwo); 
  5.         System.out.println(future.get()); 
  6.  
  7.         CompletableFuture> future2 = getLastOne().thenApply(s -> getLastTwo(s)); 
  8.         System.out.println(future2.get().get()); 
  9.     } 
  10.  
  11.     public static CompletableFuture getLastOne(){ 
  12.         return CompletableFuture.supplyAsync(()-> "topOne"); 
  13.     } 
  14.  
  15.     public static CompletableFuture getLastTwo(String s){ 
  16.         return CompletableFuture.supplyAsync(()-> s + "  topTwo"); 
  17.     } 

可以看到使用 thenApply 的時(shí)候,需要使用兩個 get() 方法才能獲取到最終的返回值,使用 thenCompose 只要一個即可。

3、And 匯聚關(guān)系 Api

(1)thenCombine 的使用

加入我們要計(jì)算兩個異步方法返回值的和,就必須要等到兩個異步任務(wù)都計(jì)算完才能求和,此時(shí)可以用 thenCombine 來完成。

 
 
 
 
  1. public static void main(String[] args) throws ExecutionException, InterruptedException { 
  2.     CompletableFuture thenComposeOne = CompletableFuture.supplyAsync(() -> 192); 
  3.     CompletableFuture thenComposeTwo = CompletableFuture.supplyAsync(() -> 196); 
  4.     CompletableFuture thenComposeCount = thenComposeOne 
  5.         .thenCombine(thenComposeTwo, (s, y) -> s + y); 
  6.  
  7.     thenComposeOne.thenAcceptBoth(thenComposeTwo,(s,y)-> System.out.println("thenAcceptBoth")); 
  8.     thenComposeOne.runAfterBoth(thenComposeTwo, () -> System.out.println("runAfterBoth")); 
  9.  
  10.     System.out.println(thenComposeCount.get()); 

可以看到 thenCombine 第二個參數(shù)是一個 Function 函數(shù),前面兩個異步任務(wù)都完成之后,使用這個函數(shù)來完成一些運(yùn)算。

(2)thenAcceptBoth

接收前面兩個異步任務(wù)的結(jié)果,執(zhí)行一個回調(diào)函數(shù),但是這個回調(diào)函數(shù)沒有返回值。

(3)runAfterBoth

接收前面兩個異步任務(wù)的結(jié)果,但是回調(diào)函數(shù),不接收參數(shù),也不返回值。

4、Or 匯聚關(guān)系 Api

 
 
 
 
  1. public class BasicFuture11 { 
  2.  
  3.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  4.         CompletableFuture thenComposeOne = CompletableFuture.supplyAsync(() -> 192); 
  5.         CompletableFuture thenComposeTwo = CompletableFuture.supplyAsync(() -> 196); 
  6.         CompletableFuture thenComposeCount = thenComposeOne 
  7.                 .applyToEither(thenComposeTwo, s -> s + 1); 
  8.  
  9.         thenComposeOne.acceptEither(thenComposeTwo,s -> {}); 
  10.          
  11.         thenComposeOne.runAfterEither(thenComposeTwo,()->{}); 
  12.  
  13.         System.out.println(thenComposeCount.get()); 
  14.     } 

(1)applyToEither

任何一個執(zhí)行完就執(zhí)行回調(diào)方法,回調(diào)方法接收一個參數(shù),有返回值

(2)acceptEither

任何一個執(zhí)行完就執(zhí)行回調(diào)方法,回調(diào)方法接收一個參數(shù),無返回值

(3)runAfterEither

任何一個執(zhí)行完就執(zhí)行回調(diào)方法,回調(diào)方法不接收參數(shù),也無返回值

5、處理異常

上面我們講了如何把幾個異步任務(wù)編排起來,執(zhí)行一些串行或者匯聚操作。還有一個重要的地方,就是異常的處理。

先看下面的例子:

 
 
 
 
  1. public static void main(String[] args) throws ExecutionException, InterruptedException { 
  2.     CompletableFuture.supplyAsync(() -> { 
  3.         System.out.println("execute one "); 
  4.         return 100; 
  5.     }) 
  6.         .thenApply(s -> 10 / 0) 
  7.         .thenRun(() -> System.out.println("thenRun")) 
  8.         .thenAccept(s -> System.out.println("thenAccept")); 
  9.  
  10.     CompletableFuture.runAsync(() -> System.out.println("other")); 

結(jié)果:

 
 
 
 
  1. execute one  
  2. other 

可以發(fā)現(xiàn),只要鏈條上有一個任務(wù)發(fā)生了異常,這個鏈條下面的任務(wù)都不再執(zhí)行了。

但是 main 方法上的接下來的代碼還是會執(zhí)行的。

所以這個時(shí)候,需要合理的去處理異常來完成一些收尾的工作。

 
 
 
 
  1. public class BasicFuture12 { 
  2.  
  3.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  4.         CompletableFuture.supplyAsync(() -> { 
  5.             System.out.println("execute one "); 
  6.             return 100; 
  7.         }) 
  8.                 .thenApply(s -> 10 / 0) 
  9.                 .thenRun(() -> System.out.println("thenRun")) 
  10.                 .thenAccept(s -> System.out.println("thenAccept")) 
  11.                 .exceptionally(s -> { 
  12.                     System.out.println("異常處理"); 
  13.                     return null; 
  14.                 }); 
  15.  
  16.         CompletableFuture.runAsync(() -> System.out.println("other")); 
  17.     } 

可以使用 exceptionally 來處理異常。

使用 handle() 方法也可以處理異常。但是 handle() 方法的不同之處在于,即使沒有發(fā)生異常,也會執(zhí)行。

六、燒水泡茶程序的實(shí)現(xiàn)

1、使用 Thread 多線程和 CountDownLatch 來實(shí)現(xiàn)

 
 
 
 
  1. public class MakeTee { 
  2.  
  3.     private static CountDownLatch countDownLatch = new CountDownLatch(2); 
  4.  
  5.     static class HeatUpWater implements Runnable { 
  6.  
  7.         private CountDownLatch countDownLatch; 
  8.  
  9.         public HeatUpWater(CountDownLatch countDownLatch) { 
  10.             this.countDownLatch = countDownLatch; 
  11.         } 
  12.         @Override 
  13.         public void run() { 
  14.             try { 
  15.                 System.out.println("洗水壺"); 
  16.                 Thread.sleep(1000); 
  17.                 System.out.println("燒開水"); 
  18.                 Thread.sleep(5000); 
  19.                 countDownLatch.countDown(); 
  20.             } catch (InterruptedException e) { 
  21.             } 
  22.  
  23.         } 
  24.     } 
  25.  
  26.     static class PrepareTee implements Runnable { 
  27.         private CountDownLatch countDownLatch; 
  28.  
  29.         public PrepareTee(CountDownLatch countDownLatch) { 
  30.             this.countDownLatch = countDownLatch; 
  31.         } 
  32.  
  33.         @Override 
  34.         public void run() { 
  35.             try { 
  36.                 System.out.println("洗茶壺"); 
  37.                 Thread.sleep(1000); 
  38.                 System.out.println("洗茶杯"); 
  39.                 Thread.sleep(1000); 
  40.                 System.out.println("拿茶葉"); 
  41.                 Thread.sleep(1000); 
  42.                 countDownLatch.countDown(); 
  43.             } catch (InterruptedException e) { 
  44.             } 
  45.         } 
  46.     } 
  47.     public static void main(String[] args) throws InterruptedException { 
  48.         new Thread(new HeatUpWater(countDownLatch) ).start(); 
  49.         new Thread(new PrepareTee(countDownLatch)).start(); 
  50.         countDownLatch.await(); 
  51.         System.out.println("準(zhǔn)備就緒,開始泡茶"); 
  52.     } 

這里我們使用兩個線程,分別執(zhí)行燒水和泡茶的程序,使用 CountDownLatch 來協(xié)調(diào)兩個線程的進(jìn)度,等到他們都執(zhí)行完成之后,再執(zhí)行泡茶的動作。

可以看到這種方法,多了很多不必要的代碼,new Thread,人工維護(hù) CountDownLatch 的進(jìn)度。

2、使用 CompletableFuture 來實(shí)現(xiàn)

 
 
 
 
  1. public class MakeTeeFuture { 
  2.  
  3.     public static void main(String[] args) throws ExecutionException, InterruptedException { 
  4.         CompletableFuture future1 = CompletableFuture.runAsync(() -> { 
  5.             try { 
  6.                 System.out.println("洗水壺"); 
  7.                 Thread.sleep(1000); 
  8.                 System.out.println("燒開水"); 
  9.                 Thread.sleep(5000); 
  10.             } catch (InterruptedException e) { 
  11.                 e.printStackTrace(); 
  12.             } 
  13.         }); 
  14.         CompletableFuture future2 = CompletableFuture.runAsync(() -> { 
  15.             try { 
  16.                 System.out.println("洗茶壺"); 
  17.                 Thread.sleep(1000); 
  18.                 System.out.println("洗茶杯"); 
  19.                 Thread.sleep(1000); 
  20.                 System.out.println("拿茶葉"); 
  21.                 Thread.sleep(1000); 
  22.             } catch (InterruptedException e) { 
  23.                 e.printStackTrace(); 
  24.             } 
  25.         }); 
  26.         CompletableFuture finish = future1.runAfterBoth(future2, () -> { 
  27.             System.out.println("準(zhǔn)備完畢,開始泡茶"); 
  28.         }); 
  29.         System.out.println(finish.get()); 
  30.     } 

這個程序極度簡單,無需手工維護(hù)線程,給任務(wù)分配線程的工作也不需要關(guān)注。

同時(shí)語義也更加清晰,future1.runAfterBoth(future2,......) 能夠清晰的表述“任務(wù) 3 要等到任務(wù) 1 和任務(wù) 2 都完成之后才能繼續(xù)開始”

然后代碼更加簡練并且專注于業(yè)務(wù)邏輯,幾乎所有的代碼都是業(yè)務(wù)邏輯相關(guān)的。

七、總結(jié)

本文介紹了異步編程的概念,以及 Java8 的 CompletableFuture 是如何優(yōu)雅的處理多個異步任務(wù)之間的協(xié)調(diào)工作的。CompletableFuture 能夠極大簡化我們對于異步任務(wù)編排的工作,F(xiàn)link 在提交任務(wù)時(shí),也是使用這種異步任務(wù)的方式,去編排提交時(shí)和提交后對于任務(wù)狀態(tài)處理的一些工作的。


網(wǎng)站題目:Java8異步編程CompletableFuture全解析
文章出自:http://www.dlmjj.cn/article/djeepei.html