日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
Java 8 ConcurrentHashMap源碼中竟然隱藏著兩個(gè)Bug

 Java 7的ConcurrenHashMap的源碼我建議大家都看看,那個(gè)版本的源碼就是Java多線程編程的教科書(shū)。在Java 7的源碼中,作者對(duì)悲觀鎖的使用非常謹(jǐn)慎,大多都轉(zhuǎn)換為自旋鎖加volatile獲得相同的語(yǔ)義,即使最后迫不得已要用,作者也會(huì)通過(guò)各種技巧減少鎖的臨界區(qū)。在上一篇文章中我們也有講到,自旋鎖在臨界區(qū)比較小的時(shí)候是一個(gè)較優(yōu)的選擇是因?yàn)樗苊饬司€程由于阻塞而切換上下文,但本質(zhì)上它也是個(gè)鎖,在自旋等待期間只有一個(gè)線程能進(jìn)入臨界區(qū),其他線程只會(huì)自旋消耗CPU的時(shí)間片。Java 8中ConcurrentHashMap的實(shí)現(xiàn)通過(guò)一些巧妙的設(shè)計(jì)和技巧,避開(kāi)了自旋鎖的局限,提供了更高的并發(fā)性能。如果說(shuō)Java 7版本的源碼是在教我們?nèi)绾螌⒈^鎖轉(zhuǎn)換為自旋鎖,那么在Java 8中我們甚至可以看到如何將自旋鎖轉(zhuǎn)換為無(wú)鎖的方法和技巧。

新區(qū)網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)建站!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、APP開(kāi)發(fā)、響應(yīng)式網(wǎng)站設(shè)計(jì)等網(wǎng)站項(xiàng)目制作,到程序開(kāi)發(fā),運(yùn)營(yíng)維護(hù)。創(chuàng)新互聯(lián)建站成立于2013年到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)建站。

把書(shū)讀薄

image

圖片來(lái)源:https://www.zhenchao.org/2019/01/31/java/cas-based-concurrent-hashmap/

在開(kāi)始本文之前,大家首先在心里還是要有這樣的一張圖,如果有同學(xué)對(duì)HashMap比較熟悉,那這張圖也應(yīng)該不會(huì)陌生。事實(shí)上在整體的數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)上Java 8的ConcurrentHashMap和HashMap基本上是一致的。

Java 7中ConcurrentHashMap為了提升性能使用了很多的編程技巧,但是引入Segment的設(shè)計(jì)還是有很大的改進(jìn)空間的,Java 7中ConcurrrentHashMap的設(shè)計(jì)有下面這幾個(gè)可以改進(jìn)的點(diǎn):

    1.  Segment在擴(kuò)容的時(shí)候非擴(kuò)容線程對(duì)本Segment的寫操作時(shí)都要掛起等待的

    2.  對(duì)ConcurrentHashMap的讀操作需要做兩次哈希尋址,在讀多寫少的情況下其實(shí)是有額外的性能損失的

    3.  盡管size()方法的實(shí)現(xiàn)中先嘗試無(wú)鎖讀,但是如果在這個(gè)過(guò)程中有別的線程做寫入操作,那調(diào)用size()的這個(gè)線程就會(huì)給整個(gè)ConcurrentHashMap加鎖,這是整個(gè)ConcurrrentHashMap唯一一個(gè)全局鎖,這點(diǎn)對(duì)底層的組件來(lái)說(shuō)還是有性能隱患的

    4.  極端情況下(比如客戶端實(shí)現(xiàn)了一個(gè)性能很差的哈希函數(shù))get()方法的復(fù)雜度會(huì)退化到O(n)。

針對(duì)1和2,在Java 8的設(shè)計(jì)是廢棄了Segment的使用,將悲觀鎖的粒度降低至桶維度,因此調(diào)用get的時(shí)候也不需要再做兩次哈希了。size()的設(shè)計(jì)是Java 8版本中最大的亮點(diǎn),我們?cè)诤竺娴奈恼轮袝?huì)詳細(xì)說(shuō)明。至于紅黑樹(shù),這篇文章仍然不做過(guò)多闡述。接下來(lái)的篇幅會(huì)深挖細(xì)節(jié),把書(shū)讀厚,涉及到的模塊有:初始化,put方法, 擴(kuò)容方法transfer以及size()方法,而其他模塊,比如hash函數(shù)等改變較小,故不再深究。

準(zhǔn)備知識(shí)

ForwardingNode

 
 
 
 
  1. static final class ForwardingNode extends Node {  
  2.     final Node[] nextTable;  
  3.     ForwardingNode(Node[] tab) {  
  4.         // MOVED = -1,F(xiàn)orwardingNode的哈希值為-1  
  5.         super(MOVED, null, null, null);  
  6.         this.nextTable = tab;  
  7.     }  

除了普通的Node和TreeNode之外,ConcurrentHashMap還引入了一個(gè)新的數(shù)據(jù)類型ForwardingNode,我們這里只展示他的構(gòu)造方法,F(xiàn)orwardingNode的作用有兩個(gè):

  •  在動(dòng)態(tài)擴(kuò)容的過(guò)程中標(biāo)志某個(gè)桶已經(jīng)被復(fù)制到了新的桶數(shù)組中
  •  如果在動(dòng)態(tài)擴(kuò)容的時(shí)候有g(shù)et方法的調(diào)用,則ForwardingNode將會(huì)把請(qǐng)求轉(zhuǎn)發(fā)到新的桶數(shù)組中,以避免阻塞get方法的調(diào)用,F(xiàn)orwardingNode在構(gòu)造的時(shí)候會(huì)將擴(kuò)容后的桶數(shù)組nextTable保存下來(lái)。

UNSAFE.compareAndSwap***

這是在Java 8版本的ConcurrentHashMap實(shí)現(xiàn)CAS的工具,以int類型為例其方法定義如下:

 
 
 
 
  1. /**  
  2. * Atomically update Java variable to x if it is currently  
  3. * holding expected.  
  4. * @return true if successful  
  5. */  
  6. public final native boolean compareAndSwapInt(Object o, long offset,  
  7.                                               int expected,  
  8.                                               int x); 

相應(yīng)的語(yǔ)義為:

如果對(duì)象o起始地址偏移量為offset的值等于expected,則將該值設(shè)為x,并返回true表明更新成功,否則返回false,表明CAS失敗

初始化 

 
 
 
 
  1. public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {  
  2.     if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 檢查參數(shù)  
  3.         throw new IllegalArgumentException();  
  4.     if (initialCapacity < concurrencyLevel)  
  5.         initialCapacity = concurrencyLevel;  
  6.     long size = (long)(1.0 + (long)initialCapacity / loadFactor);  
  7.     int cap = (size >= (long)MAXIMUM_CAPACITY) ?  
  8.         MAXIMUM_CAPACITY : tableSizeFor((int)size); // tableSizeFor,求不小于size的 2^n的算法,jdk1.8的HashMap中說(shuō)過(guò)  
  9.     this.sizeCtl = cap;   

即使是最復(fù)雜的一個(gè)初始化方法代碼也是比較簡(jiǎn)單的,這里我們只需要注意兩個(gè)點(diǎn):

  •  concurrencyLevel在Java 7中是Segment數(shù)組的長(zhǎng)度,由于在Java 8中已經(jīng)廢棄了Segment,因此concurrencyLevel只是一個(gè)保留字段,無(wú)實(shí)際意義
  •  sizeCtl這個(gè)值第一次出現(xiàn),這個(gè)值如果等于-1則表明系統(tǒng)正在初始化,如果是其他負(fù)數(shù)則表明系統(tǒng)正在擴(kuò)容,在擴(kuò)容時(shí)sizeCtl二進(jìn)制的低十六位等于擴(kuò)容的線程數(shù)加一,高十六位(除符號(hào)位之外)包含桶數(shù)組的大小信息

put方法 

 
 
 
 
  1. public V put(K key, V value) {  
  2.     return putVal(key, value, false);  

put方法將調(diào)用轉(zhuǎn)發(fā)到putVal方法:

 
 
 
 
  1. final V putVal(K key, V value, boolean onlyIfAbsent) {  
  2.     if (key == null || value == null) throw new NullPointerException();  
  3.     int hash = spread(key.hashCode());  
  4.     int binCount = 0;  
  5.     for (Node[] tab = table;;) {  
  6.         Node f; int n, i, fh;  
  7.         // 【A】延遲初始化  
  8.         if (tab == null || (n = tab.length) == 0)  
  9.             tab = initTable();  
  10.         // 【B】當(dāng)前桶是空的,直接更新  
  11.         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {  
  12.             if (casTabAt(tab, i, null, 
  13.                              new Node(hash, key, value, null)))  
  14.                 break;                   // no lock when adding to empty bin  
  15.         } 
  16.         // 【C】如果當(dāng)前的桶的第一個(gè)元素是一個(gè)ForwardingNode節(jié)點(diǎn),則該線程嘗試加入擴(kuò)容  
  17.         else if ((ffh = f.hash) == MOVED)  
  18.             tab = helpTransfer(tab, f);  
  19.         // 【D】否則遍歷桶內(nèi)的鏈表或樹(shù),并插入 
  20.          else {  
  21.             // 暫時(shí)折疊起來(lái),后面詳細(xì)看  
  22.         }  
  23.     }  
  24.     // 【F】流程走到此處,說(shuō)明已經(jīng)put成功,map的記錄總數(shù)加一  
  25.     addCount(1L, binCount);  
  26.     return null;  

從整個(gè)代碼結(jié)構(gòu)上來(lái)看流程還是比較清楚的,我用括號(hào)加字母的方式標(biāo)注了幾個(gè)非常重要的步驟,put方法依然牽扯出很多的知識(shí)點(diǎn)

桶數(shù)組的初始化 

 
 
 
 
  1. private final Node[] initTable() {  
  2.     Node[] tab; int sc;  
  3.     while ((tab = table) == null || tab.length == 0) {  
  4.         if ((sc = sizeCtl) < 0)  
  5.             // 說(shuō)明已經(jīng)有線程在初始化了,本線程開(kāi)始自旋  
  6.             Thread.yield(); // lost initialization race; just spin  
  7.         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  
  8.             // CAS保證只有一個(gè)線程能走到這個(gè)分支  
  9.             try {  
  10.                 if ((tab = table) == null || tab.length == 0) {  
  11.                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;  
  12.                     @SuppressWarnings("unchecked")  
  13.                     Node[] nt = (Node[])new Node[n];  
  14.                     tabtable = tab = nt;  
  15.                     // sc = n - n/4 = 0.75n  
  16.                     sc = n - (n >>> 2);  
  17.                 }  
  18.             } finally {  
  19.                 // 恢復(fù)sizeCtl > 0相當(dāng)于釋放鎖  
  20.                 sizeCtl = sc;  
  21.             }  
  22.             break;  
  23.         }  
  24.     }  
  25.     return tab;  

在初始化桶數(shù)組的過(guò)程中,系統(tǒng)如何保證不會(huì)出現(xiàn)并發(fā)問(wèn)題呢,關(guān)鍵點(diǎn)在于自旋鎖的使用,當(dāng)有多個(gè)線程都執(zhí)行initTable方法的時(shí)候,CAS可以保證只有一個(gè)線程能夠進(jìn)入到真正的初始化分支,其他線程都是自旋等待。這段代碼中我們關(guān)注三點(diǎn)即可:

  •  依照前文所述,當(dāng)有線程開(kāi)始初始化桶數(shù)組時(shí),會(huì)通過(guò)CAS將sizeCtl置為-1,其他線程以此為標(biāo)志開(kāi)始自旋等待
  •  當(dāng)桶數(shù)組初始化結(jié)束后將sizeCtl的值恢復(fù)為正數(shù),其值等于0.75倍的桶數(shù)組長(zhǎng)度,這個(gè)值的含義和之前HashMap中的THRESHOLD一致,是系統(tǒng)觸發(fā)擴(kuò)容的臨界點(diǎn)
  •  在finally語(yǔ)句中對(duì)sizeCtl的操作并沒(méi)有使用CAS是因?yàn)镃AS保證只有一個(gè)線程能夠執(zhí)行到這個(gè)地方

添加桶數(shù)組第一個(gè)元素 

 
 
 
 
  1. static final  Node tabAt(Node[] tab, int i) {  
  2.     return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);  
  3. }  
  4. static final  boolean casTabAt(Node[] tab, int i,  
  5.                                     Node c, Node v) {  
  6.     return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);  

put方法的第二個(gè)分支會(huì)用tabAt判斷當(dāng)前桶是否是空的,如果是則會(huì)通過(guò)CAS寫入,tabAt通過(guò)UNSAFE接口會(huì)拿到桶中的最新元素,casTabAt通過(guò)CAS保證不會(huì)有并發(fā)問(wèn)題,如果CAS失敗,則通過(guò)循環(huán)再進(jìn)入其他分支

判斷是否需要新增線程擴(kuò)容 

 
 
 
 
  1. final Node[] helpTransfer(Node[] tab, Node f) {  
  2.     Node[] nextTab; int sc;  
  3.     if (tab != null && (f instanceof ForwardingNode) &&  
  4.         (nextTab = ((ForwardingNode)f).nextTable) != null) {  
  5.         int rs = resizeStamp(tab.length);  
  6.         while (nextTab == nextTable && table == tab &&  
  7.                 (sc = sizeCtl) < 0) {  
  8.             // RESIZE_STAMP_SHIFT = 16  
  9.             if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||  
  10.                 sc == rs + MAX_RESIZERS || transferIndex <= 0)  
  11.                 break;  
  12.             // 這里將sizeCtl的值自增1,表明參與擴(kuò)容的線程數(shù)量+1  
  13.             if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {  
  14.                 transfer(tab, nextTab);  
  15.                 break;  
  16.             }  
  17.         }  
  18.         return nextTab;  
  19.     }  
  20.     return table;  

在這個(gè)地方我們就要詳細(xì)說(shuō)下sizeCtl這個(gè)標(biāo)志位了,臨時(shí)變量rs由resizeStamp這個(gè)方法返回

 
 
 
 
  1. static final int resizeStamp(int n) {  
  2.     // RESIZE_STAMP_BITS = 16  
  3.     return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));  

因?yàn)槿雲(yún)是一個(gè)int類型的值,所有Integer.numberOfLeadingZeros(n)的返回值介于0到32之間,如果轉(zhuǎn)換成二進(jìn)制

  •  Integer.numberOfLeadingZeros(n)的最大值是:00000000 00000000 00000000 00100000
  •  Integer.numberOfLeadingZeros(n)的最小值是:00000000 00000000 00000000 00000000

因此resizeStampd的返回值也就介于00000000 00000000 10000000 00000000到00000000 00000000 10000000 00100000之間,從這個(gè)返回值的范圍可以看出來(lái)resizeStamp的返回值高16位全都是0,是不包含任何信息的。因此在ConcurrrentHashMap中,會(huì)把resizeStamp的返回值左移16位拼到sizeCtl中,這就是為什么sizeCtl的高16位包含整個(gè)Map大小的原理。有了這個(gè)分析,這段代碼中比較長(zhǎng)的if判斷也就能看懂了

 
 
 
 
  1. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||  
  2.     sc == rs + MAX_RESIZERS || transferIndex <= 0)  
  3.     break;  
  4.     (sc >>> RESIZE_STAMP_SHIFT) != rs保證所有線程要基于同一個(gè)舊的桶數(shù)組擴(kuò)容  
  5.     transferIndex <= 0已經(jīng)有線程完成擴(kuò)容任務(wù)了 

至于sc == rs + 1 || sc == rs + MAX_RESIZERS這兩個(gè)判斷條件如果是細(xì)心的同學(xué)一定會(huì)覺(jué)得難以理解,這個(gè)地方確實(shí)是JDK的一個(gè)BUG,這個(gè)BUG已經(jīng)在JDK 12中修復(fù),詳細(xì)情況可以參考一下Oracle的官網(wǎng):https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427,這兩個(gè)判斷條件應(yīng)該寫成這樣:sc == (rs << RESIZE_STAMP_SHIFT) + 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS,因?yàn)橹苯颖容^rs和sc是沒(méi)有意義的,必須要有移位操作。它表達(dá)的含義是

  •  sc == (rs << RESIZE_STAMP_SHIFT) + 1當(dāng)前擴(kuò)容的線程數(shù)為0,即已經(jīng)擴(kuò)容完成了,就不需要再新增線程擴(kuò)容
  •  sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS參與擴(kuò)容的線程數(shù)已經(jīng)到了最大,就不需要再新增線程擴(kuò)容

真正擴(kuò)容的邏輯在transfer方法中,我們后面會(huì)詳細(xì)看,不過(guò)有個(gè)小細(xì)節(jié)可以提前注意,如果nextTable已經(jīng)初始化了,transfer會(huì)返回nextTable的的引用,后續(xù)可以直接操作新的桶數(shù)組。

插入新值

如果桶數(shù)組已經(jīng)初始化好了,該擴(kuò)容的也擴(kuò)容了,并且根據(jù)哈希定位到的桶中已經(jīng)有元素了,那流程就跟普通的HashMap一樣了,唯一一點(diǎn)不同的就是,這時(shí)候要給當(dāng)前的桶加鎖,且看代碼:

 
 
 
 
  1. final V putVal(K key, V value, boolean onlyIfAbsent) {  
  2.     if (key == null || value == null) throw new NullPointerException();  
  3.     int hash = spread(key.hashCode());  
  4.     int binCount = 0;  
  5.     for (Node[] tab = table;;) {  
  6.         Node f; int n, i, fh;  
  7.         if (tab == null || (n = tab.length) == 0)// 折疊  
  8.         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 折疊}  
  9.         else if ((ffh = f.hash) == MOVED)// 折疊  
  10.         else {  
  11.             V oldVal = null;  
  12.             synchronized (f) {  
  13.                 // 要注意這里這個(gè)不起眼的判斷條件  
  14.                 if (tabAt(tab, i) == f) {  
  15.                     if (fh >= 0) { // fh>=0的節(jié)點(diǎn)是鏈表,否則是樹(shù)節(jié)點(diǎn)或者ForwardingNode  
  16.                         binCount = 1;  
  17.                         for (Node e = f;; ++binCount) {  
  18.                             K ek;  
  19.                             if (e.hash == hash &&  
  20.                                 ((eek = e.key) == key ||  
  21.                                     (ek != null && key.equals(ek)))) {  
  22.                                 oldVal = e.val; // 如果鏈表中有值了,直接更新  
  23.                                 if (!onlyIfAbsent)  
  24.                                     e.val = value;  
  25.                                 break;  
  26.                             }  
  27.                             Node pred = e;  
  28.                             if ((ee = e.next) == null) {  
  29.                                 // 如果流程走到這里,則說(shuō)明鏈表中還沒(méi)值,直接連接到鏈表尾部  
  30.                                 pred.next = new Node(hash, key, value, null);  
  31.                                 break;  
  32.                             }  
  33.                         }  
  34.                     }  
  35.                     // 紅黑樹(shù)的操作先略過(guò)  
  36.                 }  
  37.             }  
  38.         }  
  39.     }  
  40.     // put成功,map的元素個(gè)數(shù)+1  
  41.     addCount(1L, binCount);  
  42.     return null;  

這段代碼中要特備注意一個(gè)不起眼的判斷條件(上下文在源碼上已經(jīng)標(biāo)注出來(lái)了):tabAt(tab, i) == f,這個(gè)判斷的目的是為了處理調(diào)用put方法的線程和擴(kuò)容線程的競(jìng)爭(zhēng)。因?yàn)閟ynchronized是阻塞鎖,如果調(diào)用put方法的線程恰好和擴(kuò)容線程同時(shí)操作同一個(gè)桶,且調(diào)用put方法的線程競(jìng)爭(zhēng)鎖失敗,等到該線程重新獲取到鎖的時(shí)候,當(dāng)前桶中的元素就會(huì)變成一個(gè)ForwardingNode,那就會(huì)出現(xiàn)tabAt(tab, i) != f的情況。

多線程動(dòng)態(tài)擴(kuò)容 

 
 
 
 
  1. private final void transfer(Node[] tab, Node[] nextTab) {  
  2.     int n = tab.length, stride;  
  3.     if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)  
  4.         stride = MIN_TRANSFER_STRIDE; // subdivide range  
  5.     if (nextTab == null) {            // 初始化新的桶數(shù)組  
  6.         try {  
  7.             @SuppressWarnings("unchecked")  
  8.             Node[] nt = (Node[])new Node[n << 1];  
  9.             nextTab = nt;  
  10.         } catch (Throwable ex) {      // try to cope with OOME  
  11.             sizeCtl = Integer.MAX_VALUE;  
  12.             return; 
  13.         }  
  14.         nextTabnextTable = nextTab;  
  15.         transferIndex = n;  
  16.     }  
  17.     int nextn = nextTab.length;  
  18.     ForwardingNode fwd = new ForwardingNode(nextTab);  
  19.     boolean advance = true;  
  20.     boolean finishing = false; // to ensure sweep before committing nextTab  
  21.     for (int i = 0, bound = 0;;) {  
  22.         Node f; int fh;  
  23.         while (advance) {  
  24.             int nextIndex, nextBound; 
  25.             if (--i >= bound || finishing)  
  26.                 advance = false;  
  27.             else if ((nextIndex = transferIndex) <= 0) {  
  28.                 i = -1;  
  29.                 advance = false;  
  30.             }  
  31.             else if (U.compareAndSwapInt  
  32.                         (this, TRANSFERINDEX, nextIndex,  
  33.                         nextBound = (nextIndex > stride ?  
  34.                                     nextIndex - stride : 0))) {  
  35.                 bound = nextBound;  
  36.                 i = nextIndex - 1;  
  37.                 advance = false;  
  38.             }  
  39.         }  
  40.         if (i < 0 || i >= n || i + n >= nextn) {  
  41.             int sc;  
  42.             if (finishing) {  
  43.                 nextTable = null;  
  44.                 table = nextTab;  
  45.                 sizeCtl = (n << 1) - (n >>> 1);  
  46.                 return;  
  47.             }  
  48.             if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  
  49.                 // 判斷是會(huì)否是最后一個(gè)擴(kuò)容線程  
  50.                 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)  
  51.                     return;  
  52.                 finishing = advance = true;  
  53.                 i = n; // recheck before commit  
  54.             }  
  55.         }  
  56.         else if ((f = tabAt(tab, i)) == null)  
  57.             advance = casTabAt(tab, i, null, fwd);  
  58.         else if ((ffh = f.hash) == MOVED) // 只有最后一個(gè)擴(kuò)容線程才有機(jī)會(huì)執(zhí)行這個(gè)分支  
  59.             advance = true; // already processed  
  60.         else { // 復(fù)制過(guò)程與HashMap類似,這里不再贅述  
  61.             synchronized (f) {  
  62.                // 折疊  
  63.             }  
  64.         }  
  65.     }  

在深入到源碼細(xì)節(jié)之前我們先根據(jù)下圖看一下在Java 8中ConcurrentHashMap擴(kuò)容的幾個(gè)特點(diǎn):

  •  新的桶數(shù)組nextTable是原先桶數(shù)組長(zhǎng)度的2倍,這與之前HashMap一致
  •  參與擴(kuò)容的線程也是分段將table中的元素復(fù)制到新的桶數(shù)組nextTable中
  •  桶一個(gè)桶數(shù)組中的元素在新的桶數(shù)組中均勻的分布在兩個(gè)桶中,桶下標(biāo)相差n(舊的桶數(shù)組的長(zhǎng)度),這一點(diǎn)依然與HashMap保持一致

image-20210424202636495

各個(gè)線程之間如何通力協(xié)作

先看一個(gè)關(guān)鍵的變量transferIndex,這是一個(gè)被volatile修飾的變量,這一點(diǎn)可以保證所有線程讀到的一定是最新的值。

 
 
 
 
  1. private transient volatile int transferIndex; 

這個(gè)值會(huì)被第一個(gè)參與擴(kuò)容的線程初始化,因?yàn)橹挥械谝粋€(gè)參與擴(kuò)容的線程才滿足條件nextTab == null

 
 
 
 
  1. if (nextTab == null) {            // initiating  
  2.     try {  
  3.         @SuppressWarnings("unchecked")  
  4.         Node[] nt = (Node[])new Node[n << 1];  
  5.         nextTab = nt;  
  6.     } catch (Throwable ex) {      // try to cope with OOME  
  7.         sizeCtl = Integer.MAX_VALUE;  
  8.         return; 
  9.     }  
  10.     nextTabnextTable = nextTab;  
  11.     transferIndex = n;  

在了解了transferIndex屬性的基礎(chǔ)上,上面的這個(gè)循環(huán)就好理解了

 
 
 
 
  1. while (advance) {  
  2.     int nextIndex, nextBound;  
  3.       // 當(dāng)bound <= i <= transferIndex的時(shí)候i自減跳出這個(gè)循環(huán)繼續(xù)干活  
  4.     if (--i >= bound || finishing)  
  5.         advance = false;  
  6.     // 擴(kuò)容的所有任務(wù)已經(jīng)被認(rèn)領(lǐng)完畢,本線程結(jié)束干活  
  7.     else if ((nextIndex = transferIndex) <= 0) {  
  8.         i = -1;  
  9.         advance = false; 
  10.      }  
  11.     // 否則認(rèn)領(lǐng)新的一段復(fù)制任務(wù),并通過(guò)`CAS`更新transferIndex的值  
  12.     else if (U.compareAndSwapInt 
  13.                  (this, TRANSFERINDEX, nextIndex,  
  14.                 nextBound = (nextIndex > stride ?  
  15.                             nextIndex - stride : 0))) {  
  16.         bound = nextBound;  
  17.         i = nextIndex - 1;  
  18.         advance = false;  
  19.     }  

transferIndex就像是一個(gè)游標(biāo),每個(gè)線程認(rèn)領(lǐng)一段復(fù)制任務(wù)的時(shí)候都會(huì)通過(guò)CAS將其更新為transferIndex - stride, CAS可以保證transferIndex可以按照stride這個(gè)步長(zhǎng)降到0。

最后一個(gè)擴(kuò)容線程需要二次確認(rèn)?

對(duì)于每一個(gè)擴(kuò)容線程,for循環(huán)的變量i代表要復(fù)制的桶的在桶數(shù)組中的下標(biāo),這個(gè)值的上限和下限通過(guò)游標(biāo)transferIndex和步長(zhǎng)stride計(jì)算得來(lái),當(dāng)i減小為負(fù)數(shù),則說(shuō)明當(dāng)前擴(kuò)容線程完成了擴(kuò)容任務(wù),這時(shí)候流程會(huì)走到這個(gè)分支:

 
 
 
 
  1. // i >= n || i + n >= nextn現(xiàn)在看來(lái)取不到  
  2. if (i < 0 || i >= n || i + n >= nextn) {  
  3.     int sc;  
  4.     if (finishing) { // 【A】完成整個(gè)擴(kuò)容過(guò)程  
  5.         nextTable = null;  
  6.         table = nextTab;  
  7.      
    網(wǎng)站欄目:Java 8 ConcurrentHashMap源碼中竟然隱藏著兩個(gè)Bug
    URL標(biāo)題:http://www.dlmjj.cn/article/djoppch.html