日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
一個公式看懂:為什么Dubbo線程池會打滿

0 文章概述

大家可能都遇到過DUBBO線程池打滿這個問題,剛開始遇到這個問題可能會比較慌,常見方案可能就是重啟服務(wù),但也不知道重啟是否可以解決。我認(rèn)為重啟不僅不能解決問題,甚至有可能加劇問題,這是為什么呢?本文我們就一起分析DUBBO線程池打滿這個問題。

成都創(chuàng)新互聯(lián)專業(yè)為企業(yè)提供封丘網(wǎng)站建設(shè)、封丘做網(wǎng)站、封丘網(wǎng)站設(shè)計、封丘網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計與制作、封丘企業(yè)網(wǎng)站模板建站服務(wù),十多年封丘做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡(luò)服務(wù)。

1 基礎(chǔ)知識

1.1 DUBBO線程模型

1.1.1 基本概念

DUBBO底層網(wǎng)絡(luò)通信采用Netty框架,我們編寫一個Netty服務(wù)端進(jìn)行觀察:

public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(7777).sync();
System.out.println("服務(wù)端準(zhǔn)備就緒");
channelFuture.channel().closeFuture().sync();
} catch (Exception ex) {
System.out.println(ex.getMessage());
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

BossGroup線程組只有一個線程處理客戶端連接請求,連接完成后將完成三次握手的SocketChannel連接分發(fā)給WorkerGroup處理讀寫請求,這兩個線程組被稱為「IO線程」。

我們再引出「業(yè)務(wù)線程」這個概念。服務(wù)生產(chǎn)者接收到請求后,如果處理邏輯可以快速處理完成,那么可以直接放在IO線程處理,從而減少線程池調(diào)度與上下文切換。但是如果處理邏輯非常耗時,或者會發(fā)起新IO請求例如查詢數(shù)據(jù)庫,那么必須派發(fā)到業(yè)務(wù)線程池處理。

DUBBO提供了多種線程模型,選擇線程模型需要在配置文件指定dispatcher屬性:






不同線程模型在選擇是使用IO線程還是業(yè)務(wù)線程,DUBBO官網(wǎng)文檔說明:

all
所有消息都派發(fā)到業(yè)務(wù)線程池,包括請求,響應(yīng),連接事件,斷開事件,心跳

direct
所有消息都不派發(fā)到業(yè)務(wù)線程池,全部在IO線程直接執(zhí)行

message
只有請求響應(yīng)消息派發(fā)到業(yè)務(wù)線程池,其它連接斷開事件,心跳等消息直接在IO線程執(zhí)行

execution
只有請求消息派發(fā)到業(yè)務(wù)線程池,響應(yīng)和其它連接斷開事件,心跳等消息直接在IO線程執(zhí)行

connection
在IO線程上將連接斷開事件放入隊列,有序逐個執(zhí)行,其它消息派發(fā)到業(yè)務(wù)線程池

all所有消息都派發(fā)到業(yè)務(wù)線程池,包括請求,響應(yīng),連接事件,斷開事件,心跳direct所有消息都不派發(fā)到業(yè)務(wù)線程池,全部在IO線程直接執(zhí)行message只有請求響應(yīng)消息派發(fā)到業(yè)務(wù)線程池,其它連接斷開事件,心跳等消息直接在IO線程執(zhí)行execution只有請求消息派發(fā)到業(yè)務(wù)線程池,響應(yīng)和其它連接斷開事件,心跳等消息直接在IO線程執(zhí)行connection在IO線程上將連接斷開事件放入隊列,有序逐個執(zhí)行,其它消息派發(fā)到業(yè)務(wù)線程池

1.1.2 確定時機(jī)

生產(chǎn)者和消費者在初始化時確定線程模型:

// 生產(chǎn)者
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}

// 消費者
public class NettyClient extends AbstractClient {
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
}

生產(chǎn)者和消費者默認(rèn)線程模型都會使用AllDispatcher,ChannelHandlers.wrap方法可以獲取Dispatch自適應(yīng)擴(kuò)展點。如果我們在配置文件中指定dispatcher,擴(kuò)展點加載器會從URL獲取屬性值加載對應(yīng)線程模型。本文以生產(chǎn)者為例進(jìn)行分析:

public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// ChannelHandlers.wrap確定線程策略
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}

public class ChannelHandlers {
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
}
}

@SPI(AllDispatcher.NAME)
public interface Dispatcher {
@Adaptive({Constants.DISPATCHER_KEY, "channel.handler"})
ChannelHandler dispatch(ChannelHandler handler, URL url);
}

1.1.3 源碼分析

我們分析其中兩個線程模型源碼,其它線程模型請閱讀DUBBO源碼。AllDispatcher模型所有消息都派發(fā)到業(yè)務(wù)線程池,包括請求,響應(yīng),連接事件,斷開事件,心跳:

public class AllDispatcher implements Dispatcher {

// 線程模型名稱
public static final String NAME = "all";

// 具體實現(xiàn)策略
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}


public class AllChannelHandler extends WrappedChannelHandler {

@Override
public void connected(Channel channel) throws RemotingException {
// 連接完成事件交給業(yè)務(wù)線程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass() + " error when process connected event", t);
}
}

@Override
public void disconnected(Channel channel) throws RemotingException {
// 斷開連接事件交給業(yè)務(wù)線程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event", t);
}
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
// 請求響應(yīng)事件交給業(yè)務(wù)線程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException) {
Request request = (Request)message;
if(request.isTwoWay()) {
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event", t);
}
}

@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
// 異常事件交給業(yè)務(wù)線程池
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass() + " error when process caught event", t);
}
}
}

DirectDispatcher策略所有消息都不派發(fā)到業(yè)務(wù)線程池,全部在IO線程直接執(zhí)行:

public class DirectDispatcher implements Dispatcher {

// 線程模型名稱
public static final String NAME = "direct";

// 具體實現(xiàn)策略
@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
// 直接返回handler表示所有事件都交給IO線程處理
return handler;
}
}

1.2 DUBBO線程池策略

1.2.1 基本概念

上個章節(jié)分析了線程模型,我們知道不同的線程模型會選擇使用還是IO線程還是業(yè)務(wù)線程。如果使用業(yè)務(wù)線程池,那么使用什么線程池策略是本章節(jié)需要回答的問題。DUBBO官網(wǎng)線程派發(fā)模型圖展示了線程模型和線程池策略的關(guān)系:

DUBBO提供了多種線程池策略,選擇線程池策略需要在配置文件指定threadpool屬性:





不同線程池策略會創(chuàng)建不同特性的線程池:

fixed
包含固定個數(shù)線程

cached
線程空閑一分鐘會被回收,當(dāng)新請求到來時會創(chuàng)建新線程

limited
線程個數(shù)隨著任務(wù)增加而增加,但不會超過最大閾值??臻e線程不會被回收

eager
當(dāng)所有核心線程數(shù)都處于忙碌狀態(tài)時,優(yōu)先創(chuàng)建新線程執(zhí)行任務(wù),而不是立即放入隊列

fixed包含固定個數(shù)線程cached線程空閑一分鐘會被回收,當(dāng)新請求到來時會創(chuàng)建新線程limited線程個數(shù)隨著任務(wù)增加而增加,但不會超過最大閾值。空閑線程不會被回收eager當(dāng)所有核心線程數(shù)都處于忙碌狀態(tài)時,優(yōu)先創(chuàng)建新線程執(zhí)行任務(wù),而不是立即放入隊列

1.2.2 確定時機(jī)

本文我們以AllDispatcher為例分析線程池策略在什么時候確定:

public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";

@Override
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}

public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
}

在WrappedChannelHandler構(gòu)造函數(shù)中如果配置指定了threadpool屬性,擴(kuò)展點加載器會從URL獲取屬性值加載對應(yīng)線程池策略,默認(rèn)策略為fixed:

public class WrappedChannelHandler implements ChannelHandlerDelegate {

public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 獲取線程池自適應(yīng)擴(kuò)展點
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
}

@SPI("fixed")
public interface ThreadPool {
@Adaptive({Constants.THREADPOOL_KEY})
Executor getExecutor(URL url);
}

1.2.3 源碼分析

(1) FixedThreadPool
public class FixedThreadPool implements ThreadPool {

@Override
public Executor getExecutor(URL url) {

// 線程名稱
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

// 線程個數(shù)默認(rèn)200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);

// 隊列容量默認(rèn)0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

// 隊列容量等于0使用阻塞隊列SynchronousQueue
// 隊列容量小于0使用無界阻塞隊列LinkedBlockingQueue
// 隊列容量大于0使用有界阻塞隊列LinkedBlockingQueue
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue()
: (queues < 0 ? new LinkedBlockingQueue()
: new LinkedBlockingQueue(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
(2) CachedThreadPool
public class CachedThreadPool implements ThreadPool {

@Override
public Executor getExecutor(URL url) {

// 獲取線程名稱
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

// 核心線程數(shù)默認(rèn)0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);

// 最大線程數(shù)默認(rèn)Int最大值
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);

// 隊列容量默認(rèn)0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

// 線程空閑多少時間被回收默認(rèn)1分鐘
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

// 隊列容量等于0使用阻塞隊列SynchronousQueue
// 隊列容量小于0使用無界阻塞隊列LinkedBlockingQueue
// 隊列容量大于0使用有界阻塞隊列LinkedBlockingQueue
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue()
: (queues < 0 ? new LinkedBlockingQueue()
: new LinkedBlockingQueue(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
(3) LimitedThreadPool
public class LimitedThreadPool implements ThreadPool {

@Override
public Executor getExecutor(URL url) {

// 獲取線程名稱
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

// 核心線程數(shù)默認(rèn)0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);

// 最大線程數(shù)默認(rèn)200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);

// 隊列容量默認(rèn)0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

// 隊列容量等于0使用阻塞隊列SynchronousQueue
// 隊列容量小于0使用無界阻塞隊列LinkedBlockingQueue
// 隊列容量大于0使用有界阻塞隊列LinkedBlockingQueue
// keepalive時間設(shè)置Long.MAX_VALUE表示不回收空閑線程
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue()
: (queues < 0 ? new LinkedBlockingQueue()
: new LinkedBlockingQueue(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
(4) EagerThreadPool

我們知道ThreadPoolExecutor是普通線程執(zhí)行器。當(dāng)線程池核心線程達(dá)到閾值時新任務(wù)放入隊列,當(dāng)隊列已滿開啟新線程處理,當(dāng)前線程數(shù)達(dá)到最大線程數(shù)時執(zhí)行拒絕策略。

但是EagerThreadPool自定義線程執(zhí)行策略,當(dāng)線程池核心線程達(dá)到閾值時,新任務(wù)不會放入隊列而是開啟新線程進(jìn)行處理(要求當(dāng)前線程數(shù)沒有超過最大線程數(shù))。當(dāng)前線程數(shù)達(dá)到最大線程數(shù)時任務(wù)放入隊列。

public class EagerThreadPool implements ThreadPool {

@Override
public Executor getExecutor(URL url) {

// 線程名
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

// 核心線程數(shù)默認(rèn)0
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);

// 最大線程數(shù)默認(rèn)Int最大值
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);

// 隊列容量默認(rèn)0
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

// 線程空閑多少時間被回收默認(rèn)1分鐘
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

// 初始化自定義線程池和隊列重寫相關(guān)方法
TaskQueue taskQueue = new TaskQueue(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}

1.3 一個公式

現(xiàn)在我們知道DUBBO會選擇線程池策略進(jìn)行業(yè)務(wù)處理,那么應(yīng)該如何估算可
文章標(biāo)題:一個公式看懂:為什么Dubbo線程池會打滿
標(biāo)題來源:http://www.dlmjj.cn/article/dhojcss.html