Appearance
如果不允许线程池丢弃任务,应该选择哪个拒绝策略?
大家应该都知道, 线程池的核心线程满了, 就会放在阻塞队列, 阻塞队列满了会创建临时线程, 如果超过最大线程数, 就会触发拒绝策略。 那很多人都是用直接拒绝的策略,这样任务就直接丢了。
线程池的拒绝策略是指当线程池无法继续接收新任务时(例如线程池已满且任务队列已满),如何处理新提交的任务。Java的ThreadPoolExecutor提供了4种内置的拒绝策略,每种策略都有其适用场景和特点。
1. AbortPolicy(默认策略)
- 行为:当任务无法被线程池接收时,抛出
RejectedExecutionException异常。 - 适用场景:适用于对任务执行非常关键的场景,异常可以提示调用者任务无法执行。
- 缺点:直接抛出异常,可能会导致调用方无法处理任务。
2. CallerRunsPolicy
- 行为:由提交任务的线程(调用线程)来执行该任务。这会降低线程池的负载,但可能会导致调用线程阻塞。
- 适用场景:适用于任务提交方可以接受在当前线程中执行任务的场景,有助于缓解线程池的压力。
- 优点:可以避免任务丢失,确保任务最终被执行。
- 缺点:可能会导致调用线程阻塞,影响调用方的性能。
3. DiscardPolicy
- 行为:直接丢弃任务,不抛出异常。
- 适用场景:适用于任务可以被丢弃的场景,例如日志记录等非关键任务。
- 优点:简单直接,不会影响调用方。
- 缺点:任务丢失,调用方无法感知任务未执行。
4. DiscardOldestPolicy
- 行为:丢弃队列中最早的任务,尝试执行当前任务。
- 适用场景:适用于队列中的任务可以被丢弃,且希望尽可能执行最新任务的场景。
- 优点:可以尝试执行最新任务,减少任务积压。
- 缺点:可能会导致一些任务被丢弃,且调用方无法感知任务丢失。
如何选择拒绝策略?
选择合适的拒绝策略需要根据实际业务需求来决定:
- 如果任务非常关键,不能丢失,可以使用
AbortPolicy或CallerRunsPolicy。 - 如果任务可以被丢弃,且不想影响调用方,可以选择
DiscardPolicy。 - 如果希望尽可能执行最新任务,可以选择
DiscardOldestPolicy。
示例代码
以下是一个使用CallerRunsPolicy的线程池示例:
java复制
java
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60, // 线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(2), // 任务队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);在这个示例中,当线程池和任务队列都满了时,新提交的任务会在调用线程中执行。
通过合理选择拒绝策略,可以更好地控制线程池的行为,确保系统在高负载下仍然能够稳定运行。
哈喽大家好我是徐庶,需要开放场具题、项目解决方案、线上问题解决等高频面试题的可以在评论区扣666.
但是如果任务很重要,不允许丢弃你会怎么做呢?
那大家可以把工作中用到的方案发在评论区, 我给你提供几种:
- 用CallerRunsPolicy ,他会采用主线程执行任务, 但是如果任务非常耗时会阻塞主线程,在高并发场景慎用!
- 将任务进行持久化,可以采用mysql、redis、或者mq异步 等方案持久化, 后续再对任务进行补偿执行。
- 也可以参考netty: 他会再创建新的一个异步线程处理任务。
java
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
//创建一个临时线程处理任务
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}- 也参考ActiveMQ: 再次插入阻塞队列, 会加入等待时间, 尽可能的保证执行。
java
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
//限时阻塞等待,实现尽可能交付
//如果队列未满,则立即插入任务并返回 true。
//如果队列已满,则会等待指定的时间(这里是 60 秒)。
//如果在 60 秒内队列仍然无法腾出空间(比如没有任务被消费),则插入失败,返回 false。
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}当然如果系统资源运行的情况下我们还可以调整最大线程数和阻塞队列的长度, 这样可以减少拒绝策略的触发。
如果对你有帮助可以三连支持, 感谢支持!
更新: 2025-02-25 19:53:18
原文: https://www.yuque.com/tulingzhouyu/db22bv/fnu830wnweolv1rp