日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
Java并發(fā):jucExecutor框架詳解

Executor 框架是 juc 里提供的線程池的實(shí)現(xiàn)。前兩天看了下 Executor 框架的一些源碼,做個(gè)簡(jiǎn)單的總結(jié)。

10年積累的成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問(wèn)題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站設(shè)計(jì)后付款的網(wǎng)站建設(shè)流程,更有山東免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。

線程池大概的思路是維護(hù)一個(gè)的線程池用于執(zhí)行提交的任務(wù)。我理解池的技術(shù)的主要意義有兩個(gè):

1. 資源的控制,如并發(fā)量限制。像連接池這種是對(duì)數(shù)據(jù)庫(kù)資源的保護(hù)。

2. 資源的有效利用,如線程復(fù)用,避免頻繁創(chuàng)建線程和線程上下文切換。

那么想象中設(shè)計(jì)一個(gè)線程池就需要有線程池大小、線程生命周期管理、等待隊(duì)列等等功能,下面結(jié)合代碼看看原理。

Excutor 整體結(jié)構(gòu)如下:

Executor 接口定義了最基本的 execute 方法,用于接收用戶提交任務(wù)。 ExecutorService 定義了線程池終止和創(chuàng)建及提交 futureTask 任務(wù)支持的方法。

AbstractExecutorService 是抽象類,主要實(shí)現(xiàn)了 ExecutorService 和 futureTask 相關(guān)的一些任務(wù)創(chuàng)建和提交的方法。

ThreadPoolExecutor 是最核心的一個(gè)類,是線程池的內(nèi)部實(shí)現(xiàn)。線程池的功能都在這里實(shí)現(xiàn)了,平時(shí)用的最多的基本就是這個(gè)了。其源碼很精練,遠(yuǎn)沒(méi)當(dāng)時(shí)想象的多。

ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基礎(chǔ)上提供了支持定時(shí)調(diào)度的功能。線程任務(wù)可以在一定延時(shí)時(shí)間后才被觸發(fā)執(zhí)行。

1、ThreadPoolExecutor 原理

1.1 ThreadPoolExecutor內(nèi)部的幾個(gè)重要屬性

1.線程池本身的狀態(tài)

 
 
 
 
  1. volatile int runState; 
  2. static final int RUNNING = 0; 
  3. static final int SHUTDOWN = 1; 
  4. static final int STOP = 2; 
  5. static final int TERMINATED = 3; 

2.等待任務(wù)隊(duì)列和工作集

 
 
 
 
  1. private final BlockingQueue workQueue; //等待被執(zhí)行的Runnable任務(wù) 
  2. private final HashSet workers = new HashSet(); //正在被執(zhí)行的Worker任務(wù)集 

 3.線程池的主要狀態(tài)鎖。線程池內(nèi)部的狀態(tài)變化 ( 如線程大小 ) 都需要基于此鎖。

 
 
 
 
  1. private final ReentrantLock mainLock = new ReentrantLock();

4.線程的存活時(shí)間和大小

 
 
 
 
  1. private volatile long keepAliveTime;// 線程存活時(shí)間 
  2. private volatile boolean allowCoreThreadTimeOut;// 是否允許核心線程存活 
  3. private volatile int corePoolSize;// 核心池大小 
  4. private volatile int maximumPoolSize; // 最大池大小 
  5. private volatile int poolSize; //當(dāng)前池大小 
  6. private int largestPoolSize; //最大池大小,區(qū)別于maximumPoolSize,是用于記錄線程池曾經(jīng)達(dá)到過(guò)的最大并發(fā),理論上小于等于maximumPoolSize。 

5.線程工廠和拒絕策略

 
 
 
 
  1. private volatile RejectedExecutionHandler handler;// 拒絕策略,用于當(dāng)線程池?zé)o法承載新線程是的處理策略。
  2.  private volatile ThreadFactory threadFactory;// 線程工廠,用于在線程池需要新創(chuàng)建線程的時(shí)候創(chuàng)建線程

6.線程池完成任務(wù)數(shù)

 
 
 
 
  1. private long completedTaskCount;//線程池運(yùn)行到當(dāng)前完成的任務(wù)數(shù)總和

1.2 ThreadPoolExecutor 的內(nèi)部工作原理

有了以上定義好的數(shù)據(jù),下面來(lái)看看內(nèi)部是如何實(shí)現(xiàn)的 。 Doug Lea 的整個(gè)思路總結(jié)起來(lái)就是 5 句話:

  1. 如果當(dāng)前池大小 poolSize 小于 corePoolSize ,則創(chuàng)建新線程執(zhí)行任務(wù)。
  2. 如果當(dāng)前池大小 poolSize 大于 corePoolSize ,且等待隊(duì)列未滿,則進(jìn)入等待隊(duì)列
  3. 如果當(dāng)前池大小 poolSize 大于 corePoolSize 且小于 maximumPoolSize ,且等待隊(duì)列已滿,則創(chuàng)建新線程執(zhí)行任務(wù)。
  4. 如果當(dāng)前池大小 poolSize 大于 corePoolSize 且大于 maximumPoolSize ,且等待隊(duì)列已滿,則調(diào)用拒絕策略來(lái)處理該任務(wù)。
  5. 線程池里的每個(gè)線程執(zhí)行完任務(wù)后不會(huì)立刻退出,而是會(huì)去檢查下等待隊(duì)列里是否還有線程任務(wù)需要執(zhí)行,如果在 keepAliveTime 里等不到新的任務(wù)了,那么線程就會(huì)退出。

下面看看代碼實(shí)現(xiàn):

線程池最重要的方法是由 Executor 接口定義的 execute 方法 , 是任務(wù)提交的入口。

我們看看 ThreadPoolExecutor.execute(Runnable cmd) 的實(shí)現(xiàn):

 
 
 
 
  1. public void execute(Runnable command) {
  2.         if (command == null)
  3.             throw new NullPointerException();
  4.         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
  5.             if (runState == RUNNING && workQueue.offer(command)) {
  6.                 if (runState != RUNNING || poolSize == 0)
  7.                     ensureQueuedTaskHandled(command);
  8.             }
  9.             else if (!addIfUnderMaximumPoolSize(command))
  10.                 reject(command); // is shutdown or saturated
  11.         }
  12. }

解釋如下:

當(dāng)提交一個(gè)新的 Runnable 任務(wù):

分支1 : 如果當(dāng)前池大小小于 corePoolSize, 執(zhí)行 addIfUnderCorePoolSize(command) , 如果線程池處于運(yùn)行狀態(tài)且 poolSize < corePoolSize addIfUnderCorePoolSize(command) 會(huì)做如下事情,將 Runnable 任務(wù)封裝成 Worker 任務(wù) , 創(chuàng)建新的 Thread ,執(zhí)行 Worker 任務(wù)。如果不滿足條件,則返回 false 。

代碼如下:

 
 
 
 
  1.  private boolean addIfUnderCorePoolSize(Runnable firstTask) {
  2.         Thread t = null;
  3.         final ReentrantLock mainLock = this.mainLock;
  4.         mainLock.lock();
  5.         try {
  6.             if (poolSize < corePoolSize && runState == RUNNING)
  7.                 t = addThread(firstTask);
  8.         } finally {
  9.             mainLock.unlock();
  10.         }
  11.         if (t == null)
  12.             return false;
  13.         t.start();
  14.         return true;
  15. }

分支2 : 如果大于 corePoolSize 或 1 失敗失敗,則:

  • 如果等待隊(duì)列未滿,把 Runnable 任務(wù)加入到 workQueue 等待隊(duì)列
    workQueue .offer(command)
  • 如多等待隊(duì)列已經(jīng)滿了,調(diào)用 addIfUnderMaximumPoolSize(command) ,和 addIfUnderCorePoolSize 基本類似,只不過(guò)判斷條件是 poolSize < maximumPoolSize 。如果大于 maximumPoolSize ,則把 Runnable 任務(wù)交由 RejectedExecutionHandler 來(lái)處理。

問(wèn)題:如何實(shí)現(xiàn)線程的復(fù)用?

Doug Lea 的實(shí)現(xiàn)思路是 線程池里的每個(gè)線程執(zhí)行完任務(wù)后不立刻退出,而是去檢查下等待隊(duì)列里是否還有線程任務(wù)需要執(zhí)行,如果在 keepAliveTime 里等不到新的任務(wù)了,那么線程就會(huì)退出。這個(gè)功能的實(shí)現(xiàn) 關(guān)鍵在于 Worker 。線程池在執(zhí)行 Runnable 任務(wù)的時(shí)候,并不單純把 Runnable 任務(wù)交給創(chuàng)建一個(gè) Thread 。而是會(huì)把 Runnable 任務(wù)封裝成 Worker 任務(wù)。

下面看看 Worker 的實(shí)現(xiàn):

代碼很簡(jiǎn)單,可以看出, worker 里面包裝了 firstTask 屬性,在構(gòu)造worker 的時(shí)候傳進(jìn)來(lái)的那個(gè) Runnable 任務(wù)就是 firstTask 。 同時(shí)也實(shí)現(xiàn)了Runnable接口,所以是個(gè)代理模式,看看代理增加了哪些功能。 關(guān)鍵看 woker 的 run 方法:

 
 
 
 
  1. public void run() {
  2.            try {
  3.                Runnable task = firstTask;
  4.                firstTask = null;
  5.                while (task != null || (task = getTask()) != null) {
  6.                    runTask(task);
  7.                    task = null;
  8.                }
  9.            } finally {
  10.                workerDone(this);
  11.            }
  12.        }

可以看出 worker 的 run 方法是一個(gè)循環(huán),第一次循環(huán)運(yùn)行的必然是 firstTask ,在運(yùn)行完 firstTask 之后,并不會(huì)立刻結(jié)束,而是會(huì)調(diào)用 getTask 獲取新的任務(wù)( getTask 會(huì)從等待隊(duì)列里獲取等待中的任務(wù)),如果 keepAliveTime 時(shí)間內(nèi)得到新任務(wù)則繼續(xù)執(zhí)行,得不到新任務(wù)則那么線程才會(huì)退出。這樣就保證了多個(gè)任務(wù)可以復(fù)用一個(gè)線程,而不是每次都創(chuàng)建新任務(wù)。 keepAliveTime 的邏輯在哪里實(shí)現(xiàn)的呢?主要是利用了 BlockingQueue 的 poll 方法支持等待。可看 getTask 的代碼段:

 
 
 
 
  1. if (state == SHUTDOWN)  // Help drain queue
  2.     r = workQueue.poll();
  3. else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
  4.     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
  5. else
  6.     r = workQueue.take();

2.ThreadFactory 和R ejectedExecutionHandler

ThreadFactory 和 RejectedExecutionHandler是ThreadPoolExecutor的兩個(gè)屬性,也 可以認(rèn)為是兩個(gè)簡(jiǎn)單的擴(kuò)展點(diǎn) . ThreadFactory 是創(chuàng)建線程的工廠。

默認(rèn)的線程工廠會(huì)創(chuàng)建一個(gè)帶有“ pool-poolNumber-thread-threadNumber ”為名字的線程,如果我們有特別的需要,如線程組命名、優(yōu)先級(jí)等,可以定制自己的 ThreadFactory 。

RejectedExecutionHandler 是拒絕的策略。常見(jiàn)有以下幾種:

AbortPolicy :不執(zhí)行,會(huì)拋出 RejectedExecutionException 異常。

CallerRunsPolicy :由調(diào)用者(調(diào)用線程池的主線程)執(zhí)行。

DiscardOldestPolicy :拋棄等待隊(duì)列中最老的。

DiscardPolicy: 不做任何處理,即拋棄當(dāng)前任務(wù)。

3.ScheduledThreadPoolExecutor

ScheduleThreadPoolExecutor 是對(duì)ThreadPoolExecutor的集成。增加了定時(shí)觸發(fā)線程任務(wù)的功能。需要注意

從內(nèi)部實(shí)現(xiàn)看, ScheduleThreadPoolExecutor 使用的是 corePoolSize 線程和一個(gè)無(wú)界隊(duì)列的固定大小的池,所以調(diào)整 maximumPoolSize 沒(méi)有效果。無(wú)界隊(duì)列是一個(gè)內(nèi)部自定義的 DelayedWorkQueue 。

ScheduleThreadPoolExecutor 線程池接收定時(shí)任務(wù)的方法是 schedule ,看看內(nèi)部實(shí)現(xiàn):

 
 
 
 
  1. public ScheduledFuture schedule(Runnable command,
  2.                                    long delay,
  3.                                    TimeUnit unit) {
  4.     if (command == null || unit == null)
  5.         throw new NullPointerException();
  6.     RunnableScheduledFuture t = decorateTask(command,
  7.         new ScheduledFutureTask(command, null,
  8.                                       triggerTime(delay, unit)));
  9.     delayedExecute(t);
  10.     return t;
  11. }

以上代碼會(huì)初始化一個(gè) RunnableScheduledFuture 類型的任務(wù) t, 并交給 delayedExecute 方法。 delayedExecute(t) 方法實(shí)現(xiàn)如下:

 
 
 
 
  1.     private void delayedExecute(Runnable command) {
  2.         if (isShutdown()) {
  3.             reject(command);
  4.             return;
  5.         }
  6.         if (getPoolSize() < getCorePoolSize())
  7.             prestartCoreThread();
  8.         super.getQueue().add(command);
  9. }

如果當(dāng)前線程池大小 poolSize 小于 CorePoolSize ,則創(chuàng)建一個(gè)新的線程,注意這里創(chuàng)建的線程是空的,不會(huì)把任務(wù)直接交給線程來(lái)做,而是把線程任務(wù)放到隊(duì)列里。因?yàn)槿蝿?wù)是要定時(shí)觸發(fā)的,所以不能直接交給線程去執(zhí)行。

問(wèn)題: 那如何做到定時(shí)觸發(fā)呢?

關(guān)鍵在于DelayedWorkQueue,它代理了 DelayQueue ??梢哉J(rèn)為 DelayQueue 是這樣一個(gè)隊(duì)列(具體可以去看下源碼,不詳細(xì)分析):

1. 隊(duì)列里的元素按照任務(wù)的 delay 時(shí)間長(zhǎng)短升序排序, delay 時(shí)間短的在隊(duì)頭, delay 時(shí)間長(zhǎng)的在隊(duì)尾。

2. 從 DelayQueue 里 FIFO 的獲取一個(gè)元素的時(shí)候,不會(huì)直接返回 head 。可能會(huì)阻塞,等到 head 節(jié)點(diǎn)到達(dá) delay 時(shí)間后才能被獲取??梢钥聪?DelayQueue 的 take 方法實(shí)現(xiàn):

 
 
 
 
  1. public E take() throws InterruptedException {
  2.     final ReentrantLock lock = this.lock;
  3.     lock.lockInterruptibly();
  4.     try {
  5.         for (;;) {
  6.             E first = q.peek();
  7.             if (first == null) {
  8.                 available.await();
  9.             } else {
  10.                 long delay =  first.getDelay(TimeUnit.NANOSECONDS);
  11.                 if (delay > 0) {
  12.                     long tl = available.awaitNanos(delay);//等待delay時(shí)間
  13.                 } else {
  14.                     E x = q.poll();
  15.                     assert x != null;
  16.                     if (q.size() != 0)
  17.                         available.signalAll(); // wake up other takers
  18.                     return x;
  19.                 }
  20.             }
  21.         }
  22.     } finally {
  23.         lock.unlock();
  24.     }
  25. }

4.線程池使用策略

通過(guò)以上的詳解基本上能夠定制出自己需要的策略了,下面簡(jiǎn)單介紹下Executors里面提供的一些常見(jiàn)線程池策略:

1.FixedThreadPool

 
 
 
 
  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2.     return new ThreadPoolExecutor(nThreads, nThreads,
  3.                                   0L, TimeUnit.MILLISECONDS,
  4.                                   new LinkedBlockingQueue());
  5. }

實(shí)際上就是個(gè)不支持keepalivetime,且corePoolSize和maximumPoolSize相等的線程池。

2.SingleThreadExecutor

 
 
 
 
  1. public static ExecutorService newSingleThreadExecutor() {
  2.     return new FinalizableDelegatedExecutorService
  3.         (new ThreadPoolExecutor(1, 1,
  4.                                 0L, TimeUnit.MILLISECONDS,
  5.                                 new LinkedBlockingQueue()));
  6. }

實(shí)際上就是個(gè)不支持keepalivetime,且corePoolSize和maximumPoolSize都等1的線程池。

3.CachedThreadPool

 
 
 
 
  1. public static ExecutorService newCachedThreadPool() {
  2.     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3.                                   60L, TimeUnit.SECONDS,
  4.                                   new SynchronousQueue());
  5. }

實(shí)際上就是個(gè)支持keepalivetime時(shí)間是60秒(線程空閑存活時(shí)間),且corePoolSize為0,maximumPoolSize無(wú)窮大的線程池。

4.SingleThreadScheduledExecutor

 
 
 
 
  1. public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
  2.     return new DelegatedScheduledExecutorService
  3.         (new ScheduledThreadPoolExecutor(1, threadFactory));
  4. }

實(shí)際上是個(gè)corePoolSize為1的ScheduledExecutor。上文說(shuō)過(guò)ScheduledExecutor采用無(wú)界等待隊(duì)列,所以maximumPoolSize沒(méi)有作用。

5.ScheduledThreadPool

 
 
 
 
  1. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  2.     return new ScheduledThreadPoolExecutor(corePoolSize);
  3. }

實(shí)際上是corePoolSize課設(shè)定的ScheduledExecutor。上文說(shuō)過(guò)ScheduledExecutor采用無(wú)界等待隊(duì)列,所以maximumPoolSize沒(méi)有作用。

以上還不一定滿足你的需要,完全可以根據(jù)自己需要去定制。


分享題目:Java并發(fā):jucExecutor框架詳解
鏈接地址:http://www.dlmjj.cn/article/djiochj.html