日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
死磕java集合之DelayQueue源碼分析-創(chuàng)新互聯(lián)

問題

(1)DelayQueue是阻塞隊列嗎?

創(chuàng)新互聯(lián)長期為近千家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為承留企業(yè)提供專業(yè)的成都網(wǎng)站設(shè)計、網(wǎng)站建設(shè),承留網(wǎng)站改版等技術(shù)服務(wù)。擁有10年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。

(2)DelayQueue的實(shí)現(xiàn)方式?

(3)DelayQueue主要用于什么場景?

簡介

DelayQueue是java并發(fā)包下的延時阻塞隊列,常用于實(shí)現(xiàn)定時任務(wù)。

繼承體系

死磕 java集合之DelayQueue源碼分析

從繼承體系可以看到,DelayQueue實(shí)現(xiàn)了BlockingQueue,所以它是一個阻塞隊列。

另外,DelayQueue還組合了一個叫做Delayed的接口,DelayQueue中存儲的所有元素必須實(shí)現(xiàn)Delayed接口。

那么,Delayed是什么呢?

public interface Delayed extends Comparable {

    long getDelay(TimeUnit unit);
}

Delayed是一個繼承自Comparable的接口,并且定義了一個getDelay()方法,用于表示還有多少時間到期,到期了應(yīng)返回小于等于0的數(shù)值。

源碼分析

主要屬性

// 用于控制并發(fā)的鎖
private final transient ReentrantLock lock = new ReentrantLock();
// 優(yōu)先級隊列
private final PriorityQueue q = new PriorityQueue();
// 用于標(biāo)記當(dāng)前是否有線程在排隊(僅用于取元素時)
private Thread leader = null;
// 條件,用于表示現(xiàn)在是否有可取的元素
private final Condition available = lock.newCondition();

從屬性我們可以知道,延時隊列主要使用優(yōu)先級隊列來實(shí)現(xiàn),并輔以重入鎖和條件來控制并發(fā)安全。

因?yàn)閮?yōu)先級隊列是×××的,所以這里只需要一個條件就可以了。

還記得優(yōu)先級隊列嗎?點(diǎn)擊鏈接直達(dá)【死磕 java集合之PriorityQueue源碼分析】

主要構(gòu)造方法

public DelayQueue() {}

public DelayQueue(Collection c) {
    this.addAll(c);
}

構(gòu)造方法比較簡單,一個默認(rèn)構(gòu)造方法,一個初始化添加集合c中所有元素的構(gòu)造方法。

入隊

因?yàn)镈elayQueue是阻塞隊列,且優(yōu)先級隊列是×××的,所以入隊不會阻塞不會超時,因此它的四個入隊方法是一樣的。

public boolean add(E e) {
    return offer(e);
}

public void put(E e) {
    offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

入隊方法比較簡單:

(1)加鎖;

(2)添加元素到優(yōu)先級隊列中;

(3)如果添加的元素是堆頂元素,就把leader置為空,并喚醒等待在條件available上的線程;

(4)解鎖;

出隊

因?yàn)镈elayQueue是阻塞隊列,所以它的出隊有四個不同的方法,有拋出異常的,有阻塞的,有不阻塞的,有超時的。

我們這里主要分析兩個,poll()和take()方法。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

poll()方法比較簡單:

(1)加鎖;

(2)檢查第一個元素,如果為空或者還沒到期,就返回null;

(3)如果第一個元素到期了就調(diào)用優(yōu)先級隊列的poll()彈出第一個元素;

(4)解鎖。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 堆頂元素
            E first = q.peek();
            // 如果堆頂元素為空,說明隊列中還沒有元素,直接阻塞等待
            if (first == null)
                available.await();
            else {
                // 堆頂元素的到期時間
                long delay = first.getDelay(NANOSECONDS);
                // 如果小于0說明已到期,直接調(diào)用poll()方法彈出堆頂元素
                if (delay <= 0)
                    return q.poll();

                // 如果delay大于0 ,則下面要阻塞了

                // 將first置為空方便gc,因?yàn)橛锌赡芷渌貜棾隽诉@個元素
                // 這里還持有著引用不會被清理
                first = null; // don't retain ref while waiting
                // 如果前面有其它線程在等待,直接進(jìn)入等待
                if (leader != null)
                    available.await();
                else {
                    // 如果leader為null,把當(dāng)前線程賦值給它
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待delay時間后自動醒過來
                        // 醒過來后把leader置空并重新進(jìn)入循環(huán)判斷堆頂元素是否到期
                        // 這里即使醒過來后也不一定能獲取到元素
                        // 因?yàn)橛锌赡芷渌€程先一步獲取了鎖并彈出了堆頂元素
                        // 條件鎖的喚醒分成兩步,先從Condition的隊列里出隊
                        // 再入隊到AQS的隊列中,當(dāng)其它線程調(diào)用LockSupport.unpark(t)的時候才會真正喚醒
                        // 關(guān)于AQS我們后面會講的^^
                        available.awaitNanos(delay);
                    } finally {
                        // 如果leader還是當(dāng)前線程就把它置為空,讓其它線程有機(jī)會獲取元素
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 成功出隊后,如果leader為空且堆頂還有元素,就喚醒下一個等待的線程
        if (leader == null && q.peek() != null)
            // signal()只是把等待的線程放到AQS的隊列里面,并不是真正的喚醒
            available.signal();
        // 解鎖,這才是真正的喚醒
        lock.unlock();
    }
}

take()方法稍微要復(fù)雜一些:

(1)加鎖;

(2)判斷堆頂元素是否為空,為空的話直接阻塞等待;

(3)判斷堆頂元素是否到期,到期了直接調(diào)用優(yōu)先級隊列的poll()彈出元素;

(4)沒到期,再判斷前面是否有其它線程在等待,有則直接等待;

(5)前面沒有其它線程在等待,則把自己當(dāng)作第一個線程等待delay時間后喚醒,再嘗試獲取元素;

(6)獲取到元素之后再喚醒下一個等待的線程;

(7)解鎖;

使用方法

說了那么多,是不是還是不知道怎么用呢?那怎么能行,請看下面的案例:

public class DelayQueueTest {
    public static void main(String[] args) {
        DelayQueue queue = new DelayQueue<>();

        long now = System.currentTimeMillis();

        // 啟動一個線程從隊列中取元素
        new Thread(()->{
           while (true) {
                try {
                    // 將依次打印1000,2000,5000,7000,8000
                    System.out.println(queue.take().deadline - now);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 添加5個元素到隊列中
        queue.add(new Message(now + 5000));
        queue.add(new Message(now + 8000));
        queue.add(new Message(now + 2000));
        queue.add(new Message(now + 1000));
        queue.add(new Message(now + 7000));
    }
}

class Message implements Delayed {
    long deadline;

    public Message(long deadline) {
        this.deadline = deadline;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return deadline - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return String.valueOf(deadline);
    }
}

是不是很簡單,越早到期的元素越先出隊。

總結(jié)

(1)DelayQueue是阻塞隊列;

(2)DelayQueue內(nèi)部存儲結(jié)構(gòu)使用優(yōu)先級隊列;

(3)DelayQueue使用重入鎖和條件來控制并發(fā)安全;

(4)DelayQueue常用于定時任務(wù);

彩蛋

java中的線程池實(shí)現(xiàn)定時任務(wù)是直接用的DelayQueue嗎?

當(dāng)然不是,ScheduledThreadPoolExecutor中使用的是它自己定義的內(nèi)部類DelayedWorkQueue,其實(shí)里面的實(shí)現(xiàn)邏輯基本都是一樣的,只不過DelayedWorkQueue里面沒有使用現(xiàn)成的PriorityQueue,而是使用數(shù)組又實(shí)現(xiàn)了一遍優(yōu)先級隊列,本質(zhì)上沒有什么區(qū)別。


歡迎關(guān)注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

死磕 java集合之DelayQueue源碼分析

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。


文章標(biāo)題:死磕java集合之DelayQueue源碼分析-創(chuàng)新互聯(lián)
URL分享:http://www.dlmjj.cn/article/dcddce.html