Java并发工具类详解
哪吒 2024/1/15
# Java并发工具类详解
点击勘误issues (opens new window),哪吒感谢大家的阅读
# 1. 概述
Java并发包(java.util.concurrent)提供了丰富的并发工具类,这些工具类可以帮助开发者更容易地编写高效、安全的多线程程序。
# 2. 原子类(Atomic Classes)
# 2.1 AtomicInteger
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerDemo {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // 原子性的自增操作
}
public void decrement() {
count.decrementAndGet(); // 原子性的自减操作
}
public int get() {
return count.get();
}
public boolean compareAndSet(int expect, int update) {
return count.compareAndSet(expect, update);
}
public static void main(String[] args) throws InterruptedException {
AtomicIntegerDemo demo = new AtomicIntegerDemo();
// 创建多个线程进行并发操作
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
demo.increment();
}
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终计数: " + demo.get()); // 10000
}
}
# 2.2 AtomicReference
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceDemo {
private static class Node {
String data;
Node next;
Node(String data) {
this.data = data;
}
@Override
public String toString() {
return data;
}
}
private AtomicReference<Node> head = new AtomicReference<>();
public void addFirst(String data) {
Node newNode = new Node(data);
Node currentHead;
do {
currentHead = head.get();
newNode.next = currentHead;
} while (!head.compareAndSet(currentHead, newNode));
}
public Node removeFirst() {
Node currentHead;
Node newHead;
do {
currentHead = head.get();
if (currentHead == null) {
return null;
}
newHead = currentHead.next;
} while (!head.compareAndSet(currentHead, newHead));
return currentHead;
}
public static void main(String[] args) {
AtomicReferenceDemo stack = new AtomicReferenceDemo();
// 并发添加元素
for (int i = 0; i < 5; i++) {
final int value = i;
new Thread(() -> {
stack.addFirst("Item-" + value);
System.out.println("添加: Item-" + value);
}).start();
}
// 等待一段时间后移除元素
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Node removed = stack.removeFirst();
if (removed != null) {
System.out.println("移除: " + removed);
}
}).start();
}
}
}
# 2.3 AtomicStampedReference
import java.util.concurrent.atomic.AtomicStampedReference;
public class AtomicStampedReferenceDemo {
private AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<>(100, 0);
public void demonstrateABA() {
Thread thread1 = new Thread(() -> {
int stamp = atomicStampedRef.getStamp();
System.out.println("Thread1 初始stamp: " + stamp);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 尝试CAS操作
boolean success = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
System.out.println("Thread1 CAS结果: " + success);
});
Thread thread2 = new Thread(() -> {
int stamp = atomicStampedRef.getStamp();
System.out.println("Thread2 初始stamp: " + stamp);
// 修改值
atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
System.out.println("Thread2 第一次修改: 100 -> 101");
// 再次修改回原值
stamp = atomicStampedRef.getStamp();
atomicStampedRef.compareAndSet(101, 100, stamp, stamp + 1);
System.out.println("Thread2 第二次修改: 101 -> 100");
});
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("最终值: " + atomicStampedRef.getReference());
System.out.println("最终stamp: " + atomicStampedRef.getStamp());
}
public static void main(String[] args) {
new AtomicStampedReferenceDemo().demonstrateABA();
}
}
# 3. 同步工具类
# 3.1 CountDownLatch
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchDemo {
public static void raceExample() {
final int runnerCount = 5;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(runnerCount);
// 创建跑步者线程
for (int i = 0; i < runnerCount; i++) {
final int runnerId = i + 1;
new Thread(() -> {
try {
System.out.println("跑步者" + runnerId + "准备就绪");
startSignal.await(); // 等待起跑信号
// 模拟跑步时间
int runTime = (int) (Math.random() * 3000) + 1000;
Thread.sleep(runTime);
System.out.println("跑步者" + runnerId + "完成比赛,用时" + runTime + "ms");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneSignal.countDown(); // 完成一个
}
}).start();
}
try {
Thread.sleep(1000);
System.out.println("裁判发出起跑信号!");
startSignal.countDown(); // 发出起跑信号
doneSignal.await(); // 等待所有跑步者完成
System.out.println("所有跑步者都完成了比赛!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void serviceStartupExample() {
final int serviceCount = 3;
CountDownLatch latch = new CountDownLatch(serviceCount);
// 启动多个服务
new Thread(() -> {
try {
System.out.println("数据库服务启动中...");
Thread.sleep(2000);
System.out.println("数据库服务启动完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}).start();
new Thread(() -> {
try {
System.out.println("缓存服务启动中...");
Thread.sleep(1500);
System.out.println("缓存服务启动完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}).start();
new Thread(() -> {
try {
System.out.println("消息队列服务启动中...");
Thread.sleep(1000);
System.out.println("消息队列服务启动完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}).start();
try {
System.out.println("等待所有服务启动...");
latch.await(5, TimeUnit.SECONDS); // 最多等待5秒
System.out.println("所有服务启动完成,应用程序可以开始工作了!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
System.out.println("=== 赛跑示例 ===");
raceExample();
System.out.println("\n=== 服务启动示例 ===");
serviceStartupExample();
}
}
# 3.2 CyclicBarrier
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void teamWorkExample() {
final int teamSize = 4;
CyclicBarrier barrier = new CyclicBarrier(teamSize, () -> {
System.out.println("所有队员都到达集合点,开始下一阶段任务!");
});
for (int i = 0; i < teamSize; i++) {
final int memberId = i + 1;
new Thread(() -> {
try {
// 第一阶段任务
int workTime = (int) (Math.random() * 2000) + 1000;
System.out.println("队员" + memberId + "开始第一阶段任务");
Thread.sleep(workTime);
System.out.println("队员" + memberId + "完成第一阶段任务,等待其他队员");
barrier.await(); // 等待所有队员完成第一阶段
// 第二阶段任务
workTime = (int) (Math.random() * 2000) + 1000;
System.out.println("队员" + memberId + "开始第二阶段任务");
Thread.sleep(workTime);
System.out.println("队员" + memberId + "完成第二阶段任务,等待其他队员");
barrier.await(); // 等待所有队员完成第二阶段
System.out.println("队员" + memberId + ":所有任务完成!");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
public static void matrixCalculationExample() {
final int rows = 4;
final int cols = 4;
final int[][] matrix = new int[rows][cols];
// 初始化矩阵
for (int i = 0; i < rows; i++) {
for (int j = 0; j < cols; j++) {
matrix[i][j] = (int) (Math.random() * 10);
}
}
CyclicBarrier barrier = new CyclicBarrier(rows, () -> {
System.out.println("所有行计算完成,开始汇总结果");
// 打印矩阵
for (int i = 0; i < rows; i++) {
for (int j = 0; j < cols; j++) {
System.out.print(matrix[i][j] + " ");
}
System.out.println();
}
});
// 每个线程处理一行
for (int i = 0; i < rows; i++) {
final int row = i;
new Thread(() -> {
try {
System.out.println("线程" + row + "开始处理第" + row + "行");
// 模拟计算过程
for (int j = 0; j < cols; j++) {
matrix[row][j] *= 2; // 简单的计算:乘以2
Thread.sleep(100); // 模拟计算耗时
}
System.out.println("线程" + row + "完成第" + row + "行处理");
barrier.await(); // 等待其他线程完成
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
public static void main(String[] args) {
System.out.println("=== 团队协作示例 ===");
teamWorkExample();
try {
Thread.sleep(8000); // 等待第一个示例完成
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n=== 矩阵计算示例 ===");
matrixCalculationExample();
}
}
# 3.3 Semaphore
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
// 停车场示例
public static class ParkingLot {
private final Semaphore semaphore;
private final int capacity;
public ParkingLot(int capacity) {
this.capacity = capacity;
this.semaphore = new Semaphore(capacity);
}
public boolean park(String carId) {
try {
if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) {
System.out.println(carId + " 成功停车,剩余车位: " + semaphore.availablePermits());
return true;
} else {
System.out.println(carId + " 停车失败,车位已满");
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
public void leave(String carId) {
semaphore.release();
System.out.println(carId + " 离开停车场,剩余车位: " + semaphore.availablePermits());
}
}
public static void parkingLotExample() {
ParkingLot parkingLot = new ParkingLot(3); // 3个车位
// 模拟5辆车尝试停车
for (int i = 1; i <= 5; i++) {
final String carId = "Car-" + i;
new Thread(() -> {
if (parkingLot.park(carId)) {
try {
// 停车一段时间
Thread.sleep((int) (Math.random() * 3000) + 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
parkingLot.leave(carId);
}
}
}).start();
}
}
// 数据库连接池示例
public static class DatabaseConnectionPool {
private final Semaphore semaphore;
private final String[] connections;
private final boolean[] used;
public DatabaseConnectionPool(int poolSize) {
this.semaphore = new Semaphore(poolSize);
this.connections = new String[poolSize];
this.used = new boolean[poolSize];
// 初始化连接
for (int i = 0; i < poolSize; i++) {
connections[i] = "Connection-" + (i + 1);
}
}
public String getConnection() throws InterruptedException {
semaphore.acquire();
synchronized (this) {
for (int i = 0; i < connections.length; i++) {
if (!used[i]) {
used[i] = true;
System.out.println(Thread.currentThread().getName() + " 获取连接: " + connections[i]);
return connections[i];
}
}
}
return null;
}
public void releaseConnection(String connection) {
synchronized (this) {
for (int i = 0; i < connections.length; i++) {
if (connections[i].equals(connection) && used[i]) {
used[i] = false;
System.out.println(Thread.currentThread().getName() + " 释放连接: " + connection);
semaphore.release();
break;
}
}
}
}
}
public static void connectionPoolExample() {
DatabaseConnectionPool pool = new DatabaseConnectionPool(2); // 2个连接
// 模拟5个线程使用连接池
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
String connection = pool.getConnection();
if (connection != null) {
// 使用连接执行数据库操作
Thread.sleep((int) (Math.random() * 2000) + 1000);
pool.releaseConnection(connection);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Thread-" + i).start();
}
}
public static void main(String[] args) {
System.out.println("=== 停车场示例 ===");
parkingLotExample();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n=== 数据库连接池示例 ===");
connectionPoolExample();
}
}
# 3.4 Exchanger
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void dataExchangeExample() {
Exchanger<String> exchanger = new Exchanger<>();
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 3; i++) {
String data = "Data-" + i;
System.out.println("生产者生产: " + data);
// 与消费者交换数据
String received = exchanger.exchange(data);
System.out.println("生产者收到确认: " + received);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 3; i++) {
// 与生产者交换数据
String received = exchanger.exchange("ACK-" + i);
System.out.println("消费者接收: " + received);
// 处理数据
Thread.sleep(500);
System.out.println("消费者处理完成: " + received);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void bufferExchangeExample() {
Exchanger<StringBuilder> exchanger = new Exchanger<>();
// 写入线程
Thread writer = new Thread(() -> {
StringBuilder buffer = new StringBuilder();
try {
for (int i = 1; i <= 5; i++) {
// 写入数据到缓冲区
buffer.append("Message-").append(i).append("\n");
System.out.println("写入: Message-" + i);
if (i % 2 == 0) { // 每写入2条消息就交换缓冲区
System.out.println("交换缓冲区...");
buffer = exchanger.exchange(buffer);
buffer.setLength(0); // 清空新缓冲区
}
Thread.sleep(500);
}
// 交换最后的缓冲区
if (buffer.length() > 0) {
exchanger.exchange(buffer);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 读取线程
Thread reader = new Thread(() -> {
StringBuilder buffer = new StringBuilder();
try {
for (int i = 0; i < 3; i++) { // 预期交换3次
buffer = exchanger.exchange(buffer);
if (buffer.length() > 0) {
System.out.println("读取到数据:\n" + buffer.toString());
buffer.setLength(0); // 清空缓冲区
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
writer.start();
reader.start();
try {
writer.join();
reader.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
System.out.println("=== 数据交换示例 ===");
dataExchangeExample();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n=== 缓冲区交换示例 ===");
bufferExchangeExample();
}
}
# 4. 并发集合
# 4.1 ConcurrentHashMap
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ConcurrentHashMapDemo {
public static void basicOperations() {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 基本操作
map.put("key1", 1);
map.put("key2", 2);
map.put("key3", 3);
// 原子操作
map.putIfAbsent("key4", 4); // 如果不存在则放入
map.replace("key1", 1, 10); // 如果当前值为1,则替换为10
// 计算操作
map.compute("key2", (key, value) -> value == null ? 1 : value * 2);
map.computeIfAbsent("key5", key -> key.length());
map.computeIfPresent("key3", (key, value) -> value + 100);
// 合并操作
map.merge("key1", 5, Integer::sum); // 如果存在则相加,否则使用新值
System.out.println("Map内容: " + map);
}
public static void concurrentAccess() {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(10);
// 并发写入
for (int i = 0; i < 100; i++) {
final int value = i;
executor.submit(() -> {
map.put("key" + value, value);
// 原子性增加计数
map.compute("counter", (key, val) -> val == null ? 1 : val + 1);
});
}
// 并发读取
for (int i = 0; i < 50; i++) {
executor.submit(() -> {
map.forEach((key, value) -> {
if (key.startsWith("key") && value % 10 == 0) {
System.out.println("Found: " + key + " = " + value);
}
});
});
}
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("最终大小: " + map.size());
System.out.println("计数器值: " + map.get("counter"));
}
public static void main(String[] args) {
System.out.println("=== 基本操作示例 ===");
basicOperations();
System.out.println("\n=== 并发访问示例 ===");
concurrentAccess();
}
}
# 4.2 BlockingQueue
import java.util.concurrent.*;
public class BlockingQueueDemo {
public static void arrayBlockingQueueExample() {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// 生产者
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
String item = "Item-" + i;
queue.put(item); // 阻塞式放入
System.out.println("生产: " + item + ", 队列大小: " + queue.size());
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者
Thread consumer = new Thread(() -> {
try {
Thread.sleep(2000); // 延迟启动
for (int i = 1; i <= 5; i++) {
String item = queue.take(); // 阻塞式取出
System.out.println("消费: " + item + ", 队列大小: " + queue.size());
Thread.sleep(1500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void priorityBlockingQueueExample() {
BlockingQueue<Task> queue = new PriorityBlockingQueue<>();
// 任务类
class Task implements Comparable<Task> {
private String name;
private int priority;
public Task(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public int compareTo(Task other) {
return Integer.compare(other.priority, this.priority); // 高优先级优先
}
@Override
public String toString() {
return name + "(优先级:" + priority + ")";
}
}
// 添加任务
queue.offer(new Task("任务A", 3));
queue.offer(new Task("任务B", 1));
queue.offer(new Task("任务C", 5));
queue.offer(new Task("任务D", 2));
queue.offer(new Task("任务E", 4));
// 按优先级处理任务
while (!queue.isEmpty()) {
try {
Task task = queue.take();
System.out.println("处理: " + task);
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
public static void delayQueueExample() {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
// 延迟任务类
class DelayedTask implements Delayed {
private String name;
private long executeTime;
public DelayedTask(String name, long delayInMillis) {
this.name = name;
this.executeTime = System.currentTimeMillis() + delayInMillis;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = executeTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);
}
@Override
public String toString() {
return name;
}
}
// 添加延迟任务
queue.offer(new DelayedTask("任务1", 3000)); // 3秒后执行
queue.offer(new DelayedTask("任务2", 1000)); // 1秒后执行
queue.offer(new DelayedTask("任务3", 2000)); // 2秒后执行
System.out.println("开始时间: " + System.currentTimeMillis());
// 处理延迟任务
while (!queue.isEmpty()) {
try {
DelayedTask task = queue.take(); // 会等待到任务可执行时间
System.out.println("执行: " + task + ", 时间: " + System.currentTimeMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
public static void main(String[] args) {
System.out.println("=== ArrayBlockingQueue示例 ===");
arrayBlockingQueueExample();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n=== PriorityBlockingQueue示例 ===");
priorityBlockingQueueExample();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\n=== DelayQueue示例 ===");
delayQueueExample();
}
}
# 5. 总结
本文详细介绍了Java并发工具类,包括:
- 原子类:AtomicInteger、AtomicReference、AtomicStampedReference等
- 同步工具类:CountDownLatch、CyclicBarrier、Semaphore、Exchanger
- 并发集合:ConcurrentHashMap、BlockingQueue等
这些工具类为并发编程提供了强大的支持,能够帮助开发者编写更安全、更高效的多线程程序。在实际开发中,应该根据具体需求选择合适的工具类。