日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
阻塞隊(duì)列—DelayedWorkQueue源碼分析

 前言

潁上ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書(shū)未來(lái)市場(chǎng)廣闊!成為創(chuàng)新互聯(lián)的ssl證書(shū)銷(xiāo)售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:028-86922220(備注:SSL證書(shū)合作)期待與您的合作!

線程池運(yùn)行時(shí),會(huì)不斷從任務(wù)隊(duì)列中獲取任務(wù),然后執(zhí)行任務(wù)。如果我們想實(shí)現(xiàn)延時(shí)或者定時(shí)執(zhí)行任務(wù),重要一點(diǎn)就是任務(wù)隊(duì)列會(huì)根據(jù)任務(wù)延時(shí)時(shí)間的不同進(jìn)行排序,延時(shí)時(shí)間越短地就排在隊(duì)列的前面,先被獲取執(zhí)行。

隊(duì)列是先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),就是先進(jìn)入隊(duì)列的數(shù)據(jù),先被獲取。但是有一種特殊的隊(duì)列叫做優(yōu)先級(jí)隊(duì)列,它會(huì)對(duì)插入的數(shù)據(jù)進(jìn)行優(yōu)先級(jí)排序,保證優(yōu)先級(jí)越高的數(shù)據(jù)首先被獲取,與數(shù)據(jù)的插入順序無(wú)關(guān)。

實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列高效常用的一種方式就是使用堆。關(guān)于堆的實(shí)現(xiàn)可以查看《堆和二叉堆的實(shí)現(xiàn)和特性》

ScheduledThreadPoolExecutor線程池

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以其內(nèi)部的數(shù)據(jù)結(jié)構(gòu)和ThreadPoolExecutor基本一樣,并在其基礎(chǔ)上增加了按時(shí)間調(diào)度執(zhí)行任務(wù)的功能,分為延遲執(zhí)行任務(wù)和周期性執(zhí)行任務(wù)。

ScheduledThreadPoolExecutor的構(gòu)造函數(shù)只能傳3個(gè)參數(shù)corePoolSize、ThreadFactory、RejectedExecutionHandler,默認(rèn)maximumPoolSize為Integer.MAX_VALUE。

工作隊(duì)列是高度定制化的延遲阻塞隊(duì)列DelayedWorkQueue,其實(shí)現(xiàn)原理和DelayQueue基本一樣,核心數(shù)據(jù)結(jié)構(gòu)是二叉最小堆的優(yōu)先隊(duì)列,隊(duì)列滿(mǎn)時(shí)會(huì)自動(dòng)擴(kuò)容,所以offer操作永遠(yuǎn)不會(huì)阻塞,maximumPoolSize也就用不上了,所以線程池中永遠(yuǎn)會(huì)保持至多有corePoolSize個(gè)工作線程正在運(yùn)行。

 
 
 
 
  1. public ScheduledThreadPoolExecutor(int corePoolSize,
  2.                                    ThreadFactory threadFactory,
  3.                                    RejectedExecutionHandler handler) {
  4.     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  5.           new DelayedWorkQueue(), threadFactory, handler);
  6. }

 DelayedWorkQueue延遲阻塞隊(duì)列

DelayedWorkQueue 也是一種設(shè)計(jì)為定時(shí)任務(wù)的延遲隊(duì)列,它的實(shí)現(xiàn)和DelayQueue一樣,不過(guò)是將優(yōu)先級(jí)隊(duì)列和DelayQueue的實(shí)現(xiàn)過(guò)程遷移到本身方法體中,從而可以在該過(guò)程當(dāng)中靈活的加入定時(shí)任務(wù)特有的方法調(diào)用。

工作原理

DelayedWorkQueue的實(shí)現(xiàn)原理中規(guī)中矩,內(nèi)部維護(hù)了一個(gè)以RunnableScheduledFuture類(lèi)型數(shù)組實(shí)現(xiàn)的最小二叉堆,初始容量是16,使用ReentrantLock和Condition實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模式。

源碼分析

定義

DelayedWorkQueue 的類(lèi)繼承關(guān)系如下:

其包含的方法定義如下:

成員屬性

 
 
 
 
  1. // 初始時(shí),數(shù)組長(zhǎng)度大小。
  2. private static final int INITIAL_CAPACITY = 16;        
  3. // 使用數(shù)組來(lái)儲(chǔ)存隊(duì)列中的元素。
  4. private RunnableScheduledFuture[] queue =  new RunnableScheduledFuture[INITIAL_CAPACITY];        
  5. // 使用lock來(lái)保證多線程并發(fā)安全問(wèn)題。
  6. private final ReentrantLock lock = new ReentrantLock();        
  7. // 隊(duì)列中儲(chǔ)存元素的大小
  8. private int size = 0;        
  9. //特指隊(duì)列頭任務(wù)所在線程
  10. private Thread leader = null;        
  11. // 當(dāng)隊(duì)列頭的任務(wù)延時(shí)時(shí)間到了,或者有新的任務(wù)變成隊(duì)列頭時(shí),用來(lái)喚醒等待線程
  12. private final Condition available = lock.newCondition();

 DelayedWorkQueue是用數(shù)組來(lái)儲(chǔ)存隊(duì)列中的元素,核心數(shù)據(jù)結(jié)構(gòu)是二叉最小堆的優(yōu)先隊(duì)列,隊(duì)列滿(mǎn)時(shí)會(huì)自動(dòng)擴(kuò)容。

構(gòu)造函數(shù)

DelayedWorkQueue 是 ScheduledThreadPoolExecutor 的靜態(tài)類(lèi)部類(lèi),默認(rèn)只有一個(gè)無(wú)參構(gòu)造方法。

 
 
 
 
  1. static class DelayedWorkQueue extends AbstractQueue
  2.         implements BlockingQueue {
  3.  // ...
  4. }

 入隊(duì)方法

DelayedWorkQueue 提供了 put/add/offer(帶時(shí)間) 三個(gè)插入元素方法。我們發(fā)現(xiàn)與普通阻塞隊(duì)列相比,這三個(gè)添加方法都是調(diào)用offer方法。那是因?yàn)樗鼪](méi)有隊(duì)列已滿(mǎn)的條件,也就是說(shuō)可以不斷地向DelayedWorkQueue添加元素,當(dāng)元素個(gè)數(shù)超過(guò)數(shù)組長(zhǎng)度時(shí),會(huì)進(jìn)行數(shù)組擴(kuò)容。

 
 
 
 
  1. public void put(Runnable e) {
  2.  offer(e);
  3. }        
  4. public boolean add(Runnable e) {            
  5.     return offer(e);
  6. }        
  7. public boolean offer(Runnable e, long timeout, TimeUnit unit) {            
  8.     return offer(e);
  9. }

 offer添加元素

ScheduledThreadPoolExecutor提交任務(wù)時(shí)調(diào)用的是DelayedWorkQueue.add,而add、put等一些對(duì)外提供的添加元素的方法都調(diào)用了offer。

 
 
 
 
  1. public boolean offer(Runnable x) {            
  2.     if (x == null)                
  3.         throw new NullPointerException();
  4.     RunnableScheduledFuture e = (RunnableScheduledFuture)x;            
  5.     // 使用lock保證并發(fā)操作安全
  6.     final ReentrantLock lock = this.lock;
  7.     lock.lock();            
  8.     try {                
  9.         int i = size;                
  10.         // 如果要超過(guò)數(shù)組長(zhǎng)度,就要進(jìn)行數(shù)組擴(kuò)容
  11.         if (i >= queue.length)                    
  12.             // 數(shù)組擴(kuò)容
  13.             grow();                
  14.         // 將隊(duì)列中元素個(gè)數(shù)加一
  15.         size = i + 1;                
  16.         // 如果是第一個(gè)元素,那么就不需要排序,直接賦值就行了
  17.         if (i == 0) {
  18.          queue[0] = e;
  19.             setIndex(e, 0);
  20.         } else {                    
  21.             // 調(diào)用siftUp方法,使插入的元素變得有序。
  22.             siftUp(i, e);
  23.         }                
  24.         // 表示新插入的元素是隊(duì)列頭,更換了隊(duì)列頭,
  25.         // 那么就要喚醒正在等待獲取任務(wù)的線程。
  26.         if (queue[0] == e) {
  27.          leader = null;                    
  28.             // 喚醒正在等待等待獲取任務(wù)的線程
  29.             available.signal();
  30.         }
  31.     } finally {
  32.      lock.unlock();
  33.     }            
  34.     return true;
  35. }

 其基本流程如下:

  1. 其作為生產(chǎn)者的入口,首先獲取鎖。
  2. 判斷隊(duì)列是否要滿(mǎn)了(size >= queue.length),滿(mǎn)了就擴(kuò)容grow()。
  3. 隊(duì)列未滿(mǎn),size+1。
  4. 判斷添加的元素是否是第一個(gè),是則不需要堆化。
  5. 添加的元素不是第一個(gè),則需要堆化siftUp。
  6. 如果堆頂元素剛好是此時(shí)被添加的元素,則喚醒take線程消費(fèi)。
  7. 最終釋放鎖。

offer基本流程圖如下:

擴(kuò)容grow()

可以看到,當(dāng)隊(duì)列滿(mǎn)時(shí),不會(huì)阻塞等待,而是繼續(xù)擴(kuò)容。新容量newCapacity在舊容量oldCapacity的基礎(chǔ)上擴(kuò)容50%(oldCapacity >> 1相當(dāng)于oldCapacity /2)。最后Arrays.copyOf,先根據(jù)newCapacity創(chuàng)建一個(gè)新的空數(shù)組,然后將舊數(shù)組的數(shù)據(jù)復(fù)制到新數(shù)組中。

 
 
 
 
  1. private void grow() {            
  2.     int oldCapacity = queue.length;            
  3.     // 每次擴(kuò)容增加原來(lái)數(shù)組的一半數(shù)量。
  4.     // grow 50%
  5.     int newCapacity = oldCapacity + (oldCapacity >> 1); 
  6.     if (newCapacity < 0) // overflow
  7.      newCapacity = Integer.MAX_VALUE;            
  8.     // 使用Arrays.copyOf來(lái)復(fù)制一個(gè)新數(shù)組
  9.     queue = Arrays.copyOf(queue, newCapacity);
  10. }

 向上堆化siftUp

新添加的元素先會(huì)加到堆底,然后一步步和上面的父親節(jié)點(diǎn)比較,若小于父親節(jié)點(diǎn)則和父親節(jié)點(diǎn)互換位置,循環(huán)比較直至大于父親節(jié)點(diǎn)才結(jié)束循環(huán)。通過(guò)循環(huán),來(lái)查找元素key應(yīng)該插入在堆二叉樹(shù)那個(gè)節(jié)點(diǎn)位置,并交互父節(jié)點(diǎn)的位置。

向上堆化siftUp的詳細(xì)過(guò)程可以查看《堆和二叉堆的實(shí)現(xiàn)和特性》

 
 
 
 
  1. private void siftUp(int k, RunnableScheduledFuture key) {            
  2.     // 當(dāng)k==0時(shí),就到了堆二叉樹(shù)的根節(jié)點(diǎn)了,跳出循環(huán)
  3.     while (k > 0) {                
  4.         // 父節(jié)點(diǎn)位置坐標(biāo), 相當(dāng)于(k - 1) / 2
  5.         int parent = (k - 1) >>> 1;                
  6.         // 獲取父節(jié)點(diǎn)位置元素
  7.         RunnableScheduledFuture e = queue[parent];                
  8.         // 如果key元素大于父節(jié)點(diǎn)位置元素,滿(mǎn)足條件,那么跳出循環(huán)
  9.         // 因?yàn)槭菑男〉酱笈判虻摹?/li>
  10.         if (key.compareTo(e) >= 0)                    
  11.             break;                
  12.         // 否則就將父節(jié)點(diǎn)元素存放到k位置
  13.         queue[k] = e;                
  14.         // 這個(gè)只有當(dāng)元素是ScheduledFutureTask對(duì)象實(shí)例才有用,用來(lái)快速取消任務(wù)。
  15.         setIndex(e, k);                
  16.         // 重新賦值k,尋找元素key應(yīng)該插入到堆二叉樹(shù)的那個(gè)節(jié)點(diǎn)
  17.         k = parent;
  18.     }            
  19.     // 循環(huán)結(jié)束,k就是元素key應(yīng)該插入的節(jié)點(diǎn)位置
  20.     queue[k] = key;
  21.     setIndex(key, k);
  22. }

 出隊(duì)方法

DelayedWorkQueue 提供了以下幾個(gè)出隊(duì)方法

  • take(),等待獲取隊(duì)列頭元素
  • poll() ,立即獲取隊(duì)列頭元素
  • poll(long timeout, TimeUnit unit) ,超時(shí)等待獲取隊(duì)列頭元素

take消費(fèi)元素

Worker工作線程啟動(dòng)后就會(huì)循環(huán)消費(fèi)工作隊(duì)列中的元素,因?yàn)镾cheduledThreadPoolExecutor的keepAliveTime=0,所以消費(fèi)任務(wù)其只調(diào)用了DelayedWorkQueue.take。take基本流程如下:

  • 首先獲取可中斷鎖,判斷堆頂元素是否是空,空的則阻塞等待available.await()。
  • 堆頂元素不為空,則獲取其延遲執(zhí)行時(shí)間delay,delay <= 0說(shuō)明到了執(zhí)行時(shí)間,出隊(duì)列finishPoll。
  • delay > 0還沒(méi)到執(zhí)行時(shí)間,判斷l(xiāng)eader線程是否為空,不為空則說(shuō)明有其他take線程也在等待,當(dāng)前take將無(wú)限期阻塞等待。
  • leader線程為空,當(dāng)前take線程設(shè)置為leader,并阻塞等待delay時(shí)長(zhǎng)。
  • 當(dāng)前l(fā)eader線程等待delay時(shí)長(zhǎng)自動(dòng)喚醒或者被其他take線程喚醒,則最終將leader設(shè)置為null。
  • 再循環(huán)一次判斷delay <= 0出隊(duì)列。
  • 跳出循環(huán)后判斷l(xiāng)eader為空并且堆頂元素不為空,則喚醒其他take線程,最后是否鎖。
 
 
 
 
  1. public RunnableScheduledFuture take() throws InterruptedException {            
  2.     final ReentrantLock lock = this.lock;
  3.     lock.lockInterruptibly();            
  4.     try {                
  5.         for (;;) {
  6.          RunnableScheduledFuture first = queue[0];                    
  7.             // 如果沒(méi)有任務(wù),就讓線程在available條件下等待。
  8.             if (first == null)
  9.              available.await();                    
  10.             else {                        
  11.                 // 獲取任務(wù)的剩余延時(shí)時(shí)間
  12.                 long delay = first.getDelay(NANOSECONDS);                        
  13.                 // 如果延時(shí)時(shí)間到了,就返回這個(gè)任務(wù),用來(lái)執(zhí)行。
  14.                 if (delay <= 0)                            
  15.                     return finishPoll(first);                        
  16.                 // 將first設(shè)置為null,當(dāng)線程等待時(shí),不持有first的引用
  17.                 first = null; // don't retain ref while waiting
  18.                 // 如果還是原來(lái)那個(gè)等待隊(duì)列頭任務(wù)的線程,
  19.                 // 說(shuō)明隊(duì)列頭任務(wù)的延時(shí)時(shí)間還沒(méi)有到,繼續(xù)等待。
  20.                 if (leader != null)
  21.                  available.await();                        
  22.                 else {                            
  23.                     // 記錄一下當(dāng)前等待隊(duì)列頭任務(wù)的線程
  24.                     Thread thisThread = Thread.currentThread();
  25.                     leader = thisThread;                            
  26.                     try {                                
  27.                         // 當(dāng)任務(wù)的延時(shí)時(shí)間到了時(shí),能夠自動(dòng)超時(shí)喚醒。
  28.                         available.awaitNanos(delay);
  29.                     } finally {                                
  30.                         if (leader == thisThread)
  31.                          leader = null;
  32.                     }
  33.                 }
  34.             }
  35.         }
  36.     } finally {                
  37.         if (leader == null && queue[0] != null)                    // 喚醒等待任務(wù)的線程
  38.          available.signal();
  39.         ock.unlock();
  40.     }
  41. }

 take基本流程圖如下:

take線程阻塞等待

可以看出這個(gè)生產(chǎn)者take線程會(huì)在兩種情況下阻塞等待:

  • 堆頂元素為空。
  • 堆頂元素的delay > 0 。

finishPoll出隊(duì)列

堆頂元素delay<=0,執(zhí)行時(shí)間到,出隊(duì)列就是一個(gè)向下堆化的過(guò)程siftDown。

 
 
 
 
  1. // 移除隊(duì)列頭元素
  2. private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {            
  3.     // 將隊(duì)列中元素個(gè)數(shù)減一
  4.     int s = --size;            
  5.     // 獲取隊(duì)列末尾元素x
  6.     RunnableScheduledFuture x = queue[s];            
  7.     // 原隊(duì)列末尾元素設(shè)置為null
  8.     queue[s] = null;            
  9.     if (s != 0)                
  10.         // 因?yàn)橐瞥岁?duì)列頭元素,所以進(jìn)行重新排序。
  11.         siftDown(0, x);
  12.     setIndex(f, -1);            
  13.     return f;
  14. }

堆的刪除方法主要分為三步:

  1. 先將隊(duì)列中元素個(gè)數(shù)減一;
  2. 將原隊(duì)列末尾元素設(shè)置成為隊(duì)列頭元素,再將隊(duì)列末尾元素設(shè)置為null;
  3. 調(diào)用setDown(O,x)方法,保證按照元素的優(yōu)先級(jí)排序。

向下堆化siftDown

由于堆頂元素出隊(duì)列后,就破壞了堆的結(jié)構(gòu),需要組織整理下,將堆尾元素移到堆頂,然后向下堆化:

  • 從堆頂開(kāi)始,父親節(jié)點(diǎn)與左右子節(jié)點(diǎn)中較小的孩子節(jié)點(diǎn)比較(左孩子不一定小于右孩子)。
  • 父親節(jié)點(diǎn)小于等于較小孩子節(jié)點(diǎn),則結(jié)束循環(huán),不需要交換位置。
  • 若父親節(jié)點(diǎn)大于較小孩子節(jié)點(diǎn),則交換位置。
  • 繼續(xù)向下循環(huán)判斷父親節(jié)點(diǎn)和孩子節(jié)點(diǎn)的關(guān)系,直到父親節(jié)點(diǎn)小于等于較小孩子節(jié)點(diǎn)才結(jié)束循環(huán)。

向下堆化siftDown的詳細(xì)過(guò)程可以查看《堆和二叉堆的實(shí)現(xiàn)和特性》

 
 
 
 
  1. private void siftDown(int k, RunnableScheduledFuture key) {     
  2.     // 無(wú)符號(hào)右移,相當(dāng)于size/2
  3.     int half = size >>> 1;            
  4.     // 通過(guò)循環(huán),保證父節(jié)點(diǎn)的值不能大于子節(jié)點(diǎn)。
  5.     while (k < half) {                
  6.         // 左子節(jié)點(diǎn), 相當(dāng)于 (k * 2) + 1
  7.         int child = (k << 1) + 1;                
  8.         // 左子節(jié)點(diǎn)位置元素
  9.         RunnableScheduledFuture c = queue[child];                
  10.         // 右子節(jié)點(diǎn), 相當(dāng)于 (k * 2) + 2
  11.         int right = child + 1;                
  12.         // 如果左子節(jié)點(diǎn)元素值大于右子節(jié)點(diǎn)元素值,那么右子節(jié)點(diǎn)才是較小值的子節(jié)點(diǎn)。
  13.         // 就要將c與child值重新賦值
  14.         if (right < size && c.compareTo(queue[right]) > 0)
  15.          c = queue[child = right];                
  16.         // 如果父節(jié)點(diǎn)元素值小于較小的子節(jié)點(diǎn)元素值,那么就跳出循環(huán)
  17.         if (key.compareTo(c) <= 0)                    
  18.             break;                
  19.         // 否則,父節(jié)點(diǎn)元素就要和子節(jié)點(diǎn)進(jìn)行交換
  20.         queue[k] = c;
  21.         setIndex(c, k);
  22.         k = child;
  23.     }            
  24.     queue[k] = key;
  25.     setIndex(key, k);
  26. }

 leader線程

leader線程的設(shè)計(jì),是Leader-Follower模式的變種,旨在于為了不必要的時(shí)間等待。當(dāng)一個(gè)take線程變成leader線程時(shí),只需要等待下一次的延遲時(shí)間,而不是leader線程的其他take線程則需要等leader線程出隊(duì)列了才喚醒其他take線程。

poll()

立即獲取隊(duì)列頭元素,當(dāng)隊(duì)列頭任務(wù)是null,或者任務(wù)延時(shí)時(shí)間沒(méi)有到,表示這個(gè)任務(wù)還不能返回,因此直接返回null。否則調(diào)用finishPoll方法,移除隊(duì)列頭元素并返回。

 
 
 
 
  1. public RunnableScheduledFuture poll() {            
  2.     final ReentrantLock lock = this.lock;
  3.     lock.lock();            
  4.     try {
  5.      RunnableScheduledFuture first = queue[0];                
  6.         // 隊(duì)列頭任務(wù)是null,或者任務(wù)延時(shí)時(shí)間沒(méi)有到,都返回null
  7.         if (first == null || first.getDelay(NANOSECONDS) > 0)                    
  8.             return null;                
  9.         else
  10.          // 移除隊(duì)列頭元素
  11.             return finishPoll(first);
  12.     } finally {
  13.      lock.unlock();
  14.     }
  15. }

 poll(long timeout, TimeUnit unit)

超時(shí)等待獲取隊(duì)列頭元素,與take方法相比較,就要考慮設(shè)置的超時(shí)時(shí)間,如果超時(shí)時(shí)間到了,還沒(méi)有獲取到有用任務(wù),那么就返回null。其他的與take方法中邏輯一樣。

 
 
 
 
  1. public RunnableScheduledFuture poll(long timeout, TimeUnit unit)            
  2.     throws InterruptedException {            
  3.     long nanos = unit.toNanos(timeout);            
  4.     final ReentrantLock lock = this.lock;
  5.     lock.lockInterruptibly();            
  6.     try {                
  7.         for (;;) {
  8.          RunnableScheduledFuture first = queue[0];                    
  9.             // 如果沒(méi)有任務(wù)。
  10.             if (first == null) {                        
  11.              // 超時(shí)時(shí)間已到,那么就直接返回null
  12.                 if (nanos <= 0)                            
  13.                     return null;                        
  14.                 else
  15.                  // 否則就讓線程在available條件下等待nanos時(shí)間
  16.                     nanos = available.awaitNanos(nanos);
  17.             } else {                        
  18.                 // 獲取任務(wù)的剩余延時(shí)時(shí)間
  19.                 long delay = first.getDelay(NANOSECONDS);                        
  20.                 // 如果延時(shí)時(shí)間到了,就返回這個(gè)任務(wù),用來(lái)執(zhí)行。
  21.                 if (delay <= 0)                            
  22.                     return finishPoll(first);                        
  23.                 // 如果超時(shí)時(shí)間已到,那么就直接返回null
  24.                 if (nanos <= 0)                            
  25.                     return null;                        
  26.                 // 將first設(shè)置為null,當(dāng)線程等待時(shí),不持有first的引用
  27.                 first = null; // don't retain ref while waiting
  28.                 // 如果超時(shí)時(shí)間小于任務(wù)的剩余延時(shí)時(shí)間,那么就有可能獲取不到任務(wù)。
  29.                 // 在這里讓線程等待超時(shí)時(shí)間nanos
  30.                 if (nanos < delay || leader != null)
  31.                  nanos = available.awaitNanos(nanos);                        
  32.                 else {
  33.                     Thread thisThread = Thread.currentThread();
  34.                     leader = thisThread;                            
  35.                     try {                                
  36.                         // 當(dāng)任務(wù)的延時(shí)時(shí)間到了時(shí),能夠自動(dòng)超時(shí)喚醒。
  37.                         long timeLeft = available.awaitNanos(delay);                                
  38.                         // 計(jì)算剩余的超時(shí)時(shí)間
  39.                         nanos -= delay - timeLeft;
  40.                     } finally {                                
  41.                         if (leader == thisThread)
  42.                          leader = null;
  43.                     }
  44.                 }
  45.             }
  46.         }
  47.     } finally {                
  48.         if (leader == null && queue[0] != null)                    // 喚醒等待任務(wù)的線程
  49.          available.signal();
  50.         lock.unlock();
  51.     }
  52. }

 remove刪除指定元素

刪除指定元素一般用于取消任務(wù)時(shí),任務(wù)還在阻塞隊(duì)列中,則需要將其刪除。當(dāng)刪除的元素不是堆尾元素時(shí),需要做堆化處理。

 
 
 
 
  1. public boolean remove(Object x) {
  2.     final ReentrantLock lock = this.lock;
  3.     lock.lock();
  4.     try {
  5.         int i = indexOf(x);
  6.         if (i < 0)
  7.             return false;
  8.         //維護(hù)heapIndex
  9.         setIndex(queue[i], -1);
  10.         int s = --size;
  11.         RunnableScheduledFuture replacement = queue[s];
  12.         queue[s] = null;
  13.         if (s != i) {
  14.             //刪除的不是堆尾元素,則需要堆化處理
  15.             //先向下堆化
  16.             siftDown(i, replacement);
  17.             if (queue[i] == replacement)
  18.                 //若向下堆化后,i位置的元素還是replacement,說(shuō)明四無(wú)需向下堆化的,
  19.                 //則需要向上堆化
  20.                 siftUp(i, replacement);
  21.         }
  22.         return true;
  23.     } finally {
  24.         lock.unlock();
  25.     }
  26. }

 總結(jié)

使用優(yōu)先級(jí)隊(duì)列DelayedWorkQueue,保證添加到隊(duì)列中的任務(wù),會(huì)按照任務(wù)的延時(shí)時(shí)間進(jìn)行排序,延時(shí)時(shí)間少的任務(wù)首先被獲取。

  1. DelayedWorkQueue的數(shù)據(jù)結(jié)構(gòu)是基于堆實(shí)現(xiàn)的;
  2. DelayedWorkQueue采用數(shù)組實(shí)現(xiàn)堆,根節(jié)點(diǎn)出隊(duì),用最后葉子節(jié)點(diǎn)替換,然后下推至滿(mǎn)足堆成立條件;最后葉子節(jié)點(diǎn)入隊(duì),然后向上推至滿(mǎn)足堆成立條件;
  3. DelayedWorkQueue添加元素滿(mǎn)了之后會(huì)自動(dòng)擴(kuò)容原來(lái)容量的1/2,即永遠(yuǎn)不會(huì)阻塞,最大擴(kuò)容可達(dá)Integer.MAX_VALUE,所以線程池中至多有corePoolSize個(gè)工作線程正在運(yùn)行;
  4. DelayedWorkQueue 消費(fèi)元素take,在堆頂元素為空和delay >0 時(shí),阻塞等待;
  5. DelayedWorkQueue 是一個(gè)生產(chǎn)永遠(yuǎn)不會(huì)阻塞,消費(fèi)可以阻塞的生產(chǎn)者消費(fèi)者模式;
  6. DelayedWorkQueue 有一個(gè)leader線程的變量,是Leader-Follower模式的變種。當(dāng)一個(gè)take線程變成leader線程時(shí),只需要等待下一次的延遲時(shí)間,而不是leader線程的其他take線程則需要等leader線程出隊(duì)列了才喚醒其他take線程。

網(wǎng)頁(yè)標(biāo)題:阻塞隊(duì)列—DelayedWorkQueue源碼分析
URL分享:http://www.dlmjj.cn/article/djsoepd.html