ti外包网站建设wordpress 插件 支付
news/
2025/9/24 4:29:53/
文章来源:
ti外包网站建设,wordpress 插件 支付,青海城乡建设部网站,网站建设功能选择表背景
最近在研究tomcat调优的问题#xff0c;开发人员做过的最多的tomcat调优想必就是线程池调优了#xff0c;但是tomcat并没有使用jdk自己的线程池实现#xff0c;而是自定了了线程池#xff0c;自己实现了ThreadPoolExecutor类位于org.apache.tomcat.util.threads包下 …背景
最近在研究tomcat调优的问题开发人员做过的最多的tomcat调优想必就是线程池调优了但是tomcat并没有使用jdk自己的线程池实现而是自定了了线程池自己实现了ThreadPoolExecutor类位于org.apache.tomcat.util.threads包下
jdk线程池
首先回顾一下jdk的线程池实现 提交一个任务时 1 如果此时线程池中的数量小于corePoolSize无论线程池中的线程是否处于空闲状态都会创建新的线程来处理被添加的任务。
2 如果此时线程池中的数量大于等于于corePoolSize但是缓冲队列workQueue未满那么任务被放入缓冲队列。
3 如果此时线程池中的数量大于等于corePoolSize且缓冲队列workQueue满但是线程池中的数量小于maximumPoolSize则建新线程来处理被添加的任务。
4 如果此时线程池中的数量大于corePoolSize且缓冲队列workQueue满并且线程池中的数量等于maximumPoolSize那么就需要通过handler所指定的策略来处理此这个任务。
5 线程池中的线程数量大于 corePoolSize时如果某线程处理完任务后进入空闲状态空闲时间超过keepAliveTime该线程将被终止。这样线程池可以动态的调整池中的线程数到corePoolSize。
重点已经标红处理了在线程达到corePoolSize个数时超过的任务是先放在队列里面的
问题当任务很多又很耗时比如http请求IO密集型队列长度怎么设置过长容易造成任务堆积甚至OOM最大线程数的设置也将变的没有意义过短又容易将丢弃任务tomcat至少要保证请求尽可能交给业务系统去处理
tomcat线程池
在AbstractEndpoint中调用createExecutor方法自定义线程池
public void createExecutor() {internalExecutor true;TaskQueue taskqueue new TaskQueue();TaskThreadFactory tf new TaskThreadFactory(getName() -exec-, daemon, getThreadPriority());executor new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);taskqueue.setParent( (ThreadPoolExecutor) executor);}那么首先看ThreadPoolExecutor的excute方法
public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue (TaskQueue)super.getQueue();try {if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException(sm.getString(threadPoolExecutor.queueFull));}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet();throw rx;}}}tomcat中的线程池有一个额外的属性submittedCount你可以简单的理解为就是一个计数器每当有一个任务执行在执行时submittedCount就会加1当任务执行完成后submittedCount就会减1具体记录这个数字有什么作用继续往下看就知道了。
核心还在是父类的的excute方法
public void execute(Runnable command) {if (command null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldnt, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c ctl.get();if (workerCountOf(c) corePoolSize) {if (addWorker(command, true))return;c ctl.get();}if (isRunning(c) workQueue.offer(command)) {int recheck ctl.get();if (! isRunning(recheck) remove(command))reject(command);else if (workerCountOf(recheck) 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}如果当前worker的数量小于核心线程池调用addWorker如果任务能成功入队并且需要增加一个线程线程池未关闭并且没有工作线程调用addWorker如果任务入队失败那么尝试添加一个新线程。如果失败了拒绝这项任务。
什么是ctl
我们看到线程池ThreadPoolExecutor内部是通过AtomicInteger类型的 ctl变量来控制运行状态和线程个数在通过AtomicInteger源码知道器内部就是维护了一个int类型的value值如下
private volatile int value;那么通过一个值如何维护状态和个数两个值呢那么接下来通过底层源码来看大神Doug Lea是如何设计的。
private static final int COUNT_BITS Integer.SIZE - 3;private static final int CAPACITY (1 COUNT_BITS) - 1;// 取高三位表示以下运行时的状态// runState is stored in the high-order bitsprivate static final int RUNNING -1 COUNT_BITS;private static final int SHUTDOWN 0 COUNT_BITS;private static final int STOP 1 COUNT_BITS;private static final int TIDYING 2 COUNT_BITS;private static final int TERMINATED 3 COUNT_BITS;/*** 各个值的二进制表示** 1111 1111 1111 1111 1111 1111 1111 1111 (-1) * 0000 0000 0000 0000 0000 0000 0000 0000 (0) * 0000 0000 0000 0000 0000 0000 0000 0001 (1) * 0000 0000 0000 0000 0000 0000 0000 0010 (2) * 0000 0000 0000 0000 0000 0000 0000 0011 (3)** 【分析】* 初始容量值高三位全是0低29位全是1后续操作会以此为基础进行操作* CAPACITY 000 1 1111 1111 1111 1111 1111 1111 1111** ---------------3位-1位 -28位---* 【前三位表明状态位后29位表明线程个数即 2^29 - 1 基本够用了】** RUNNING(-536870912) 111 0 0000 0000 0000 0000 0000 0000 0000* SHUTDOWN(0) 000 0 0000 0000 0000 0000 0000 0000 0000* STOP(536870912) 001 0 0000 0000 0000 0000 0000 0000 0000* TIDYING(1073741824) 010 0 0000 0000 0000 0000 0000 0000 0000* TERMINATED(1610612736) 011 0 0000 0000 0000 0000 0000 0000 0000* */ 结论就是前三位表明状态位后29位表明线程个数即 2^29 - 1 基本够用了
TaskQueue的offer()方法
既然tomcat用了自己的队列接下来看一下自定义的TaskQueue类
上述源码已经知道是否调用addWorker方法取决于TaskQueue的offer()方法要返回false结果暂不考虑入队成功但是worker随后被销毁的情况
LinkedBlockingQueue的实现
TaskQueue继承于LinkedBlockingQueue那么先看LinkedBlockingQueue的offer实现
public boolean offer(E e) {if (e null) throw new NullPointerException();final AtomicInteger count this.count;if (count.get() capacity)return false;final int c;final NodeE node new NodeE(e);final ReentrantLock putLock this.putLock;putLock.lock();try {if (count.get() capacity)return false;enqueue(node);c count.getAndIncrement();if (c 1 capacity)notFull.signal();} finally {putLock.unlock();}if (c 0)signalNotEmpty();return true;}很简单队列中元素个数达到容量上限则返回false 这也能解释为什么jdk线程池是队列满了才会继续新增线程至最大线程数
tomcat实现TaskQueue
首先TaskQueue是继承LinkedBlockingQueue的
public class TaskQueue extends LinkedBlockingQueueRunnable 再看offer方法
public boolean offer(Runnable o) {//we cant do any checksif (parentnull) return super.offer(o);//we are maxed out on threads, simply queue the objectif (parent.getPoolSize() parent.getMaximumPoolSize()) return super.offer(o);//we have idle threads, just add it to the queueif (parent.getSubmittedCount()(parent.getPoolSize())) return super.offer(o);//if we have less threads than maximum force creation of a new threadif (parent.getPoolSize()parent.getMaximumPoolSize()) return false;//if we reached here, we need to add it to the queuereturn super.offer(o);}注释解释的很明显了
如果当前活跃的线程数等于最大线程数那么就不能创建线程了因此直接放入队列中如果当前提交的任务数小于等于当前活跃的线程数表示还有空闲线程直接添加到队列让线程去执行即可。此处也终于看到了parent.getSubmittedCount方法用来获取当前提交的任务数每当有一个任务执行在执行时submittedCount就会加1当任务执行完成后submittedCount就会减1再校验下当前活跃线程数是否小于最大线程数如果小于此时就可以创建新的线程了。如果以上都不符合那就代表既没有空闲线程又达到了最大线程数也只能放队列了但是不需要新建线程 因此Tomcat的线程池策略是如果没有空闲线程且线程数大于核心线程配置时继续增加线程至最大核心数为止。
两者对比
tomcat的线程池实现相比jdk实现有以下几点不同
队列无限长高qps时基本不会丢弃任务但是会有OOM的风险但是一般单台服务器qps基本不会超过两千达到最大线程数之后才会放入队列低qps时可以快速请求响应不用排队但是若qps不稳定会频繁创建销毁线程对cpu不够友好
行文至此tomcat为什么要自定义线程池已经讲完了接下来会补充一下worker的源码分析
worker原理
简介
Worker是ThreadPoolExecutor中的内部类线程池中的线程都会被封装成一个Worker类对象ThreadPoolExecutor维护的其实就是一组Worker对象其中用集合workers存储这些Worker对象
Worker类中有两个属性一个是firstTask用来保存传入线程池中的任务一个是thread是在构造Worker对象的时候利用ThreadFactory来创建的线程用来处理线程池队列中的任务
Worker继承AQS使用AQS实现独占锁并且是不可重入的构造Worker对象的时候会把锁资源状态设置成-1因为新增的线程还没有处理过任务是不允许被中断的
代码如下
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID 6138294804551838833L;/** 这个就是worker持有的线程也就是线程池中的线程 */ final Thread thread;/** 这个就是提交给线程池的任务 */ Runnable firstTask;/** 每一个线程执行的任务数量的计数器 */ volatile long completedTasks;/*** 我们在调用addWorker方法的时候就会调用这个构造方法有可能是创建新线程并执行任务那么firstTask就是传给线程池要执行的任务如果只是了* 单纯的想创建一个线程只需要传入null就可以*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask firstTask;// 这个是通过线程工厂类创建一个线程也就是给线程池创建一个线程this.thread getThreadFactory().newThread(this); }/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}}
worker其实就是一个Runable其也是需要构造成一个Thread对象然后调用Thread.start()方法运行的。只不过在worker的run方法中是定义了一个runWoker的方法。这个方法的主要内容从 for 循环不停的从task队列中获取对应的runable的task然后同步调用这个task的run()方法。其实就是在某个线程中不停的拿队列中的任务进行执行。
runWorker
上文已经知道线程池添加一个线程是通过调用addWorker方法在调用addWorker成功后则会执行Worker对象的run方法进入runWorker方法逻辑
final void runWorker(ThreadPoolExecutor.Worker w) {// 获取当前线程其实这个当前线程就是worker对象持有的线程从线程池中拿到的任务就是由这个线程执行的Thread wt Thread.currentThread();// 在构造Worker对象的时候会把一个任务添加进Worker对象// 因此需要把其作为新增线程的第一个任务来执行Runnable task w.firstTask;// 上面已经将该任务拿出来准备进行执行了将firstTask取出赋值给task则需要将该worker对象即线程池中的线程对象持有的任务清空w.firstTask null;// 将AQS锁资源的状态由-1变成0运行该线程进行中断 因为在创建的时候将state设为-1了现在开始执行任务了也就需要加锁了所以要把state再重新变为0这样在后面执行任务的时候才能用来加锁保证任务在执行过程中不会出现并发异常// 解锁w.unlock();// 用来判断执行任务的过程中是否出现了异常boolean completedAbruptly true;try {// 线程池中的线程循环处理线程池中的任务直到线程池中的所有任务都被处理完后则跳出循环while (task ! null || (task getTask()) ! null) { // 这一步的getTask()就说明Worker一直在轮询的从队列中获取任务getTask()方法将从队列获取到的任务返回赋值给task// 给该worker加锁一个线程只处理一个任务。注意加锁是给worker线程加锁不是给任务线程加锁因为worker线程之前一直在轮询地在队列中取任务但是当执行任务的时候为了避免执行任务出现异常就对其加锁w.lock();// 线程池是否是STOP状态// 如果是则确保当前线程是中断状态// 如果不是则确保当前线程不是中断状态if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted())// 注意这里中断的是当前线程也就是worker对象持有的线程wt.interrupt();try {// 扩展使用在执行任务的run方法之前执行beforeExecute(wt, task);// 记录执行任务过程中出现的异常Throwable thrown null;try {// 执行任务的run方法 当前线程环境就是worker对象持有的线程所以本质就是woker对象在执行task任务的run()方法task.run();} catch (RuntimeException x) {thrown x; throw x;} catch (Error x) {thrown x; throw x;} catch (Throwable x) {thrown x; throw new Error(x);} finally {// 扩展使用在执行任务的run方法之后执行afterExecute(task, thrown);}} finally {// 执行完任务后就将任务对象清空task null;w.completedTasks; // 该worker已经完成的任务数1w.unlock(); // 将worker线程地锁释放}}// 正常执行完任务completedAbruptly false;} finally {// 线程池中所有的任务都处理完后或者执行任务的过程中出现了异常就会执行该方法processWorkerExit(w, completedAbruptly);}
}这个方法主要做几件事
如果 task 不为空则开始执行 task如果 task 为空则通过 getTask()再去取任务并赋值给 task如果取到的 Runnable 不为空则执行该任务执行完毕后通过 while 循环继续 getTask()取任务如果 getTask()取到的任务依然是空那么整个 runWorker()方法执行完毕
这个方法比较简单如果忽略状态检测和锁的内容本质就是如果有第一个任务就先执行之之后再从任务队列中取任务来执行获取任务是通过getTask()来进行的。
到这里也就明白了线程池是怎么复用有限的线程数来执行大量的任务 那么worker是如何获取任务的呢?
核心方法 getTask()
这个方法用来向队列中轮询地尝试获取任务。该方法也是ThreadPoolExecutor中的方法。
这里重要的地方是第二个 if 判断目的是控制线程池的有效线程数量。
由上文中的分析可以知道在执行 execute 方法时如果当前线程池的线程数量超过了 corePoolSize 且小于maximumPoolSize并且 workQueue 已满时则可以增加工作线程但这时如果超时没有获取到任务也就是 timedOut 为 true 的情况说明 workQueue 已经为空了也就说明了当前线程池中不需要那么多线程来执行任务了可以把多于 corePoolSize 数量的线程销毁掉保持线程数量在 corePoolSize 即可。
// 返回任务Runnable
private Runnable getTask() {// timedOut表示 记录上一次从队列中获取任务是否超时boolean timedOut false; // Did the last poll() time out?// 自旋for (;;) {// 这一部分是判断线程池状态// 获取线程池的状态和线程池中线程数量组成的整形字段32位// 高3位代表线程池的状态低29位代表线程池中线程的数量int c ctl.get();// 获取高3位的值即线程池的状态int rs runStateOf(c);// 如果线程池状态不是Running状态并且 线程也不是SHUTDOWN状态 或者任务队列已空if (rs SHUTDOWN (rs STOP || workQueue.isEmpty())) {// 则将线程池中的线程数量减1 就是说该线程已经不是运行状态了所以要这个worker线程也没有用了直接将该worker去掉。这个是原子操作decrementWorkerCount();//返回一个空任务因为// 1如果任务队列已空则想返回任务也没有// 2:如果线程池处于STOP或者之上的状态则线程池不允许再处理任务return null;}// 这一部分是判断线程池有效线程数量// 获取低29位的值即线程池中线程的数量int wc workerCountOf(c);// timed是否需要进行超时控制// allowCoreThreadTimeOut默认false// 当线程池中线程的数量没有达到核心线程数量时获取任务的时候允许超时 如果将allowCoreThreadTimeOut设为true那也不允许超时// 当线程池中线程的数量超过核心线程数量时获取任务的时候不允许超时 boolean timed allowCoreThreadTimeOut || wc corePoolSize; // 这个很好理解// wc maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法// timed timedOut 如果为true表示当前操作需要进行超时控制并且上次从阻塞队列中获取任务发生了超时// 接下来判断如果有效线程数量大于1或者阻塞队列是空的那么尝试将workerCount减1// 如果减1失败则continue返回重试// 如果wc 1时也就说明当前线程是线程池中唯一的一个线程了。if ((wc maximumPoolSize || (timed timedOut)) (wc 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}// 如果上面都没问题就可以获取任务了try {// 获取任务// 如果timed true 说明需要做超时控制则根据keepAliveTime设置的时间内阻塞等待从队列中获取任务// 如果timed false说明不需要做超时控制则阻塞直到从队列中获取到任务为止Runnable r timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();// 如果获取到任务则把任务返回if (r ! null)return r;// 执行到这里说明在允许的时间内没有获取到任务timedOut true;} catch (InterruptedException retry) {// 获取任务没有超时但是出现异常了将timedOut设置为falsetimedOut false;}}
}
注意这里取任务会根据工作线程的数量判断是使用BlockingQueue的poll(timeout, unit)方法还是take()方法。
poll(timeout, unit)方法会在超时时返回null如果timeout0队列为空时直接返回null。
take()方法会一直阻塞直到取到任务或抛出中断异常。
所以如果keepAliveTime设置为0当任务队列为空时非核心线程取不出来任务会立即结束其生命周期。
当允许超时控制时workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 会在keepAliveTime后返回null返回null的结果是退出完成循环销毁当前线程这就说明了keepAliveTime参数的含义线程最大空闲时间了
提出一个问题这个keepAliveTime参数是否对核心线程也生效呢文末会有答案
那么接下来worker是如何销毁的呢
核心方法 processWorkerExit()
runWorker 的 while 循环执行完毕以后在 finally 中会调用 processWorkerExit()方法来销毁工作线程。该方法就是判断当前线程是需要将其删除还是继续执行任务。该方法也是ThreadPoolExecutor中的方法。
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {// 如果 completedAbruptly true 则线程执行任务的时候出现了异常需要从线程池中减少一个线程// 如果 completedAbruptly false则执行getTask方法的时候已经减1这里无需在进行减1操作if (completedAbruptly)decrementWorkerCount();// 获取线程池的锁因为后面是线程池的操作为了并发安全需要对线程池加锁final ReentrantLock mainLock this.mainLock;// 线程池加锁mainLock.lock();try {// 统计该线程池完成的任务数completedTaskCount w.completedTasks;// 从线程池中移除一个工作线程 works是线程池持有的一个集合 workers.remove(w); // 将没用的worker去掉也就是当前传入的worker} finally {// 线程池解锁mainLock.unlock();}// 根据线程池的状态决定是否结束该线程池tryTerminate(); // 钩子方法// 判断线程池是否需要增加线程// 获取线程池的状态int c ctl.get();// -当线程池是RUNNING或SHUTDOWN状态时// --如果worker是异常结束即completedAbruptly为false那么会直接addWorker// ---如果allowCoreThreadTimeOut true并且等待队列有任务至少保留一个worker// ---如果allowCoreThreadTimeOut false活跃线程数不少于corePoolSizeif (runStateLessThan(c, STOP)) { // 线程池状态小于STOP就说明当前线程池是RUNNING或SHUTDOWN状态// 如果worker是异常结束的不进入下面的分支直接去addWorkerif (!completedAbruptly) {// 根据allowCoreThreadTimeOut的值来设置线程池中最少的活跃线程数是0还是corePoolSizeint min allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果等待队列中有任务要至少保留一个workerif (min 0 ! workQueue.isEmpty())// 至少保留一个workermin 1;// 如果活跃线程数大于等于min直接返回不需要再调用addWorker来增加线程池中的线程了if (workerCountOf(c) min)return; // replacement not needed}// 增加线程池中的workeraddWorker(null, false);}
}
1.执行decrementWorkerCount方法将线程池中的线程数量减1 因为当前worker已经取出去任务了
如果 completedAbruptly true 则代表线程执行任务的时候出现了异常需要执行如果 completedAbruptly false则执行getTask方法的时候调用过decrementWorkerCount方法将线程池中的线程数量减1这里无需在进行减1操作
2.将worker从wokers集合中移除 3.根据线程池的状态决定是否结束该线程池 4.判断是否再调用addWorker方法
其中 int min allowCoreThreadTimeOut ? 0 : corePoolSize;根据allowCoreThreadTimeOut的值来设置线程池中最少的活跃线程数是0还是corePoolSize
如果worker是异常结束即completedAbruptly为false那么会直接addWorker如果allowCoreThreadTimeOut true并且等待队列有任务至少保留一个worker如果allowCoreThreadTimeOut false活跃线程数不少于corePoolSize
说明
如果核心线程出了异常也是会被销毁的只不过销毁后还会调用addWorker方法增加一个线程allowCoreThreadTimeOut为true时min为0则表明核心线程在allowCoreThreadTimeOut为true时也是会随着worker的销毁没有任务可取既空闲了keepAliveTime时间而销毁并且不会调用addWorker来增加一个线程
参考 https://blog.csdn.net/cy973071263/article/details/131484384 https://blog.csdn.net/u014307520/article/details/117787133 https://blog.csdn.net/kkkkk0826/article/details/103405813
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/914843.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!