新聞中心
本文轉(zhuǎn)載自微信公眾號(hào)「Shooter茶杯」,作者Shooter 。轉(zhuǎn)載本文請(qǐng)聯(lián)系Shooter茶杯公眾號(hào)。

成都創(chuàng)新互聯(lián)公司是一家專注于成都網(wǎng)站制作、成都做網(wǎng)站與策劃設(shè)計(jì),麻山網(wǎng)站建設(shè)哪家好?成都創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)十余年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:麻山等地區(qū)。麻山做網(wǎng)站價(jià)格咨詢:028-86922220
本文主要是介紹線程池的一些進(jìn)階玩法 。
面包超人鎮(zhèn)樓
1、線程池簡(jiǎn)簡(jiǎn)單單 4 連發(fā)
- 1、線程池的核心線程數(shù)怎么設(shè)置?
- 2、8C16G 的機(jī)器需要幾臺(tái)可以抗起 3W 的qps?
- 3、如何動(dòng)態(tài)的修改線程池參數(shù)?
- 4、線程池可以先啟動(dòng)最大線程數(shù)再將任務(wù)放到阻塞隊(duì)列里么?
后面的舉例的機(jī)器配置統(tǒng)一是 8核16G !
2、線程池的核心線程數(shù)到底怎么設(shè)置?首先說個(gè)不太正確的答案:
IO 密集型的設(shè)置為 2n, 計(jì)算密集型設(shè)置為 n+1
為什么不對(duì)?因?yàn)楹诵木€程數(shù)設(shè)置多少要具體情況具體分析,大家使用線程池的業(yè)務(wù)場(chǎng)景不同,解決方案自然是不一樣的,下面我舉個(gè)例子做詳細(xì)的分析,然后總結(jié)出一個(gè)方法論就可以適用各個(gè)不同的場(chǎng)景了!!!
舉例:
- 1、假設(shè)現(xiàn)在要給 100w 用戶發(fā)放優(yōu)惠券,通過線程池異步發(fā)送
- 2、假設(shè)某線程池執(zhí)行發(fā)優(yōu)惠券的任務(wù)共耗時(shí) 50ms,其中 45ms 在io, 5ms 在進(jìn)行計(jì)算
(真正的 io 耗時(shí) 計(jì)算耗時(shí)可以通過 記錄log 判斷時(shí)間差值計(jì)算出來(lái) 取平均值即可 )
3、如何設(shè)置線程池的參數(shù)快速的將這 100w 張券發(fā)完?
先拋出答案公式,再論證這個(gè)公式的正確性:
核心線程數(shù) = CPU核數(shù) * ((Io耗時(shí) / 計(jì)算耗時(shí)) + 1)
核心線程數(shù) = 8C * ((45ms / 5ms) +1 ) = 80個(gè)
45ms / 5ms 是什么意思?
CPU 在等待 IO 返回時(shí)完全可以將 CPU 時(shí)間片拿出來(lái)去做其他的計(jì)算,45ms 可以多處理 9 個(gè)計(jì)算任務(wù),再加上原本就有一個(gè) 5ms 在計(jì)算,也就是說: 一個(gè)CPU 核在執(zhí)行這個(gè) 50ms 發(fā)券任務(wù)時(shí),可以并發(fā)的起10個(gè)線程去處理任務(wù)!那8C CPU 最多同時(shí)可以有 8個(gè)核心并行的處理任務(wù), 8 * 10 = 80
一秒鐘一個(gè)線程可以處理 1000ms / 50ms = 20個(gè)任務(wù)
可以算出線程池執(zhí)行任務(wù)的峰值 qps = 20 * 80 = 1600
發(fā)完100w 張券所需時(shí)間: 100w / 1600 = 625S,也就是說大概 10分鐘左右就能發(fā)完 100w 張券。
不太正確的結(jié)論: 核心線程數(shù)在處理這個(gè)任務(wù)的情況下可以設(shè)置為 80 用來(lái)極限的壓榨機(jī)器CPU 的性能。
what?為什么算出 80 又不正確了?
因?yàn)閷⒑诵木€程數(shù)設(shè)置為 80,這幾乎吃完了所有的 CPU 時(shí)間片, CPU 的負(fù)載將會(huì)達(dá)到 100% ; 試想一下生產(chǎn)環(huán)境如果你的機(jī)器 CPU 負(fù)載是 100% , 慌不慌?(CPU 負(fù)載打滿機(jī)器不會(huì)宕機(jī), 但沒有 CPU 資源來(lái)處理用戶的請(qǐng)求,表現(xiàn)為服務(wù)假死/機(jī)器請(qǐng)求半天無(wú)反應(yīng))
設(shè)置線程池核心線程數(shù)要考慮 CPU 的使用要素
- 1、每臺(tái)機(jī)器操作系統(tǒng)需要消耗一些 CPU 資源; 假設(shè)用了 2% 的CPU 資源;
- 2、如果是面向用戶的服務(wù),處理用戶的請(qǐng)求也是要消耗CPU 資源的,可以通過一些監(jiān)控系統(tǒng),看看平時(shí) CPU 在繁忙時(shí)間段的負(fù)載是多少; 假設(shè)用了 10% 的資源;
- 3、如果除了發(fā)券任務(wù)的線程池還有其他線程池在運(yùn)行,就得把其他線程池消耗的CPU資源也算上,假設(shè)用了 13% 的資源;
- 4、實(shí)際情況一些中間件框架也會(huì)用線程池,也會(huì)吃一些CPU 資源, 這里暫不做考慮。
在我的實(shí)際項(xiàng)目里有一個(gè)專門跑定時(shí)任務(wù)和消費(fèi) MQ 消息的服務(wù):
我需要考慮的點(diǎn):
- 1、操作系統(tǒng)的CPU 資源, 算占用 2% 的CPU資源
- 2、MQ 消費(fèi)消息 算占用 5% 的CPU 資源
- 3、有其他的定時(shí)任務(wù)也在用線程池跑任務(wù) 算占用 13% 的CPU 資源
- 4、機(jī)器的 CPU 在無(wú)人監(jiān)控的非必要時(shí)段不能超過 60%。
60% - 2% - 5% - 13% = 40%
發(fā) 100w 張優(yōu)惠券的線程池就只能消耗 40%的資源于是核心線程數(shù)最多可以設(shè)置為:
核心線程數(shù): 80個(gè) * 40% = 32個(gè);
CPU 100% 時(shí)可以設(shè)置 80個(gè)線程去跑任務(wù) CPU 40% 時(shí)可以設(shè)置 32個(gè)線程去跑任務(wù) 那這樣設(shè)置系統(tǒng)正常運(yùn)行CPU大概是 60% 左右, 就算偶爾飆高到 70%-80% 也不用太慌~
補(bǔ)充: 為什么用線程池沒考慮上下文的切換?
1ms = 1000us, 一次上下文的切換大概是 1us, 上下文切換的時(shí)間跟執(zhí)行任務(wù)的時(shí)間比起來(lái)可以忽略不計(jì)。
結(jié)論 : CPU核數(shù) * ((Io耗時(shí) / 計(jì)算耗時(shí)) + 1)
這是機(jī)器 CPU 負(fù)載 100% 時(shí)極限的值, 乘以期望的 CPU 負(fù)載百分比即可算出實(shí)際情況最佳的線程數(shù);
PS: 萬(wàn)一設(shè)置錯(cuò)了核心線程數(shù)又不想改代碼重新發(fā)布,可以繼續(xù)看第三個(gè)問題如何動(dòng)態(tài)修改線程池參數(shù)!
2、8C16G 的機(jī)器需要幾臺(tái)可以抗起 3W 的qps?
首先算出單臺(tái)機(jī)器的 QPS, 3w 除以單臺(tái)機(jī)器的 qps 即可算出所需的機(jī)器數(shù)。
想知道單臺(tái)機(jī)器某個(gè)接口的 QPS 很簡(jiǎn)單, 壓測(cè)即可。
不過顯然面試的時(shí)候如果被問這個(gè)問題是壓測(cè)不了的。
實(shí)際上是面試官在考察你對(duì)線程池的理解,接著往下看~
假設(shè)一個(gè) 用戶領(lǐng)券系統(tǒng)的 qps 在3w左右
大部分服務(wù)通常的部署在 Tomcat 上, Tomcat 內(nèi)部也是通過線程來(lái)處理用戶的請(qǐng)求,Tomcat 也是通過線程池來(lái)管理線程, 實(shí)際上算出 Tomcat 實(shí)際的并發(fā)和理想狀態(tài)能支持的的并發(fā)就好了。
上個(gè)問題分析出來(lái)發(fā)券接口 50ms 耗時(shí), 8C 的CPU 占用 100%, 不考慮內(nèi)存 磁盤 網(wǎng)絡(luò)等其他開銷, 線程池極限的QPS 是1600, 這里也不考慮有沒有其他線程池或者七七八八的東西消耗 CPU 資源了。假設(shè) CPU 只能維持在 70% 左右的負(fù)載;
單臺(tái)機(jī)器的 qps 就只能有 1600 * 70% = 1120,就算 1100
3w / 1100 = 27.27 向上取整 大概需要 28 臺(tái)機(jī)器。
作為一個(gè)有經(jīng)驗(yàn)的開發(fā)人員實(shí)際部署的時(shí)候絕對(duì)要多擴(kuò)容幾臺(tái)服務(wù)器來(lái)兜底, 推薦部署 32 - 36 臺(tái)機(jī)器分兩個(gè)集群部署。
3、如何動(dòng)態(tài)的修改線程池參數(shù)?為什么需要?jiǎng)討B(tài)的修改線程池參數(shù)呢?
比如第一個(gè)發(fā)券任務(wù)發(fā) 100w 張券需要 10 分鐘, 假設(shè)今天突然要發(fā) 200w 張券了, 多了100w 的發(fā)券任務(wù),也不想用其他手段來(lái)解決了, 且機(jī)器的 CPU 負(fù)載很低只有 1% ; (為了強(qiáng)行舉例修改線程池參數(shù)費(fèi)盡苦心)
看到第一個(gè)和第二個(gè)問題,想必你也收獲了如下信息:
使用 8C16G 的機(jī)器發(fā)放 100w 張優(yōu)惠券, 處理每個(gè)優(yōu)惠券任務(wù)耗時(shí) 50ms , 其中 45ms在IO , 5ms 在計(jì)算, 核心線程數(shù)設(shè)置為 32, CPU 負(fù)載到 40% 左右, 10分鐘可以把優(yōu)惠券發(fā)完。
如果想發(fā) 200w 張券, 最快的方法是將 核心線程數(shù) 32 設(shè)置為 64, CPU 負(fù)載在 80% 左右。
如何動(dòng)態(tài)的修改線程池參數(shù)呢?
JDK 的 ThreadPoolExecutor 提供了修改線程池參數(shù)的 API
- ThreadPoolExecutor.setCorePoolSize // 修改核心線程數(shù)
- ThreadPoolExecutor.setMaximumPoolSize // 修改最大線程數(shù)
- ThreadPoolExecutor.setKeepAliveTime // 修改空閑線程存活時(shí)間
- ThreadPoolExecutor.setRejectedExecutionHandler // 修改拒絕策略
- ThreadPoolExecutor.setThreadFactory // 修改線程工廠
(不可直接修改阻塞隊(duì)列大小,想達(dá)到修改阻塞隊(duì)列的效果對(duì)線程池做一些封裝即可)
- 1、首先將線程池定義為一個(gè) Bean 對(duì)象;
- @Bean("refreshLowPriceExecutor")
- public ThreadPoolExecutor refreshLowPriceExecutor() {
- final BlockingQueue
queue = new LinkedBlockingDeque<>(1000000); - final int corePoolSize = 20;
- final int maximumPoolSize = 100;
- final int keepAliveTime = 200;
- ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);
- return executorService;
- }
- 2、可以通過分布式配置 or controller接口 or 數(shù)據(jù)庫(kù)觸發(fā)修改線程的各個(gè)參數(shù), 推薦使用分布式配置(各種用法大同小異):
- private Map
config; - @QMapConfig("config.properties")
- private void getValueChange(Map
config) { - refreshLowPriceExecutor.setCorePoolSize(Integer.valueOf(config.get("core_size")));
- refreshLowPriceExecutor.setMaximumPoolSize(Integer.valueOf(config.get("max_size")));
- System.out.println("當(dāng)前核心線程數(shù)為 :" + refreshLowPriceExecutor.getCorePoolSize());
- System.out.println("當(dāng)前最大線程數(shù)為 :" + refreshLowPriceExecutor.getMaximumPoolSize());
- this.config = config;
- }
- 3、改了核心線程數(shù),線程池是如何讓線程數(shù)立即生效的?
- public void execute(Runnable command) {
- // 省略注釋/非核心代碼
- int c = ctl.get();
- // 線程池執(zhí)行任務(wù)的處理邏輯主要分三步
- // 第一步 : 當(dāng)前線程數(shù)小于核心線程數(shù)則繼續(xù)添加worker創(chuàng)建線程
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 第二步 : 當(dāng)前線程數(shù)達(dá)到了核心線程數(shù)后,將任務(wù)放進(jìn)阻塞隊(duì)列
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 第三步 : 隊(duì)列滿了就將啟動(dòng)最大線程數(shù)限制的線程, 失敗就將任務(wù)交給拒絕策略去處理
- else if (!addWorker(command, false))
- reject(command);
- }
在線程池的核心線程數(shù)被修改后,只要有任務(wù)繼續(xù)添加進(jìn)線程池,execute 方法就會(huì)繼續(xù)創(chuàng)建新線程去處理任務(wù),這樣核心線程數(shù)就生效了。
- 4、使用 ScheduledThreadPoolExecutor 監(jiān)控線程池內(nèi)部狀況
- // 封裝成一個(gè)任務(wù)
- Runnable runnable = () -> monitorThreadPool();
- public void monitorThreadPool(){
- log.info("核心線程數(shù)" + refreshLowPriceExecutor.getCorePoolSize());
- log.info("活躍線程數(shù)" + refreshLowPriceExecutor.getActiveCount());
- log.info("最大線程數(shù)" + refreshLowPriceExecutor.getMaximumPoolSize());
- log.info("任務(wù)數(shù)" + refreshLowPriceExecutor.getTaskCount());
- log.info("線程池里的線程數(shù)" + refreshLowPriceExecutor.getPoolSize());
- log.info("獲取隊(duì)列再獲取隊(duì)列任務(wù)數(shù)" + refreshLowPriceExecutor.getQueue().size());
- }
- // 將任務(wù)交給延時(shí)線程池
- executor.scheduleAtFixedRate(runnable, initialDelay,period, TimeUnit);
4、線程池可以先啟動(dòng)最大線程數(shù)再將任務(wù)放到阻塞隊(duì)列里么?
答案是當(dāng)然可以!
繼續(xù)分析線程池三步走的后兩步邏輯
- public void execute(Runnable command) {
- // 省略注釋/非必要代碼
- // 第二步 : 當(dāng)前線程池正在運(yùn)行且 阻塞隊(duì)列的 offer 方法返回 true
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 第三步 : 啟動(dòng)大于核心線程數(shù)但小于最大線程數(shù)個(gè)線程, 添加worker失敗就將任務(wù)交給拒絕策略去處理
- else if (!addWorker(command, false))
- reject(command);
- }
啟動(dòng)最大線程數(shù)再將任務(wù)放到阻塞隊(duì)列的訣竅就在 workQueue 的 offer 方法;
我們可以用自己實(shí)現(xiàn)的阻塞隊(duì)列在重寫 offer 方法; 在 offer 方法中判斷 當(dāng)前線程數(shù)是否大于等于最大線程數(shù),如果不大于就返回 false, 這樣就跳過了 execute 方法的第二步, 來(lái)到了第三步的創(chuàng)建最大線程數(shù)的邏輯。
看看 dubbo 是怎么做的 , 直接將代碼 copy(白嫖) 過來(lái)即可 地址
https://github.com/apache/dubbo/blob/master/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
- @Override
- public boolean offer(Runnable runnable) {
- if (executor == null) {
- throw new RejectedExecutionException("The task queue does not have executor!");
- }
- int currentPoolThreadSize = executor.getPoolSize();
- // 主要是這個(gè)邏輯 當(dāng)前線程數(shù)是否小于最大線程數(shù),如果小于返回 false
- // 這樣就可以跳過 execute 方法的第二步, 來(lái)到了第三步的創(chuàng)建最大線程數(shù)的邏輯。
- // return false to let executor create new worker.
- if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
- return false;
- }
- // currentPoolThreadSize >= max
- return super.offer(runnable);
- }
本文轉(zhuǎn)載自微信公眾號(hào)「Shooter茶杯」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系Shooter茶杯公眾號(hào)。
當(dāng)前題目:看看面包超人的'招牌線程池'用得可還行?
標(biāo)題路徑:http://www.dlmjj.cn/article/dhshoje.html


咨詢
建站咨詢
