日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
Java8異步編程之CompletableFuture源碼解讀

【稿件】

一、引言

一說(shuō)到異步任務(wù),很多人上來(lái)咔咔新建個(gè)線程池。為了防止線程數(shù)量肆虐,一般還會(huì)考慮使用單例模式創(chuàng)建線程池,具體使用方法大都如下面的代碼所示: 

 
 
 
 
  1. @Test  
  2. publicvoiddemo1() throwsExecutionException, InterruptedException {  
  3. ExecutorServiceexecutorService=Executors.newFixedThreadPool(5);  
  4. Futurefuture1=executorService.submit(newCallable() {  
  5. @Override  
  6. publicObjectcall() throwsException {  
  7. returnThread.currentThread().getName();
      }    
  8.  
  9. }); 
  10.  
  11. System.out.println(future1.get()); 
  12.  
  13. executorService.execute(newRunnable() { 
  14.  
  15. @Overridepublicvoidrun() { 
  16.  
  17. System.out.println(Thread.currentThread().getName());        
  18.  
  19. }   
  20.  
  21. }); 
  22.  
  23. }  
  24. 經(jīng)常使用 JavaScript 的同學(xué)相信對(duì)于異步回調(diào)的用法相當(dāng)熟悉了,畢竟 JavaScript 擁有“回調(diào)地獄”的美譽(yù)。

    我們大 Java 又開啟了新一輪模仿之旅。

    java.util.concurrent 包新增了 CompletableFuture 類可以實(shí)現(xiàn)類似 JavaScript 的連續(xù)回調(diào)。

    二、兩種基本用法

    先來(lái)看下 CompletableFuture 的兩種基本?法,代碼如下: 

     
     
     
     
    1. @Test 
    2.  
    3. public void index1() throws ExecutionException, InterruptedException { 
    4.  
    5.   CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> Thread.currentThread().getName()); 
    6.  
    7.    CompletableFuture completableFuture2 = CompletableFuture.runAsync(() -> Thread.currentThread().getName()); 
    8.  
    9.    System.out.println(completableFuture1.get()); System.out.println(completableFuture2.get()); 
    10.  
    11. }  

    打印輸出: 

     
     
     
     
    1. ForkJoinPool.commonPool-worker-1 
    2. null 

    初看代碼,第一反應(yīng)是代碼簡(jiǎn)潔。直接調(diào)用 CompletableFuture 類的靜態(tài)方法,提交任務(wù)方法就完事了。但是,隨之而來(lái)的疑問就是,異步任務(wù)執(zhí)行的背后是一套什么邏輯呢?是一對(duì)一使用newThread()還是依賴線程池去執(zhí)行的呢。

    三、探索線程池原理

    翻閱 CompletableFuture 類的源碼,我們找到答案。關(guān)鍵代碼如下:

     
     
     
     
    1. private static final boolean useCommonPool = 
    2.  (ForkJoinPool.getCommonPoolParallelism() > 1); 
    3. /** 
    4. * Default executor -- ForkJoinPool.commonPool() unless it cannot 
    5. * support parallelism. 
    6. */ 
    7. private static final Executor asyncPool = useCommonPool ? 
    8.  ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 

    可以看到 CompletableFuture 類默認(rèn)使?的是 ForkJoinPool.commonPool() ?法返回的線程池。當(dāng) 然啦,前提是 ForkJoinPool 線程池的數(shù)量?于 1 。否則,則使? CompletableFuture 類?定義的 ThreadPerTaskExecutor 線程池。 ThreadPerTaskExecutor 線程池的實(shí)現(xiàn)邏輯?常簡(jiǎn)單,??代碼簡(jiǎn)單實(shí)現(xiàn)了 Executor 接?,內(nèi)部執(zhí)? 邏輯是?條任務(wù)對(duì)應(yīng)?條線程。代碼如下:

     
     
     
     
    1. /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 
    2. static final class ThreadPerTaskExecutor implements Executor { 
    3.  public void execute(Runnable r) { new Thread(r).start(); } 

    四、兩種異步接?

    之前我們使?線程池執(zhí)?異步任務(wù)時(shí),當(dāng)不需要任務(wù)執(zhí)?完畢后返回結(jié)果的,我們都是實(shí)現(xiàn) Runnable 接?。?當(dāng)需要實(shí)現(xiàn)返回值時(shí),我們使?的則是 Callable 接?。 同理,使? CompletableFuture 類的靜態(tài)?法執(zhí)?異步任務(wù)時(shí),不需要返回結(jié)果的也是實(shí)現(xiàn) Runnable 接?。?當(dāng)需要實(shí)現(xiàn)返回值時(shí),我們使?的則是 Supplier 接?。其實(shí),Callable 接?和 Supplier 接? 并沒有什么區(qū)別。 接下來(lái),我們來(lái)分析?下 CompletableFuture 是如何實(shí)現(xiàn)異步任務(wù)執(zhí)?的。

    runAsync

    CompletableFuture 執(zhí)??返回值任務(wù)的是 runAsync() ?法。該?法的關(guān)鍵執(zhí)?代碼如下:

     
     
     
     
    1. static CompletableFuture asyncRunStage(Executor e, Runnable f) { 
    2.  if (f == null) throw new NullPointerException(); 
    3.  CompletableFuture d = new CompletableFuture(); 
    4.  e.execute(new AsyncRun(d, f)); 
    5.  return d; 

    可以看到,該?法將 Runnable 實(shí)例作為參數(shù)封裝? AsyncRun 類。實(shí)際上, AsyncRun 類是對(duì) Runnable 接?的進(jìn)?步封裝。實(shí)際上,AsyncRun 類也是實(shí)現(xiàn)了 Runnable 接?。觀察下? AsyncRun 類的源碼,可以看到 AsyncRun 類的 run() ?法中調(diào)?了 Runnable 參數(shù)的 run() ?法。

     
     
     
     
    1. public void run() { 
    2.  CompletableFuture d; Runnable f; 
    3.  if ((d = dep) != null && (f = fn) != null) { 
    4.  dep = null; fn = null; 
    5.  if (d.result == null) { 
    6.  try { 
    7.  f.run(); 
    8.  d.completeNull(); 
    9.  } catch (Throwable ex) { 
    10.  d.completeThrowable(ex); 
    11.  } 
    12.  } 
    13.  d.postComplete(); 
    14.  } 

    當(dāng)提交的任務(wù)執(zhí)?完畢后,即 f.run() ?法執(zhí)?完畢。調(diào)? d.completeNull() ?法設(shè)置任務(wù)執(zhí)?結(jié) 果為空。代碼如下:

     
     
     
     
    1. /** The encoding of the null value. */ 
    2. static final AltResult NIL = new AltResult(null); 
    3. /** Completes with the null value, unless already completed. */ 
    4. final boolean completeNull() { 
    5.  return UNSAFE.compareAndSwapObject(this, RESULT, null, 
    6.  NIL); 

    可以看到,對(duì)于任務(wù)返回值為 null 的執(zhí)?結(jié)果,被封裝為 new AltResult(null) 對(duì)象。?且,還是 調(diào)?的 CAS 本地?法實(shí)現(xiàn)了原?操作。 為什么需要對(duì) null 值進(jìn)?單獨(dú)封裝呢?觀察 get() ?法的源碼:

     
     
     
     
    1. public T get() throws InterruptedException, ExecutionException { 
    2.  Object r; 
    3.  return reportGet((r = result) == null ? waitingGet(true) : r); 

    原來(lái)原因是便于使? null 值區(qū)分異步任務(wù)是否執(zhí)?完畢。 如果你對(duì) CAS 不太了解的話,可以查閱 compareAndSwapObject ?法的四個(gè)參數(shù)的含義。該?法的參 數(shù) RESULT 是什么呢?查看代碼如下:

     
     
     
     
    1. RESULT = u.objectFieldOffset(k.getDeclaredField("result")); 

    原來(lái),RESULT 是獲取 CompletableFuture 對(duì)象中 result 字段的偏移地址。這個(gè) result 字段?是啥 呢?就是任務(wù)執(zhí)?完畢后的結(jié)果值。代碼如下:

     
     
     
     
    1. // Either the result or boxed AltResult 
    2. volatile Object result;  

    supplyAsync

    CompletableFuture 執(zhí)?有返回值任務(wù)的是 supplyAsync() ?法。該?法的關(guān)鍵執(zhí)?代碼如下:

     
     
     
     
    1. static  CompletableFuture asyncSupplyStage(Executor e, 
    2.  Supplier f) { 
    3.  if (f == null) throw new NullPointerException(); 
    4.  CompletableFuture d = new CompletableFuture(); 
    5.  e.execute(new AsyncSupply(d, f)); 
    6.  return d; 

    與 AsyncRun 類對(duì) Runnable 接?的封裝相同的是,AsyncSupply 類也是對(duì) Runnable 接?的 run() ? 法進(jìn)?了?層封裝。代碼如下:

     
     
     
     
    1. public void run() { 
    2.  CompletableFuture d; Supplier f; 
    3.  if ((d = dep) != null && (f = fn) != null) { 
    4.  dep = null; fn = null; 
    5.  if (d.result == null) { 
    6.  try { 
    7.  d.completeValue(f.get()); 
    8.  } catch (Throwable ex) { 
    9.  d.completeThrowable(ex); 
    10.  } 
    11.  } 
    12.  d.postComplete(); 
    13.  } 

    當(dāng)異步任務(wù)執(zhí)?完畢后,返回結(jié)果會(huì)經(jīng) d.completeValue() ?法進(jìn)?封裝。與 d.completeNull() ? 法不同的是,該?法具有?個(gè)參數(shù)。代碼如下:

     
     
     
     
    1. /** Completes with a non-exceptional result, unless already completed. */ 
    2. final boolean completeValue(T t) { 
    3.  return UNSAFE.compareAndSwapObject(this, RESULT, null, 
    4.  (t == null) ? NIL : t); 

    ?論是類 AsyncRun 還是類 AsyncSupply ,run() ?法都會(huì)在執(zhí)?結(jié)束之際調(diào)? CompletableFuture 對(duì)象的 postComplete() ?法。顧名思義,該?法將通知后續(xù)回調(diào)函數(shù)的執(zhí)?。

    五、探究回調(diào)函數(shù)原理

    前?我們提到了 CompletableFuture 具有連續(xù)回調(diào)的特性。舉個(gè)例?:

     
     
     
     
    1. @Test 
    2. public void demo2() throws ExecutionException, InterruptedException { 
    3.  CompletableFuture completableFuture = 
    4. CompletableFuture.supplyAsync(() -> { 
    5.  System.out.println(Thread.currentThread().getName()); 
    6.  return new ArrayList(); 
    7.  }) 
    8.  .whenCompleteAsync((list, throwable) -> { 
    9.  System.out.println(Thread.currentThread().getName()); 
    10.  list.add(1); 
    11.  }) 
    12.  .whenCompleteAsync((list, throwable) -> { 
    13.  System.out.println(Thread.currentThread().getName()); 
    14.  list.add(2); 
    15.  }) 
    16.  .whenCompleteAsync((list, throwable) -> { 
    17. System.out.println(Thread.currentThread().getName()); 
    18.  list.add(3); 
    19.  }); 
    20.  System.out.println(completableFuture.get()); 

    打印輸出:

     
     
     
     
    1. ForkJoinPool.commonPool-worker-1 
    2. ForkJoinPool.commonPool-worker-1 
    3. ForkJoinPool.commonPool-worker-1 
    4. ForkJoinPool.commonPool-worker-1 
    5. [1, 2, 3] 

    上?的測(cè)試?法中,通過(guò) supplyAsync ?法提交異步任務(wù),當(dāng)異步任務(wù)運(yùn)?結(jié)束,對(duì)結(jié)果值添加三個(gè)回 調(diào)函數(shù)進(jìn)?步處理。 觀察打印輸出,可以初步得出如下結(jié)論:

    1. 異步任務(wù)與回調(diào)函數(shù)均運(yùn)?在同?個(gè)線程中。
    2. 回調(diào)函數(shù)的調(diào)?順序與添加回調(diào)函數(shù)的順序?致。

    那么問題來(lái)了,CompletableFuture 內(nèi)部是如何處理連續(xù)回調(diào)函數(shù)的呢?

    AsyncSupply

    當(dāng)我們提交異步任務(wù)時(shí),等價(jià)于向線程池提交 AsyncSupply 對(duì)象或者 AsyncRun 對(duì)象。觀察這兩個(gè)類 的唯?構(gòu)造?法都是相同的,代碼如下:

     
     
     
     
    1. AsyncSupply(CompletableFuture dep, Supplier fn) { 
    2.  this.dep = dep; this.fn = fn; 

    這就將 AsyncSupply 異步任務(wù)與返回給?戶的 CompletableFuture 對(duì)象進(jìn)?綁定,?于在執(zhí)?結(jié)束后 回填結(jié)果到 CompletableFuture 對(duì)象,以及通知后續(xù)回調(diào)函數(shù)的運(yùn)?。

    Completion

    回調(diào)函數(shù)均是 Completion 類的?類,抽取 Completion 類與?類的關(guān)鍵代碼:

     
     
     
     
    1. Completion next; 
    2. CompletableFuture dep; 
    3. CompletableFuture src; 
    4. Function fn; 

    Completion 類含有 next 字段,很明顯是?個(gè)鏈表。 Completion 的?類含有兩個(gè) CompletableFuture 類型的參數(shù),dep 是新建的、?于下?步的 CompletableFuture 對(duì)象,src 則是引?它的 CompletableFuture 對(duì)象。

    當(dāng) Completion 執(zhí)?完回調(diào)?法后,?般會(huì)返回 dep 對(duì)象,?于迭代遍歷。

    CompletableFuture

    觀察源碼,CompletableFuture 主要包含下?兩個(gè)參數(shù):

     
     
     
     
    1. volatile Object result; //結(jié)果 
    2. volatile Completion stack; //回調(diào)?法棧 

    Completion 類型封裝了回調(diào)?法,但為什么要起名為 stack (棧)呢? 因?yàn)?CompletableFuture 借助 Completion 的鏈表結(jié)構(gòu)實(shí)現(xiàn)了棧。每當(dāng)調(diào)? CompletableFuture 對(duì) 象的 whenCompleteAsync() 或其它回調(diào)?法時(shí),都會(huì)新建?個(gè) Completion 對(duì)象,并壓到棧頂。代碼 如下:

     
     
     
     
    1. final boolean tryPushStack(Completion c) { 
    2.  Completion h = stack; 
    3.  lazySetNext(c, h); 
    4.  return UNSAFE.compareAndSwapObject(this, STACK, h, c); 

    postComplete

    回顧上?兩種異步任務(wù)類的實(shí)現(xiàn),當(dāng)異步任務(wù)執(zhí)?完畢之后,都會(huì)調(diào)? postComplete() ?法通知回調(diào) ?法的執(zhí)?。代碼如下:

     
     
     
     
    1. final void postComplete() { 
    2.  CompletableFuture f = this; Completion h; 
    3.  while ((h = f.stack) != null || 
    4.  (f != this && (h = (f = this).stack) != null)) { 
    5.  CompletableFuture d; Completion t; 
    6.  if (f.casStack(h, t = h.next)) { 
    7.  if (t != null) { 
    8.  if (f != this) { 
    9.  pushStack(h); 
    10.  continue; 
    11.  } 
    12.  h.next = null; // detach 
    13.  } 
    14.  f = (d = h.tryFire(NESTED)) == null ? this : d; 
    15.  } 
    16.  } 

    這段代碼是本?的核?部分,?致邏輯如下:

    當(dāng)異步任務(wù)執(zhí)?結(jié)束后,CompletableFuture 會(huì)查看?身是否含有回調(diào)?法棧,如果含有,會(huì)通過(guò) casStack() ?法拿出棧頂元素 h ,此時(shí)的棧頂是原來(lái)?xiàng)5牡?位元素 t。如果 t 等于 null,那么直接 執(zhí)?回調(diào)?法 h,并返回下?個(gè) CompletableFuture 對(duì)象。然后?直迭代這個(gè)過(guò)程。 簡(jiǎn)化上述思路,我更想稱其為通過(guò) Completion 對(duì)象實(shí)現(xiàn)橋接的 CompletableFuture 鏈表,流程圖如 下:

    上?的過(guò)程是屬于正常情況下的,也就是?個(gè) CompletableFuture 對(duì)象只提交?個(gè)回調(diào)?法的情況。 如果我們使?同?個(gè) CompletableFuture 對(duì)象連續(xù)調(diào)?多次回調(diào)?法,那么就會(huì)形成 Completion 棧。

    你以為 Completion 棧內(nèi)元素會(huì)依次調(diào)?,不會(huì)的。從代碼中來(lái)看,當(dāng)回調(diào)?法 t 不等于 null,有兩種 情況:

    情況 1:如果當(dāng)前迭代到的 CompletableFuture 對(duì)象是 this (也就是 CompletableFuture 鏈表頭), 會(huì)令 h.next = null ,因?yàn)?h.next 也就是 t 通過(guò) CAS 的?式壓到了 this 對(duì)象的 stack 棧頂。

    情況 2:如果當(dāng)前迭代到的 CompletableFuture 對(duì)象 f 不是 this (不是鏈表頭)的話,會(huì)將回調(diào)函數(shù) h 壓? this (鏈表頭)的 stack 中。然后從鏈表頭再次迭代遍歷。這樣下去,對(duì)象 f 中的回調(diào)?法棧假設(shè) 為 3-2-1,從 f 的棧頂推出再壓? this 的棧頂,順序就變?yōu)榱?1-2-3。這時(shí)候,情況就變成了第 1 種。

    這樣,當(dāng)回調(diào)?法 t = h.next 等于 null 或者 f 等于 this 時(shí),都會(huì)對(duì)棧頂?shù)幕卣{(diào)?法進(jìn)?調(diào)?。

    簡(jiǎn)單來(lái)說(shuō),就是將擁有多個(gè)回調(diào)?法的 CompletableFuture 對(duì)象的多余的回調(diào)?法移到到 this 對(duì)象的 棧內(nèi)。

    回調(diào)?法執(zhí)?結(jié)束要么返回下?個(gè) CompletableFuture 對(duì)象,要么返回 null 然后?動(dòng)設(shè)置為 f = this, 再次從頭遍歷。

    Async

    回調(diào)函數(shù)的執(zhí)?其實(shí)分為兩種,區(qū)別在于帶不帶 Async 后綴。例如:

     
     
     
     
    1. @Test 
    2. public void demo3() throws ExecutionException, InterruptedException { 
    3.  CompletableFuture completableFuture = 
    4. CompletableFuture.supplyAsync(() -> { 
    5.  System.out.println(Thread.currentThread().getName()); 
    6.  return new ArrayList(); 
    7.  }) 
    8.  .whenComplete((arrayList, throwable) -> { 
    9.  System.out.println(Thread.currentThread().getName()); 
    10.  arrayList.add(1); 
    11.  }).whenCompleteAsync((arrayList, throwable) -> { 
    12.  System.out.println(Thread.currentThread().getName()); 
    13.  arrayList.add(2); 
    14.  }); 
    15.  System.out.println(completableFuture.get()); 

    打印輸出:

     
     
     
     
    1. ForkJoinPool.commonPool-worker-1 
    2. main 
    3. ForkJoinPool.commonPool-worker-1 
    4. [1, 2] 

    whenComplete() 和 whenCompleteAsync() ?法的區(qū)別在于是否在?即執(zhí)?。源碼如下:

     
     
     
     
    1. private CompletableFuture uniWhenCompleteStage( 
    2.  Executor e, BiConsumer f) { 
    3.  if (f == null) throw new NullPointerException(); 
    4.  CompletableFuture d = new CompletableFuture(); 
    5.  if (e != null || !d.uniWhenComplete(this, f, null)) { 
    6.  UniWhenComplete c = new UniWhenComplete(e, d, this, f); 
    7.  push(c); 
    8.  c.tryFire(SYNC); 
    9.  } 
    10.  return d; 

    兩個(gè)?法都是調(diào)?的 uniWhenCompleteStage() ,區(qū)別在于參數(shù) Executor e 是否為 null。從?控制是 否調(diào)? d.uniWhenComplete() ?法,該?法會(huì)判斷 result 是否為 null,從?嘗試是否?即執(zhí)?該回調(diào) ?法。若是 supplyAsync() ?法提交的異步任務(wù)耗時(shí)相對(duì)??些,那么就不建議使? whenComplete() ?法了。此時(shí)由 whenComplete() 和 whenCompleteAsync() ?法提交的異步任務(wù)都會(huì)由線程池執(zhí)?。

    本章小結(jié)

    通過(guò)本章節(jié)的源碼分析,我們明白了 Completion 之所以將自身設(shè)置為鏈表結(jié)構(gòu),是因?yàn)?CompletableFuture 需要借助 Completion 的鏈表結(jié)構(gòu)實(shí)現(xiàn)棧。也明白了同一個(gè) CompletableFuture 對(duì)象如果多次調(diào)用回調(diào)方法時(shí)執(zhí)行順序會(huì)與調(diào)用的順序不符合。換言之,一個(gè) CompletableFuture 對(duì)象只調(diào)用一個(gè)回調(diào)方法才是 CompletableFuture 設(shè)計(jì)的初衷,我們?cè)诰幊讨幸部梢岳眠@一特性來(lái)保證回調(diào)方法的調(diào)用順序。

    因篇幅有限,本文并沒有分析更多的 CompletableFuture 源碼,感興趣的小伙伴可以自行查看。

    六、用法集錦

    異常處理

    方法:

     
     
     
     
    1. public CompletableFuture     exceptionally(Function fn) 

    示例:

     
     
     
     
    1. @Test 
    2. public void index2() throws ExecutionException, InterruptedException { 
    3.    CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 2 / 0) 
    4.           .exceptionally((e) -> { 
    5.                System.out.println(e.getMessage()); 
    6.                return 0; 
    7.           }); 
    8.    System.out.println(completableFuture.get()); 

    輸出:

     
     
     
     
    1. java.lang.ArithmeticException: / by zero 

    任務(wù)完成后對(duì)結(jié)果的處理

    方法:

     
     
     
     
    1. public CompletableFuture   whenComplete(BiConsumer action) 
    2. public CompletableFuture  whenCompleteAsync(BiConsumer action) 
    3. public CompletableFuture  whenCompleteAsync(BiConsumer action, Executor executor) 

    示例:

     
     
     
     
    1. @Test 
    2. public void index3() throws ExecutionException, InterruptedException { 
    3.     CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> new HashMap()) 
    4.             .whenComplete((map, throwable) -> { 
    5.                 map.put("key1", "value1"); 
    6.             }); 
    7.     System.out.println(completableFuture.get()); 

    輸出:

     
     
     
     
    1. {key=value} 

    任務(wù)完成后對(duì)結(jié)果的轉(zhuǎn)換

    方法:

     
     
     
     
    1. public  CompletableFuture   thenApply(Function fn) 
    2. public  CompletableFuture  thenApplyAsync(Function fn) 
    3. public  CompletableFuture  thenApplyAsync(Function fn, Executor executor) 

    示例:

     
     
     
     
    1. @Test 
    2. public void index4() throws ExecutionException, InterruptedException { 
    3.     CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 2) 
    4.             .thenApply((r) -> r + 1); 
    5.     System.out.println(completableFuture.get()); 

    輸出:

     
     
     
     

    任務(wù)完成后對(duì)結(jié)果的消費(fèi)

    方法:

     
     
     
     
    1. public CompletableFuture    thenAccept(Consumer action) 
    2. public CompletableFuture   thenAcceptAsync(Consumer action) 
    3. public CompletableFuture   thenAcceptAsync(Consumer action, Executor executor) 

    示例:

     
     
     
     
    1. @Test 
    2. public void index5() throws ExecutionException, InterruptedException { 
    3.     CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 2) 
    4.             .thenAccept(System.out::println); 
    5.     System.out.println(completableFuture.get()); 

    輸出:

     
     
     
     
    1. null 

    任務(wù)的組合(需等待上一個(gè)任務(wù)完成)

    方法:

     
     
     
     
    1. public  CompletableFuture   thenCompose(Function> fn) 
    2. public  CompletableFuture  thenComposeAsync(Function> fn) 
    3. public  CompletableFuture  thenComposeAsync(Function> fn, Executor executor) 

    示例:

     
     
     
     
    1. @Test 
    2. public void index6() throws ExecutionException, InterruptedException { 
    3.     CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 2) 
    4.             .thenCompose(integer -> CompletableFuture.supplyAsync(() -> integer + 1)); 
    5.     System.out.println(completableFuture.get()); 

    輸出:

     
     
     
     

    任務(wù)的組合(不需等待上一步完成)

    方法:

     
     
     
     
    1. public  CompletableFuture   thenCombine(CompletionStage other, BiFunction fn) 
    2. public  CompletableFuture   thenCombineAsync(CompletionStage other, BiFunction fn) 
    3. public  CompletableFuture   thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor)  

    示例:

     
     
     
     
    1. @Test 
    2. public void index7() throws ExecutionException, InterruptedException { 
    3.     CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 2) 
    4.             .thenCombine(CompletableFuture.supplyAsync(() -> 1), (x, y) -> x + y); 
    5.     System.out.println(completableFuture.get()); 

    輸出:

     
     
     
     

    消費(fèi)最先執(zhí)行完畢的其中一個(gè)任務(wù),不返回結(jié)果

    方法:

     
     
     
     
    1. public CompletableFuture  acceptEither(CompletionStage other, Consumer action) 
    2. public CompletableFuture  acceptEitherAsync(CompletionStage other, Consumer action) 
    3. public CompletableFuture  acceptEitherAsync(CompletionStage other, Consumer action, Executor executor) 

    示例:

     
     
     
     
    1. @Test 
    2. public void index8() throws ExecutionException, InterruptedException { 
    3.     CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { 
    4.         try { 
    5.             Thread.sleep(100); 
    6.         } catch (InterruptedException e) { 
    7.             e.printStackTrace(); 
    8.         } 
    9.         return 2; 
    10.     }) 
    11.             .acceptEither(CompletableFuture.supplyAsync(() -> 1), System.out::println); 
    12.     System.out.println(completableFuture.get()); 

    輸出:

     
     
     
     
    1. null 

    消費(fèi)最先執(zhí)行完畢的其中一個(gè)任務(wù),并返回結(jié)果

    方法:

     
     
     
     
    1. public  CompletableFuture     applyToEither(CompletionStage other, Function fn) 
    2. public  CompletableFuture     applyToEitherAsync(CompletionStage other, Function fn) 
    3. public  CompletableFuture     applyToEitherAsync(CompletionStage other, Function fn, Executor executor) 

    示例:

     
     
     
     
    1. @Test 
    2. public void index9() throws ExecutionException, InterruptedException { 
    3.     CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { 
    4.         try { 
    5.             Thread.sleep(100); 
    6.         } catch (InterruptedException e) { 
    7.             e.printStackTrace(); 
    8.         } 
    9.         return 2; 
    10.     }) 
    11.             .applyToEither(CompletableFuture.supplyAsync(() -> 1), x -> x + 10); 
    12.     System.out.println(completableFuture.get()); 

    輸出:

     
     
     
     
    1. 11 

    等待所有任務(wù)完成

    方法:

     
     
     
     
    1. public static CompletableFuture allOf(CompletableFuture... cfs) 

    示例:

     
     
     
     
    1. @Test 
    2. public void index10() throws ExecutionException, InterruptedException { 
    3.     CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> { 
    4.         try { 
    5.             Thread.sleep(2000); 
    6.         } catch (InterruptedException e) { 
    7.             e.printStackTrace(); 
    8.         } 
    9.         return 1; 
    10.     }); 
    11.     CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> 2); 
    12.     CompletableFuture completableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2); 
    13.     System.out.println("waiting all task finish.."); 
    14.     System.out.println(completableFuture.get()); 
    15.     System.out.println("all task finish"); 

    輸出:

     
     
     
     
    1. waiting all task finish.. 
    2. null 
    3. all task finish 

    返回最先完成的任務(wù)結(jié)果

    方法:

     
     
     
     
    1. public static CompletableFuture anyOf(CompletableFuture... cfs) 

      示例:

       
       
       
       
      1. @Test 
      2. public void index11() throws ExecutionException, InterruptedException { 
      3.     CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> { 
      4.         try { 
      5.             Thread.sleep(100); 
      6.         } catch (InterruptedException e) { 
      7.             e.printStackTrace(); 
      8.         } 
      9.         return 1; 
      10.     }); 
      11.     CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> 2); 
      12.     CompletableFuture completableFuture = CompletableFuture.anyOf(completableFuture1, completableFuture2); 
      13.     System.out.println(completableFuture.get()); 
      14. 輸出:

         
         
         
         

        作者簡(jiǎn)介:

        薛勤,公眾號(hào)“代碼藝術(shù)”的作者,就職于阿里巴巴,熱衷于探索計(jì)算機(jī)世界的底層原理,個(gè)人在 Github@Ystcode 上擁有多個(gè)開源項(xiàng)目。

        【原創(chuàng)稿件,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文作者和出處為.com】


        新聞標(biāo)題:Java8異步編程之CompletableFuture源碼解讀
        分享URL:http://www.dlmjj.cn/article/cdihijs.html