新聞中心
這篇文章主要介紹“怎么快速了解Java線程池”,在日常操作中,相信很多人在怎么快速了解Java線程池問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么快速了解Java線程池”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
成都創(chuàng)新互聯(lián)專注于成都網(wǎng)站制作、網(wǎng)站設(shè)計、網(wǎng)頁設(shè)計、網(wǎng)站制作、網(wǎng)站開發(fā)。公司秉持“客戶至上,用心服務(wù)”的宗旨,從客戶的利益和觀點出發(fā),讓客戶在網(wǎng)絡(luò)營銷中找到自己的駐足之地。尊重和關(guān)懷每一位客戶,用嚴謹?shù)膽B(tài)度對待客戶,用專業(yè)的服務(wù)創(chuàng)造價值,成為客戶值得信賴的朋友,為客戶解除后顧之憂。
老王 是個深耕在帝都的一線碼農(nóng),辛苦一年掙了點錢,想把錢存儲到銀行卡里,錢銀行卡辦理遇到了如下的遭遇
老王 銀行門口取號后發(fā)現(xiàn)有柜臺營業(yè)但是沒人辦理業(yè)務(wù) 直接辦理
了。老王 取號后發(fā)現(xiàn)柜臺都有人在辦理,等待席有空地,去 坐著等
辦理去了。老王 取號后發(fā)現(xiàn)柜臺都有人辦理,等待席也人坐滿了,這個時候銀行經(jīng)理看到小麥是老實人本著關(guān)愛老實人的態(tài)度,新開一個 臨時窗口給他辦理了。 老王 取號后發(fā)現(xiàn)柜臺都滿了,等待座位席也滿了, 臨時窗口也人滿了。這個時候銀行經(jīng)理給出了若干 解決策略
。
直接告知人太多不給你辦理了。 看到 老王 就來氣,也不給不辦理也不讓他走。 經(jīng)理讓 老王 取嘗試跟座位席中最前面的人聊一聊看是否可以加塞,可以就辦理,不可以還是被踢走。 經(jīng)理直接跟 老王 說誰讓你來的你找誰去我這辦理不了。

上面的這個流程幾乎就跟JDK線程池的大致流程類似,
營業(yè)中的3個窗口對應(yīng)核心線程池數(shù):corePoolSize 銀行總的營業(yè)窗口數(shù)對應(yīng):maximumPoolSize 打開的臨時窗口在多少時間內(nèi)無人辦理則關(guān)閉對應(yīng):unit 銀行里的等待座椅就是等待隊列:workQueue 無法辦理的時候銀行給出的解決方法對應(yīng):RejectedExecutionHandler threadFactory 該參數(shù)在JDK中是 線程工廠,用來創(chuàng)建線程對象,一般不會動。
5分鐘線程池的核心工作流程講解完畢,更細節(jié)的知識看下面。
什么是線程池
簡單理解就是 預(yù)先創(chuàng)建好一定數(shù)量的線程對象,存入緩沖池中,需要用的時候直接從緩沖池中取出,用完之后不要銷毀,還回到緩沖池中。
線程池存在必要性
提高線程的利用率,降低資源的消耗。 提高響應(yīng)速度,線程的創(chuàng)建時間為T1,執(zhí)行時間T2,銷毀時間T3,用線程池可以免去T1和T3的時間。 便于統(tǒng)一管理線程對象 可控制最大并發(fā)數(shù)
手動實現(xiàn)
如果先不看線程池源碼讓我們自己手動實現(xiàn)一個線程池你可以考慮到幾個重要點?
有若干個初始化好的線程數(shù)組來充當(dāng)線程池。 線程池要去一個 等待的任務(wù)隊列 中去拿任務(wù)。
簡單來說就是初始化N個線程充當(dāng)線程池然后一起去阻塞隊列中進行阻塞take,新添加的任務(wù)都通過put將任務(wù)追加到任務(wù)隊列,關(guān)于任務(wù)隊列的講解看這blog
核心類
public class MyThreadPool2 {
// 線程池中默認線程的個數(shù)為5
private static int WORK_NUM = 5;
// 隊列默認任務(wù)個數(shù)為100 來不及保存任務(wù)
private static int TASK_COUNT = 100;
// 工作線程組
private WorkThread[] workThreads;
// 任務(wù)隊列,作為一個緩沖
private final BlockingQueue taskQueue;
//用戶在構(gòu)造這個池,希望的啟動的線程數(shù)
private final int worker_num;
// 創(chuàng)建具有默認線程個數(shù)的線程池
public MyThreadPool2() {
this(WORK_NUM, TASK_COUNT);
}
// 創(chuàng)建線程池,worker_num為線程池中工作線程的個數(shù)
public MyThreadPool2(int worker_num, int taskCount) {
if (worker_num <= 0) worker_num = WORK_NUM;
if (taskCount <= 0) taskCount = TASK_COUNT;
this.worker_num = worker_num;
taskQueue = new ArrayBlockingQueue<>(taskCount);
workThreads = new WorkThread[worker_num];
for (int i = 0; i < worker_num; i++) {
workThreads[i] = new WorkThread();
workThreads[i].start();
}
Runtime.getRuntime().availableProcessors();
}
// 執(zhí)行任務(wù),其實只是把任務(wù)加入任務(wù)隊列,什么時候執(zhí)行有線程池管理器決定
public void execute(Runnable task) {
try {
taskQueue.put(task);// 阻塞 放置任務(wù)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 銷毀線程池,該方法保證在所有任務(wù)都完成的情況下才銷毀所有線程,否則等待任務(wù)完成才銷毀
public void destroy() {
// 工作線程停止工作,且置為null
System.out.println("準備關(guān)閉線程池");
for (int i = 0; i < worker_num; i++) {
workThreads[i].stopWorker();
workThreads[i] = null;//help gc
}
taskQueue.clear();// 清空任務(wù)隊列
}
// 覆蓋toString方法,返回線程池信息:工作線程個數(shù)和已完成任務(wù)個數(shù)
@Override
public String toString() {
return "線程池大小 :" + worker_num + " 等待執(zhí)行任務(wù)個數(shù):" + taskQueue.size();
}
//內(nèi)部類,工作線程
private class WorkThread extends Thread {
@Override
public void run() {
Runnable r = null;
try {
while (!isInterrupted()) {
r = taskQueue.take();//阻塞獲得任務(wù)
if (r != null) {
System.out.println(getId() + " 準備執(zhí)行 :" + r);
r.run();
}
r = null; //help gc;
}
} catch (Exception e) {
// TODO: handle exception
}
}
public void stopWorker() {
interrupt();
}
}
}
測試類
public class TestMyThreadPool {
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建3個線程的線程池
MyThreadPool2 t = new MyThreadPool2(3, 5);
t.execute(new MyTask("testA"));
t.execute(new MyTask("testB"));
t.execute(new MyTask("testC"));
t.execute(new MyTask("testD"));
t.execute(new MyTask("testE"));
System.out.println(t);
Thread.sleep(10000);
t.destroy();// 所有線程都執(zhí)行完成 才destory
System.out.println(t);
}
// 任務(wù)類
static class MyTask implements Runnable {
private String name;
private Random r = new Random();
public MyTask(String name) {
this.name = name;
}
public String getName() {
return name;
}
@Override
public void run() {// 執(zhí)行任務(wù)
try {
Thread.sleep(r.nextInt(1000) + 2000); //隨機休眠
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getId() + " 被打斷:"
+ Thread.currentThread().isInterrupted());
}
System.out.println("任務(wù) " + name + " 完成");
}
}
}
ThreadPoolExecutor
ThreadPoolExecutor
是JDK中所有線程池實現(xiàn)類的父類,構(gòu)造函數(shù)有多個入?yún)⑼ㄟ^靈活的組合來實現(xiàn)線程池的初始化,核心構(gòu)造函數(shù)如下。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
重要參數(shù)解析
corePoolSize
此值是用來初始化線程池中核心線程數(shù),當(dāng)線程池中線程池數(shù)<
corePoolSize
時,系統(tǒng)默認是添加一個任務(wù)才創(chuàng)建一個線程池??梢酝ㄟ^調(diào)用prestartAllCoreThreads
方法一次性的啟動corePoolSize
個數(shù)的線程。當(dāng)線程數(shù) = corePoolSize時,新任務(wù)會追加到workQueue中。
maximumPoolSize
maximumPoolSize
表示允許的最大線程數(shù) = (非核心線程數(shù)+核心線程數(shù)),當(dāng)BlockingQueue
也滿了,但線程池中總線程數(shù) <maximumPoolSize
時候就會再次創(chuàng)建新的線程。
keepAliveTime
非核心線程 =(maximumPoolSize - corePoolSize ) ,非核心線程閑置下來不干活最多存活時間。
unit
線程池中非核心線程保持存活的時間
TimeUnit.DAYS; 天 TimeUnit.HOURS; 小時 TimeUnit.MINUTES; 分鐘 TimeUnit.SECONDS; 秒 TimeUnit.MILLISECONDS; 毫秒 TimeUnit.MICROSECONDS; 微秒 TimeUnit.NANOSECONDS; 納秒
workQueue
線程池 等待隊列,維護著等待執(zhí)行的
Runnable
對象。當(dāng)運行當(dāng)線程數(shù)= corePoolSize時,新的任務(wù)會被添加到workQueue
中,如果workQueue
也滿了則嘗試用非核心線程執(zhí)行任務(wù),另外等待隊列盡量用有界的哦??!
threadFactory
創(chuàng)建一個新線程時使用的工廠,可以用來設(shè)定線程名、是否為daemon線程等等。
handler
corePoolSize
、workQueue
、maximumPoolSize
都不可用的時候執(zhí)行的 飽和策略。AbortPolicy :直接拋出異常,默認用此 CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù) DiscardOldestPolicy:丟棄阻塞隊列里最老的任務(wù),隊列里最靠前的任務(wù) DiscardPolicy :當(dāng)前任務(wù)直接丟棄 想實現(xiàn)自己的飽和策略,實現(xiàn)RejectedExecutionHandler接口即可
形象流程圖如下:
提交
execute 不需要返回
// 核心思想跟上面的流程圖類似
public void execute(Runnable command) {
if (command == null) //規(guī)范性檢查
throw new NullPointerException();
int c = ctl.get();//當(dāng)前工作的線程數(shù)跟線程狀態(tài) ctl = AtomicInteger CAS級別
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
// 如果當(dāng)前線程池中工作線程數(shù)小于核心線程數(shù),直接添加任務(wù) 然后return
return;
c = ctl.get();// 添加失敗了重新獲得線程池中工作線程數(shù)
}
if (isRunning(c) && workQueue.offer(command)) {
// 線程池狀態(tài)是否處于可用,可用就嘗試將線程添加到queue
int recheck = ctl.get();// 獲得線程池狀態(tài)
if (! isRunning(recheck) && remove(command))
reject(command);// 如果線程狀態(tài)不在運行中 則remove 該任務(wù)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))// 嘗試將任務(wù)用非核心線程執(zhí)行,
reject(command);//失敗了則執(zhí)行失敗策略。
}
submit 需要返回值 ThreadPoolExecutor extends AbstractExecutorService
父類中存在一個submit方法,public
Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFutureftask = newTaskFor(task);
execute(ftask);
return ftask;
}
關(guān)閉線程池
注意線程之間是協(xié)作式的哦,所以的關(guān)閉只是發(fā)出關(guān)閉指令。
shutdown() 將線程池狀態(tài)置為shutdown,并不會立即停止:
停止接收外部submit的任務(wù) 內(nèi)部正在跑的任務(wù)和隊列里等待的任務(wù),會執(zhí)行完 等到第二步完成后,才真正停止
shutdownNow() 將線程池狀態(tài)置為stop。企圖立即停止,事實上不一定:
跟shutdown()一樣,先停止接收外部提交的任務(wù) 忽略隊列里等待的任務(wù) 嘗試將正在跑的任務(wù)interrupt中斷 返回未執(zhí)行的任務(wù)列表
shutdown 跟shutdownnow簡單來說區(qū)別如下:
shutdownNow()能立即停止線程池,正在跑的和正在等待的任務(wù)都停下了。這樣做立即生效,但是風(fēng)險也比較大。shutdown()只是關(guān)閉了提交通道,用submit()是無效的;而內(nèi)部該怎么跑還是怎么跑,跑完再停。
awaitTermination
pool.showdown()
boolean b = pool.awaitTermination(3, TimeUnit.SECONDS)
awaitTermination
有兩個參數(shù),一個是timeout即超時時間,另一個是unit即時間單位。這個方法會使線程等待timeout時長,當(dāng)超過timeout時間后,會監(jiān)測ExecutorService
是否已經(jīng)關(guān)閉,若關(guān)閉則返回true,否則返回false。一般情況下會和shutdown方法組合使用,調(diào)用后當(dāng)前線程會阻塞,直到
等所有已提交的任務(wù)(包括正在跑的和隊列中等待的)執(zhí)行完 或者等超時時間到 或者線程被中斷,拋出InterruptedException
總結(jié)
優(yōu)雅的關(guān)閉,用shutdown() 想立馬關(guān)閉,并得到未執(zhí)行任務(wù)列表,用shutdownNow() 優(yōu)雅的關(guān)閉,發(fā)出關(guān)閉指令后看下是否真的關(guān)閉了用awaitTermination()。
合理配置線程池
線程在Java中屬于稀缺資源,線程池不是越大越好也不是越小越好。任務(wù)分為計算密集型、IO密集型、混合型。
計算密集型:大部分都在用CPU跟內(nèi)存,加密,邏輯操作業(yè)務(wù)處理等。 IO密集型:數(shù)據(jù)庫鏈接,網(wǎng)絡(luò)通訊傳輸?shù)取?/section>
計算密集型一般推薦線程池不要過大,一般是CPU數(shù) + 1,+1是因為可能存在 頁缺失(就是可能存在有些數(shù)據(jù)在硬盤中需要多來一個線程將數(shù)據(jù)讀入內(nèi)存)。如果線程池數(shù)太大,可能會頻繁的 進行線程上下文切換跟任務(wù)調(diào)度。獲得當(dāng)前CPU核心數(shù)代碼如下:
Runtime.getRuntime().availableProcessors();
IO密集型:線程數(shù)適當(dāng)大一點,機器的Cpu核心數(shù)*2。 混合型:如果密集型站大頭則拆分的必要性不大,如果IO型占據(jù)不少有必要,Mark 下。
常見線程池
每個線程池都是一個實現(xiàn)了接口ExecutorService
并且繼承自ThreadPoolExecutor
的具體實現(xiàn)類,這些類的創(chuàng)建統(tǒng)一由一個工廠類Executors
來提供對外創(chuàng)建接口。Executors框架圖如下:
ThreadPoolExecutor
中一個線程就是一個Worker
對象,它與一個線程綁定,當(dāng)Worker
執(zhí)行完畢就是線程執(zhí)行完畢。而Worker帶了鎖AQS,根據(jù)我后面準備寫的讀寫鎖的例子,發(fā)現(xiàn)線程池是線程安全的??纯磮D二的類圖。下面簡單介紹幾個常用的線程池模式。
FixedThreadPool
定長的線程池,有核心線程,核心線程的即為最大的線程數(shù)量,沒有非核心線程。 使用的 無界的等待隊列是 LinkedBlockingQueue
。使用時候小心堵滿等待隊列。
SingleThreadPool
只有一條線程來執(zhí)行任務(wù),適用于有順序的任務(wù)的應(yīng)用場景,也是用的無界等待隊列
CachedThreadPool
可緩存的線程池,該線程池中沒有核心線程,非核心線程的數(shù)量為Integer.max_value,就是無限大,當(dāng)有需要時創(chuàng)建線程來執(zhí)行任務(wù),沒有需要時回收線程,適用于耗時少,任務(wù)量大的情況。任務(wù)隊列用的是SynchronousQueue如果生產(chǎn)多快消費慢,則會導(dǎo)致創(chuàng)建很多線程需注意。
WorkStealingPool
JDK7以后 基于ForkJoinPool實現(xiàn)。PS:其中
FixedThreadPool
、SingleThreadPool
、CachedThreadPool
都用的無界等待隊列,因此實際工作中都不建議這樣做的哦,阿里巴巴Java編程規(guī)范建議如下:最后來個簡單的線程使用demo:
public class UseThreadPool
{
// 工作線程
static class Worker implements Runnable
{
private String taskName;
private Random r = new Random();
public Worker(String taskName)
{
this.taskName = taskName;
}
public String getName()
{
return taskName;
}
@Override
public void run()
{
System.out.println(Thread.currentThread().getName() + " 當(dāng)前任務(wù): " + taskName);
try
{
TimeUnit.MILLISECONDS.sleep(r.nextInt(100) * 5);
} catch (Exception e)
{
e.printStackTrace();
}
}
}
static class CallWorker implements Callable
{
private String taskName;
private Random r = new Random();
public CallWorker(String taskName)
{
this.taskName = taskName;
}
public String getName()
{
return taskName;
}
@Override
public String call() throws Exception
{
System.out.println(Thread.currentThread().getName() + " 當(dāng)前任務(wù) : " + taskName);
return Thread.currentThread().getName() + ":" + r.nextInt(100) * 5;
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException
{
ExecutorService pool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
// ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
{
Worker worker = new Worker("Runnable_" + i);
pool.execute(worker);
}
for (int i = 0; i < 5; i++)
{
CallWorker callWorker = new CallWorker("CallWorker_" + i);
Future result = pool.submit(callWorker);
System.out.println(result.get());
}
pool.shutdown();
}
}
ScheduledThreadPoolExecutor
周期性執(zhí)行任務(wù)的線程池,按照某種特定的計劃執(zhí)行線程中的任務(wù),有核心線程,但也有非核心線程,非核心線程的大小也為無限大。適用于執(zhí)行周期性的任務(wù)。看構(gòu)造函數(shù):調(diào)用的還是
ThreadPoolExecutor
構(gòu)造函數(shù),區(qū)別不同點在于任務(wù)隊列是用的DelayedWorkQueue,沒什么新奇的了。
核心函數(shù)講解:
schedule 只執(zhí)行一次,任務(wù)還可以延時執(zhí)行,傳入待執(zhí)行任務(wù)跟延時時間。 scheduleAtFixedRate 提交固定時間間隔的任務(wù),提交任務(wù),延時時間,已經(jīng)循環(huán)時間間隔時間。這個的含義是只是在固定的時間間隔嘗試運行該任務(wù)。 scheduleWithFixedDelay 提交固定延時間隔執(zhí)行的任務(wù)。上一個任務(wù)執(zhí)行完畢后等多久再執(zhí)行下個任務(wù),這個中間時間叫 FixedDelay
其中
scheduleAtFixedRate
跟scheduleWithFixedDelay
區(qū)別如下圖scheduleAtFixedRate任務(wù)超時狀態(tài),比如我們設(shè)定60s執(zhí)行一次,其中第一個任務(wù)時長 80s,第二個任務(wù)20s,第三個任務(wù) 50s。
第一個任務(wù)第0秒開始,第80s結(jié)束. 第二個任務(wù)第80s開始,在第100s結(jié)束. 第三個任務(wù)第120s秒開始,170s結(jié)束. 第四個任務(wù)從180s開始.
簡單Mark個循環(huán)任務(wù)demo:
class ScheduleWorker implements Runnable {
public final static int Normal = 0;//普通任務(wù)類型
public final static int HasException = -1;//會拋出異常的任務(wù)類型
public final static int ProcessException = 1;//拋出異常但會捕捉的任務(wù)類型
public static SimpleDateFormat formater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private int taskType;
public ScheduleWorker(int taskType) {
this.taskType = taskType;
}
@Override
public void run() {
if (taskType == HasException) {
System.out.println(formater.format(new Date()) + " 異常產(chǎn)生");
throw new RuntimeException("有異常");
} else if (taskType == ProcessException) {
try {
System.out.println(formater.format(new Date()) + " 異常產(chǎn)生被捕捉");
throw new RuntimeException("異常被捕捉");//異常導(dǎo)致下個任務(wù)無法執(zhí)行
} catch (Exception e) {
System.out.println(" 異常被主播");
}
} else {
System.out.println("正常" + formater.format(new Date()));
}
}
}
public class SchTest{
public static void main(String[] args) {
ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);
schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.HasException),
1000, 3000, TimeUnit.MILLISECONDS); // 任務(wù)在 1秒后執(zhí)行 周期3秒
schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.Normal),
1000, 3000, TimeUnit.MILLISECONDS);
}
}
CompletionService
JDK8中新添加的一個類,攝像一個場景你去詢問兩個商品價格然后將價格保存數(shù)據(jù)庫。
ExecutorService executor =Executors.newFixedThreadPool(2);
// 異步向電商 S1 詢價
Future f1 = executor.submit(()->getPriceByS1());
// 異步向電商 S2 詢價
Future f2 = executor.submit(()->getPriceByS2());
// 獲取電商 S1 報價并保存
r=f1.get();
executor.execute(()->save(r));
// 獲取電商 S2 報價并保存
r=f2.get();
executor.execute(()->save(r));
上面的這個方案本身沒有太大問題,但是有個地方的處理需要你注意,那就是如果獲取電商 S1 報價的耗時很長,那么即便獲取電商 S2 報價的耗時很短,也無法讓保存 S2 報價的操作先執(zhí)行,因為這個主線程都阻塞在了 f1.get(),那我們?nèi)绾谓鉀Q了?解決方法:結(jié)果都存入到一個阻塞隊列中去。
// 創(chuàng)建阻塞隊列
BlockingQueue bq =new LinkedBlockingQueue<>();
// 電商 S1 報價異步進入阻塞隊列
executor.execute(()->bq.put(f1.get()));
// 電商 S2 報價異步進入阻塞隊列
executor.execute(()->bq.put(f2.get()));
// 異步保存所有報價
for (int i=0; i<2; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}
在JDK8中不建議上面的工作都手動實現(xiàn),JDK提供了CompletionService
,它實現(xiàn)原理也是內(nèi)部維護了一個阻塞隊列,它的核心功效就是讓先執(zhí)行的任務(wù)先放到結(jié)果集。當(dāng)任務(wù)執(zhí)行結(jié)束就把任務(wù)的執(zhí)行結(jié)果加入到阻塞隊列中,不同的是CompletionService
是把任務(wù)執(zhí)行結(jié)果的 Future 對象加入到阻塞隊列中,而上面的示例代碼是把任務(wù)最終的執(zhí)行結(jié)果放入了阻塞隊列中。CompletionService
將Executor
和BlockingQueue
的功能融合在一起,CompletionService
內(nèi)部有個阻塞隊列。CompletionService
接口的實現(xiàn)類是 ExecutorCompletionService
,這個實現(xiàn)類的構(gòu)造方法有兩個,分別是:
ExecutorCompletionService(Executor executor)
ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue)
這兩個構(gòu)造方法都需要傳入一個線程池,如果不指定 completionQueue,那么默認會使用無界的 LinkedBlockingQueue。任務(wù)執(zhí)行結(jié)果的 Future對象就是加入到 completionQueue 中。
// 創(chuàng)建線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 創(chuàng)建 CompletionService
CompletionService cs = new ExecutorCompletionService<>(executor);
// 異步向電商 S1 詢價
cs.submit(()->getPriceByS1());
// 異步向電商 S2 詢價
cs.submit(()->getPriceByS2());
// 將詢價結(jié)果異步保存到數(shù)據(jù)庫
for (int i=0; i<2; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}
來一個整體的demo加深印象:
// 任務(wù)類
class WorkTask implements Callable
{
private String name;
public WorkTask(String name)
{
this.name = name;
}
@Override
public Integer call()
{
int sleepTime = new Random().nextInt(1000);
try
{
Thread.sleep(sleepTime);
} catch (InterruptedException e)
{
e.printStackTrace();
}
return sleepTime;
}
}
public class CompletionCase
{
private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors();
public void selfByQueue() throws Exception
{
long start = System.currentTimeMillis(); // 統(tǒng)計所有任務(wù)休眠的總時長
AtomicInteger count = new AtomicInteger(0);
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE); // 創(chuàng)建線程池
BlockingQueue> queue = new LinkedBlockingQueue>();//容器存放提交給線程池的任務(wù),list,map,
for (int i = 0; i < TOTAL_TASK; i++)
{
Future future = pool.submit(new WorkTask("要執(zhí)行的第幾個任務(wù)" + i));
queue.add(future);//i=0 先進隊列,i=1的任務(wù)跟著進
}
for (int i = 0; i < TOTAL_TASK; i++)
{
int sleptTime = queue.take().get(); // 檢查線程池任務(wù)執(zhí)行結(jié)果 i=0先取到,i=1的后取到
System.out.println(" 休眠毫秒數(shù) = " + sleptTime + " ms ");
count.addAndGet(sleptTime);
}
pool.shutdown();
System.out.println("休眠時間" + count.get() + "ms,耗時時間" + (System.currentTimeMillis() - start) + " ms");
}
public void testByCompletion() throws Exception
{
long start = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
// 創(chuàng)建線程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
CompletionService cService = new ExecutorCompletionService<>(pool);
// 向里面扔任務(wù)
for (int i = 0; i < TOTAL_TASK; i++)
{
cService.submit(new WorkTask("執(zhí)行任務(wù)" + i));
}
// 檢查線程池任務(wù)執(zhí)行結(jié)果
for (int i = 0; i < TOTAL_TASK; i++)
{
int sleptTime = cService.take().get();
System.out.println("休眠毫秒數(shù) = " + sleptTime + " ms ...");
count.addAndGet(sleptTime);
}
pool.shutdown();
System.out.println("休眠時間 " + count.get() + "ms,耗時時間" + (System.currentTimeMillis() - start) + " ms");
}
public static void main(String[] args) throws Exception
{
CompletionCase t = new CompletionCase();
t.selfByQueue();
t.testByCompletion();
}
}

常見考題
為什么用線程池? 線程池的作用? 常用的線程池模版? 7大重要參數(shù)? 4大拒絕策略? 常見線程池任務(wù)隊列,如何理解有界跟無界?7.如何分配線程池個數(shù)? 單機線程池執(zhí)行一般斷電了如何考慮? 正在處理的實現(xiàn)事務(wù)功能,下次自動回滾,隊列實現(xiàn)持久化儲存,下次啟動自動載入。
設(shè)定一個線程池優(yōu)先級隊列,Runable類要實現(xiàn)可對比功能,任務(wù)隊列使用優(yōu)先級隊列
到此,關(guān)于“怎么快速了解Java線程池”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
文章題目:怎么快速了解Java線程池
標題網(wǎng)址:http://www.dlmjj.cn/article/pecpdi.html