面试官问: 线程池拒绝策略怎么选,才不会丢任务?本文将从原理、策略选型到高阶方案,助你从容应对。
面试官问:线程池拒绝策略怎么选,才不会丢任务?
当面试官追问“线程池满了如何处理?哪种策略能不丢任务?”时,仅回答“用CallerRunsPolicy”往往不够。面试官真正考察的是结合业务场景选型和规避任务丢失风险的能力。本文将从原理、策略选型到高阶方案,助你从容应对。
一、拒绝策略触发条件剖析
拒绝策略生效的本质是任务提交速率超出线程池处理能力,需同时满足:
- 核心线程满载:所有
corePoolSize线程均处于忙碌状态。 - 队列饱和:任务队列(如
LinkedBlockingQueue)容量耗尽。 - 线程数达上限:线程总数已达
maximumPoolSize,无法创建新线程。
场景示例
电商秒杀场景,线程池配置corePoolSize=5,maximumPoolSize=10, 队列容量capacity=20。若瞬时涌入 50 个下单任务:
- 5 个核心线程处理任务。
- 20 个任务进入队列。
- 创建 5 个非核心线程处理新任务。
- 剩余 10 个任务触发拒绝策略。策略选择不当,轻则任务丢失(下单失败),重则系统崩溃。
二、JDK 原生拒绝策略详解
JDK 提供 4 种策略(实现RejectedExecutionHandler),其任务保障性各异:
1. AbortPolicy (默认策略):抛异常,任务必丢
- 逻辑:直接抛出
RejectedExecutionException,任务不执行。
ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20), new ThreadPoolExecutor.AbortPolicy() );- 适用:需立即感知任务丢失的场景(如金融转账)。任务丢失即业务异常。
- 风险:未捕获异常导致提交线程崩溃;捕获不处理仍丢任务。
2. DiscardPolicy:静默丢弃,隐患最大
- 逻辑:直接丢弃新任务,无任何通知。
- 代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20), new ThreadPoolExecutor.DiscardPolicy() );- 适用:几乎无推荐场景!仅适用于日志记录等非核心且可容忍丢失的任务。
- 风险:任务“凭空消失”,排查困难。
3. DiscardOldestPolicy:弃旧保新
- 逻辑:丢弃队列头部(最老)任务,尝试将新任务入队。
- 代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20), new ThreadPoolExecutor.DiscardOldestPolicy() );- 适用:新任务优先级高于旧任务的场景(如实时数据统计)。
- 风险:丢弃的是已入队的任务,若为关键业务(如订单创建),将导致异常。
4. CallerRunsPolicy:提交线程执行,不丢任务!
- 逻辑:由提交任务的线程(如 Tomcat 工作线程)直接执行被提任务。
- 代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20), new ThreadPoolExecutor.CallerRunsPolicy() );- 适用:核心且不容丢失的任务(秒杀下单、用户注册)。
- 优势:
- 零丢失:提交线程不崩溃,任务必执行。
- 天然限流:提交线程忙于执行任务,减缓新任务提交速度。
- 风险:若提交线程是关键路径(如主线程),执行任务会阻塞其本职工作(如处理其他请求)。
三、高阶方案:自定义策略解决痛点
原生策略存在丢任务或阻塞风险,实践中常采用自定义策略:
1. 方案一:MQ 异步重试(绝对保任务)
- 逻辑:拒绝时,将任务序列化后发送至消息队列(RabbitMQ/RocketMQ/Kafka)。消费者异步拉取并重试提交,直至成功或记录失败。
- 代码示例:
// 自定义拒绝策略:MQ 重试 RejectedExecutionHandler mqRejectionHandler = (Runnable runnable, Executor executor) -> { try { String taskJson = JSON.toJSONString(runnable); // 序列化任务 rabbitTemplate.convertAndSend(「thread-pool-retry-queue」, taskJson); log.info(「任务入 MQ 重试:{}」, taskJson); } catch (Exception e) { // MQ 失败降级:CallerRunsPolicy new ThreadPoolExecutor.CallerRunsPolicy().rejectedExecution(runnable, executor); log.error(「任务入 MQ 失败,降级处理」, e); } }; // 配置线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20), mqRejectionHandler // 使用自定义策略 ); // MQ 消费者(示例) @RabbitListener(queues = 「thread-pool-retry-queue」) public void handleRetryTask(String taskJson) { Runnable task = JSON.parseObject(taskJson, Runnable.class); for (int i = 0; i < 3; i++) { // 重试 3 次 if (executor.getQueue().remainingCapacity() > 0) { executor.submit(task); log.info(「任务重试成功:{}」, taskJson); return; } try { TimeUnit.SECONDS.sleep(1); // 间隔 1 秒 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } log.error(「任务重试失败,记录到 DB:{}」, taskJson); // 记录失败 dbService.saveFailedTask(taskJson); }- 适用:绝对不容丢失的核心业务(支付、转账、库存扣减)。
- 优势:零丢失风险,不阻塞提交线程,大厂核心业务首选。
2. 方案二:动态扩容(减少拒绝触发)
- 逻辑:拒绝前尝试动态扩容队列或最大线程数(在预设上限内)。扩容失败则降级(如
CallerRunsPolicy)。 - 代码示例(可扩容队列):
class ResizableBlockingQueue<E> extends LinkedBlockingQueue<E> { private int maxCapacity; // 最大扩容上限 public ResizableBlockingQueue(int initialCapacity, int maxCapacity) { super(initialCapacity); this.maxCapacity = maxCapacity; } public boolean expandCapacity(int newCapacity) { if (newCapacity > maxCapacity) return false; // ... 实现扩容逻辑 (需考虑线程安全) return true; } @Override public boolean offer(E e) { if (super.remainingCapacity() == 0) { // 队列满 int newCapacity = (int) (size() * 1.2); // 尝试扩容 20% if (expandCapacity(newCapacity)) { log.info(「队列扩容至:{}」, newCapacity); } } return super.offer(e); } } // 自定义拒绝策略:先扩容,后降级 RejectedExecutionHandler expandRejectionHandler = (runnable, executor) -> { ThreadPoolExecutor pool = (ThreadPoolExecutor) executor; ResizableBlockingQueue<?> queue = (ResizableBlockingQueue<?>) pool.getQueue(); // 1. 尝试扩容队列 if (queue.expandCapacity((int) (queue.size() * 1.2))) { pool.submit(runnable); return; } // 2. 尝试扩容最大线程数 (上限假设为 20) if (pool.getMaximumPoolSize() < 20) { pool.setMaximumPoolSize(pool.getMaximumPoolSize() + 2); pool.submit(runnable); return; } // 3. 扩容失败,降级 CallerRunsPolicy new ThreadPoolExecutor.CallerRunsPolicy().rejectedExecution(runnable, executor); }; // 配置线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.SECONDS, new ResizableBlockingQueue<>(20, 100), // 初始容量 20,最大扩容至 100 expandRejectionHandler // 使用自定义策略 );- 适用:流量波动较大的场景(如大促预热到峰值过渡期)。
- 优势:减少拒绝触发频率,平衡性能与稳定性。
四、面试高频考点与应对
CallerRunsPolicy阻塞提交线程如何规避?
判断提交线程性质(如是否为 Tomcat 工作线程),是则改用 MQ 策略。
为任务执行添加超时控制(Future.get(timeout)),避免长期阻塞。- MQ 重试如何避免任务重复执行?
幂等设计是关键:任务携带唯一标识(如订单 ID)。
执行前检查标识状态(查 DB/Redis),已执行则跳过。
示例:下单任务用订单 ID 查 Redis 键order:123:executed。 - 除拒绝策略外,如何避免丢任务?
参数调优:corePoolSize= 峰值 QPS / 单线程 QPS;queueCapacity= 峰值时长 * 单线程 QPS。
预热核心线程:executor.prestartAllCoreThreads()。
提交前检查:若executor.getQueue().remainingCapacity() < 阈值,提前返回“系统繁忙”。
五、总结:选型与避坑指南
- 选型:
- 核心业务保任务 →MQ 重试策略。
- 非核心实时场景保新 →DiscardOldestPolicy。
- 简单场景不阻塞核心线程 →CallerRunsPolicy。
- 避免使用
AbortPolicy/DiscardPolicy。
- 避坑:
CallerRunsPolicy→ 警惕阻塞核心线程。- MQ 重试 → 必须实现幂等。
- 动态扩容 → 设定资源上限。
- 核心原则:拒绝策略是兜底,优化参数(核心线程数、队列容量)和源头控流(提交前检查)才是根本。