新聞中心
前言
本節(jié)以ArrayBlockingQueue為例, 帶大家看下阻塞隊列是如何實現(xiàn),一起來看下吧!

創(chuàng)新互聯(lián)是一家網(wǎng)站設(shè)計公司,集創(chuàng)意、互聯(lián)網(wǎng)應(yīng)用、軟件技術(shù)為一體的創(chuàng)意網(wǎng)站建設(shè)服務(wù)商,主營產(chǎn)品:響應(yīng)式網(wǎng)站設(shè)計、成都品牌網(wǎng)站建設(shè)、營銷型網(wǎng)站。我們專注企業(yè)品牌在網(wǎng)站中的整體樹立,網(wǎng)絡(luò)互動的體驗,以及在手機等移動端的優(yōu)質(zhì)呈現(xiàn)。網(wǎng)站制作、做網(wǎng)站、移動互聯(lián)產(chǎn)品、網(wǎng)絡(luò)運營、VI設(shè)計、云產(chǎn)品.運維為核心業(yè)務(wù)。為用戶提供一站式解決方案,我們深知市場的競爭激烈,認真對待每位客戶,為客戶提供賞析悅目的作品,網(wǎng)站的價值服務(wù)。
ArrayBlockingQueue 源碼分析
構(gòu)造函數(shù)
同樣的,我們先從它的構(gòu)造函數(shù)看起。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}- capacity 固定容量大小。
- false,這個字段名稱其實是fair默認下它是false,非公平鎖。
??上節(jié)??我們使用的就是它的默認用法,公平鎖和非公平鎖我們之前講過,可以查看以往文章(ReentrantLock源碼分析)。下面我們接著看:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}從上面的代碼來看,可知capacity > 0,第一個構(gòu)造函數(shù)的this()其實就是調(diào)的這個構(gòu)造函數(shù),我們可以通過它來指定容量和訪問策略(fair 和 nofair)的ArrayBlockingQueue。
再接著看最后一個構(gòu)造函數(shù)。
public ArrayBlockingQueue(int capacity, boolean fair,
Collection extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
// 鎖用于可見性
lock.lock();
try {
int i = 0;
try {
// 迭代集合
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
// 捕獲異常 越界
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
從代碼來看,對比上一個多了個Collection,這是干嘛的呢?它允許我們在創(chuàng)建的時候初始化一個集合進去,按迭代順序添加到容器,從它的內(nèi)部我們也可以看出來。
內(nèi)部變量
// 隊列的元素
final Object[] items;
// 獲取下一個元素時的索引
int takeIndex;
// 下一個添加元素時的索引
int putIndex;
// 隊列的元素數(shù)量
int count;
// 全局鎖
final ReentrantLock lock;
// 等待條件
private final Condition notEmpty;
private final Condition notFull;
// 迭代器的共享狀態(tài)
transient Itrs itrs = null;
內(nèi)部方法
add() & offer()
我們看下add方法,這個方法用于向隊列中添加元素。
public boolean add(E e) {
return super.add(e);
}內(nèi)部調(diào)用了父類的方法,它繼承了AbstractQueue。
public class ArrayBlockingQueueextends AbstractQueue
implements BlockingQueue, java.io.Serializable {....}
接著看AbstractQueue的add()。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}可以看到內(nèi)部調(diào)用了offer(), 如果添加成功就返回true,失敗就拋出異常, 這符合我們上節(jié)使用時它的特點。
但是,我們發(fā)現(xiàn)在它的內(nèi)部并沒有offer方法,所以實現(xiàn)不在AbstractQueue,實現(xiàn)還是在ArrayBlockingQueue。
來看下ArrayBlockingQueue的offer()方法。
public boolean offer(E e) {
// 判斷元素 e 是否為空,空拋出 NullPointerException 異常
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 需要持鎖
lock.lock();
try {
// 如果元素已滿 返回false, 對標 add 就會拋出異常
if (count == items.length)
return false;
else {
// 添加到隊列中
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}看下enqueue:
private void enqueue(E x) {
final Object[] items = this.items;
// 將元素添加到預(yù)期的索引位置
items[putIndex] = x;
// 如果下個索引值等于容器數(shù)量值 將putIndex歸0
if (++putIndex == items.length)
putIndex = 0;
// 容器元素數(shù)量+1
count++;
// 喚醒等待的線程
notEmpty.signal();
}remove & poll
先看第一個 remove(), 同樣的這個方法存在于 AbstractQueue內(nèi)部,如果被移除的元素為null則拋出異常。
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}poll()的實現(xiàn)在ArrayBlockingQueue,內(nèi)部實現(xiàn)方式跟add很像。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}private E dequeue() {
final Object[] items = this.items;
// 這個注解用于忽略一些警告 這不是重點
@SuppressWarnings("unchecked")
// 取出元素 takeIndex 按照 FIFO
E x = (E) items[takeIndex];
// 元素取出時 置為空
items[takeIndex] = null;
// 判斷下一個元素的位置
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 喚醒等待的線程
notFull.signal();
// 返回元素
return x;
}第二個remove(e), 這個實現(xiàn)在ArrayBlockingQueue的內(nèi)部,可以移除指定元素。
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
// 遍歷移除指定元素
if (o.equals(items[i])) {
// 移除指定元素 并更新對應(yīng)的索引位置
removeAt(i);
return true;
}
// 防止越界
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}take
take會造成線程阻塞下面我看下它的內(nèi)部實現(xiàn)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 獲取鎖,當線程中斷會直接返回
lock.lockInterruptibly();
try {
// 如果元素內(nèi)部為空 會進入阻塞,意思是沒有元素可拿了,進入等待
while (count == 0)
// 使當前線程等待
notEmpty.await();
// 否則出列
return dequeue();
} finally {
lock.unlock();
}
}put
該方法實現(xiàn)跟 take類似, 也會阻塞線程。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果元素滿了 就阻塞
while (count == items.length)
notFull.await();
// 否則入列
enqueue(e);
} finally {
lock.unlock();
}
}結(jié)束語
本節(jié)主要給大家講了ArrayBlockingQueue的源碼實現(xiàn),它的源碼相對簡單一些, 大家可以根據(jù)本節(jié)看下BlockingQueue其它的實現(xiàn)類。
網(wǎng)頁標題:面試官:阻塞隊列的底層實現(xiàn)有了解過嗎?
網(wǎng)頁路徑:http://www.dlmjj.cn/article/copehdj.html


咨詢
建站咨詢
