Java线程池深度解析
# Java线程池深度解析
点击勘误issues (opens new window),哪吒感谢大家的阅读
# 1. 线程池概述
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。
# 1.1 为什么使用线程池
- 降低资源消耗:重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行
- 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性
# 2. ThreadPoolExecutor详解
# 2.1 构造函数参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程数量
- maximumPoolSize:最大线程数量
- keepAliveTime:非核心线程空闲时的存活时间
- unit:时间单位
- workQueue:任务队列
- threadFactory:线程工厂
- handler:拒绝策略
# 2.2 线程池执行流程
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolExecutorDemo {
public static void demonstrateExecutionFlow() {
// 自定义线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "CustomThread-" + threadNumber.getAndIncrement());
thread.setDaemon(false);
return thread;
}
};
// 自定义拒绝策略
RejectedExecutionHandler rejectedHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("任务被拒绝: " + r.toString() +
", 当前线程数: " + executor.getPoolSize() +
", 队列大小: " + executor.getQueue().size());
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60L, // 空闲时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(2), // 队列容量为2
threadFactory, // 线程工厂
rejectedHandler // 拒绝策略
);
// 提交8个任务,观察执行流程
for (int i = 1; i <= 8; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "开始执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(3000); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "执行完成");
});
// 打印当前状态
System.out.println("提交任务" + taskId + "后 - 核心线程数: " + executor.getCorePoolSize() +
", 当前线程数: " + executor.getPoolSize() +
", 队列大小: " + executor.getQueue().size());
}
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
public static void main(String[] args) {
demonstrateExecutionFlow();
}
}
# 2.3 线程池状态
import java.util.concurrent.*;
public class ThreadPoolStateDemo {
public static void demonstrateStates() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
System.out.println("初始状态: " + getPoolState(executor));
// 提交任务
for (int i = 0; i < 3; i++) {
executor.submit(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
System.out.println("提交任务后: " + getPoolState(executor));
// 关闭线程池
executor.shutdown();
System.out.println("调用shutdown后: " + getPoolState(executor));
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("最终状态: " + getPoolState(executor));
}
private static String getPoolState(ThreadPoolExecutor executor) {
if (executor.isTerminated()) {
return "TERMINATED";
} else if (executor.isTerminating()) {
return "TIDYING";
} else if (executor.isShutdown()) {
return "SHUTDOWN";
} else {
return "RUNNING";
}
}
public static void main(String[] args) {
demonstrateStates();
}
}
# 3. 常用线程池类型
# 3.1 FixedThreadPool
import java.util.concurrent.*;
public class FixedThreadPoolDemo {
public static void basicUsage() {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交10个任务
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "开始执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "执行完成");
});
}
executor.shutdown();
try {
if (!executor.awaitTermination(15, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
// 批量处理任务示例
public static void batchProcessing() {
ExecutorService executor = Executors.newFixedThreadPool(4);
// 模拟批量数据处理
String[] data = {"数据1", "数据2", "数据3", "数据4", "数据5", "数据6", "数据7", "数据8"};
CompletableFuture<Void>[] futures = new CompletableFuture[data.length];
for (int i = 0; i < data.length; i++) {
final String item = data[i];
futures[i] = CompletableFuture.runAsync(() -> {
System.out.println("处理" + item + ",线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println(item + "处理完成");
}, executor);
}
// 等待所有任务完成
CompletableFuture.allOf(futures).join();
System.out.println("所有数据处理完成");
executor.shutdown();
}
public static void main(String[] args) {
System.out.println("=== 基本用法 ===");
basicUsage();
System.out.println("\n=== 批量处理 ===");
batchProcessing();
}
}
# 3.2 CachedThreadPool
import java.util.concurrent.*;
public class CachedThreadPoolDemo {
public static void dynamicThreadCreation() {
ExecutorService executor = Executors.newCachedThreadPool();
// 第一批任务 - 快速提交
System.out.println("提交第一批任务...");
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "开始执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "执行完成");
});
}
try {
Thread.sleep(3000); // 等待第一批任务完成
} catch (InterruptedException e) {
e.printStackTrace();
}
// 第二批任务 - 延迟提交,观察线程复用
System.out.println("\n提交第二批任务...");
for (int i = 6; i <= 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "开始执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "执行完成");
});
}
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
// 突发任务处理示例
public static void burstTaskHandling() {
ExecutorService executor = Executors.newCachedThreadPool();
// 模拟突发的大量短任务
System.out.println("模拟突发任务处理...");
for (int i = 1; i <= 20; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("突发任务" + taskId + ",线程: " + Thread.currentThread().getName());
try {
Thread.sleep(500); // 短任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 快速提交任务
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
public static void main(String[] args) {
System.out.println("=== 动态线程创建 ===");
dynamicThreadCreation();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n=== 突发任务处理 ===");
burstTaskHandling();
}
}
# 3.3 SingleThreadExecutor
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class SingleThreadExecutorDemo {
public static void sequentialExecution() {
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交多个任务,观察顺序执行
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "开始执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "执行完成");
});
}
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
// 日志记录示例
public static void logRecording() {
ExecutorService logExecutor = Executors.newSingleThreadExecutor();
AtomicInteger logCounter = new AtomicInteger(0);
// 模拟多个线程产生日志
for (int i = 1; i <= 3; i++) {
final int threadId = i;
new Thread(() -> {
for (int j = 1; j <= 5; j++) {
final int logId = j;
// 将日志写入任务提交到单线程执行器
logExecutor.submit(() -> {
int logNumber = logCounter.incrementAndGet();
System.out.println("[LOG-" + logNumber + "] 线程" + threadId + "的日志" + logId +
" - 时间: " + System.currentTimeMillis());
try {
Thread.sleep(100); // 模拟写入时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}).start();
}
// 等待所有日志写入完成
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logExecutor.shutdown();
try {
if (!logExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
logExecutor.shutdownNow();
}
} catch (InterruptedException e) {
logExecutor.shutdownNow();
}
}
// 状态机示例
public static void stateMachine() {
ExecutorService executor = Executors.newSingleThreadExecutor();
class StateMachine {
private String state = "INIT";
public void transition(String newState) {
executor.submit(() -> {
System.out.println("状态转换: " + state + " -> " + newState);
this.state = newState;
try {
Thread.sleep(500); // 模拟状态转换处理时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("当前状态: " + this.state);
});
}
}
StateMachine machine = new StateMachine();
// 多个线程尝试改变状态
machine.transition("STARTING");
machine.transition("RUNNING");
machine.transition("PAUSED");
machine.transition("RUNNING");
machine.transition("STOPPED");
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
public static void main(String[] args) {
System.out.println("=== 顺序执行 ===");
sequentialExecution();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n=== 日志记录 ===");
logRecording();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n=== 状态机 ===");
stateMachine();
}
}
# 3.4 ScheduledThreadPool
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ScheduledThreadPoolDemo {
public static void basicScheduling() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 延迟执行
scheduler.schedule(() -> {
System.out.println("延迟3秒执行的任务,时间: " + System.currentTimeMillis());
}, 3, TimeUnit.SECONDS);
// 固定频率执行
AtomicInteger counter1 = new AtomicInteger(0);
ScheduledFuture<?> fixedRateTask = scheduler.scheduleAtFixedRate(() -> {
int count = counter1.incrementAndGet();
System.out.println("固定频率任务第" + count + "次执行,时间: " + System.currentTimeMillis());
try {
Thread.sleep(1500); // 任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 1, 2, TimeUnit.SECONDS); // 1秒后开始,每2秒执行一次
// 固定延迟执行
AtomicInteger counter2 = new AtomicInteger(0);
ScheduledFuture<?> fixedDelayTask = scheduler.scheduleWithFixedDelay(() -> {
int count = counter2.incrementAndGet();
System.out.println("固定延迟任务第" + count + "次执行,时间: " + System.currentTimeMillis());
try {
Thread.sleep(1000); // 任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 2, 2, TimeUnit.SECONDS); // 2秒后开始,每次执行完成后延迟2秒再执行
// 运行10秒后停止
scheduler.schedule(() -> {
fixedRateTask.cancel(false);
fixedDelayTask.cancel(false);
System.out.println("取消定时任务");
}, 10, TimeUnit.SECONDS);
// 等待一段时间后关闭
try {
Thread.sleep(12000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduler.shutdown();
}
// 心跳检测示例
public static void heartbeatMonitoring() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
class HeartbeatMonitor {
private volatile boolean isHealthy = true;
private AtomicInteger heartbeatCount = new AtomicInteger(0);
public void startMonitoring() {
scheduler.scheduleAtFixedRate(() -> {
int count = heartbeatCount.incrementAndGet();
// 模拟健康检查
boolean currentHealth = Math.random() > 0.2; // 80%概率健康
if (currentHealth != isHealthy) {
isHealthy = currentHealth;
System.out.println("[心跳" + count + "] 状态变化: " + (isHealthy ? "健康" : "异常"));
} else {
System.out.println("[心跳" + count + "] 状态: " + (isHealthy ? "健康" : "异常"));
}
if (!isHealthy) {
System.out.println("[心跳" + count + "] 触发告警!");
}
}, 0, 2, TimeUnit.SECONDS);
}
}
HeartbeatMonitor monitor = new HeartbeatMonitor();
monitor.startMonitoring();
// 运行10秒
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduler.shutdown();
}
// 缓存清理示例
public static void cacheCleanup() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
class Cache {
private final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Long> accessTime = new ConcurrentHashMap<>();
public void put(String key, String value) {
cache.put(key, value);
accessTime.put(key, System.currentTimeMillis());
System.out.println("缓存添加: " + key + " = " + value);
}
public String get(String key) {
accessTime.put(key, System.currentTimeMillis());
return cache.get(key);
}
public void cleanup() {
long currentTime = System.currentTimeMillis();
long expireTime = 5000; // 5秒过期
accessTime.entrySet().removeIf(entry -> {
if (currentTime - entry.getValue() > expireTime) {
String key = entry.getKey();
cache.remove(key);
System.out.println("清理过期缓存: " + key);
return true;
}
return false;
});
System.out.println("缓存清理完成,当前大小: " + cache.size());
}
}
Cache cache = new Cache();
// 定期清理缓存
scheduler.scheduleAtFixedRate(cache::cleanup, 3, 3, TimeUnit.SECONDS);
// 模拟缓存使用
cache.put("key1", "value1");
cache.put("key2", "value2");
try {
Thread.sleep(2000);
cache.put("key3", "value3");
Thread.sleep(4000);
cache.get("key2"); // 更新访问时间
cache.put("key4", "value4");
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduler.shutdown();
}
public static void main(String[] args) {
System.out.println("=== 基本调度 ===");
basicScheduling();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n=== 心跳监控 ===");
heartbeatMonitoring();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n=== 缓存清理 ===");
cacheCleanup();
}
}
# 4. 拒绝策略
# 4.1 内置拒绝策略
import java.util.concurrent.*;
public class RejectionPolicyDemo {
public static void abortPolicy() {
System.out.println("=== AbortPolicy 示例 ===");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.AbortPolicy() // 默认策略
);
try {
// 提交3个任务,第3个会被拒绝
for (int i = 1; i <= 3; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "完成");
} catch (InterruptedException e) {
System.out.println("任务" + taskId + "被中断");
Thread.currentThread().interrupt();
}
});
}
System.out.println("任务已提交,程序运行中...(可以通过Ctrl+C测试关闭钩子)");
// 模拟程序运行
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 正常关闭
gracefulShutdown(executor, 3, TimeUnit.SECONDS);
}
public static void main(String[] args) {
System.out.println("=== 优雅关闭示例 ===");
demonstrateGracefulShutdown();
System.out.println("\n=== 关闭钩子示例 ===");
shutdownWithHook();
}
}
# 7. 线程池异常处理
# 7.1 异常处理策略
import java.util.concurrent.*;
public class ThreadPoolExceptionHandlingDemo {
// 自定义线程工厂,设置异常处理器
static class CustomThreadFactory implements ThreadFactory {
private int counter = 0;
private final Thread.UncaughtExceptionHandler exceptionHandler;
public CustomThreadFactory(Thread.UncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "CustomThread-" + (++counter));
t.setDaemon(false);
if (exceptionHandler != null) {
t.setUncaughtExceptionHandler(exceptionHandler);
}
return t;
}
}
public static void demonstrateExceptionHandling() {
// 自定义异常处理器
Thread.UncaughtExceptionHandler exceptionHandler = (thread, exception) -> {
System.err.println("线程 " + thread.getName() + " 发生未捕获异常: " + exception.getMessage());
exception.printStackTrace();
// 可以在这里添加日志记录、告警等逻辑
System.out.println("异常已记录,线程将被终止");
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new CustomThreadFactory(exceptionHandler)
);
// 提交会抛出异常的任务
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "开始执行");
if (taskId % 2 == 0) {
// 偶数任务抛出异常
throw new RuntimeException("任务" + taskId + "模拟异常");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "正常完成");
});
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
// 使用Future处理异常
public static void handleExceptionWithFuture() {
System.out.println("\n=== 使用Future处理异常 ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务并获取Future
Future<?>[] futures = new Future[5];
for (int i = 0; i < 5; i++) {
final int taskId = i + 1;
futures[i] = executor.submit(() -> {
System.out.println("任务" + taskId + "开始执行");
if (taskId == 3) {
throw new RuntimeException("任务" + taskId + "发生异常");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务被中断", e);
}
System.out.println("任务" + taskId + "完成");
return "任务" + taskId + "结果";
});
}
// 检查任务执行结果
for (int i = 0; i < futures.length; i++) {
try {
Object result = futures[i].get(3, TimeUnit.SECONDS);
System.out.println("任务" + (i + 1) + "成功,结果: " + result);
} catch (ExecutionException e) {
System.err.println("任务" + (i + 1) + "执行异常: " + e.getCause().getMessage());
} catch (TimeoutException e) {
System.err.println("任务" + (i + 1) + "执行超时");
futures[i].cancel(true);
} catch (InterruptedException e) {
System.err.println("等待任务" + (i + 1) + "被中断");
Thread.currentThread().interrupt();
}
}
executor.shutdown();
}
public static void main(String[] args) {
demonstrateExceptionHandling();
handleExceptionWithFuture();
}
}
# 8. 性能调优
# 8.1 线程池参数调优
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class ThreadPoolTuningDemo {
// 性能测试工具类
static class PerformanceTest {
private final AtomicLong completedTasks = new AtomicLong(0);
private final AtomicLong totalExecutionTime = new AtomicLong(0);
private volatile long startTime;
private volatile long endTime;
public void start() {
startTime = System.currentTimeMillis();
completedTasks.set(0);
totalExecutionTime.set(0);
}
public void taskCompleted(long executionTime) {
completedTasks.incrementAndGet();
totalExecutionTime.addAndGet(executionTime);
}
public void end() {
endTime = System.currentTimeMillis();
}
public void printResults(String testName) {
long totalTime = endTime - startTime;
long completed = completedTasks.get();
long avgExecutionTime = completed > 0 ? totalExecutionTime.get() / completed : 0;
System.out.println("=== " + testName + " 性能测试结果 ===");
System.out.println("总耗时: " + totalTime + "ms");
System.out.println("完成任务数: " + completed);
System.out.println("吞吐量: " + (completed * 1000.0 / totalTime) + " 任务/秒");
System.out.println("平均任务执行时间: " + avgExecutionTime + "ms");
System.out.println();
}
}
public static void compareThreadPoolConfigurations() {
int taskCount = 100;
int taskDuration = 100; // 每个任务100ms
// 配置1:小线程池
testConfiguration("小线程池(2-4)", 2, 4, 10, taskCount, taskDuration);
// 配置2:中等线程池
testConfiguration("中等线程池(4-8)", 4, 8, 20, taskCount, taskDuration);
// 配置3:大线程池
testConfiguration("大线程池(8-16)", 8, 16, 50, taskCount, taskDuration);
// 配置4:固定线程池
testFixedThreadPool("固定线程池(8)", 8, taskCount, taskDuration);
}
private static void testConfiguration(String name, int coreSize, int maxSize,
int queueSize, int taskCount, int taskDuration) {
PerformanceTest test = new PerformanceTest();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
coreSize, maxSize, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueSize),
new ThreadPoolExecutor.CallerRunsPolicy()
);
test.start();
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
executor.submit(() -> {
long start = System.currentTimeMillis();
try {
Thread.sleep(taskDuration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
long end = System.currentTimeMillis();
test.taskCompleted(end - start);
latch.countDown();
});
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
test.end();
test.printResults(name);
executor.shutdown();
}
private static void testFixedThreadPool(String name, int poolSize,
int taskCount, int taskDuration) {
PerformanceTest test = new PerformanceTest();
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
test.start();
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
executor.submit(() -> {
long start = System.currentTimeMillis();
try {
Thread.sleep(taskDuration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
long end = System.currentTimeMillis();
test.taskCompleted(end - start);
latch.countDown();
});
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
test.end();
test.printResults(name);
executor.shutdown();
}
public static void main(String[] args) {
compareThreadPoolConfigurations();
}
}
# 9. 总结
本文深入分析了Java线程池的各个方面:
# 9.1 核心要点
- 线程池参数:合理配置核心线程数、最大线程数、队列大小等参数
- 任务类型:根据CPU密集型、IO密集型选择不同的线程池配置
- 拒绝策略:选择合适的拒绝策略,必要时自定义策略
- 监控和调优:实时监控线程池状态,根据性能指标调优参数
- 异常处理:正确处理任务执行中的异常
- 优雅关闭:确保线程池能够优雅地关闭
# 9.2 最佳实践
- 避免使用Executors:推荐手动创建ThreadPoolExecutor,明确各项参数
- 合理设置队列大小:避免无界队列导致内存溢出
- 监控线程池状态:定期检查线程池的各项指标
- 设置有意义的线程名:便于问题排查和监控
- 正确处理异常:使用Future.get()或自定义异常处理器
- 及时关闭线程池:避免资源泄露
# 9.3 常见问题
- 线程池大小设置不当:导致资源浪费或性能瓶颈
- 队列选择错误:无界队列可能导致OOM
- 拒绝策略不合适:可能导致任务丢失或系统阻塞
- 异常处理不当:异常被吞没,难以排查问题
- 未正确关闭:导致程序无法正常退出
通过合理使用线程池,可以显著提高应用程序的性能和稳定性。在实际开发中,应该根据具体的业务场景和性能要求来选择和配置线程池。
# 参考资料
- Java并发编程实战 (opens new window)
- Java官方文档 - ThreadPoolExecutor (opens new window)
- 深入理解Java虚拟机 (opens new window)
务" + taskId + "执行中...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "完成");
});
System.out.println("提交任务" + taskId);
}
} catch (RejectedExecutionException e) {
System.out.println("任务被拒绝: " + e.getMessage());
}
executor.shutdown();
}
public static void callerRunsPolicy() {
System.out.println("\n=== CallerRunsPolicy 示例 ===");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 1; i <= 3; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "执行中,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "完成");
});
System.out.println("提交任务" + taskId + ",当前线程: " + Thread.currentThread().getName());
}
executor.shutdown();
}
public static void discardPolicy() {
System.out.println("\n=== DiscardPolicy 示例 ===");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1),
new ThreadPoolExecutor.DiscardPolicy()
);
for (int i = 1; i <= 4; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "执行中...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "完成");
});
System.out.println("提交任务" + taskId);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
public static void discardOldestPolicy() {
System.out.println("\n=== DiscardOldestPolicy 示例 ===");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "执行中...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "完成");
});
System.out.println("提交任务" + taskId + ",队列大小: " + executor.getQueue().size());
}
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
public static void main(String[] args) {
abortPolicy();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
callerRunsPolicy();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
discardPolicy();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
discardOldestPolicy();
}
}
# 4.2 自定义拒绝策略
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomRejectionPolicyDemo {
// 记录拒绝次数的策略
static class LoggingRejectedExecutionHandler implements RejectedExecutionHandler {
private final AtomicInteger rejectedCount = new AtomicInteger(0);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
int count = rejectedCount.incrementAndGet();
System.out.println("[拒绝策略] 任务被拒绝 #" + count +
", 当前线程池状态 - 核心线程数: " + executor.getCorePoolSize() +
", 当前线程数: " + executor.getPoolSize() +
", 队列大小: " + executor.getQueue().size());
}
public int getRejectedCount() {
return rejectedCount.get();
}
}
// 重试策略
static class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
private final int maxRetries;
private final long retryDelayMs;
public RetryRejectedExecutionHandler(int maxRetries, long retryDelayMs) {
this.maxRetries = maxRetries;
this.retryDelayMs = retryDelayMs;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
System.out.println("[重试策略] 线程池已关闭,任务被丢弃");
return;
}
// 在新线程中进行重试
new Thread(() -> {
for (int i = 0; i < maxRetries; i++) {
try {
Thread.sleep(retryDelayMs);
executor.execute(r);
System.out.println("[重试策略] 任务重试成功,重试次数: " + (i + 1));
return;
} catch (RejectedExecutionException | InterruptedException e) {
System.out.println("[重试策略] 第" + (i + 1) + "次重试失败");
}
}
System.out.println("[重试策略] 达到最大重试次数,任务被丢弃");
}).start();
}
}
// 降级策略
static class FallbackRejectedExecutionHandler implements RejectedExecutionHandler {
private final ExecutorService fallbackExecutor;
public FallbackRejectedExecutionHandler() {
this.fallbackExecutor = Executors.newSingleThreadExecutor();
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("[降级策略] 任务被降级到备用线程池执行");
fallbackExecutor.execute(() -> {
System.out.println("[降级策略] 在备用线程池中执行任务");
r.run();
});
}
public void shutdown() {
fallbackExecutor.shutdown();
}
}
public static void testLoggingPolicy() {
System.out.println("=== 记录拒绝策略测试 ===");
LoggingRejectedExecutionHandler handler = new LoggingRejectedExecutionHandler();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(2),
handler
);
// 提交5个任务,后面的会被拒绝
for (int i = 1; i <= 5; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("任务" + taskId + "执行中...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "完成");
});
} catch (RejectedExecutionException e) {
// 这里不会抛出异常,因为我们的策略只是记录
}
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("总拒绝次数: " + handler.getRejectedCount());
executor.shutdown();
}
public static void testRetryPolicy() {
System.out.println("\n=== 重试策略测试 ===");
RetryRejectedExecutionHandler handler = new RetryRejectedExecutionHandler(3, 1000);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1),
handler
);
// 提交3个任务
for (int i = 1; i <= 3; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "执行中...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "完成");
});
}
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
public static void testFallbackPolicy() {
System.out.println("\n=== 降级策略测试 ===");
FallbackRejectedExecutionHandler handler = new FallbackRejectedExecutionHandler();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1),
handler
);
// 提交4个任务
for (int i = 1; i <= 4; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "在主线程池执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "完成");
});
}
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
handler.shutdown();
}
public static void main(String[] args) {
testLoggingPolicy();
testRetryPolicy();
testFallbackPolicy();
}
}
# 5. 线程池监控
# 5.1 基本监控
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class ThreadPoolMonitoringDemo {
public static void basicMonitoring() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10)
);
// 监控线程
ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
monitor.scheduleAtFixedRate(() -> {
System.out.println("=== 线程池状态监控 ===");
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("当前线程数: " + executor.getPoolSize());
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("队列大小: " + executor.getQueue().size());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
System.out.println("总任务数: " + executor.getTaskCount());
System.out.println("是否关闭: " + executor.isShutdown());
System.out.println("是否终止: " + executor.isTerminated());
System.out.println();
}, 0, 2, TimeUnit.SECONDS);
// 提交任务
for (int i = 1; i <= 15; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "开始执行");
try {
Thread.sleep((int) (Math.random() * 3000) + 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + "完成");
});
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 等待任务完成
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
monitor.shutdown();
executor.shutdown();
}
// 自定义监控线程池
static class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicLong totalExecutionTime = new AtomicLong(0);
private final AtomicLong taskCount = new AtomicLong(0);
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
public MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTime.set(System.currentTimeMillis());
System.out.println("[监控] 任务开始执行,线程: " + t.getName());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.currentTimeMillis();
long executionTime = endTime - startTime.get();
totalExecutionTime.addAndGet(executionTime);
long count = taskCount.incrementAndGet();
System.out.println("[监控] 任务执行完成,耗时: " + executionTime + "ms");
if (t != null) {
System.out.println("[监控] 任务执行异常: " + t.getMessage());
}
// 每10个任务输出一次统计
if (count % 10 == 0) {
System.out.println("[统计] 已完成任务数: " + count +
", 平均执行时间: " + (totalExecutionTime.get() / count) + "ms");
}
} finally {
super.afterExecute(r, t);
startTime.remove();
}
}
@Override
protected void terminated() {
super.terminated();
System.out.println("[监控] 线程池已终止");
System.out.println("[统计] 总任务数: " + taskCount.get());
System.out.println("[统计] 总执行时间: " + totalExecutionTime.get() + "ms");
if (taskCount.get() > 0) {
System.out.println("[统计] 平均执行时间: " +
(totalExecutionTime.get() / taskCount.get()) + "ms");
}
}
public void printStatistics() {
long count = taskCount.get();
long totalTime = totalExecutionTime.get();
System.out.println("[实时统计] 已完成: " + count +
", 总耗时: " + totalTime + "ms" +
(count > 0 ? ", 平均: " + (totalTime / count) + "ms" : ""));
}
}
public static void customMonitoring() {
System.out.println("\n=== 自定义监控示例 ===");
MonitoredThreadPoolExecutor executor = new MonitoredThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5)
);
// 定期输出统计信息
ScheduledExecutorService statsScheduler = Executors.newScheduledThreadPool(1);
statsScheduler.scheduleAtFixedRate(executor::printStatistics, 3, 3, TimeUnit.SECONDS);
// 提交任务
for (int i = 1; i <= 20; i++) {
final int taskId = i;
executor.submit(() -> {
try {
// 模拟不同的执行时间
int sleepTime = (int) (Math.random() * 2000) + 500;
Thread.sleep(sleepTime);
// 模拟偶尔的异常
if (Math.random() < 0.1) {
throw new RuntimeException("模拟任务异常");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (RuntimeException e) {
throw e; // 重新抛出以便监控
}
});
}
// 等待任务完成
executor.shutdown();
try {
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
statsScheduler.shutdown();
}
public static void main(String[] args) {
basicMonitoring();
customMonitoring();
}
}
# 6. 最佳实践
# 6.1 线程池配置建议
import java.util.concurrent.*;
public class ThreadPoolBestPractices {
// CPU密集型任务线程池
public static ExecutorService createCpuIntensivePool() {
int cpuCount = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cpuCount, // 核心线程数 = CPU核心数
cpuCount, // 最大线程数 = CPU核心数
0L, TimeUnit.MILLISECONDS, // 无需保持空闲线程
new LinkedBlockingQueue<>(100), // 有界队列
new ThreadFactory() {
private int counter = 0;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "CPU-Worker-" + (++counter));
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 调用者运行策略
);
}
// IO密集型任务线程池
public static ExecutorService createIoIntensivePool() {
int cpuCount = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cpuCount * 2, // 核心线程数 = CPU核心数 * 2
cpuCount * 4, // 最大线程数 = CPU核心数 * 4
60L, TimeUnit.SECONDS, // 空闲线程保持60秒
new LinkedBlockingQueue<>(200), // 较大的队列
new ThreadFactory() {
private int counter = 0;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "IO-Worker-" + (++counter));
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
// 混合型任务线程池
public static ExecutorService createMixedPool() {
int cpuCount = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
cpuCount + 1, // 核心线程数
cpuCount * 2 + 1, // 最大线程数
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(150),
new ThreadFactory() {
private int counter = 0;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "Mixed-Worker-" + (++counter));
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.AbortPolicy()
);
}
public static void demonstratePoolTypes() {
System.out.println("CPU核心数: " + Runtime.getRuntime().availableProcessors());
// CPU密集型任务测试
ExecutorService cpuPool = createCpuIntensivePool();
System.out.println("\n=== CPU密集型任务测试 ===");
for (int i = 0; i < 8; i++) {
cpuPool.submit(() -> {
// 模拟CPU密集型计算
long start = System.currentTimeMillis();
long sum = 0;
for (int j = 0; j < 100000000; j++) {
sum += j;
}
long end = System.currentTimeMillis();
System.out.println("CPU任务完成,耗时: " + (end - start) + "ms, 线程: " +
Thread.currentThread().getName());
});
}
// IO密集型任务测试
ExecutorService ioPool = createIoIntensivePool();
System.out.println("\n=== IO密集型任务测试 ===");
for (int i = 0; i < 12; i++) {
ioPool.submit(() -> {
try {
// 模拟IO操作
long start = System.currentTimeMillis();
Thread.sleep(1000); // 模拟IO等待
long end = System.currentTimeMillis();
System.out.println("IO任务完成,耗时: " + (end - start) + "ms, 线程: " +
Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 关闭线程池
cpuPool.shutdown();
ioPool.shutdown();
try {
cpuPool.awaitTermination(10, TimeUnit.SECONDS);
ioPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
demonstratePoolTypes();
}
}
# 6.2 优雅关闭
import java.util.concurrent.*;
import java.util.List;
public class GracefulShutdownDemo {
public static void demonstrateGracefulShutdown() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10)
);
// 提交一些长时间运行的任务
for (int i = 1; i <= 8; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "开始执行");
try {
for (int j = 0; j < 10; j++) {
Thread.sleep(500);
System.out.println("任务" + taskId + "进度: " + ((j + 1) * 10) + "%");
// 检查中断状态
if (Thread.currentThread().isInterrupted()) {
System.out.println("任务" + taskId + "被中断");
return;
}
}
System.out.println("任务" + taskId + "正常完成");
} catch (InterruptedException e) {
System.out.println("任务" + taskId + "被中断异常");
Thread.currentThread().interrupt();
}
});
}
// 等待一段时间后开始关闭
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n开始优雅关闭线程池...");
gracefulShutdown(executor, 5, TimeUnit.SECONDS);
}
public static void gracefulShutdown(ExecutorService executor, long timeout, TimeUnit unit) {
// 第一步:停止接收新任务
executor.shutdown();
System.out.println("已调用shutdown(),不再接收新任务");
try {
// 第二步:等待已提交的任务完成
if (!executor.awaitTermination(timeout, unit)) {
System.out.println("等待超时,强制关闭线程池");
// 第三步:取消正在执行的任务
List<Runnable> pendingTasks = executor.shutdownNow();
System.out.println("强制关闭,未执行的任务数: " + pendingTasks.size());
// 第四步:再次等待一段时间
if (!executor.awaitTermination(timeout, unit)) {
System.out.println("线程池无法正常终止");
} else {
System.out.println("线程池已强制终止");
}
} else {
System.out.println("线程池已优雅关闭");
}
} catch (InterruptedException e) {
// 当前线程被中断,强制关闭线程池
System.out.println("关闭过程被中断,强制关闭线程池");
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}