新聞中心
前言
大家好,一直以來(lái)我都本著用最通俗的話理解核心的知識(shí)點(diǎn), 我認(rèn)為所有的難點(diǎn)都離不開 「基礎(chǔ)知識(shí)」 的鋪墊。

- 有一定的Java基礎(chǔ)
- 想學(xué)習(xí)或了解多線程開發(fā)
- 想提高自己的同學(xué)
背景
之前給大家講了一些框架的使用,這些都屬于業(yè)務(wù)層面的東西,你需要熟練掌握它并在項(xiàng)目中會(huì)運(yùn)用它即可,但這些對(duì)自身技術(shù)的積累是遠(yuǎn)遠(yuǎn)不夠的,如果你想要提高自己,對(duì)于語(yǔ)言本身你需要花更多的時(shí)間去挖掘而不是局限于框架的使用,所以之前為什么跟大家一直強(qiáng)調(diào)基礎(chǔ)的重要性,框架可以千變?nèi)f化,層出不窮,但是基礎(chǔ)它是不變的,不管是學(xué)java還是前端或者是其它語(yǔ)言, 這一點(diǎn)大家還是需要認(rèn)清的。
情景回顧
上期帶大家學(xué)習(xí)了什么是進(jìn)階學(xué)習(xí)了Thread以及分析了它的一些源碼,本期帶大家學(xué)習(xí)Callable、Future與FutureTask的用法以及源碼分析, 內(nèi)容較多, 我們一起來(lái)看一下吧~
Callable & Future
之前我們通過(guò)Runnable,Thread就可以創(chuàng)建一個(gè)線程,但是它也有一個(gè)局限,就是沒(méi)有返回值,有時(shí)候我們的需求需要結(jié)合多任務(wù)處理后的數(shù)據(jù)做一些事情,所以通過(guò)上邊的方法就不好解決了。
下面我們看一下Callable。
public interface Callable{
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
首先它是一個(gè)接口,且還提供了泛型的支持,call方法有返回值, 那怎么使用它呢,肯定是要實(shí)現(xiàn)它。
public class CallableTest {
public static class CallableDemo implements Callable {
@Override
public String call() throws Exception {
return "hello";
}
}
public static void main(String[] args) throws Exception {
CallableDemo demo = new CallableDemo();
String result = demo.call();
System.out.println(result);
System.out.println("main");
}
} 運(yùn)行一下實(shí)際輸出:
hello
main
發(fā)現(xiàn)返回的結(jié)果輸出出去了,但是這里有個(gè)問(wèn)題,這個(gè)main輸出在hello之后,似乎好像沒(méi)有開啟一個(gè)線程,依然是同步執(zhí)行的,是這樣嗎,我們看一下call內(nèi)部的線程環(huán)境。
public String call() throws Exception {
System.out.println(Thread.currentThread());
Thread.sleep(3000);
return "hello";
}運(yùn)行一下實(shí)際輸出:
Thread[main,5,main]
hello
main
好家伙,還是main線程內(nèi)部,并且線程還被阻塞了,原來(lái)new是開啟不了線程的,只是單純的實(shí)現(xiàn)了一下它的接口,我們姿勢(shì)搞錯(cuò)了。其實(shí)它的源碼上加了注釋的,說(shuō)通常會(huì)借助Excutors類使用,這個(gè)類是用來(lái)創(chuàng)建線程池的,這個(gè)我們后邊講,這里給大家演示一下。
public static void main(String[] args) throws Exception {
CallableDemo demo = new CallableDemo();
// 創(chuàng)建線程池
ExecutorService executor = Executors.newCachedThreadPool();
// 提交任務(wù)
Future future = executor.submit(demo);
System.out.println("main");
} 實(shí)際輸出:
main
Thread[pool-1-thread-1,5,main]
發(fā)現(xiàn)是單獨(dú)線程執(zhí)行的,并且沒(méi)有阻塞線程。我們發(fā)現(xiàn)這里也用到了Future,這個(gè)翻譯過(guò)來(lái)時(shí)未來(lái)的意思,這里也就是結(jié)果發(fā)生在后邊,它是一個(gè)異步情況, 那么我們?nèi)绾潍@取到結(jié)果呢?
System.out.println(future.get());
System.out.println("main");
實(shí)際輸出:
Thread[pool-1-thread-1,5,main]
hello
main
發(fā)現(xiàn)結(jié)果拿到了,但是運(yùn)行的時(shí)候好像線程被阻塞了,我們可以發(fā)現(xiàn)get()會(huì)導(dǎo)致線程阻塞,舉一反三,我想不阻塞的情況下拿到返回值,可以嗎那有什么辦法呢?開啟單獨(dú)的線程不就好了,那么在單獨(dú)的線程可以拿到其它線程的值嗎,我們來(lái)試一下。
new Thread(() -> {
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}).start();
System.out.println("main");實(shí)際運(yùn)行輸出:
Thread[pool-1-thread-1,5,main]
main
hello
發(fā)現(xiàn),這下就對(duì)了~
Future & FutureTask 源碼解析
端起小板凳,這部分好好聽,我們主要看下它的源碼實(shí)現(xiàn)。我們上文使用到了 Future,我們看一下它的定義,發(fā)現(xiàn)它也是一個(gè)接口。
public interface Future{
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
還有一個(gè)接口叫做RunnableFuture,FutureTask是它的一個(gè)實(shí)現(xiàn)類,這個(gè)類幫我實(shí)現(xiàn)了很多好用的方法,因?yàn)槲覀冏约簩?shí)現(xiàn)的話是很麻煩的。
public interface RunnableFutureextends Runnable, Future {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
之前的例子也可以用FutureTask改寫成:
public static void main(String[] args) throws Exception {
CallableDemo demo = new CallableDemo();
ExecutorService executor = Executors.newCachedThreadPool();
FutureTask futureTask = new FutureTask<>(demo);
executor.submit(futureTask);
System.out.println(futureTask.get());
} 它繼承了 Runnable, Future接口,我們之前調(diào)用的get方法就是其中之一,來(lái)一起看一下這個(gè)get是如何拿到值的,該部分源碼來(lái)自FutureTask類實(shí)現(xiàn)。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}這個(gè)state線程的狀態(tài)值,這里很好理解,一個(gè)是阻塞方法awaitDone,一個(gè)是拋出結(jié)果report,我們重點(diǎn)看一下awaitDone的實(shí)現(xiàn):
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
首先它是一個(gè)內(nèi)部方法,timed指定是否定時(shí)等待,如果傳true的話需要指定時(shí)間nanos。
// 銷亡時(shí)間 System.nanoTime() 正在運(yùn)行的 Java 虛擬機(jī)的高分辨率時(shí)間源的當(dāng)前值,以納秒為單位
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;它是一個(gè)鏈表結(jié)構(gòu) volatile 被用來(lái)修飾會(huì)被不同線程訪問(wèn)和修改的變量, 后邊還會(huì)講到,此處先有個(gè)印象。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}for (;;) {...},這是一個(gè)死循環(huán),這里就是阻塞部分了,內(nèi)部先會(huì)判斷線程狀態(tài)。
// 判斷線程狀態(tài) 如果中斷,直接拋出異常,并且將```q```從節(jié)點(diǎn)中移除
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
這里為什么會(huì)移除呢,想想看,如果不移除,內(nèi)部積累太多,每次都要遍歷它,如果是有競(jìng)爭(zhēng)的情況下,是不是很浪費(fèi)。這里主要是避免不必要的高額開銷。
// 線程狀態(tài) 最先是 NEW
int s = state;
if (s > COMPLETING) {
// 如果線程完成狀態(tài) 移除q節(jié)點(diǎn) 并返回當(dāng)前線程狀態(tài) 最終通過(guò)report返回結(jié)果
if (q != null)
q.thread = null;
return s;
}
這里為什么移除?因?yàn)橥瓿闪?,我只要結(jié)果就好了,不需要在進(jìn)一步判斷了。
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
如果處于COMPLETING,會(huì)讓出cpu時(shí)間。
else if (q == null)
q = new WaitNode();
這個(gè)很好理解,節(jié)點(diǎn)不存在就創(chuàng)建一個(gè)。
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
如果有新任務(wù)進(jìn)來(lái),會(huì)新建一個(gè)節(jié)點(diǎn),然后利用CAS操作放入waiter鏈表的頭部,這里是一個(gè)原子性操作,CAS的概念我們后邊給大家講,這里一切都是為了安全。
compareAndSwap是個(gè)原子方法,原理是CAS,即將內(nèi)存中的值與期望值進(jìn)行比較,如果相等,就將內(nèi)存中的值修改成新值并返回true。
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);這里判斷消亡時(shí)間,如果超時(shí)了,移除節(jié)點(diǎn),并返回線程狀態(tài),LockSupport使線程阻塞,有的同學(xué)可能會(huì)問(wèn),for不是已經(jīng)阻塞了嗎那為啥還調(diào)用LockSupport,這里其實(shí)是線程優(yōu)化,想想你一直for循環(huán)一直判斷是不是也會(huì)產(chǎn)生開銷,加上LockSupport避免不要的操作,其實(shí)for的整個(gè)過(guò)程是實(shí)現(xiàn)了自旋鎖的操作。
阻塞了不就沒(méi)法執(zhí)行了嗎,park加鎖方法還有一個(gè)對(duì)應(yīng)的unpark相當(dāng)于釋放鎖,但此處沒(méi)有看到這個(gè)方法,那么它在哪個(gè)地方呢我們大體應(yīng)該可以猜到,它應(yīng)該是在執(zhí)行階段,還記得RunnableFuture接口下的run方法嗎?下面我們看一下它的實(shí)現(xiàn)。
public void run() {
// 判斷線程狀態(tài) 如果不為NEW 或者 并判斷值是否一樣,如果不一樣就直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 這一步是執(zhí)行我們的任務(wù)
Callable c = callable;
// 如果任務(wù)存在 并且處于NEW狀態(tài)
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 執(zhí)行任務(wù)
result = c.call();
ran = true;
} catch (Throwable ex) {
// 異常檢測(cè)
result = null;
ran = false;
setException(ex);
}
// 執(zhí)行成功,設(shè)置返回值
if (ran)
set(result);
}
} finally {
// 這里其實(shí)是釋放階段 防止并發(fā)調(diào)用
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 這一步其實(shí)是防止在中斷時(shí)提交任務(wù),內(nèi)部是調(diào)用了一個(gè)Thread.yield()
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
} 下面我們重點(diǎn)看一下這個(gè)set方法。
protected void set(V v) {
// 先比較是否相同
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// outcome 是返回的結(jié)果或者異常 setException這里是設(shè)置異常結(jié)果 異常賦值給outcome
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 設(shè)置最終狀態(tài)
finishCompletion();
}
}UNSAFE類是一個(gè)很特殊的類,它的內(nèi)部幾乎都是native方法,它可以使得我們能夠操作內(nèi)存空間來(lái)獲得更高的性能,但一般我們很少使用它,因?yàn)樗槐籫c控制,使用不當(dāng)jvm可能都會(huì)掛了。我們重點(diǎn)關(guān)注一下 finishCompletion這個(gè)方法。
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 遍歷節(jié)點(diǎn)釋放鎖
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 默認(rèn)下它是一個(gè)空方法,可以用于執(zhí)行完成的回調(diào)方法, 可以覆蓋實(shí)現(xiàn)
done();
callable = null; // to reduce footprint
}我們可以看到在這個(gè)內(nèi)部它是調(diào)了一個(gè)unpark方法的,可以看出之前awaitDone()方法內(nèi)部的線程阻塞在這個(gè)地方被喚醒了, 再回回過(guò)頭看awaitDone()方法,就明白為啥要調(diào)用park方法了,因?yàn)榫€程沒(méi)有達(dá)到大于COMPLETING狀態(tài),它會(huì)一直for。
最后一個(gè)就是report了,返回值。
private V report(int s) throws ExecutionException {
Object x = outcome;
// 在set的時(shí)候 我們可以看到有設(shè)置為這個(gè)狀態(tài)。 V就是傳入的類型
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}于是我們的get就拿到返回值了。
FutureTask 狀態(tài)
這里給大家補(bǔ)充一下FutureTask的狀態(tài)值。
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
state可能的狀態(tài)轉(zhuǎn)變路徑如下:
- NEW -> COMPLETING -> NORMAL。
- NEW -> COMPLETING -> EXCEPTIONAL。
- NEW -> CANCELLED。
- NEW -> INTERRUPTING -> INTERRUPTED。
結(jié)束語(yǔ)
本期到這里就結(jié)束了, 總結(jié)一下,本節(jié)主要講了Callable、Future與FutureTask的常用方法,以及從問(wèn)題觸發(fā),帶大家分析了一下FutureTask的源碼,這里大家要好好理解,不要去背,想要告訴大家的是學(xué)習(xí)要帶著問(wèn)題, 看源碼一定要大膽猜測(cè),冷靜分析 ~
本文標(biāo)題:Java多線程專題之Callable、Future與FutureTask
標(biāo)題路徑:http://www.dlmjj.cn/article/cooehid.html


咨詢
建站咨詢
