Java并发编程——线程池

本文最后更新于:3 年前

引言

在高并发环境下,线程创建与销毁的频繁切换会带来额外的开销。线程池通过对线程的复用、可控的队列和多种灵活的拒绝策略,让并发编程在性能、可维护性和扩展性上达到平衡。本文系统介绍了 Java 并发编程中的线程池概念和关键实现方式:从 ThreadPoolExecutor 核心原理到常见线程池类型的应用场景,再到 Fork/Join 框架、线程池管理与调优策略,全方位助力开发者编写更高效的并发程序。

简介

线程池(Thread Pool)是一种多线程处理技术,通过预先创建并维护一定数量的线程来执行并发任务。其核心思想包括:

  • 资源复用:重复使用固定数量的线程处理多个任务,避免频繁创建和销毁线程带来的开销。
  • 任务管理:通过队列管理提交的任务,控制任务的执行顺序与优先级。
  • 状态维护:线程池负责监控线程状态,包括空闲、忙碌和终止等。

在 Java 中,java.util.concurrent 包提供了丰富的线程池实现,如 ThreadPoolExecutorExecutors 工具类等,方便开发者根据需求创建和管理线程池。

作用

  • 提高性能
    • 减少创建和销毁开销:线程池通过复用线程,避免每次任务执行都创建新线程,从而降低系统开销和响应时间。
    • 资源优化:通过控制线程数量,防止过多线程导致系统资源耗尽(如 CPU 过载、内存不足等)。
  • 任务管理
    • 统一调度:线程池提供任务队列和调度机制,有序地管理大量并发任务,提升任务处理的可预测性和稳定性。
    • 灵活配置:支持自定义线程数量、队列类型、拒绝策略等,满足不同应用场景的需求。
  • 简化编程模型
    • 开发者无需手动管理线程生命周期,降低并发编程复杂度,更专注于业务逻辑实现。

特点

优点

  • 资源复用与性能提升:通过复用线程减少频繁创建和销毁线程的开销,提高系统响应速度和吞吐量。
  • 统一管理与控制:集中管理线程,便于监控、调优和维护,避免资源泄露和线程过多导致的性能问题。
  • 可配置性强:提供多种配置参数(如核心线程数、最大线程数、队列类型、拒绝策略等),满足不同场景需求。
  • 提高稳定性:通过合理设置线程池参数和拒绝策略,可以防止系统因资源竞争或过载而崩溃,提高系统稳定性。

缺点

  • 配置复杂:参数设置不当可能导致性能瓶颈或资源浪费。需要深入理解线程池工作原理,根据应用特点进行调优。
  • 有限的灵活性:固定线程池大小可能无法应对突然增加的任务负载;动态调整线程数虽然灵活,但增加了管理复杂性。
  • 潜在的死锁与资源竞争:如果任务间存在依赖关系或不当的锁机制设计,线程池内的线程可能发生死锁或频繁竞争资源,影响系统正常运行。

关键组件

ThreadPoolExecutor

ThreadPoolExecutor 是 Java 中实现线程池功能的核心类,位于 java.util.concurrent 包中。其主要构造器如下:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }
  • corePoolSize:核心线程数,始终保持活动的线程数。
  • maximumPoolSize:池中允许的最大线程数。
  • keepAliveTimeunit:非核心线程的空闲存活时间及其单位,当空闲时间超过该值时被回收。
  • workQueue:用于存放等待执行任务的工作队列。
  • threadFactory:用于创建新线程的线程工厂。
  • handler:任务拒绝策略,当线程池和队列都满时的处理方式。

示例:创建了一个核心 5 线程,最大 10 线程的线程池,并采用默认线程工厂和拒绝策略。

1
2
3
4
5
6
7
8
9
10
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
executor.execute(() -> {
// 执行任务代码
});
executor.shutdown();

核心和最大线程数

核心线程数

  • 线程池在正常情况下维护的最小线程数。即使没有任务需要执行,这些核心线程也会一直存在(除非设置了允许核心线程超时回收)。
  • 当提交新任务时,如果当前运行线程数少于核心数,则立即创建新线程处理任务,即使其他线程处于空闲状态。

最大线程数

  • 线程池能够容纳的最大线程数。它决定了线程池在面对任务洪峰时能够扩展到的线程数上限。
  • 当任务队列已满且当前线程数达到核心数时,会继续创建非核心线程,直到达到最大线程数。
  • 超过最大线程数后,若有新任务提交,则根据拒绝策略处理。

两者关系与调度策略

  • 任务提交顺序:

    1. 如果当前线程数 < corePoolSize,新任务会创建新的线程执行。
    2. 如果当前线程数 ≥ corePoolSize,将任务放入工作队列等待执行。
    3. 若队列已满且当前线程数 < maximumPoolSize,则创建新线程处理任务。
    4. 若线程数达到 maximumPoolSize 且队列已满,则任务由拒绝策略处理。
  • 调优考虑:

    选择合适的核心和最大线程数取决于应用需求、任务类型和系统资源,避免过多线程导致资源竞争或过少线程造成性能瓶颈。

工作队列

定义与作用

  • 工作队列(BlockingQueue<Runnable>)用于存放等待执行的任务。
  • 它的类型和容量会直接影响线程池的行为和性能。

常见队列类型

  • 无界队列(如 LinkedBlockingQueue 默认无界):允许无限制的任务积压,但可能导致线程池无法扩展到最大线程数。
  • 有界队列(如 ArrayBlockingQueue):限制队列容量,有助于控制资源使用和拒绝压力,但可能因队列满而触发拒绝策略。
  • 直接交付队列(如 SynchronousQueue):不存储任务,直接将任务移交给线程处理,通常与无限制的最大线程配合使用。

对线程池影响

  • 队列为空时,线程池创建新线程处理任务;队列非空时,新任务入队等待空闲线程处理。
  • 队列类型和容量决定了在什么情况下线程池会扩展线程数量或触发拒绝策略。

线程工厂

作用

  • 线程工厂(ThreadFactory)用于创建新线程,提供了一种定制化线程创建方式。
  • 通过实现 ThreadFactory 接口,可以设置线程的名称、优先级、守护状态等属性,便于调试和管理。

使用示例

1
2
3
4
5
6
7
8
9
ThreadFactory customFactory = new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("MyPoolThread-" + counter.getAndIncrement());
thread.setDaemon(false);
return thread;
}
};

将此工厂传递给 ThreadPoolExecutor 构造器,使线程池使用自定义的线程创建规则。

拒绝策略

概述

  • 当线程池中的线程数达到 maximumPoolSize 且工作队列已满时,新提交的任务无法被接受,这时会触发拒绝策略。
  • RejectedExecutionHandler 接口定义了如何处理被拒绝的任务。

常见拒绝策略

  1. AbortPolicy(默认):抛出 RejectedExecutionException,任务被拒绝并终止执行。
  2. CallerRunsPolicy:由调用者线程执行该任务,如果线程池已关闭则丢弃任务。
  3. DiscardPolicy:直接丢弃新任务,不抛出异常。
  4. DiscardOldestPolicy:丢弃最旧的未处理任务,将新任务提交到队列尾部。

定制拒绝策略

可以实现 RejectedExecutionHandler 接口,定义自定义的拒绝行为。例如记录日志、存储到外部系统等。

1
2
3
4
5
6
7
8
@Slf4j
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info("任务被拒绝,执行自定义处理逻辑");
// 可以将任务保存到外部系统、日志记录或重试机制
}
}

线程池状态

生命周期与状态

ThreadPoolExecutor 通过内部状态管理线程池的生命周期,主要状态包括:

  1. RUNNING:初始状态,接受新任务和处理队列中的任务。
  2. SHUTDOWN:调用 shutdown() 后进入该状态,不再接收新任务,但继续处理队列中已提交的任务。
  3. STOP:调用 shutdownNow() 或遇到严重故障时进入状态,不再接收新任务,尝试停止正在执行的任务并清空队列。
  4. TIDYING:所有任务已终止,工作线程空闲后进入此状态,准备进行资源清理。
  5. TERMINATED:完全终止状态,线程池生命周期结束。

状态转换

  • RUNNINGSHUTDOWN:调用 shutdown() 方法。
  • RUNNINGSHUTDOWNSTOP:调用 shutdownNow() 方法。
  • 当所有任务完成且线程清理结束后,进入 TIDYING,再到 TERMINATED

监控与管理

  • 可以通过线程池提供的方法(如 isShutdown(), isTerminated() 等)查询状态。
  • 状态有助于合理地关闭线程池、释放资源及保证程序正确终止。

线程池类型

固定数量线程池(FixedThreadPool)

概述

固定数量线程池通过 Executors.newFixedThreadPool(int nThreads) 创建,其核心线程数和最大线程数均设置为 nThreads。线程池始终维护固定数量的线程。

特点与行为

  • 线程数固定:始终保持 nThreads 个活动线程,无论任务负载如何变化。
  • 任务队列:使用无界任务队列(如 LinkedBlockingQueue),新任务会被放入队列中等待空闲线程处理。
  • 适用场景:适用于可预知任务量、需要控制并发线程数以避免资源耗尽的场景,如服务器固定数量的处理线程。

优缺点

  • 优点:线程数固定,易于预测系统资源使用。
  • 缺点:队列无界可能导致内存增长;无法动态调整线程数应对突发负载。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
public class FixedThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小为5的线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);

// 提交若干任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.execute(() -> {
log.info("FixedThreadPool - 执行任务 " + taskId + " 由线程 " + Thread.currentThread().getName());
});
}

// 关闭线程池
fixedPool.shutdown();
}
}

可缓存线程池(CachedThreadPool)

概述

可缓存线程池通过 Executors.newCachedThreadPool() 创建,其核心线程数为 0,最大线程数为无限大,使用 SynchronousQueue 作为任务队列。

特点与行为

  • 动态线程创建:没有核心线程限制,新任务提交时若无闲置线程则创建新线程。
  • 线程回收:空闲线程超过 60 秒未使用将被回收,减少资源占用。
  • 任务队列:使用无容量的同步队列(SynchronousQueue),任务必须有线程立即处理,否则创建新线程。
  • 适用场景:适用于执行大量短期异步任务,任务量波动较大,对响应速度要求高的场景。

优缺点

  • 优点:灵活应对任务峰谷变化,线程可复用提高性能。
  • 缺点:线程数无限制增长可能导致资源耗尽,需要谨慎使用。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
public class CachedThreadPoolExample {
public static void main(String[] args) {
// 创建一个可缓存的线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();

// 提交若干任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
cachedPool.execute(() -> {
log.info("CachedThreadPool - 执行任务 " + taskId + " 由线程 " + Thread.currentThread().getName());
});
}

// 关闭线程池
cachedPool.shutdown();
}
}

单线程池(SingleThreadExecutor)

概述

单线程池通过 Executors.newSingleThreadExecutor() 创建,内部实际是固定大小为 1 的线程池。

特点与行为

  • 单一线程执行:始终只有一个工作线程顺序执行提交的任务,保证任务按提交顺序依次完成。
  • 任务队列:任务被放入一个无界队列等待执行。
  • 适用场景:需要串行化任务执行并确保任务顺序性,如日志记录、顺序处理事件等。

优缺点

  • 优点:简单易用,保证任务执行顺序。
  • 缺点:单线程可能成为性能瓶颈,不适合并行处理任务。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
public class SingleThreadExecutorExample {
public static void main(String[] args) {
// 创建一个单线程池
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

// 提交若干任务
for (int i = 0; i < 5; i++) {
final int taskId = i;
singleThreadPool.execute(() -> {
log.info("SingleThreadExecutor - 执行任务 " + taskId + " 由线程 " + Thread.currentThread().getName());
});
}

// 关闭线程池
singleThreadPool.shutdown();
}
}

计划线程池(ScheduledThreadPool)

概述

计划线程池通过 Executors.newScheduledThreadPool(int corePoolSize) 创建,它支持任务延迟执行或周期执行。

特点与行为

  • 定时与周期任务:提供方法 schedule()scheduleAtFixedRate()scheduleWithFixedDelay() 实现延迟执行和周期性任务。
  • 核心线程数固定:保持指定数量的核心线程,处理定时任务。
  • 任务队列:内部使用延时队列(DelayedWorkQueue)管理等待执行的任务。
  • 适用场景:适用于需要在未来某个时刻执行或周期执行的任务,如定时数据备份、定期报告生成等。

优缺点

  • 优点:专门设计用于定时任务,调度精准可靠。
  • 缺点:线程池大小固定,不会动态扩展,且仅用于调度场景,不适合一般的高并发任务处理。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class ScheduledTaskExample {
public static void main(String[] args) {
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1);

log.info("主线程:{}", Thread.currentThread().getName());

// 延迟2秒执行一次任务
scheduledPool.schedule(() -> log.info("延迟任务执行,线程:{}", Thread.currentThread().getName()), 2, TimeUnit.SECONDS);

// 每隔3秒周期性执行一次任务,初始延迟1秒
scheduledPool.scheduleAtFixedRate(() -> log.info("周期任务执行,线程:{}", Thread.currentThread().getName()), 1, 3, TimeUnit.SECONDS);

// 为演示效果暂停一段时间后关闭
scheduledPool.schedule(() -> {
scheduledPool.shutdown();
log.info("关闭线程池");
}, 10, TimeUnit.SECONDS);
}
}

工作窃取线程池(WorkStealingPool)

概述

工作窃取线程池通过 Executors.newWorkStealingPool() 创建,基于 ForkJoinPool 实现,适用于大规模并行任务的分治处理。

特点与行为

  • 工作窃取算法:每个线程维护自己的任务队列,当完成本地任务后,可从其他线程队列窃取任务,提高资源利用率。
  • 动态线程管理:线程数通常等于可用处理器核心数,能根据任务负载动态调节。
  • 任务类型:适合大量小任务的并行处理,通过递归分解任务和结果合并实现。
  • 适用场景:计算密集型任务、分治算法、递归任务等需要并行执行的复杂计算场景。

优缺点

  • 优点:高效利用多核 CPU,减小线程闲置时间,提高并行度。
  • 缺点:主要针对 CPU 密集型任务设计,对 I/O 密集型任务可能不适用;调试和监控相对复杂。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
public class WorkStealingPoolExample {
public static void main(String[] args) {
// 创建一个工作窃取线程池,默认线程数等于可用处理器核心数
ExecutorService workStealingPool = Executors.newWorkStealingPool();

// 提交若干任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
workStealingPool.submit(() -> {
log.info("WorkStealingPool - 执行任务 " + taskId + " 由线程 " + Thread.currentThread().getName());
});
}

// 注意:WorkStealingPool使用的是守护线程,主线程需要等待任务完成才能看到输出结果
// 这里简单地让主线程睡眠一段时间
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// WorkStealingPool不需要显示关闭,它会在没有更多任务时自动退出
}
}

任务执行与处理

提交任务

execute(Runnable task)

  • 定义在 Executor 接口中,用于提交不需要返回值的任务。
  • 接受一个实现了 Runnable 接口的任务,并将其提交到线程池执行。
  • 如果任务在执行过程中抛出异常,异常会被线程池捕获并传递到未捕获异常处理器,通常会在控制台输出堆栈跟踪,但不会影响其他任务。

submit(Callable task)

  • 定义在 ExecutorService 接口中,支持提交实现了 RunnableCallable 的任务。
  • 返回一个 Future 对象,用于获取任务的执行结果或检查任务状态:
    • 提交 Runnable 时返回 Future<?>,结果为 null(除非使用 submit(Runnable, T result) 提供默认结果)。
    • 提交 Callable<V> 时返回 Future<V>,可通过 Future.get() 获取结果或抛出异常。
  • 异常处理不同:
    • 对于通过 submit() 提交的任务,异常会被封装在 Future 中,并在调用 Future.get() 时抛出 ExecutionException

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j
public class TaskSubmissionExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);

// 使用 execute() 提交任务,无返回值
for (int i = 0; i < 3; i++) {
final int taskId = i;
executor.execute(() -> {
log.info("使用 execute() 执行任务 " + taskId + ",线程:{}", Thread.currentThread().getName());
});
}

// 使用 submit() 提交任务,有返回值
Future<Integer> future = executor.submit(() -> {
log.info("使用 submit() 执行任务,线程:{}", Thread.currentThread().getName());
// 模拟异常抛出
return 100; // 若无异常,返回一个整数
});

try {
// 尝试获取结果,会抛出 ExecutionException
Integer result = future.get();
log.info("任务执行结果:{}", result);
} catch (ExecutionException ee) {
log.error("任务执行异常:", ee.getCause());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}

executor.shutdown();
}
}

运行程序,控制台输出如下:

1
2
3
4
5
02:00:50.903 [pool-1-thread-2] INFO  s.y.threadpool.TaskSubmissionExample - 使用 execute() 执行任务 1,线程:pool-1-thread-2
02:00:50.903 [pool-1-thread-1] INFO s.y.threadpool.TaskSubmissionExample - 使用 execute() 执行任务 0,线程:pool-1-thread-1
02:00:50.907 [pool-1-thread-1] INFO s.y.threadpool.TaskSubmissionExample - 使用 submit() 执行任务,线程:pool-1-thread-1
02:00:50.907 [pool-1-thread-2] INFO s.y.threadpool.TaskSubmissionExample - 使用 execute() 执行任务 2,线程:pool-1-thread-2
02:00:50.908 [main] INFO s.y.threadpool.TaskSubmissionExample - 任务执行结果:100

由运行结果可知,execute 执行的任务没有返回值,在线程池内部的线程执行;而 submit 执行的任务可以在调用线程获取到返回值以及异常。

任务调度

概念与方式

  • 任务调度指的是线程池如何安排和执行提交的任务。ThreadPoolExecutor 内部通过任务队列和工作线程协同完成任务分派与执行。
  • 调度过程通常包括任务入队、取出、执行及线程复用等步骤:
    1. 任务入队:当提交任务时,根据线程池当前线程数与核心线程数、队列容量等条件决定是创建新线程还是将任务放入队列等待。
    2. 任务取出与执行:空闲线程从队列中取出任务并执行;执行完成后回归空闲状态以处理后续任务。
    3. 动态调整:根据任务负载,线程池可能扩展或回收线程(非核心线程超过空闲时间)。

定时与周期调度

对于计划线程池(ScheduledThreadPoolExecutor),任务调度更为灵活:

  • 提供 schedule()scheduleAtFixedRate()scheduleWithFixedDelay() 方法,实现延迟执行、固定速率周期执行和固定延迟周期执行。
  • 内部通过延时队列管理任务,确保任务按照预定时间准确执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class ScheduledTaskExample {
public static void main(String[] args) {
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1);

log.info("主线程:{}", Thread.currentThread().getName());

// 延迟2秒执行一次任务
scheduledPool.schedule(() -> log.info("延迟任务执行,线程:{}", Thread.currentThread().getName()), 2, TimeUnit.SECONDS);

// 每隔3秒周期性执行一次任务,初始延迟1秒
scheduledPool.scheduleAtFixedRate(() -> log.info("周期任务执行,线程:{}", Thread.currentThread().getName()), 1, 3, TimeUnit.SECONDS);

// 为演示效果暂停一段时间后关闭
scheduledPool.schedule(() -> {
scheduledPool.shutdown();
log.info("关闭线程池");
}, 10, TimeUnit.SECONDS);
}
}

运行程序,控制台输出如下:

1
2
3
4
5
6
7
01:55:42.551 [main] INFO  s.y.threadpool.ScheduledTaskExample - 主线程:main
01:55:43.594 [pool-1-thread-1] INFO s.y.threadpool.ScheduledTaskExample - 周期任务执行,线程:pool-1-thread-1
01:55:44.593 [pool-1-thread-1] INFO s.y.threadpool.ScheduledTaskExample - 延迟任务执行,线程:pool-1-thread-1
01:55:46.592 [pool-1-thread-1] INFO s.y.threadpool.ScheduledTaskExample - 周期任务执行,线程:pool-1-thread-1
01:55:49.601 [pool-1-thread-1] INFO s.y.threadpool.ScheduledTaskExample - 周期任务执行,线程:pool-1-thread-1
01:55:52.594 [pool-1-thread-1] INFO s.y.threadpool.ScheduledTaskExample - 周期任务执行,线程:pool-1-thread-1
01:55:52.594 [pool-1-thread-1] INFO s.y.threadpool.ScheduledTaskExample - 关闭线程池

由运行结果可知,程序已实现定时调度。

异常处理

异常处理方式

  • **execute(Runnable task)**:
    • 如果任务内部抛出未捕获的异常,该异常将由线程池的工作线程捕获,并根据线程的未捕获异常处理器(UncaughtExceptionHandler)处理。
    • 异常不会传播回调用线程,任务会终止但不影响线程池其他任务执行。
  • **submit(Callable task)**:
    • 异常会被封装在返回的 Future 对象内部。调用 Future.get() 时会抛出 ExecutionException,其原因即为任务执行过程中抛出的异常。
    • 通过 Future 可以捕获并处理这些异常,而不会影响其他正在执行的任务。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class TaskSubmissionExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);

// 使用 execute() 提交任务,无返回值
for (int i = 0; i < 3; i++) {
final int taskId = i;
executor.execute(() -> {
log.info("使用 execute() 执行任务 " + taskId + ",线程:{}", Thread.currentThread().getName());
if (taskId == 1) throw new RuntimeException("execute异常示例");
});
}

// 使用 submit() 提交任务,有返回值
Future<Integer> future = executor.submit(() -> {
log.info("使用 submit() 执行任务,线程:{}", Thread.currentThread().getName());
// 模拟异常抛出
if (true) throw new RuntimeException("submit异常示例");
return 100; // 若无异常,返回一个整数
});

try {
// 尝试获取结果,会抛出 ExecutionException
Integer result = future.get();
log.info("任务执行结果:{}", result);
} catch (ExecutionException ee) {
log.error("任务执行异常:", ee.getCause());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}

executor.shutdown();
}
}

运行程序,控制台输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
02:07:21.590 [pool-1-thread-1] INFO  s.y.threadpool.TaskSubmissionExample - 使用 execute() 执行任务 0,线程:pool-1-thread-1
02:07:21.590 [pool-1-thread-2] INFO s.y.threadpool.TaskSubmissionExample - 使用 execute() 执行任务 1,线程:pool-1-thread-2
02:07:21.596 [pool-1-thread-1] INFO s.y.threadpool.TaskSubmissionExample - 使用 execute() 执行任务 2,线程:pool-1-thread-1
02:07:21.596 [pool-1-thread-1] INFO s.y.threadpool.TaskSubmissionExample - 使用 submit() 执行任务,线程:pool-1-thread-1
02:07:21.598 [main] ERROR s.y.threadpool.TaskSubmissionExample - 任务执行异常:
java.lang.RuntimeException: submit异常示例
at space.yangtao.threadpool.TaskSubmissionExample.lambda$main$1(TaskSubmissionExample.java:32)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "pool-1-thread-2" java.lang.RuntimeException: execute异常示例
at space.yangtao.threadpool.TaskSubmissionExample.lambda$main$0(TaskSubmissionExample.java:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

由运行结果可知,execute 执行时出现的异常会在线程中被抛出,而 submit 执行时出现的异常会在调用 Future 对象的 get 方法时被抛出。

自定义异常处理

  • 可在线程池外层包装任务,或使用自定义的 ThreadFactory 设置未捕获异常处理器,以捕获并处理异常。注意:异常处理器只能对通过 execute 提交的任务进行异常捕获,通过 submit 提交的任务不会捕获,应通过 返回 Feature 对象的 get 方法获取。
  • 对于 Callable 任务,可在 call() 方法内自行捕获异常并返回特定结果或状态。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Slf4j
public class CustomThreadFactory implements ThreadFactory {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

@Override
public Thread newThread(Runnable r) {
Thread thread = defaultFactory.newThread(r);
// 设置未捕获异常处理器
thread.setUncaughtExceptionHandler((t, e) -> {
log.error("线程 {} 捕获到异常: ", t.getName(), e);
});
return thread;
}
}

@Slf4j
class CustomThreadFactoryExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 使用自定义工厂创建线程池
ExecutorService executor = new ThreadPoolExecutor(
2,
4,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new CustomThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
executor.execute(() -> {
log.info("execute线程:{}", Thread.currentThread().getName());
throw new RuntimeException("自定义工厂异常示例");
});
Future<Object> future = executor.submit(() -> {
log.info("submit线程:{}", Thread.currentThread().getName());
throw new RuntimeException("自定义工厂异常示例");
});
try {
log.info("submit任务执行结果: {}", future.get());
} catch (ExecutionException e) {
log.error("submit任务执行异常: ", e);
}
executor.shutdown();
}
}

执行程序,运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
02:09:27.135 [pool-1-thread-1] INFO  s.y.t.CustomThreadFactoryExample - execute线程:pool-1-thread-1
02:09:27.135 [pool-1-thread-2] INFO s.y.t.CustomThreadFactoryExample - submit线程:pool-1-thread-2
02:09:27.141 [pool-1-thread-1] ERROR s.y.threadpool.CustomThreadFactory - 线程 pool-1-thread-1 捕获到异常:
java.lang.RuntimeException: 自定义工厂异常示例
at space.yangtao.threadpool.CustomThreadFactoryExample.lambda$main$0(CustomThreadFactory.java:41)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
02:09:27.141 [main] ERROR s.y.t.CustomThreadFactoryExample - submit任务执行异常:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 自定义工厂异常示例
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at space.yangtao.threadpool.CustomThreadFactoryExample.main(CustomThreadFactory.java:48)
Caused by: java.lang.RuntimeException: 自定义工厂异常示例
at space.yangtao.threadpool.CustomThreadFactoryExample.lambda$main$1(CustomThreadFactory.java:45)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

由运行结果可知,异常处理器只能对通过 execute 提交的任务进行异常捕获,通过 submit 提交的任务则时通过返回 Feature 对象的 get 方法获取。

任务处理和监控

任务结果处理

通过 submit() 返回的 Future 对象。

  • 使用 future.isDone() 检查任务是否完成。
  • 调用 future.get() 获取任务执行结果,此方法会阻塞直到任务完成。
  • 使用超时参数的 future.get(timeout, unit) 在指定时间内等待结果,超时则抛出异常。

任务监控

线程池本身的监控方法。

  • getActiveCount():获取当前正在执行任务的线程数量。
  • getCompletedTaskCount():获取已完成的任务数量。
  • getTaskCount():获取线程池已提交任务的总数。
  • getQueue():获取当前任务队列,可用于监控等待执行的任务数等。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService executor = Executors.newSingleThreadExecutor();

Future<String> future = executor.submit(() -> {
Thread.sleep(500); // 模拟任务执行
return "任务完成";
});

// 检查任务是否完成,并获取结果
while (!future.isDone()) {
log.info("任务未完成,等待中...");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
1
2
3
4
5
6
7
8
9
10
// 监控线程池状态
while (threadPool.getActiveCount() > 0) {
// 监控线程池状态
log.info("----------------------");
log.info("活跃线程数: " + threadPool.getActiveCount());
log.info("已完成任务数: " + threadPool.getCompletedTaskCount());
log.info("队列等待任务数: " + threadPool.getQueue().size());
log.info("----------------------");
Thread.sleep(100);
}

Fork/Join 框架

Fork/Join 框架是 Java 7 引入的一种用于并行任务处理的高级并发框架,旨在简化利用多核处理器进行大规模并行计算的编程模型。

核心概念与结构

基本思想

分而治之:

  • 将一个大任务拆分成若干较小的子任务,并递归地对子任务继续拆分。
  • 当子任务足够小或满足某个条件时直接计算。
  • 最后将子任务的计算结果合并为最终结果。

主要组件

  • ForkJoinPool:框架的核心执行器,相当于传统线程池的升级版,专门设计用于执行由 ForkJoinTask 分解的小任务。
  • **ForkJoinTask<V>**:所有可提交给 ForkJoinPool 执行的任务的抽象类,主要子类有:
    • RecursiveAction:不返回结果的任务。
    • **RecursiveTask<V>**:返回结果的任务,适用于需要合并计算结果的场景。

工作原理

工作窃取算法

ForkJoinPool 采用工作窃取(work-stealing)算法来提高处理器利用率:

  • 每个工作线程维护一个双端队列存储任务。
  • 线程从自己的队列末尾获取任务执行,确保局部性并减少竞争。
  • 当某个线程队列为空时,它会尝试从其他线程队列的头部“窃取”任务执行,从而平衡各个线程的负载。

任务分解与合并流程

  1. 划分任务(fork):大任务递归地调用 fork() 方法,将自己拆分成多个子任务,并将子任务提交给工作队列。
  2. 执行任务:工作线程不断从其队列取任务执行,若队列空闲则尝试窃取其他线程的任务。
  3. 合并结果(join):子任务执行完毕后,父任务通过调用 join() 方法等待并收集子任务结果,进行合并计算。

代码示例

创建自定义任务

通常通过扩展 RecursiveTask<V>RecursiveAction 类来定义可拆分的任务。

**RecursiveTask<V>**:SumTask 类通过递归的方式将数组求和任务拆分成更小的子任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000; // 拆分阈值
private final long[] data;
private final int start;
private final int end;

public SumTask(long[] data, int start, int end) {
this.data = data;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
// 小任务直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += data[i];
}
return sum;
} else {
// 拆分任务
int mid = start + length / 2;
SumTask leftTask = new SumTask(data, start, mid);
SumTask rightTask = new SumTask(data, mid, end);
leftTask.fork(); // 异步执行左边任务
long rightResult = rightTask.compute(); // 同步计算右边任务
long leftResult = leftTask.join(); // 等待左边任务结果
return leftResult + rightResult;
}
}
}

提交任务到 ForkJoinPool

使用 ForkJoinPool 来执行自定义任务,自动管理工作线程和任务调度,利用工作窃取算法提升并行处理性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class ForkJoinExample {
public static void main(String[] args) {
// 构造测试数据
long[] data = new long[1000000];
for (int i = 0; i < data.length; i++) {
data[i] = i;
}

// 创建 ForkJoinPool 实例
ForkJoinPool pool = new ForkJoinPool();

// 创建任务
SumTask task = new SumTask(data, 0, data.length);

// 提交任务并获取结果
long result = pool.invoke(task);
log.info("数组总和为: " + result);

pool.shutdown();
}
}

运行程序,控制台输出如下:

1
02:31:07.646 [main] INFO  s.yangtao.forkjoin.ForkJoinExample - 数组总和为: 499999500000

优势

  • 高效的并行处理:利用多核 CPU,通过任务拆分和工作窃取算法最大化并行性。
  • 简化并行编程:相比手动创建和管理线程,Fork/Join 框架抽象出任务分解与合并的细节,使并行编程更易实现。
  • 弹性伸缩ForkJoinPool 动态调整线程数、平衡负载,适应不同的计算密集型任务需求。

适用场景

  • 计算密集型任务:如大规模数组处理、递归算法、图像处理等,可以利用多核并行提升性能。
  • 可分解的大任务:任务能够拆分为相互独立的小子任务,并在子任务完成后合并结果。

注意事项

  • 合理设置拆分阈值:拆分太细会导致任务管理开销大,拆分太粗则无法充分并行。根据任务特点调整 THRESHOLD 值以平衡开销与并行度。
  • 避免共享可变状态Fork/Join 任务应尽量避免竞争和锁定,以充分发挥并行计算的优势。
  • 监控与调优:使用 JVM 提供的监控工具观察 ForkJoinPool 的线程利用情况和任务队列长度,适时调整池大小或任务拆分策略。
  • I/O 密集型任务:如果在任务拆分完成后还有大量 I/O 等操作,实际并不能获得理想的并行收益,甚至可能在超线程机下出现线程倾斜的现象,需要真实测试与调优。

性能调优与最佳实践

如何选择适合的线程池类型

任务性质分析

  • CPU 密集型任务:适合使用固定大小线程池(FixedThreadPool)或工作窃取线程池(WorkStealingPool),保证线程数与可用 CPU 核心数匹配,避免线程上下文切换开销。
  • I/O密集型任务:由于等待 I/O 操作占用线程时间较长,可使用可缓存线程池(CachedThreadPool)或增加固定线程池的线程数,以提高并发处理能力。
  • 周期性或延时任务:采用计划线程池(ScheduledThreadPool),专用于定时调度任务,确保任务按计划执行。
  • 任务数量大且短小:使用工作窃取线程池(WorkStealingPool)和 ForkJoin 框架,将任务拆分为更小的子任务并行处理,提高吞吐量。

业务需求匹配

  • 如果任务执行顺序有要求,可选择单线程池(SingleThreadExecutor)。
  • 对于动态变化的任务负载,使用可缓存线程池能快速扩展和回收线程以适应变化。

结合任务特点和系统资源,选择最优的线程池类型,避免盲目使用默认配置带来的性能瓶颈。

线程池大小的确定

合理设置核心线程数与最大线程数至关重要:

计算型任务

  • 一般设置线程数等于 CPU 核心数(或略大于核心数),以充分利用 CPU 资源,避免线程过多导致的上下文切换。

  • 常用经验公式:

    1
    最佳线程数 = CPU 核心数 * (1 + 等待时间/计算时间)

    对于CPU密集型任务,等待时间 ≈ 0,即最佳线程数 ≈ 核心数。

I/O 密集型任务

  • 由于线程多数时间等待 I/O 响应,可设置线程数大于 CPU 核心数,以弥补 I/O 等待期间的资源浪费。
  • 根据等待与计算比率调整线程数,参考上述公式合理增加线程数。

动态调整与测试

  • 通过压力测试和性能分析工具,模拟实际工作负载,不断调整核心线程数和最大线程数,以找到最佳配置点。
  • 考虑任务执行时长、波动性及系统硬件条件,采用分步调优方式逐步优化配置。

监控线程池性能

有效的监控有助于及时发现性能瓶颈和异常状况:

线程池内置方法

使用 ThreadPoolExecutor 提供的监控方法定期记录上述指标变化、分析线程池负载和任务积压情况。

JMX和可视化监控

  • 将线程池状态通过 JMX 暴露,结合监控工具(如 VisualVMJConsolePrometheus + Grafana 等)实时观察线程池运行状态。
  • 设置报警阈值,如队列长度超过预设范围或活动线程数长期接近最大值,触发告警以便及时调整。

日志记录与分析

在关键任务调度和异常处理处记录日志,分析异常堆栈和任务执行时长,优化任务处理逻辑。

优化线程使用和队列配置

根据实际业务需求和监控数据,对线程和队列进行优化:

合适的工作队列选择

  • 无界队列(如 LinkedBlockingQueue):适用于任务提交速率恒定、内存充足的场景,但可能导致任务堆积过多。
  • 有界队列(如 ArrayBlockingQueue):控制任务积压量,结合拒绝策略处理过载情况,防止资源耗尽。
  • 同步队列(SynchronousQueue):适用于对响应时间要求高且希望动态调整线程数的场景。

调整线程池参数

  • 合理设置 keepAliveTime,让非核心线程在空闲时被回收,降低资源消耗。
  • 配置合适的拒绝策略(RejectedExecutionHandler),如 CallerRunsPolicy 将任务退回调用线程,保证任务不丢失但牺牲部分响应能力。

线程使用优化

  • 避免在任务内部进行长时间阻塞操作或过多的同步等待,尽量使用异步 I/O 或分解任务逻辑。
  • 使用自定义 ThreadFactory 设置线程优先级、命名等信息,便于问题排查和线程调度优化。

队列与任务匹配

  • 根据任务特点选择队列类型。例如,短小任务可使用无界队列提高吞吐量;而大任务积压可能适合有界队列限制数量,避免资源耗尽。
  • 调整队列容量与线程池大小的平衡,确保既能高效利用线程,又不致于因队列过长导致延迟增加。

线程池的管理与维护

安全地关闭线程池

shutdown()

shutdown() 方法会启动线程池的有序关闭过程:

  1. 停止接收新任务:线程池不再接受新提交的任务。
  2. 继续执行已提交任务:包括已在队列中的任务和正在执行的任务。
  3. 等待所有任务完成:直到所有任务执行完毕,线程池才会完全关闭。

shutdownNow()

shutdownNow() 方法会尝试立即停止所有正在执行的任务,并返回尚未开始执行的任务列表:

  1. 停止接收新任务:与 shutdown() 方法相同。
  2. 尝试中断正在执行的任务:通过中断线程来尝试停止任务执行。
  3. 返回未执行的任务:返回一个 List<Runnable>,包含所有尚未执行的任务。

awaitTermination()

为了确保线程池能够有序地关闭,同时处理可能的异常或超时,可以结合使用 shutdown()awaitTermination() 方法。以下是一个实现优雅关闭的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Slf4j
public class GracefulShutdownExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);

// 提交任务
executor.execute(() -> {
try {
Thread.sleep(3000);
log.info("任务1完成");
} catch (InterruptedException e) {
log.error("任务1被中断");
Thread.currentThread().interrupt();
}
});

executor.execute(() -> {
try {
Thread.sleep(4000);
log.info("任务2完成");
} catch (InterruptedException e) {
log.error("任务2被中断");
Thread.currentThread().interrupt();
}
});

// 优雅关闭
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
log.info("任务未完成,尝试立即关闭");
executor.shutdownNow();
}
} catch (InterruptedException e) {
log.error("主线程被中断,立即关闭线程池");
executor.shutdownNow();
Thread.currentThread().interrupt();
}

log.info("线程池关闭完成");
}
}

运行程序,控制台输出如下:

1
2
3
02:42:00.772 [pool-1-thread-1] INFO  s.y.t.GracefulShutdownExample - 任务1完成
02:42:01.772 [pool-1-thread-2] INFO s.y.t.GracefulShutdownExample - 任务2完成
02:42:01.772 [main] INFO s.y.t.GracefulShutdownExample - 线程池关闭完成

说明:

  • shutdown() 调用后,线程池不再接受新任务,但会继续执行已提交的任务。
  • awaitTermination(long timeout, TimeUnit unit) 方法会阻塞当前线程,直到线程池关闭或者达到指定的超时时间。
  • 如果在指定时间内线程池未关闭,则调用 shutdownNow() 强制关闭线程池。
  • 通过捕获 InterruptedException,确保在主线程被中断时能够及时关闭线程池并恢复中断状态。

钩子函数

在某些情况下,应用程序可能需要在 JVM 关闭时自动关闭线程池,可以使用钩子线程(Shutdown Hook)来实现。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
public class ShutdownHookExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);

// 提交任务
executor.execute(() -> {
try {
Thread.sleep(4000);
log.info("任务1完成");
} catch (InterruptedException e) {
log.error("任务1被中断");
Thread.currentThread().interrupt();
}
});

// 添加钩子线程
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("JVM 关闭,开始关闭线程池");
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("线程池已关闭");
}));

// 模拟应用程序运行
log.info("应用程序运行中,按 Ctrl+C 终止...");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

运行程序、等待一段时间后,手动关闭程序,控制台输出如下:

1
2
3
4
02:44:57.332 [main] INFO  s.y.threadpool.ShutdownHookExample - 应用程序运行中,按 Ctrl+C 终止...
02:44:01.336 [pool-1-thread-1] INFO s.y.threadpool.ShutdownHookExample - 任务1完成
02:44:06.765 [Thread-1] INFO s.y.threadpool.ShutdownHookExample - JVM 关闭,开始关闭线程池
02:44:06.766 [Thread-1] INFO s.y.threadpool.ShutdownHookExample - 线程池已关闭

由运行结果可知,当 JVM 接收到关闭信号,钩子线程会自动执行,确保线程池得到适当关闭。

线程池资源的回收

线程池的关闭与资源回收

如前述,正确关闭线程池是释放其资源的前提。调用 shutdown()shutdownNow() 后,线程池会逐步释放其内部资源,包括:

  • 线程对象:线程池中的工作线程会终止并释放其占用的内存。
  • 任务队列:任务队列中的任务会被清空(在 shutdownNow() 的情况下)。
  • 其他内部资源:如与JVM交互的本地资源句柄等。

避免线程池的资源泄露

资源泄露通常是由于线程池未正确关闭,导致工作线程持续存在,进而阻塞JVM的正常关闭。以下是避免资源泄露的最佳实践:

  • 确保线程池被关闭:在应用程序结束前,确保所有线程池都已经调用 shutdown()shutdownNow() 方法。
  • 使用 Try-Finally 块管理线程池:确保在异常情况下线程池也能被正确关闭。
  • 使用钩子线程:如前述,确保在JVM关闭时线程池得到适当关闭。

动态调整线程池参数

在实际应用中,线程池的负载和任务特性可能会随着时间变化。动态调整线程池的参数(如核心线程数、最大线程数、任务队列等),可以更好地适应负载变化,提高系统性能和资源利用率。Java的 ThreadPoolExecutor 提供了多种方法来动态调整线程池参数。

  • **void setCorePoolSize(int corePoolSize)**:设置线程的核心数量。
  • **void setMaximumPoolSize(int maximumPoolSize)**:设置允许的最大线程数。
  • **void setRejectedExecutionHandler(RejectedExecutionHandler handler)**:为不可执行的任务设置新的处理程序。

合理利用线程池钩子函数

通过继承 ThreadPoolExecutor 并覆盖其钩子方法(如 beforeExecute(), afterExecute(), terminated()),实现任务执行前后的自定义逻辑。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Slf4j
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.info("任务开始执行,由线程:" + t.getName());
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
log.info("任务执行完成");
if (t != null) {
log.info("任务执行异常: " + t.getMessage());
}
}

@Override
protected void terminated() {
super.terminated();
log.info("线程池已终止");
}
}

@Slf4j
class CustomThreadPoolExecutorExample {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new CustomThreadPoolExecutor(
2,
4,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2)
);

// 提交任务
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.execute(() -> {
try {
Thread.sleep(1000);
log.info("任务 " + taskId + " 执行中");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}

运行程序,控制台输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
02:55:37.139 [pool-1-thread-3] INFO  s.y.t.CustomThreadPoolExecutor - 任务开始执行,由线程:pool-1-thread-3
02:55:37.139 [pool-1-thread-1] INFO s.y.t.CustomThreadPoolExecutor - 任务开始执行,由线程:pool-1-thread-1
02:55:37.139 [pool-1-thread-2] INFO s.y.t.CustomThreadPoolExecutor - 任务开始执行,由线程:pool-1-thread-2
02:55:38.158 [pool-1-thread-3] INFO s.y.t.CustomThreadPoolExecutorExample - 任务 4 执行中
02:55:38.158 [pool-1-thread-2] INFO s.y.t.CustomThreadPoolExecutorExample - 任务 1 执行中
02:55:38.158 [pool-1-thread-1] INFO s.y.t.CustomThreadPoolExecutorExample - 任务 0 执行中
02:55:38.158 [pool-1-thread-3] INFO s.y.t.CustomThreadPoolExecutor - 任务执行完成
02:55:38.158 [pool-1-thread-2] INFO s.y.t.CustomThreadPoolExecutor - 任务执行完成
02:55:38.158 [pool-1-thread-1] INFO s.y.t.CustomThreadPoolExecutor - 任务执行完成
02:55:38.158 [pool-1-thread-1] INFO s.y.t.CustomThreadPoolExecutor - 任务开始执行,由线程:pool-1-thread-1
02:55:38.158 [pool-1-thread-3] INFO s.y.t.CustomThreadPoolExecutor - 任务开始执行,由线程:pool-1-thread-3
02:55:39.168 [pool-1-thread-1] INFO s.y.t.CustomThreadPoolExecutorExample - 任务 2 执行中
02:55:39.168 [pool-1-thread-3] INFO s.y.t.CustomThreadPoolExecutorExample - 任务 3 执行中
02:55:39.168 [pool-1-thread-1] INFO s.y.t.CustomThreadPoolExecutor - 任务执行完成
02:55:39.168 [pool-1-thread-3] INFO s.y.t.CustomThreadPoolExecutor - 任务执行完成
02:55:39.168 [pool-1-thread-2] INFO s.y.t.CustomThreadPoolExecutor - 线程池已终止

由运行结果可知,自定义 ThreadPoolExecutor 中的钩子函数已实现了逻辑定制。

最佳实践总结

在 Java 线程池的管理与维护过程中,以下最佳实践能够帮助开发者有效地管理线程池,优化系统性能:

  1. 始终关闭线程池:确保在应用程序结束前,所有线程池都得到适当的关闭,防止资源泄露和阻塞JVM关闭。
  2. 选择合适的线程池类型:根据任务特性(CPU 密集型、I/O 密集型等)和业务需求,选择最适合的线程池类型,避免资源浪费或性能瓶颈。
  3. 合理设置线程池参数:通过计算和测试确定核心线程数、最大线程数和任务队列容量,确保线程池能够高效处理任务负载。
  4. 动态调整线程池:结合监控数据,动态调整线程池参数以适应负载变化,提升系统弹性和资源利用率。
  5. 选择合适的队列:使用无限队列(如 LinkedBlockingQueue 默认情况下的无界队列)可能导致任务堆积过多,消耗大量内存,甚至导致系统崩溃。最佳实践为使用有界队列(如 ArrayBlockingQueue),合理设置队列容量。
  6. 实现自定义拒绝策略和钩子方法:根据具体业务需求,定制任务被拒绝时的处理逻辑和任务执行前后的行为,增强线程池的可控性和可维护性。
  7. 监控线程池状态:定期监控线程池的运行状态和资源使用情况,及时发现和解决潜在的问题,确保系统稳定运行。
  8. 异常处理:在异步任务中捕获和处理异常,避免异常传播导致线程池线程终止;使用 @Async 方法返回 FutureCompletableFuture,通过 Future 捕获异常;配置自定义的 ThreadFactory,设置 UncaughtExceptionHandler 处理未捕获异常。
  9. 避免共享可变状态:在并发任务中,尽量避免共享可变数据,减少同步需求和潜在的死锁风险。
  10. 使用高质量的任务:确保提交到线程池的任务执行时间合理,避免长时间阻塞或频繁抛出异常,提升线程池的整体效率。

通过遵循这些管理与维护策略,可以充分发挥Java线程池的优势,提升应用程序的并发处理能力和系统稳定性。

总结

Java 线程池为并发编程提供了完善的“任务提交—执行—管理”框架,极大减轻了开发者在多线程环境下手动管理线程的负担。

  • 通过 ThreadPoolExecutor,我们可以灵活地配置核心/最大线程数、任务队列、拒绝策略等参数,实现对 CPU 和内存资源的合理调度;
  • Fork/Join 框架则让分治算法和大规模并行计算变得简单;
  • 结合 shutdownawaitTermination、钩子线程等手段,能实现安全且可控地关闭线程池、避免资源泄漏;
  • 通过对性能监控与调优技术(如自定义线程工厂、合理队列容量、定制化拒绝策略、合理拆分阈值)进行持续完善,线程池可以在复杂的生产环境中保持高吞吐、低延迟和良好的稳定性。

合理运用这些线程池机制能显著提高 Java 应用在多核时代的并发能力,为业务提供可靠和高效的支持。