Java线程池深度解析

# Java线程池深度解析

点击勘误issues (opens new window),哪吒感谢大家的阅读

# 1. 线程池概述

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。

# 1.1 为什么使用线程池

  1. 降低资源消耗:重复利用已创建的线程降低线程创建和销毁造成的消耗
  2. 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性

# 2. ThreadPoolExecutor详解

# 2.1 构造函数参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:核心线程数量
  • maximumPoolSize:最大线程数量
  • keepAliveTime:非核心线程空闲时的存活时间
  • unit:时间单位
  • workQueue:任务队列
  • threadFactory:线程工厂
  • handler:拒绝策略

# 2.2 线程池执行流程

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolExecutorDemo {
    
    public static void demonstrateExecutionFlow() {
        // 自定义线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "CustomThread-" + threadNumber.getAndIncrement());
                thread.setDaemon(false);
                return thread;
            }
        };
        
        // 自定义拒绝策略
        RejectedExecutionHandler rejectedHandler = new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("任务被拒绝: " + r.toString() + 
                    ", 当前线程数: " + executor.getPoolSize() + 
                    ", 队列大小: " + executor.getQueue().size());
            }
        };
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,                              // 核心线程数
            4,                              // 最大线程数
            60L,                            // 空闲时间
            TimeUnit.SECONDS,               // 时间单位
            new ArrayBlockingQueue<>(2),    // 队列容量为2
            threadFactory,                  // 线程工厂
            rejectedHandler                 // 拒绝策略
        );
        
        // 提交8个任务,观察执行流程
        for (int i = 1; i <= 8; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "开始执行,线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(3000); // 模拟任务执行
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + "执行完成");
            });
            
            // 打印当前状态
            System.out.println("提交任务" + taskId + "后 - 核心线程数: " + executor.getCorePoolSize() + 
                ", 当前线程数: " + executor.getPoolSize() + 
                ", 队列大小: " + executor.getQueue().size());
        }
        
        executor.shutdown();
        try {
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
    
    public static void main(String[] args) {
        demonstrateExecutionFlow();
    }
}

# 2.3 线程池状态

import java.util.concurrent.*;

public class ThreadPoolStateDemo {
    
    public static void demonstrateStates() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10)
        );
        
        System.out.println("初始状态: " + getPoolState(executor));
        
        // 提交任务
        for (int i = 0; i < 3; i++) {
            executor.submit(() -> {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        System.out.println("提交任务后: " + getPoolState(executor));
        
        // 关闭线程池
        executor.shutdown();
        System.out.println("调用shutdown后: " + getPoolState(executor));
        
        try {
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("最终状态: " + getPoolState(executor));
    }
    
    private static String getPoolState(ThreadPoolExecutor executor) {
        if (executor.isTerminated()) {
            return "TERMINATED";
        } else if (executor.isTerminating()) {
            return "TIDYING";
        } else if (executor.isShutdown()) {
            return "SHUTDOWN";
        } else {
            return "RUNNING";
        }
    }
    
    public static void main(String[] args) {
        demonstrateStates();
    }
}

# 3. 常用线程池类型

# 3.1 FixedThreadPool

import java.util.concurrent.*;

public class FixedThreadPoolDemo {
    
    public static void basicUsage() {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 提交10个任务
        for (int i = 1; i <= 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "开始执行,线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + "执行完成");
            });
        }
        
        executor.shutdown();
        try {
            if (!executor.awaitTermination(15, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
    
    // 批量处理任务示例
    public static void batchProcessing() {
        ExecutorService executor = Executors.newFixedThreadPool(4);
        
        // 模拟批量数据处理
        String[] data = {"数据1", "数据2", "数据3", "数据4", "数据5", "数据6", "数据7", "数据8"};
        
        CompletableFuture<Void>[] futures = new CompletableFuture[data.length];
        
        for (int i = 0; i < data.length; i++) {
            final String item = data[i];
            futures[i] = CompletableFuture.runAsync(() -> {
                System.out.println("处理" + item + ",线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟处理时间
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println(item + "处理完成");
            }, executor);
        }
        
        // 等待所有任务完成
        CompletableFuture.allOf(futures).join();
        System.out.println("所有数据处理完成");
        
        executor.shutdown();
    }
    
    public static void main(String[] args) {
        System.out.println("=== 基本用法 ===");
        basicUsage();
        
        System.out.println("\n=== 批量处理 ===");
        batchProcessing();
    }
}

# 3.2 CachedThreadPool

import java.util.concurrent.*;

public class CachedThreadPoolDemo {
    
    public static void dynamicThreadCreation() {
        ExecutorService executor = Executors.newCachedThreadPool();
        
        // 第一批任务 - 快速提交
        System.out.println("提交第一批任务...");
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "开始执行,线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + "执行完成");
            });
        }
        
        try {
            Thread.sleep(3000); // 等待第一批任务完成
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        // 第二批任务 - 延迟提交,观察线程复用
        System.out.println("\n提交第二批任务...");
        for (int i = 6; i <= 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "开始执行,线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + "执行完成");
            });
        }
        
        executor.shutdown();
        try {
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
    
    // 突发任务处理示例
    public static void burstTaskHandling() {
        ExecutorService executor = Executors.newCachedThreadPool();
        
        // 模拟突发的大量短任务
        System.out.println("模拟突发任务处理...");
        for (int i = 1; i <= 20; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("突发任务" + taskId + ",线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(500); // 短任务
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            
            // 快速提交任务
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        executor.shutdown();
        try {
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
    
    public static void main(String[] args) {
        System.out.println("=== 动态线程创建 ===");
        dynamicThreadCreation();
        
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("\n=== 突发任务处理 ===");
        burstTaskHandling();
    }
}

# 3.3 SingleThreadExecutor

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class SingleThreadExecutorDemo {
    
    public static void sequentialExecution() {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        
        // 提交多个任务,观察顺序执行
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "开始执行,线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + "执行完成");
            });
        }
        
        executor.shutdown();
        try {
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
    
    // 日志记录示例
    public static void logRecording() {
        ExecutorService logExecutor = Executors.newSingleThreadExecutor();
        AtomicInteger logCounter = new AtomicInteger(0);
        
        // 模拟多个线程产生日志
        for (int i = 1; i <= 3; i++) {
            final int threadId = i;
            new Thread(() -> {
                for (int j = 1; j <= 5; j++) {
                    final int logId = j;
                    // 将日志写入任务提交到单线程执行器
                    logExecutor.submit(() -> {
                        int logNumber = logCounter.incrementAndGet();
                        System.out.println("[LOG-" + logNumber + "] 线程" + threadId + "的日志" + logId + 
                            " - 时间: " + System.currentTimeMillis());
                        try {
                            Thread.sleep(100); // 模拟写入时间
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    });
                }
            }).start();
        }
        
        // 等待所有日志写入完成
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        logExecutor.shutdown();
        try {
            if (!logExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                logExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            logExecutor.shutdownNow();
        }
    }
    
    // 状态机示例
    public static void stateMachine() {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        
        class StateMachine {
            private String state = "INIT";
            
            public void transition(String newState) {
                executor.submit(() -> {
                    System.out.println("状态转换: " + state + " -> " + newState);
                    this.state = newState;
                    try {
                        Thread.sleep(500); // 模拟状态转换处理时间
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    System.out.println("当前状态: " + this.state);
                });
            }
        }
        
        StateMachine machine = new StateMachine();
        
        // 多个线程尝试改变状态
        machine.transition("STARTING");
        machine.transition("RUNNING");
        machine.transition("PAUSED");
        machine.transition("RUNNING");
        machine.transition("STOPPED");
        
        executor.shutdown();
        try {
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
    
    public static void main(String[] args) {
        System.out.println("=== 顺序执行 ===");
        sequentialExecution();
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("\n=== 日志记录 ===");
        logRecording();
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("\n=== 状态机 ===");
        stateMachine();
    }
}

# 3.4 ScheduledThreadPool

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ScheduledThreadPoolDemo {
    
    public static void basicScheduling() {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
        
        // 延迟执行
        scheduler.schedule(() -> {
            System.out.println("延迟3秒执行的任务,时间: " + System.currentTimeMillis());
        }, 3, TimeUnit.SECONDS);
        
        // 固定频率执行
        AtomicInteger counter1 = new AtomicInteger(0);
        ScheduledFuture<?> fixedRateTask = scheduler.scheduleAtFixedRate(() -> {
            int count = counter1.incrementAndGet();
            System.out.println("固定频率任务第" + count + "次执行,时间: " + System.currentTimeMillis());
            try {
                Thread.sleep(1500); // 任务执行时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, 1, 2, TimeUnit.SECONDS); // 1秒后开始,每2秒执行一次
        
        // 固定延迟执行
        AtomicInteger counter2 = new AtomicInteger(0);
        ScheduledFuture<?> fixedDelayTask = scheduler.scheduleWithFixedDelay(() -> {
            int count = counter2.incrementAndGet();
            System.out.println("固定延迟任务第" + count + "次执行,时间: " + System.currentTimeMillis());
            try {
                Thread.sleep(1000); // 任务执行时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, 2, 2, TimeUnit.SECONDS); // 2秒后开始,每次执行完成后延迟2秒再执行
        
        // 运行10秒后停止
        scheduler.schedule(() -> {
            fixedRateTask.cancel(false);
            fixedDelayTask.cancel(false);
            System.out.println("取消定时任务");
        }, 10, TimeUnit.SECONDS);
        
        // 等待一段时间后关闭
        try {
            Thread.sleep(12000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        scheduler.shutdown();
    }
    
    // 心跳检测示例
    public static void heartbeatMonitoring() {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        
        class HeartbeatMonitor {
            private volatile boolean isHealthy = true;
            private AtomicInteger heartbeatCount = new AtomicInteger(0);
            
            public void startMonitoring() {
                scheduler.scheduleAtFixedRate(() -> {
                    int count = heartbeatCount.incrementAndGet();
                    
                    // 模拟健康检查
                    boolean currentHealth = Math.random() > 0.2; // 80%概率健康
                    
                    if (currentHealth != isHealthy) {
                        isHealthy = currentHealth;
                        System.out.println("[心跳" + count + "] 状态变化: " + (isHealthy ? "健康" : "异常"));
                    } else {
                        System.out.println("[心跳" + count + "] 状态: " + (isHealthy ? "健康" : "异常"));
                    }
                    
                    if (!isHealthy) {
                        System.out.println("[心跳" + count + "] 触发告警!");
                    }
                }, 0, 2, TimeUnit.SECONDS);
            }
        }
        
        HeartbeatMonitor monitor = new HeartbeatMonitor();
        monitor.startMonitoring();
        
        // 运行10秒
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        scheduler.shutdown();
    }
    
    // 缓存清理示例
    public static void cacheCleanup() {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        
        class Cache {
            private final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>();
            private final ConcurrentHashMap<String, Long> accessTime = new ConcurrentHashMap<>();
            
            public void put(String key, String value) {
                cache.put(key, value);
                accessTime.put(key, System.currentTimeMillis());
                System.out.println("缓存添加: " + key + " = " + value);
            }
            
            public String get(String key) {
                accessTime.put(key, System.currentTimeMillis());
                return cache.get(key);
            }
            
            public void cleanup() {
                long currentTime = System.currentTimeMillis();
                long expireTime = 5000; // 5秒过期
                
                accessTime.entrySet().removeIf(entry -> {
                    if (currentTime - entry.getValue() > expireTime) {
                        String key = entry.getKey();
                        cache.remove(key);
                        System.out.println("清理过期缓存: " + key);
                        return true;
                    }
                    return false;
                });
                
                System.out.println("缓存清理完成,当前大小: " + cache.size());
            }
        }
        
        Cache cache = new Cache();
        
        // 定期清理缓存
        scheduler.scheduleAtFixedRate(cache::cleanup, 3, 3, TimeUnit.SECONDS);
        
        // 模拟缓存使用
        cache.put("key1", "value1");
        cache.put("key2", "value2");
        
        try {
            Thread.sleep(2000);
            cache.put("key3", "value3");
            
            Thread.sleep(4000);
            cache.get("key2"); // 更新访问时间
            cache.put("key4", "value4");
            
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        scheduler.shutdown();
    }
    
    public static void main(String[] args) {
        System.out.println("=== 基本调度 ===");
        basicScheduling();
        
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("\n=== 心跳监控 ===");
        heartbeatMonitoring();
        
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("\n=== 缓存清理 ===");
        cacheCleanup();
    }
}

# 4. 拒绝策略

# 4.1 内置拒绝策略

import java.util.concurrent.*;

public class RejectionPolicyDemo {
    
    public static void abortPolicy() {
        System.out.println("=== AbortPolicy 示例 ===");
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(1),
            new ThreadPoolExecutor.AbortPolicy() // 默认策略
        );
        
        try {
            // 提交3个任务,第3个会被拒绝
            for (int i = 1; i <= 3; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    System.out.println("任务" + taskId + "完成");
                } catch (InterruptedException e) {
                    System.out.println("任务" + taskId + "被中断");
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        System.out.println("任务已提交,程序运行中...(可以通过Ctrl+C测试关闭钩子)");
        
        // 模拟程序运行
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        // 正常关闭
        gracefulShutdown(executor, 3, TimeUnit.SECONDS);
    }
    
    public static void main(String[] args) {
        System.out.println("=== 优雅关闭示例 ===");
        demonstrateGracefulShutdown();
        
        System.out.println("\n=== 关闭钩子示例 ===");
        shutdownWithHook();
    }
}

# 7. 线程池异常处理

# 7.1 异常处理策略

import java.util.concurrent.*;

public class ThreadPoolExceptionHandlingDemo {
    
    // 自定义线程工厂,设置异常处理器
    static class CustomThreadFactory implements ThreadFactory {
        private int counter = 0;
        private final Thread.UncaughtExceptionHandler exceptionHandler;
        
        public CustomThreadFactory(Thread.UncaughtExceptionHandler exceptionHandler) {
            this.exceptionHandler = exceptionHandler;
        }
        
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "CustomThread-" + (++counter));
            t.setDaemon(false);
            if (exceptionHandler != null) {
                t.setUncaughtExceptionHandler(exceptionHandler);
            }
            return t;
        }
    }
    
    public static void demonstrateExceptionHandling() {
        // 自定义异常处理器
        Thread.UncaughtExceptionHandler exceptionHandler = (thread, exception) -> {
            System.err.println("线程 " + thread.getName() + " 发生未捕获异常: " + exception.getMessage());
            exception.printStackTrace();
            
            // 可以在这里添加日志记录、告警等逻辑
            System.out.println("异常已记录,线程将被终止");
        };
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10),
            new CustomThreadFactory(exceptionHandler)
        );
        
        // 提交会抛出异常的任务
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "开始执行");
                
                if (taskId % 2 == 0) {
                    // 偶数任务抛出异常
                    throw new RuntimeException("任务" + taskId + "模拟异常");
                }
                
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                
                System.out.println("任务" + taskId + "正常完成");
            });
        }
        
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        executor.shutdown();
    }
    
    // 使用Future处理异常
    public static void handleExceptionWithFuture() {
        System.out.println("\n=== 使用Future处理异常 ===");
        
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 提交任务并获取Future
        Future<?>[] futures = new Future[5];
        for (int i = 0; i < 5; i++) {
            final int taskId = i + 1;
            futures[i] = executor.submit(() -> {
                System.out.println("任务" + taskId + "开始执行");
                
                if (taskId == 3) {
                    throw new RuntimeException("任务" + taskId + "发生异常");
                }
                
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("任务被中断", e);
                }
                
                System.out.println("任务" + taskId + "完成");
                return "任务" + taskId + "结果";
            });
        }
        
        // 检查任务执行结果
        for (int i = 0; i < futures.length; i++) {
            try {
                Object result = futures[i].get(3, TimeUnit.SECONDS);
                System.out.println("任务" + (i + 1) + "成功,结果: " + result);
            } catch (ExecutionException e) {
                System.err.println("任务" + (i + 1) + "执行异常: " + e.getCause().getMessage());
            } catch (TimeoutException e) {
                System.err.println("任务" + (i + 1) + "执行超时");
                futures[i].cancel(true);
            } catch (InterruptedException e) {
                System.err.println("等待任务" + (i + 1) + "被中断");
                Thread.currentThread().interrupt();
            }
        }
        
        executor.shutdown();
    }
    
    public static void main(String[] args) {
        demonstrateExceptionHandling();
        handleExceptionWithFuture();
    }
}

# 8. 性能调优

# 8.1 线程池参数调优

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class ThreadPoolTuningDemo {
    
    // 性能测试工具类
    static class PerformanceTest {
        private final AtomicLong completedTasks = new AtomicLong(0);
        private final AtomicLong totalExecutionTime = new AtomicLong(0);
        private volatile long startTime;
        private volatile long endTime;
        
        public void start() {
            startTime = System.currentTimeMillis();
            completedTasks.set(0);
            totalExecutionTime.set(0);
        }
        
        public void taskCompleted(long executionTime) {
            completedTasks.incrementAndGet();
            totalExecutionTime.addAndGet(executionTime);
        }
        
        public void end() {
            endTime = System.currentTimeMillis();
        }
        
        public void printResults(String testName) {
            long totalTime = endTime - startTime;
            long completed = completedTasks.get();
            long avgExecutionTime = completed > 0 ? totalExecutionTime.get() / completed : 0;
            
            System.out.println("=== " + testName + " 性能测试结果 ===");
            System.out.println("总耗时: " + totalTime + "ms");
            System.out.println("完成任务数: " + completed);
            System.out.println("吞吐量: " + (completed * 1000.0 / totalTime) + " 任务/秒");
            System.out.println("平均任务执行时间: " + avgExecutionTime + "ms");
            System.out.println();
        }
    }
    
    public static void compareThreadPoolConfigurations() {
        int taskCount = 100;
        int taskDuration = 100; // 每个任务100ms
        
        // 配置1:小线程池
        testConfiguration("小线程池(2-4)", 2, 4, 10, taskCount, taskDuration);
        
        // 配置2:中等线程池
        testConfiguration("中等线程池(4-8)", 4, 8, 20, taskCount, taskDuration);
        
        // 配置3:大线程池
        testConfiguration("大线程池(8-16)", 8, 16, 50, taskCount, taskDuration);
        
        // 配置4:固定线程池
        testFixedThreadPool("固定线程池(8)", 8, taskCount, taskDuration);
    }
    
    private static void testConfiguration(String name, int coreSize, int maxSize, 
                                        int queueSize, int taskCount, int taskDuration) {
        PerformanceTest test = new PerformanceTest();
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            coreSize, maxSize, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(queueSize),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        
        test.start();
        
        CountDownLatch latch = new CountDownLatch(taskCount);
        
        for (int i = 0; i < taskCount; i++) {
            executor.submit(() -> {
                long start = System.currentTimeMillis();
                try {
                    Thread.sleep(taskDuration);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                long end = System.currentTimeMillis();
                test.taskCompleted(end - start);
                latch.countDown();
            });
        }
        
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        test.end();
        test.printResults(name);
        
        executor.shutdown();
    }
    
    private static void testFixedThreadPool(String name, int poolSize, 
                                          int taskCount, int taskDuration) {
        PerformanceTest test = new PerformanceTest();
        
        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
        
        test.start();
        
        CountDownLatch latch = new CountDownLatch(taskCount);
        
        for (int i = 0; i < taskCount; i++) {
            executor.submit(() -> {
                long start = System.currentTimeMillis();
                try {
                    Thread.sleep(taskDuration);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                long end = System.currentTimeMillis();
                test.taskCompleted(end - start);
                latch.countDown();
            });
        }
        
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        test.end();
        test.printResults(name);
        
        executor.shutdown();
    }
    
    public static void main(String[] args) {
        compareThreadPoolConfigurations();
    }
}

# 9. 总结

本文深入分析了Java线程池的各个方面:

# 9.1 核心要点

  1. 线程池参数:合理配置核心线程数、最大线程数、队列大小等参数
  2. 任务类型:根据CPU密集型、IO密集型选择不同的线程池配置
  3. 拒绝策略:选择合适的拒绝策略,必要时自定义策略
  4. 监控和调优:实时监控线程池状态,根据性能指标调优参数
  5. 异常处理:正确处理任务执行中的异常
  6. 优雅关闭:确保线程池能够优雅地关闭

# 9.2 最佳实践

  1. 避免使用Executors:推荐手动创建ThreadPoolExecutor,明确各项参数
  2. 合理设置队列大小:避免无界队列导致内存溢出
  3. 监控线程池状态:定期检查线程池的各项指标
  4. 设置有意义的线程名:便于问题排查和监控
  5. 正确处理异常:使用Future.get()或自定义异常处理器
  6. 及时关闭线程池:避免资源泄露

# 9.3 常见问题

  1. 线程池大小设置不当:导致资源浪费或性能瓶颈
  2. 队列选择错误:无界队列可能导致OOM
  3. 拒绝策略不合适:可能导致任务丢失或系统阻塞
  4. 异常处理不当:异常被吞没,难以排查问题
  5. 未正确关闭:导致程序无法正常退出

通过合理使用线程池,可以显著提高应用程序的性能和稳定性。在实际开发中,应该根据具体的业务场景和性能要求来选择和配置线程池。

# 参考资料

" + taskId + "执行中...");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    System.out.println("任务" + taskId + "完成");
                });
                System.out.println("提交任务" + taskId);
            }
        } catch (RejectedExecutionException e) {
            System.out.println("任务被拒绝: " + e.getMessage());
        }
        
        executor.shutdown();
    }
    
    public static void callerRunsPolicy() {
        System.out.println("\n=== CallerRunsPolicy 示例 ===");
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(1),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        
        for (int i = 1; i <= 3; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "执行中,线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + "完成");
            });
            System.out.println("提交任务" + taskId + ",当前线程: " + Thread.currentThread().getName());
        }
        
        executor.shutdown();
    }
    
    public static void discardPolicy() {
        System.out.println("\n=== DiscardPolicy 示例 ===");
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(1),
            new ThreadPoolExecutor.DiscardPolicy()
        );
        
        for (int i = 1; i <= 4; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "执行中...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + "完成");
            });
            System.out.println("提交任务" + taskId);
        }
        
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        executor.shutdown();
    }
    
    public static void discardOldestPolicy() {
        System.out.println("\n=== DiscardOldestPolicy 示例 ===");
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(2),
            new ThreadPoolExecutor.DiscardOldestPolicy()
        );
        
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "执行中...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + "完成");
            });
            System.out.println("提交任务" + taskId + ",队列大小: " + executor.getQueue().size());
        }
        
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        executor.shutdown();
    }
    
    public static void main(String[] args) {
        abortPolicy();
        
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        callerRunsPolicy();
        
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        discardPolicy();
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        discardOldestPolicy();
    }
}

# 4.2 自定义拒绝策略

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomRejectionPolicyDemo {
    
    // 记录拒绝次数的策略
    static class LoggingRejectedExecutionHandler implements RejectedExecutionHandler {
        private final AtomicInteger rejectedCount = new AtomicInteger(0);
        
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            int count = rejectedCount.incrementAndGet();
            System.out.println("[拒绝策略] 任务被拒绝 #" + count + 
                ", 当前线程池状态 - 核心线程数: " + executor.getCorePoolSize() + 
                ", 当前线程数: " + executor.getPoolSize() + 
                ", 队列大小: " + executor.getQueue().size());
        }
        
        public int getRejectedCount() {
            return rejectedCount.get();
        }
    }
    
    // 重试策略
    static class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
        private final int maxRetries;
        private final long retryDelayMs;
        
        public RetryRejectedExecutionHandler(int maxRetries, long retryDelayMs) {
            this.maxRetries = maxRetries;
            this.retryDelayMs = retryDelayMs;
        }
        
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (executor.isShutdown()) {
                System.out.println("[重试策略] 线程池已关闭,任务被丢弃");
                return;
            }
            
            // 在新线程中进行重试
            new Thread(() -> {
                for (int i = 0; i < maxRetries; i++) {
                    try {
                        Thread.sleep(retryDelayMs);
                        executor.execute(r);
                        System.out.println("[重试策略] 任务重试成功,重试次数: " + (i + 1));
                        return;
                    } catch (RejectedExecutionException | InterruptedException e) {
                        System.out.println("[重试策略] 第" + (i + 1) + "次重试失败");
                    }
                }
                System.out.println("[重试策略] 达到最大重试次数,任务被丢弃");
            }).start();
        }
    }
    
    // 降级策略
    static class FallbackRejectedExecutionHandler implements RejectedExecutionHandler {
        private final ExecutorService fallbackExecutor;
        
        public FallbackRejectedExecutionHandler() {
            this.fallbackExecutor = Executors.newSingleThreadExecutor();
        }
        
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("[降级策略] 任务被降级到备用线程池执行");
            fallbackExecutor.execute(() -> {
                System.out.println("[降级策略] 在备用线程池中执行任务");
                r.run();
            });
        }
        
        public void shutdown() {
            fallbackExecutor.shutdown();
        }
    }
    
    public static void testLoggingPolicy() {
        System.out.println("=== 记录拒绝策略测试 ===");
        LoggingRejectedExecutionHandler handler = new LoggingRejectedExecutionHandler();
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(2),
            handler
        );
        
        // 提交5个任务,后面的会被拒绝
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            try {
                executor.submit(() -> {
                    System.out.println("任务" + taskId + "执行中...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    System.out.println("任务" + taskId + "完成");
                });
            } catch (RejectedExecutionException e) {
                // 这里不会抛出异常,因为我们的策略只是记录
            }
        }
        
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("总拒绝次数: " + handler.getRejectedCount());
        executor.shutdown();
    }
    
    public static void testRetryPolicy() {
        System.out.println("\n=== 重试策略测试 ===");
        RetryRejectedExecutionHandler handler = new RetryRejectedExecutionHandler(3, 1000);
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(1),
            handler
        );
        
        // 提交3个任务
        for (int i = 1; i <= 3; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "执行中...");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + "完成");
            });
        }
        
        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        executor.shutdown();
    }
    
    public static void testFallbackPolicy() {
        System.out.println("\n=== 降级策略测试 ===");
        FallbackRejectedExecutionHandler handler = new FallbackRejectedExecutionHandler();
        
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(1),
            handler
        );
        
        // 提交4个任务
        for (int i = 1; i <= 4; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "在主线程池执行,线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + "完成");
            });
        }
        
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        executor.shutdown();
        handler.shutdown();
    }
    
    public static void main(String[] args) {
        testLoggingPolicy();
        testRetryPolicy();
        testFallbackPolicy();
    }
}

# 5. 线程池监控

# 5.1 基本监控

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class ThreadPoolMonitoringDemo {
    
    public static void basicMonitoring() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10)
        );
        
        // 监控线程
        ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
        monitor.scheduleAtFixedRate(() -> {
            System.out.println("=== 线程池状态监控 ===");
            System.out.println("核心线程数: " + executor.getCorePoolSize());
            System.out.println("最大线程数: " + executor.getMaximumPoolSize());
            System.out.println("当前线程数: " + executor.getPoolSize());
            System.out.println("活跃线程数: " + executor.getActiveCount());
            System.out.println("队列大小: " + executor.getQueue().size());
            System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
            System.out.println("总任务数: " + executor.getTaskCount());
            System.out.println("是否关闭: " + executor.isShutdown());
            System.out.println("是否终止: " + executor.isTerminated());
            System.out.println();
        }, 0, 2, TimeUnit.SECONDS);
        
        // 提交任务
        for (int i = 1; i <= 15; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "开始执行");
                try {
                    Thread.sleep((int) (Math.random() * 3000) + 1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + "完成");
            });
            
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        // 等待任务完成
        try {
            Thread.sleep(15000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        monitor.shutdown();
        executor.shutdown();
    }
    
    // 自定义监控线程池
    static class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
        private final AtomicLong totalExecutionTime = new AtomicLong(0);
        private final AtomicLong taskCount = new AtomicLong(0);
        private final ThreadLocal<Long> startTime = new ThreadLocal<>();
        
        public MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                         long keepAliveTime, TimeUnit unit,
                                         BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            startTime.set(System.currentTimeMillis());
            System.out.println("[监控] 任务开始执行,线程: " + t.getName());
        }
        
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            try {
                long endTime = System.currentTimeMillis();
                long executionTime = endTime - startTime.get();
                totalExecutionTime.addAndGet(executionTime);
                long count = taskCount.incrementAndGet();
                
                System.out.println("[监控] 任务执行完成,耗时: " + executionTime + "ms");
                
                if (t != null) {
                    System.out.println("[监控] 任务执行异常: " + t.getMessage());
                }
                
                // 每10个任务输出一次统计
                if (count % 10 == 0) {
                    System.out.println("[统计] 已完成任务数: " + count + 
                        ", 平均执行时间: " + (totalExecutionTime.get() / count) + "ms");
                }
            } finally {
                super.afterExecute(r, t);
                startTime.remove();
            }
        }
        
        @Override
        protected void terminated() {
            super.terminated();
            System.out.println("[监控] 线程池已终止");
            System.out.println("[统计] 总任务数: " + taskCount.get());
            System.out.println("[统计] 总执行时间: " + totalExecutionTime.get() + "ms");
            if (taskCount.get() > 0) {
                System.out.println("[统计] 平均执行时间: " + 
                    (totalExecutionTime.get() / taskCount.get()) + "ms");
            }
        }
        
        public void printStatistics() {
            long count = taskCount.get();
            long totalTime = totalExecutionTime.get();
            System.out.println("[实时统计] 已完成: " + count + 
                ", 总耗时: " + totalTime + "ms" + 
                (count > 0 ? ", 平均: " + (totalTime / count) + "ms" : ""));
        }
    }
    
    public static void customMonitoring() {
        System.out.println("\n=== 自定义监控示例 ===");
        
        MonitoredThreadPoolExecutor executor = new MonitoredThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(5)
        );
        
        // 定期输出统计信息
        ScheduledExecutorService statsScheduler = Executors.newScheduledThreadPool(1);
        statsScheduler.scheduleAtFixedRate(executor::printStatistics, 3, 3, TimeUnit.SECONDS);
        
        // 提交任务
        for (int i = 1; i <= 20; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    // 模拟不同的执行时间
                    int sleepTime = (int) (Math.random() * 2000) + 500;
                    Thread.sleep(sleepTime);
                    
                    // 模拟偶尔的异常
                    if (Math.random() < 0.1) {
                        throw new RuntimeException("模拟任务异常");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (RuntimeException e) {
                    throw e; // 重新抛出以便监控
                }
            });
        }
        
        // 等待任务完成
        executor.shutdown();
        try {
            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
        
        statsScheduler.shutdown();
    }
    
    public static void main(String[] args) {
        basicMonitoring();
        customMonitoring();
    }
}

# 6. 最佳实践

# 6.1 线程池配置建议

import java.util.concurrent.*;

public class ThreadPoolBestPractices {
    
    // CPU密集型任务线程池
    public static ExecutorService createCpuIntensivePool() {
        int cpuCount = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cpuCount,                           // 核心线程数 = CPU核心数
            cpuCount,                           // 最大线程数 = CPU核心数
            0L, TimeUnit.MILLISECONDS,          // 无需保持空闲线程
            new LinkedBlockingQueue<>(100),     // 有界队列
            new ThreadFactory() {
                private int counter = 0;
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "CPU-Worker-" + (++counter));
                    t.setDaemon(false);
                    return t;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy() // 调用者运行策略
        );
    }
    
    // IO密集型任务线程池
    public static ExecutorService createIoIntensivePool() {
        int cpuCount = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cpuCount * 2,                       // 核心线程数 = CPU核心数 * 2
            cpuCount * 4,                       // 最大线程数 = CPU核心数 * 4
            60L, TimeUnit.SECONDS,              // 空闲线程保持60秒
            new LinkedBlockingQueue<>(200),     // 较大的队列
            new ThreadFactory() {
                private int counter = 0;
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "IO-Worker-" + (++counter));
                    t.setDaemon(false);
                    return t;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
    
    // 混合型任务线程池
    public static ExecutorService createMixedPool() {
        int cpuCount = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            cpuCount + 1,                       // 核心线程数
            cpuCount * 2 + 1,                   // 最大线程数
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(150),
            new ThreadFactory() {
                private int counter = 0;
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "Mixed-Worker-" + (++counter));
                    t.setDaemon(false);
                    return t;
                }
            },
            new ThreadPoolExecutor.AbortPolicy()
        );
    }
    
    public static void demonstratePoolTypes() {
        System.out.println("CPU核心数: " + Runtime.getRuntime().availableProcessors());
        
        // CPU密集型任务测试
        ExecutorService cpuPool = createCpuIntensivePool();
        System.out.println("\n=== CPU密集型任务测试 ===");
        for (int i = 0; i < 8; i++) {
            cpuPool.submit(() -> {
                // 模拟CPU密集型计算
                long start = System.currentTimeMillis();
                long sum = 0;
                for (int j = 0; j < 100000000; j++) {
                    sum += j;
                }
                long end = System.currentTimeMillis();
                System.out.println("CPU任务完成,耗时: " + (end - start) + "ms, 线程: " + 
                    Thread.currentThread().getName());
            });
        }
        
        // IO密集型任务测试
        ExecutorService ioPool = createIoIntensivePool();
        System.out.println("\n=== IO密集型任务测试 ===");
        for (int i = 0; i < 12; i++) {
            ioPool.submit(() -> {
                try {
                    // 模拟IO操作
                    long start = System.currentTimeMillis();
                    Thread.sleep(1000); // 模拟IO等待
                    long end = System.currentTimeMillis();
                    System.out.println("IO任务完成,耗时: " + (end - start) + "ms, 线程: " + 
                        Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 关闭线程池
        cpuPool.shutdown();
        ioPool.shutdown();
        
        try {
            cpuPool.awaitTermination(10, TimeUnit.SECONDS);
            ioPool.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        demonstratePoolTypes();
    }
}

# 6.2 优雅关闭

import java.util.concurrent.*;
import java.util.List;

public class GracefulShutdownDemo {
    
    public static void demonstrateGracefulShutdown() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10)
        );
        
        // 提交一些长时间运行的任务
        for (int i = 1; i <= 8; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + "开始执行");
                try {
                    for (int j = 0; j < 10; j++) {
                        Thread.sleep(500);
                        System.out.println("任务" + taskId + "进度: " + ((j + 1) * 10) + "%");
                        
                        // 检查中断状态
                        if (Thread.currentThread().isInterrupted()) {
                            System.out.println("任务" + taskId + "被中断");
                            return;
                        }
                    }
                    System.out.println("任务" + taskId + "正常完成");
                } catch (InterruptedException e) {
                    System.out.println("任务" + taskId + "被中断异常");
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 等待一段时间后开始关闭
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("\n开始优雅关闭线程池...");
        gracefulShutdown(executor, 5, TimeUnit.SECONDS);
    }
    
    public static void gracefulShutdown(ExecutorService executor, long timeout, TimeUnit unit) {
        // 第一步:停止接收新任务
        executor.shutdown();
        System.out.println("已调用shutdown(),不再接收新任务");
        
        try {
            // 第二步:等待已提交的任务完成
            if (!executor.awaitTermination(timeout, unit)) {
                System.out.println("等待超时,强制关闭线程池");
                
                // 第三步:取消正在执行的任务
                List<Runnable> pendingTasks = executor.shutdownNow();
                System.out.println("强制关闭,未执行的任务数: " + pendingTasks.size());
                
                // 第四步:再次等待一段时间
                if (!executor.awaitTermination(timeout, unit)) {
                    System.out.println("线程池无法正常终止");
                } else {
                    System.out.println("线程池已强制终止");
                }
            } else {
                System.out.println("线程池已优雅关闭");
            }
        } catch (InterruptedException e) {
            // 当前线程被中断,强制关闭线程池
            System.out.println("关闭过程被中断,强制关闭线程池");
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }