日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
JavaConcurrentHashMap高并發(fā)安全實(shí)現(xiàn)原理解析

 一、概述

ConcurrentHashMap (以下簡稱C13Map) 是并發(fā)編程出場率最高的數(shù)據(jù)結(jié)構(gòu)之一,大量的并發(fā)CASE背后都有C13Map的支持,同時也是JUC包中代碼量最大的組件(6000多行),自JDK8開始Oracle對其進(jìn)行了大量優(yōu)化工作。

本文從 HashMap 的基礎(chǔ)知識開始,嘗試逐一分析C13Map中各個組件的實(shí)現(xiàn)和安全性保證。

二、HashMap基礎(chǔ)知識 

分析C13MAP前,需要了解以下的HashMap知識或者約定:

  •  哈希表的長度永遠(yuǎn)都是2的冪次方,原因是hashcode%tableSize==hashcode&(tableSize-1),也就是哈希槽位的確定可以用一次與運(yùn)算來替代取余運(yùn)算。
  •  會對hashcode調(diào)用若干次擾動函數(shù),將高16位與低16位做異或運(yùn)算,因?yàn)楦?6位的隨機(jī)性更強(qiáng)。
  •  當(dāng)表中的元素總數(shù)超過tableSize * 0.75時,哈希表會發(fā)生擴(kuò)容操作,每次擴(kuò)容的tableSize是原先的兩倍。
  •  下文提到的槽位(bucket)、哈希分桶、BIN均表示同一個概念,即哈希table上的某一列。
  •  舊表在做搬運(yùn)時i槽位的node可以根據(jù)其哈希值的第tableSize位的bit決定在新表上的槽位是i還是i+tableSize。
  •  每個槽位上有可能會出現(xiàn)哈希沖突,在未達(dá)到某個閾值時它是一個鏈表結(jié)構(gòu),達(dá)到閾值后會升級到紅黑樹結(jié)構(gòu)。
  •  HashMap本身并非為多線程環(huán)境設(shè)計,永遠(yuǎn)不要嘗試在并發(fā)環(huán)境下直接使用HashMap,C13Map不存在這個安全問題。

三、C13Map的字段定義

C13Map的字段定義 

 
 
 
 
  1. //最大容量  
  2. private static final int MAXIMUM_CAPACITY = 1 << 30;   
  3. //默認(rèn)初始容量  
  4. private static final int DEFAULT_CAPACITY = 16;  
  5. //數(shù)組的最大容量,防止拋出OOM  
  6. static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;   
  7. //最大并行度,僅用于兼容JDK1.7以前版本  
  8. private static final int DEFAULT_CONCURRENCY_LEVEL = 16;  
  9. //擴(kuò)容因子  
  10. private static final float LOAD_FACTOR = 0.75f;  
  11. //鏈表轉(zhuǎn)紅黑樹的閾值  
  12. static final int TREEIFY_THRESHOLD = 8;  
  13. //紅黑樹退化閾值  
  14. static final int UNTREEIFY_THRESHOLD = 6;  
  15. //鏈表轉(zhuǎn)紅黑樹的最小總量  
  16. static final int MIN_TREEIFY_CAPACITY = 64;  
  17. //擴(kuò)容搬運(yùn)時批量搬運(yùn)的最小槽位數(shù)  
  18. private static final int MIN_TRANSFER_STRIDE = 16;  
  19. //當(dāng)前待擴(kuò)容table的郵戳位,通常是高16位  
  20. private static final int RESIZE_STAMP_BITS = 16;  
  21. //同時搬運(yùn)的線程數(shù)自增的最大值  
  22. private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;  
  23. //搬運(yùn)線程數(shù)的標(biāo)識位,通常是低16位  
  24. private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;  
  25. static final int MOVED     = -1; // 說明是forwardingNode  
  26. static final int TREEBIN   = -2; // 紅黑樹  
  27. static final int RESERVED  = -3; // 原子計算的占位Node  
  28. static final int HASH_BITS = 0x7fffffff; // 保證hashcode擾動計算結(jié)果為正數(shù)  
  29. //當(dāng)前哈希表  
  30. transient volatile Node[] table;  
  31. //下一個哈希表  
  32. private transient volatile Node[] nextTable;  
  33. //計數(shù)的基準(zhǔn)值  
  34. private transient volatile long baseCount;  
  35. //控制變量,不同場景有不同用途,參考下文  
  36. private transient volatile int sizeCtl;  
  37. //并發(fā)搬運(yùn)過程中CAS獲取區(qū)段的下限值  
  38. private transient volatile int transferIndex;  
  39. //計數(shù)cell初始化或者擴(kuò)容時基于此字段使用自旋鎖  
  40. private transient volatile int cellsBusy;  
  41. //加速多核CPU計數(shù)的cell數(shù)組  
  42. private transient volatile CounterCell[] counterCells; 

四、安全操作Node數(shù)組 

 
 
 
 
  1. static final  Node tabAt(Node[] tab, int i) {  
  2.     return (Node)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);  
  3. }  
  4. static final  boolean casTabAt(Node[] tab, int i,  
  5.                                     Node c, Node v) {  
  6.     return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);  
  7. }  
  8. static final  void setTabAt(Node[] tab, int i, Node v) {  
  9.     U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v); 

對Node[] 上任意一個index的讀取和寫入都使用了Unsafe輔助類,table本身是volatile類型的并不能保證其下的每個元素的內(nèi)存語義也是volatile類型;

需要借助于Unsafe來保證Node[]元素的“讀/寫/CAS”操作在多核并發(fā)環(huán)境下的原子或者可見性。

五、讀操作get為什么是線程安全的

首先需要明確的是,C13Map的讀操作一般是不加鎖的(TreeBin的讀寫鎖除外),而讀操作與寫操作有可能并行;可以保證的是,因?yàn)镃13Map的寫操作都要獲取bin頭部的syncronized互斥鎖,能保證最多只有一個線程在做更新,這其實(shí)是一個單線程寫、多線程讀的并發(fā)安全性的問題。

C13Map的get方法 

 
 
 
 
  1. public V get(Object key) {  
  2.     Node[] tab; Node e, p; int n, eh; K ek;  
  3.     //執(zhí)行擾動函數(shù)  
  4.     int h = spread(key.hashCode());  
  5.     if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {  
  6.         if ((eeh = e.hash) == h) {  
  7.             if ((eek = e.key) == key || (ek != null && key.equals(ek)))  
  8.                 return e.val;  
  9.         } 
  10.          else if (eh < 0)  
  11.             return (p = e.find(h, key)) != null ? p.val : null;  
  12.         while ((ee = e.next) != null) {  
  13.             if (e.hash == h &&  
  14.                 ((eek = e.key) == key || (ek != null && key.equals(ek))))  
  15.                 return e.val; 
  16.          }  
  17.     }  
  18.     return null;  

1、如果當(dāng)前哈希表table為null

哈希表未初始化或者正在初始化未完成,直接返回null;雖然line5和line18之間其它線程可能經(jīng)歷了千山萬水,至少在判斷tab==null的時間點(diǎn)key肯定是不存在的,返回null符合某一時刻的客觀事實(shí)。

2、如果讀取的bin頭節(jié)點(diǎn)為null

說明該槽位尚未有節(jié)點(diǎn),直接返回null。

3、如果讀取的bin是一個鏈表

說明頭節(jié)點(diǎn)是個普通Node。

(1)如果正在發(fā)生鏈表向紅黑樹的treeify工作,因?yàn)閠reeify本身并不破壞舊的鏈表bin的結(jié)構(gòu),只是在全部treeify完成后將頭節(jié)點(diǎn)一次性替換為新創(chuàng)建的TreeBin,可以放心讀取。

(2)如果正在發(fā)生resize且當(dāng)前bin正在被transfer,因?yàn)閠ransfer本身并不破壞舊的鏈表bin的結(jié)構(gòu),只是在全部transfer完成后將頭節(jié)點(diǎn)一次性替換為ForwardingNode,可以放心讀取。

(3)如果其它線程正在操作鏈表,在當(dāng)前線程遍歷鏈表的任意一個時間點(diǎn),都有可能同時在發(fā)生add/replace/remove操作。

  •  如果是add操作,因?yàn)殒湵淼墓?jié)點(diǎn)新增從JDK8以后都采用了后入式,無非是多遍歷或者少遍歷一個tailNode。
  •  如果是remove操作,存在遍歷到某個Node時,正好有其它線程將其remove,導(dǎo)致其孤立于整個鏈表之外;但因?yàn)槠鋘ext引用未發(fā)生變更,整個鏈表并沒有斷開,還是可以照常遍歷鏈表直到tailNode。
  •  如果是replace操作,鏈表的結(jié)構(gòu)未變,只是某個Node的value發(fā)生了變化,沒有安全問題。

結(jié)論:對于鏈表這種線性數(shù)據(jù)結(jié)構(gòu),單線程寫且插入操作保證是后入式的前提下,并發(fā)讀取是安全的;不會存在誤讀、鏈表斷開導(dǎo)致的漏讀、讀到環(huán)狀鏈表等問題。

4、如果讀取的bin是一個紅黑樹

說明頭節(jié)點(diǎn)是個TreeBin節(jié)點(diǎn)。

(1)如果正在發(fā)生紅黑樹向鏈表的untreeify操作,因?yàn)閡ntreeify本身并不破壞舊的紅黑樹結(jié)構(gòu),只是在全部untreeify完成后將頭節(jié)點(diǎn)一次性替換為新創(chuàng)建的普通Node,可以放心讀取。

(2)如果正在發(fā)生resize且當(dāng)前bin正在被transfer,因?yàn)閠ransfer本身并不破壞舊的紅黑樹結(jié)構(gòu),只是在全部transfer完成后將頭節(jié)點(diǎn)一次性替換為ForwardingNode,可以放心讀取。

(3)如果其他線程在操作紅黑樹,在當(dāng)前線程遍歷紅黑樹的任意一個時間點(diǎn),都可能有單個的其它線程發(fā)生add/replace/remove/紅黑樹的翻轉(zhuǎn)等操作,參考下面的紅黑樹的讀寫鎖實(shí)現(xiàn)。

TreeBin中的讀寫鎖實(shí)現(xiàn) 

 
 
 
 
  1.  TreeNode root;  
  2.     volatile TreeNode first;  
  3.     volatile Thread waiter;  
  4.     volatile int lockState;  
  5.     // values for lockState  
  6.     static final int WRITER = 1; // set while holding write lock  
  7.     static final int WAITER = 2; // set when waiting for write lock  
  8.     static final int READER = 4; // increment value for setting read lock  
  9.     private final void lockRoot() {  
  10.         //如果一次性獲取寫鎖失敗,進(jìn)入contendedLock循環(huán)體,循環(huán)獲取寫鎖或者休眠等待  
  11.         if (!U.compareAndSetInt(this, LOCKSTATE, 0, WRITER))  
  12.             contendedLock(); // offload to separate method  
  13.     }  
  14.     private final void unlockRoot() {  
  15.         lockState = 0; 
  16.     }  
  17.     //對紅黑樹加互斥鎖,也就是寫鎖  
  18.     private final void contendedLock() {  
  19.         boolean waiting = false;  
  20.         for (int s;;) {  
  21.             //如果lockState除了第二位外其它位上都為0,表示紅黑樹當(dāng)前既沒有上讀鎖,又沒有上寫鎖,僅有可能存在waiter,可以嘗試直接獲取寫鎖  
  22.             if (((s = lockState) & ~WAITER) == 0) {  
  23.                 if (U.compareAndSetInt(this, LOCKSTATE, s, WRITER)) { 
  24.                      if (waiting)  
  25.                         waiter = null;  
  26.                     return; 
  27.                 }  
  28.             }  
  29.             //如果lockState第二位是0,表示當(dāng)前沒有線程在等待寫鎖  
  30.             else if ((s & WAITER) == 0) {  
  31.                 //將lockState的第二位設(shè)置為1,相當(dāng)于打上了waiter的標(biāo)記,表示有線程在等待寫鎖  
  32.                 if (U.compareAndSetInt(this, LOCKSTATE, s, s | WAITER)) {  
  33.                     waiting = true;  
  34.                     waiter = Thread.currentThread();  
  35.                 }  
  36.             }  
  37.             //休眠當(dāng)前線程  
  38.             else if (waiting)  
  39.                 LockSupport.park(this);  
  40.         }  
  41.     } 
  42.      //查找紅黑樹中的某個節(jié)點(diǎn)  
  43.     final Node find(int h, Object k) {  
  44.         if (k != null) {  
  45.             for (Node e = first; e != null; ) {  
  46.                 int s; K ek;  
  47.                 //如果當(dāng)前有waiter或者有寫鎖,走線性檢索,因?yàn)榧t黑樹雖然替代了鏈表,但其內(nèi)部依然保留了鏈表的結(jié)構(gòu),雖然鏈表的查詢性能一般,但根據(jù)先前的分析其讀取的安全性有保證。  
  48.                 //發(fā)現(xiàn)有寫鎖改走線性檢索,是為了避免等待寫鎖釋放花去太久時間; 而發(fā)現(xiàn)有waiter改走線性檢索,是為了避免讀鎖疊加的太多,導(dǎo)致寫鎖線程需要等待太長的時間; 本質(zhì)上都是為了減少讀寫碰撞 
  49.                  //線性遍歷的過程中,每遍歷到下一個節(jié)點(diǎn)都做一次判斷,一旦發(fā)現(xiàn)鎖競爭的可能性減少就改走tree檢索以提高性能 
  50.                  if (((s = lockState) & (WAITER|WRITER)) != 0) {  
  51.                     if (e.hash == h &&  
  52.                         ((eek = e.key) == k || (ek != null && k.equals(ek))))  
  53.                         return e;  
  54.                     ee = e.next;  
  55.                 }  
  56.                 //對紅黑樹加共享鎖,也就是讀鎖,CAS一次性增加4,也就是增加的只是3~32位  
  57.                 else if (U.compareAndSetInt(this, LOCKSTATE, s, 
  58.                                               s + READER)) {  
  59.                     TreeNode r, p;  
  60.                     try {  
  61.                         p = ((r = root) == null ? null :  
  62.                              r.findTreeNode(h, k, null));  
  63.                     } finally {  
  64.                         Thread w;  
  65.                         //釋放讀鎖,如果釋放完畢且有waiter,則將其喚醒  
  66.                         if (U.getAndAddInt(this, LOCKSTATE, -READER) ==  
  67.                             (READER|WAITER) && (w = waiter) != null)  
  68.                             LockSupport.unpark(w);  
  69.                     }  
  70.                     return p;  
  71.                 }  
  72.             }  
  73.         }  
  74.         return null;  
  75.     }  
  76.     //更新紅黑樹中的某個節(jié)點(diǎn)  
  77.     final TreeNode putTreeVal(int h, K k, V v) {  
  78.         Class kc = null;  
  79.         boolean searched = false;  
  80.         for (TreeNode p = root;;) {  
  81.             int dir, ph; K pk;  
  82.             //...省略處理紅黑樹數(shù)據(jù)結(jié)構(gòu)的代碼若干        
  83.                  else {  
  84.                     //寫操作前加互斥鎖  
  85.                     lockRoot();  
  86.                     try {  
  87.                         root = balanceInsertion(root, x);  
  88.                     } finally {  
  89.                         //釋放互斥鎖 
  90.                          unlockRoot();  
  91.                     }  
  92.                 }  
  93.                 break;  
  94.             }  
  95.         }  
  96.         assert checkInvariants(root);  
  97.         return null;  
  98.     }  

紅黑樹內(nèi)置了一套讀寫鎖的邏輯,其內(nèi)部定義了32位的int型變量lockState,第1位是寫鎖標(biāo)志位,第2位是寫鎖等待標(biāo)志位,從3~32位則是共享鎖標(biāo)志位。

讀寫操作是互斥的,允許多個線程同時讀取,但不允許讀寫操作并行,同一時刻只允許一個線程進(jìn)行寫操作;這樣任意時間點(diǎn)讀取的都是一個合法的紅黑樹,整體上是安全的。

有的同學(xué)會產(chǎn)生疑惑,寫鎖釋放時為何沒有將waiter喚醒的操作呢?是否有可能A線程進(jìn)入了等待區(qū),B線程獲取了寫鎖,釋放寫鎖時僅做了lockState=0的操作。

那么A線程是否就沒有機(jī)會被喚醒了,只有等待下一個讀鎖釋放時的喚醒了呢 ?

顯然這種情況違背常理,C13Map不會出現(xiàn)這樣的疏漏,再進(jìn)一步觀察,紅黑樹的變更操作的外圍,也就是在putValue/replaceNode那一層,都是對BIN的頭節(jié)點(diǎn)加了synchornized互斥鎖的,同一時刻只能有一個寫線程進(jìn)入TreeBin的方法范圍內(nèi),當(dāng)寫線程發(fā)現(xiàn)當(dāng)前waiter不為空,其實(shí)此waiter只能是當(dāng)前線程自己,可以放心的獲取寫鎖,不用擔(dān)心無法被喚醒的問題。

TreeBin在find讀操作檢索時,在linearSearch(線性檢索)和treeSearch(樹檢索)間做了折衷,前者性能差但并發(fā)安全,后者性能佳但要做并發(fā)控制,可能導(dǎo)致鎖競爭;設(shè)計者使用線性檢索來盡量避免讀寫碰撞導(dǎo)致的鎖競爭,但評估到race condition已消失時,又立即趨向于改用樹檢索來提高性能,在安全和性能之間做到了極佳的平衡。具體的折衷策略請參考find方法及注釋。

由于有線性檢索這樣一個抄底方案,以及入口處bin頭節(jié)點(diǎn)的synchornized機(jī)制,保證了進(jìn)入到TreeBin整體代碼塊的寫線程只有一個;TreeBin中讀寫鎖的整體設(shè)計與ReentrantReadWriteLock相比還是簡單了不少,比如并未定義用于存放待喚醒線程的threadQueue,以及讀線程僅會自旋而不會阻塞等等, 可以看做是特定條件下ReadWriteLock的簡化版本。

5、如果讀取的bin是一個ForwardingNode

說明當(dāng)前bin已遷移,調(diào)用其find方法到nextTable讀取數(shù)據(jù)。

forwardingNode的find方法 

 
 
 
 
  1. static final class ForwardingNode extends Node {  
  2.     final Node[] nextTable;  
  3.     ForwardingNode(Node[] tab) {  
  4.         super(MOVED, null, null);  
  5.         this.nextTable = tab;  
  6.     }  
  7.      //遞歸檢索哈希表鏈  
  8.     Node find(int h, Object k) {  
  9.         // loop to avoid arbitrarily deep recursion on forwarding nodes  
  10.         outer: for (Node[] tab = nextTable;;) {  
  11.             Node e; int n;  
  12.             if (k == null || tab == null || (n = tab.length) == 0 ||  
  13.                 (e = tabAt(tab, (n - 1) & h)) == null)  
  14.                 return null;  
  15.             for (;;) {  
  16.                 int eh; K ek; 
  17.                  if ((eeh = e.hash) == h &&  
  18.                     ((eek = e.key) == k || (ek != null && k.equals(ek))))  
  19.                     return e;  
  20.                 if (eh < 0) {  
  21.                     if (e instanceof ForwardingNode) {  
  22.                         tab = ((ForwardingNode)e).nextTable;  
  23.                         continue outer;  
  24.                     }  
  25.                     else  
  26.                         return e.find(h, k);  
  27.                 }  
  28.                 if ((ee = e.next) == null)  
  29.                     return null;  
  30.             }  
  31.         }  
  32.     }  

ForwardingNode中保存了nextTable的引用,會轉(zhuǎn)向下一個哈希表進(jìn)行檢索,但并不能保證nextTable就一定是currentTable,因?yàn)樵诟卟l(fā)插入的情況下,極短時間內(nèi)就可以導(dǎo)致哈希表的多次擴(kuò)容,內(nèi)存中極有可能駐留一條哈希表鏈,彼此以bin的頭節(jié)點(diǎn)上的ForwardingNode相連,線程剛讀取時拿到的是table1,遍歷時卻有可能經(jīng)歷了哈希表的鏈條。

eh<0有三種情況:

  •  如果是ForwardingNode繼續(xù)遍歷下一個哈希表。
  •  如果是TreeBin,調(diào)用其find方法進(jìn)入TreeBin讀寫鎖的保護(hù)區(qū)讀取數(shù)據(jù)。
  •  如果是ReserveNode,說明當(dāng)前有compute計算中,整條bin還是一個空結(jié)構(gòu),直接返回null。

6、如果讀取的bin是一個ReserveNode

ReserveNode用于compute/computeIfAbsent原子計算的方法,在BIN的頭節(jié)點(diǎn)為null且計算尚未完成時,先在bin的頭節(jié)點(diǎn)打上一個ReserveNode的占位標(biāo)記。

讀操作發(fā)現(xiàn)ReserveNode直接返回null,寫操作會因?yàn)闋帄ZReserveNode的互斥鎖而進(jìn)入阻塞態(tài),在compute完成后被喚醒后循環(huán)重試。

六、寫操作putValue/replaceNode為什么是線程安全的

典型的編程范式如下:

C13Map的putValue方法 

 
 
 
 
  1. Node[] tab = table;  //將堆中的table變量賦給線程堆棧中的局部變量  
  2. Node f = tabAt(tab, i );  
  3. if(f==null){  
  4.  //當(dāng)前槽位沒有頭節(jié)點(diǎn),直接CAS寫入  
  5.  if (casTabAt(tab, i, null, new Node(hash, key, value)))  
  6.     break;  
  7. }else if(f.hash == MOVED){  
  8.  //加入?yún)f(xié)助搬運(yùn)行列  
  9.  helpTransfer(tab,f);  
  10. }  
  11. //不是forwardingNode  
  12. else if(f.hash != MOVED){  
  13.     //先鎖住I槽位上的頭節(jié)點(diǎn)  
  14.     synchronized (f) {  
  15.     //再doubleCheck看此槽位上的頭節(jié)點(diǎn)是否還是f  
  16.     if (tabAt(tab, i) == f) {  
  17.        ...各種寫操作  
  18.     }  
  19.   }  

1、當(dāng)前槽位如果頭節(jié)點(diǎn)為null時,直接CAS寫入

有人也許會質(zhì)疑,如果寫入時resize操作已完成,發(fā)生了table向nextTable的轉(zhuǎn)變,是否會存在寫入的是舊表的bin導(dǎo)致數(shù)據(jù)丟失的可能 ? 

這種可能性是不存在的,因?yàn)橐粋€table在resize完成后所有的BIN都會被打上ForwardingNode的標(biāo)記,可以形象的理解為所有槽位上都插滿了紅旗,而此處在CAS時的compare的變量null,能夠保證至少在CAS原子操作發(fā)生的時間點(diǎn)table并未發(fā)生變更。

2、當(dāng)前槽位如果頭節(jié)點(diǎn)不為null

這里采用了一個小技巧:先鎖住I槽位上的頭節(jié)點(diǎn),進(jìn)入同步代碼塊后,再doubleCheck看此槽位上的頭節(jié)點(diǎn)是否有變化。

進(jìn)入同步塊后還需要doubleCheck的原因:雖然一開始獲取到的頭節(jié)點(diǎn)f并非ForwardingNode,但在獲取到f的同步鎖之前,可能有其它線程提前獲取了f的同步鎖并完成了transfer工作,并將I槽位上的頭節(jié)點(diǎn)標(biāo)記為ForwardingNode,此時的f就成了一個過時的bin的頭節(jié)點(diǎn)。

然而因?yàn)闃?biāo)記操作與transfer作為一個整體在同步的代碼塊中執(zhí)行,如果doubleCheck的結(jié)果是此槽位上的頭節(jié)點(diǎn)還是f,則表明至少在當(dāng)前時間點(diǎn)該槽位還沒有被transfer到新表(假如當(dāng)前有transfer in progress的話),可以放心的對該bin進(jìn)行put/remove/replace等寫操作。

只要未發(fā)生transfer或者treeify操作,鏈表的新增操作都是采取后入式,頭節(jié)點(diǎn)一旦確定不會輕易改變,這種后入式的更新方式保證了鎖定頭節(jié)點(diǎn)就等于鎖住了整個bin。

如果不作doubleCheck判斷,則有可能當(dāng)前槽位已被transfer,寫入的還是舊表的BIN,從而導(dǎo)致寫入數(shù)據(jù)的丟失;也有可能在獲取到f的同步鎖之前,其它線程對該BIN做了treeify操作,并將頭節(jié)點(diǎn)替換成了TreeBin, 導(dǎo)致寫入的是舊的鏈表,而非新的紅黑樹;

3、doubleCheck是否有ABA問題

也許有人會質(zhì)疑,如果有其它線程提前對當(dāng)前bin進(jìn)行了的remove/put的操作,引入了新的頭節(jié)點(diǎn),并且恰好發(fā)生了JVM的內(nèi)存釋放和重新分配,導(dǎo)致新的Node的引用地址恰好跟舊的相同,也就是存在所謂的ABA問題。

這個可以通過反證法來推翻,在帶有GC機(jī)制的語言環(huán)境下通常不會發(fā)生ABA問題,因?yàn)楫?dāng)前線程包含了對頭節(jié)點(diǎn)f的引用,當(dāng)前線程并未消亡,不可能存在f節(jié)點(diǎn)的內(nèi)存被GC回收的可能性。

還有人會質(zhì)疑,如果在寫入過程中主哈希表發(fā)生了變化,是否可能寫入的是舊表的bin導(dǎo)致數(shù)據(jù)丟失,這個也可以通過反證法來推翻,因?yàn)閠able向nextTable的轉(zhuǎn)化(也就是將resize后的新哈希表正式commit)只有在所有的槽位都已經(jīng)transfer成功后才會進(jìn)行,只要有一個bin未transfer成功,則說明當(dāng)前的table未發(fā)生變化,在當(dāng)前的時間點(diǎn)可以放心的向table的bin內(nèi)寫入數(shù)據(jù)。

4、如何操作才安全

可以總結(jié)出規(guī)律,在對table的槽位成功進(jìn)行了CAS操作且compare值為null,或者對槽位的非forwardingNode的頭節(jié)點(diǎn)加鎖后,doubleCheck頭節(jié)點(diǎn)未發(fā)生變化,對bin的寫操作都是安全的。

七、原子計算相關(guān)方法

原子計算主要包括:computeIfAbsent、computeIfPresent、compute、merge四個方法。 

1、幾個方法的比較 

主要區(qū)別如下:

(1)computeIfAbsent只會在判斷到key不存在時才會插入,判空與插入是一個原子操作,提供的FunctionalInterface是一個二元的Function, 接受key參數(shù),返回value結(jié)果;如果計算結(jié)果為null則不做插入。

(2)computeIfPresent只會在判讀單到Key非空時才會做更新,判斷非空與插入是一個原子操作,提供的FunctionalInterface是一個三元的BiFunction,接受key,value兩個參數(shù),返回新的value結(jié)果;如果新的value為null則刪除key對應(yīng)節(jié)點(diǎn)。

(3)compute則不加key是否存在的限制,提供的FunctionalInterface是一個三元的BiFunction,接受key,value兩個參數(shù),返回新的value結(jié)果;如果舊的value不存在則以null替代進(jìn)行計算;如果新的value為null則保證key對應(yīng)節(jié)點(diǎn)不會存在。

(4)merge不加key是否存在的限制,提供的FunctionalInterface是一個三元的BiFunction,接受oldValue, newVALUE兩個參數(shù),返回merge后的value;如果舊的value不存在,直接以newVALUE作為最終結(jié)果,存在則返回merge后的結(jié)果;如果最終結(jié)果為null,則保證key對應(yīng)節(jié)點(diǎn)不會存在。

2、何時會使用ReserveNode占位

如果目標(biāo)bin的頭節(jié)點(diǎn)為null,需要寫入的話有兩種手段:一種是生成好新的節(jié)點(diǎn)r后使用casTabAt(tab, i, null, r)原子操作,因?yàn)閏ompare的值為null可以保證并發(fā)的安全;

另外一種方式是創(chuàng)建一個占位的ReserveNode,鎖住該節(jié)點(diǎn)并將其CAS設(shè)置到bin的頭節(jié)點(diǎn),再進(jìn)行進(jìn)一步的原子計算操作;這兩種辦法都有可能在CAS的時候失敗,需要自旋反復(fù)嘗試。

(1)為什么只有computeIfAbsent/compute方法使用占位符的方式

computeIfPresent只有在BIN結(jié)構(gòu)非空的情況下才會展開原子計算,自然不存在需要ReserveNode占位的情況;鎖住已有的頭節(jié)點(diǎn)即可。

computeIfAbsent/compute方法在BIN結(jié)構(gòu)為空時,需要展開Function或者BiFunction的運(yùn)算,這個操作是外部引入的需要耗時多久無法準(zhǔn)確評估;這種情況下如果采用先計算,再casTabAt(tab, i, null, r)的方式,如果有其它線程提前更新了這個BIN,那么就需要重新鎖定新加入的頭節(jié)點(diǎn),并重復(fù)一次原子計算(C13Map無法幫你緩存上次計算的結(jié)果,因?yàn)橛嬎愕娜雲(yún)⒂锌赡軙兓?,這個開銷是比較大的。

而使用ReserveNode占位的方式無需等到原子計算出結(jié)果,可以第一時間先搶占BIN的所有權(quán),使其他并發(fā)的寫線程阻塞。

(2)merge方法為何不需要占位

原因是如果BIN結(jié)構(gòu)為空時,根據(jù)merge的處理策略,老的value為空則直接使用新的value替代,這樣就省去了BiFunction中新老value進(jìn)行merge的計算,這個消耗幾乎是沒有的;因此可以使用casTabAt(tab, i, null, r)的方式直接修改,避免了使用ReserveNode占位,鎖定該占位ReserveNode后再進(jìn)行CAS修改的兩次CAS無謂的開銷。

C13Map的compute方法 

 
 
 
 
  1. public V compute(K key,  
  2.                  BiFunction remappingFunction) {  
  3.     if (key == null || remappingFunction == null)  
  4.         throw new nullPointerException();  
  5.     int h = spread(key.hashCode());  
  6.     V val = null;  
  7.     int delta = 0;  
  8.     int binCount = 0;  
  9.     for (Node[] tab = table; ; ) {  
  10.         Node f;  
  11.         int n, i, fh;  
  12.         if (tab == null || (n = tab.length) == 0)  
  13.             tab = initTable();  
  14.         else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {  
  15.             //創(chuàng)建占位Node  
  16.             Node r = new ReservationNode();  
  17.            //先鎖定該占位Node  
  18.             synchronized (r) {  
  19.                 //將其設(shè)置到BIN的頭節(jié)點(diǎn)  
  20.                 if (casTabAt(tab, i, null, r)) {  
  21.                     binCount = 1; 
  22.                      Node node = null;  
  23.                     try {  
  24.                         //開始原子計算  
  25.                         if ((val = remappingFunction.apply(key, null)) != null) {  
  26.                             delta = 1; 
  27.                              node = new Node(h, key, val, null);  
  28.                         }  
  29.                     } finally {  
  30.                         //設(shè)置計算后的最終節(jié)點(diǎn)  
  31.                         setTabAt(tab, i, node);  
  32.                     }  
  33.                 }  
  34.             } 
  35.              if (binCount != 0)  
  36.                 break;  
  37.         } else if ((ffh = f.hash) == MOVED) 
    網(wǎng)站名稱:JavaConcurrentHashMap高并發(fā)安全實(shí)現(xiàn)原理解析
    文章URL:http://www.dlmjj.cn/article/dhgdddj.html