java并发容器实现原理
哪吒 2023/6/15
# Java并发容器实现原理
# 1. 并发容器概述
# 1.1 为什么需要并发容器
在多线程环境下,传统的集合类(如ArrayList、HashMap等)不是线程安全的,直接使用会导致数据不一致、死循环等问题。虽然可以使用Collections.synchronizedXxx()方法包装,但性能较差。
// 传统同步方式 - 性能较差
List<String> syncList = Collections.synchronizedList(new ArrayList<>());
Map<String, String> syncMap = Collections.synchronizedMap(new HashMap<>());
// 并发容器 - 高性能
List<String> concurrentList = new CopyOnWriteArrayList<>();
Map<String, String> concurrentMap = new ConcurrentHashMap<>();
# 1.2 并发容器的分类
- 并发Map:ConcurrentHashMap、ConcurrentSkipListMap
- 并发List:CopyOnWriteArrayList
- 并发Set:CopyOnWriteArraySet、ConcurrentSkipListSet
- 阻塞队列:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue等
- 非阻塞队列:ConcurrentLinkedQueue
# 2. ConcurrentHashMap实现原理
# 2.1 JDK 1.7 vs JDK 1.8的区别
JDK 1.7:分段锁(Segment)
// JDK 1.7 结构示意
public class ConcurrentHashMap<K,V> {
// Segment数组,每个Segment包含一个HashEntry数组
final Segment<K,V>[] segments;
static final class Segment<K,V> extends ReentrantLock {
transient volatile HashEntry<K,V>[] table;
transient int count;
}
}
JDK 1.8:CAS + synchronized
// JDK 1.8 结构示意
public class ConcurrentHashMap<K,V> {
// Node数组
transient volatile Node<K,V>[] table;
static class Node<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
}
# 2.2 JDK 1.8 ConcurrentHashMap详解
# 2.2.1 put操作实现
public class ConcurrentHashMapDemo {
public static void main(String[] args) {
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
// 多线程并发put
for (int i = 0; i < 10; i++) {
final int index = i;
new Thread(() -> {
map.put("key" + index, "value" + index);
System.out.println(Thread.currentThread().getName() +
" put key" + index);
}, "Thread-" + i).start();
}
}
}
# 2.2.2 核心实现机制
// put操作的核心逻辑(简化版)
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 初始化表
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 位置为空,使用CAS插入
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 协助扩容
else {
V oldVal = null;
synchronized (f) { // 锁定头节点
// 链表或红黑树操作
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 链表操作
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 红黑树操作
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 转换为红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
# 2.3 扩容机制
public class ConcurrentHashMapResizeDemo {
public static void main(String[] args) {
ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>(4);
// 观察扩容过程
for (int i = 0; i < 20; i++) {
map.put(i, "value" + i);
System.out.println("Size: " + map.size() +
", Capacity: " + getCapacity(map));
}
}
// 通过反射获取容量(仅用于演示)
private static int getCapacity(ConcurrentHashMap<?, ?> map) {
try {
Field tableField = ConcurrentHashMap.class.getDeclaredField("table");
tableField.setAccessible(true);
Object[] table = (Object[]) tableField.get(map);
return table == null ? 0 : table.length;
} catch (Exception e) {
return -1;
}
}
}
# 3. CopyOnWriteArrayList实现原理
# 3.1 写时复制机制
CopyOnWriteArrayList采用写时复制(Copy-On-Write)策略,读操作不加锁,写操作时复制整个数组。
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) throws InterruptedException {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 添加初始数据
list.add("item1");
list.add("item2");
list.add("item3");
// 读线程 - 不加锁,性能高
Thread readerThread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("Reader: " + list);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// 写线程 - 写时复制
Thread writerThread = new Thread(() -> {
for (int i = 0; i < 5; i++) {
list.add("newItem" + i);
System.out.println("Writer added: newItem" + i);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
readerThread.start();
writerThread.start();
readerThread.join();
writerThread.join();
}
}
# 3.2 核心实现源码分析
// CopyOnWriteArrayList的add方法实现
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 写操作加锁
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 复制数组
newElements[len] = e; // 添加新元素
setArray(newElements); // 设置新数组
return true;
} finally {
lock.unlock();
}
}
// 读操作不加锁
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}
# 3.3 适用场景分析
public class CopyOnWriteScenarioDemo {
public static void main(String[] args) {
// 适用场景:读多写少
CopyOnWriteArrayList<String> eventListeners = new CopyOnWriteArrayList<>();
// 注册监听器(写操作较少)
eventListeners.add("EmailListener");
eventListeners.add("SMSListener");
eventListeners.add("LogListener");
// 模拟事件触发(读操作频繁)
for (int i = 0; i < 1000; i++) {
// 遍历所有监听器处理事件
for (String listener : eventListeners) {
// 处理事件
processEvent(listener, "Event-" + i);
}
}
}
private static void processEvent(String listener, String event) {
// 模拟事件处理
System.out.println(listener + " processing " + event);
}
}
# 4. 阻塞队列实现原理
# 4.1 ArrayBlockingQueue
基于数组的有界阻塞队列,使用ReentrantLock和Condition实现阻塞。
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
String item = "Item-" + i;
queue.put(item); // 队列满时阻塞
System.out.println("Produced: " + item +
", Queue size: " + queue.size());
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
String item = queue.take(); // 队列空时阻塞
System.out.println("Consumed: " + item +
", Queue size: " + queue.size());
Thread.sleep(300); // 消费较慢
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
# 4.2 LinkedBlockingQueue
基于链表的可选有界阻塞队列,读写使用不同的锁,性能更好。
public class LinkedBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 无界队列(实际上有界,最大容量为Integer.MAX_VALUE)
LinkedBlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>();
// 工作线程池
List<Thread> workers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Thread worker = new Thread(new Worker(taskQueue), "Worker-" + i);
workers.add(worker);
worker.start();
}
// 提交任务
for (int i = 0; i < 10; i++) {
taskQueue.offer(new Task("Task-" + i));
}
// 等待一段时间后停止
Thread.sleep(5000);
// 停止工作线程
for (Thread worker : workers) {
worker.interrupt();
}
}
static class Task {
private final String name;
public Task(String name) {
this.name = name;
}
public void execute() {
System.out.println(Thread.currentThread().getName() +
" executing " + name);
try {
Thread.sleep(1000); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public String toString() {
return name;
}
}
static class Worker implements Runnable {
private final LinkedBlockingQueue<Task> taskQueue;
public Worker(LinkedBlockingQueue<Task> taskQueue) {
this.taskQueue = taskQueue;
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Task task = taskQueue.take(); // 阻塞获取任务
task.execute();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(Thread.currentThread().getName() + " stopped");
}
}
}
}
# 4.3 PriorityBlockingQueue
基于优先级堆的无界阻塞队列,元素按优先级排序。
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<PriorityTask> queue = new PriorityBlockingQueue<>();
// 添加不同优先级的任务
queue.offer(new PriorityTask("Low Priority Task", 1));
queue.offer(new PriorityTask("High Priority Task", 10));
queue.offer(new PriorityTask("Medium Priority Task", 5));
queue.offer(new PriorityTask("Urgent Task", 20));
// 按优先级处理任务
while (!queue.isEmpty()) {
PriorityTask task = queue.take();
System.out.println("Processing: " + task);
Thread.sleep(500);
}
}
static class PriorityTask implements Comparable<PriorityTask> {
private final String name;
private final int priority;
public PriorityTask(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public int compareTo(PriorityTask other) {
// 优先级高的排在前面
return Integer.compare(other.priority, this.priority);
}
@Override
public String toString() {
return name + " (Priority: " + priority + ")";
}
}
}
# 5. 非阻塞队列 - ConcurrentLinkedQueue
# 5.1 无锁实现原理
ConcurrentLinkedQueue使用CAS操作实现无锁的并发队列。
public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// 多个生产者线程
List<Thread> producers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
final int producerId = i;
Thread producer = new Thread(() -> {
for (int j = 0; j < 5; j++) {
String item = "Producer-" + producerId + "-Item-" + j;
queue.offer(item);
System.out.println("Offered: " + item);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
producers.add(producer);
producer.start();
}
// 多个消费者线程
List<Thread> consumers = new ArrayList<>();
for (int i = 0; i < 2; i++) {
final int consumerId = i;
Thread consumer = new Thread(() -> {
for (int j = 0; j < 7; j++) {
String item = queue.poll();
if (item != null) {
System.out.println("Consumer-" + consumerId +
" polled: " + item);
} else {
System.out.println("Consumer-" + consumerId +
" found empty queue");
j--; // 重试
}
try {
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
consumers.add(consumer);
consumer.start();
}
// 等待所有线程完成
for (Thread producer : producers) {
producer.join();
}
for (Thread consumer : consumers) {
consumer.join();
}
System.out.println("Remaining items in queue: " + queue.size());
}
}
# 5.2 CAS操作示例
// ConcurrentLinkedQueue的offer方法核心逻辑(简化版)
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是最后一个节点,尝试CAS链接新节点
if (p.casNext(null, newNode)) {
// 成功链接,可能需要更新tail
if (p != t)
casTail(t, newNode);
return true;
}
}
else if (p == q) {
// 遇到哨兵节点,重新开始
p = (t != (t = tail)) ? t : head;
}
else {
// 向前推进
p = (p != t && t != (t = tail)) ? t : q;
}
}
}
# 6. ConcurrentSkipListMap实现原理
# 6.1 跳表数据结构
ConcurrentSkipListMap基于跳表(Skip List)实现,提供有序的并发Map。
public class ConcurrentSkipListMapDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentSkipListMap<Integer, String> skipListMap = new ConcurrentSkipListMap<>();
// 多线程并发插入
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final int threadId = i;
Thread thread = new Thread(() -> {
for (int j = 0; j < 10; j++) {
int key = threadId * 10 + j;
skipListMap.put(key, "Value-" + key);
System.out.println(Thread.currentThread().getName() +
" put: " + key + " -> Value-" + key);
}
}, "Thread-" + i);
threads.add(thread);
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}
// 验证有序性
System.out.println("\nOrdered entries:");
skipListMap.entrySet().forEach(entry ->
System.out.println(entry.getKey() + " -> " + entry.getValue()));
// 范围查询
System.out.println("\nRange query (15-25):");
skipListMap.subMap(15, 26).entrySet().forEach(entry ->
System.out.println(entry.getKey() + " -> " + entry.getValue()));
}
}
# 6.2 跳表的优势
public class SkipListAdvantageDemo {
public static void main(String[] args) {
ConcurrentSkipListMap<String, Integer> scoreMap = new ConcurrentSkipListMap<>();
// 添加学生成绩
scoreMap.put("Alice", 95);
scoreMap.put("Bob", 87);
scoreMap.put("Charlie", 92);
scoreMap.put("David", 78);
scoreMap.put("Eve", 89);
// 按字母顺序遍历
System.out.println("Students in alphabetical order:");
scoreMap.entrySet().forEach(entry ->
System.out.println(entry.getKey() + ": " + entry.getValue()));
// 获取第一个和最后一个
System.out.println("\nFirst student: " + scoreMap.firstKey());
System.out.println("Last student: " + scoreMap.lastKey());
// 范围查询
System.out.println("\nStudents from 'B' to 'D':");
scoreMap.subMap("B", "E").entrySet().forEach(entry ->
System.out.println(entry.getKey() + ": " + entry.getValue()));
}
}
# 7. 性能比较分析
# 7.1 并发容器性能测试
public class ConcurrentContainerPerformanceTest {
private static final int THREAD_COUNT = 10;
private static final int OPERATIONS_PER_THREAD = 10000;
public static void main(String[] args) throws InterruptedException {
System.out.println("=== 并发容器性能测试 ===");
// 测试ConcurrentHashMap
testConcurrentHashMap();
// 测试CopyOnWriteArrayList
testCopyOnWriteArrayList();
// 测试阻塞队列
testBlockingQueues();
}
private static void testConcurrentHashMap() throws InterruptedException {
System.out.println("\n--- ConcurrentHashMap vs Synchronized HashMap ---");
// 测试ConcurrentHashMap
ConcurrentHashMap<Integer, String> concurrentMap = new ConcurrentHashMap<>();
long startTime = System.currentTimeMillis();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
Thread thread = new Thread(() -> {
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
int key = threadId * OPERATIONS_PER_THREAD + j;
concurrentMap.put(key, "value" + key);
concurrentMap.get(key);
}
});
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
long concurrentTime = System.currentTimeMillis() - startTime;
System.out.println("ConcurrentHashMap time: " + concurrentTime + "ms");
// 测试Synchronized HashMap
Map<Integer, String> syncMap = Collections.synchronizedMap(new HashMap<>());
startTime = System.currentTimeMillis();
threads.clear();
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
Thread thread = new Thread(() -> {
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
int key = threadId * OPERATIONS_PER_THREAD + j;
syncMap.put(key, "value" + key);
syncMap.get(key);
}
});
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
long syncTime = System.currentTimeMillis() - startTime;
System.out.println("Synchronized HashMap time: " + syncTime + "ms");
System.out.println("Performance improvement: " +
String.format("%.2f", (double) syncTime / concurrentTime) + "x");
}
private static void testCopyOnWriteArrayList() throws InterruptedException {
System.out.println("\n--- CopyOnWriteArrayList vs Synchronized ArrayList ---");
// 读多写少场景测试
CopyOnWriteArrayList<String> cowList = new CopyOnWriteArrayList<>();
List<String> syncList = Collections.synchronizedList(new ArrayList<>());
// 预填充数据
for (int i = 0; i < 1000; i++) {
cowList.add("item" + i);
syncList.add("item" + i);
}
// 测试CopyOnWriteArrayList(90%读,10%写)
long startTime = System.currentTimeMillis();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < THREAD_COUNT; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
if (j % 10 == 0) {
// 10%写操作
cowList.add("newItem" + j);
} else {
// 90%读操作
if (!cowList.isEmpty()) {
cowList.get(j % cowList.size());
}
}
}
});
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
long cowTime = System.currentTimeMillis() - startTime;
System.out.println("CopyOnWriteArrayList time: " + cowTime + "ms");
// 测试Synchronized ArrayList
startTime = System.currentTimeMillis();
threads.clear();
for (int i = 0; i < THREAD_COUNT; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
if (j % 10 == 0) {
// 10%写操作
syncList.add("newItem" + j);
} else {
// 90%读操作
synchronized (syncList) {
if (!syncList.isEmpty()) {
syncList.get(j % syncList.size());
}
}
}
}
});
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
long syncTime = System.currentTimeMillis() - startTime;
System.out.println("Synchronized ArrayList time: " + syncTime + "ms");
System.out.println("Performance improvement: " +
String.format("%.2f", (double) syncTime / cowTime) + "x");
}
private static void testBlockingQueues() throws InterruptedException {
System.out.println("\n--- ArrayBlockingQueue vs LinkedBlockingQueue ---");
// 测试ArrayBlockingQueue
ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(1000);
long startTime = System.currentTimeMillis();
Thread producer1 = new Thread(() -> {
try {
for (int i = 0; i < OPERATIONS_PER_THREAD; i++) {
arrayQueue.put("item" + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer1 = new Thread(() -> {
try {
for (int i = 0; i < OPERATIONS_PER_THREAD; i++) {
arrayQueue.take();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer1.start();
consumer1.start();
producer1.join();
consumer1.join();
long arrayTime = System.currentTimeMillis() - startTime;
System.out.println("ArrayBlockingQueue time: " + arrayTime + "ms");
// 测试LinkedBlockingQueue
LinkedBlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>();
startTime = System.currentTimeMillis();
Thread producer2 = new Thread(() -> {
try {
for (int i = 0; i < OPERATIONS_PER_THREAD; i++) {
linkedQueue.put("item" + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread consumer2 = new Thread(() -> {
try {
for (int i = 0; i < OPERATIONS_PER_THREAD; i++) {
linkedQueue.take();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer2.start();
consumer2.start();
producer2.join();
consumer2.join();
long linkedTime = System.currentTimeMillis() - startTime;
System.out.println("LinkedBlockingQueue time: " + linkedTime + "ms");
}
}
# 8. 最佳实践与选择指南
# 8.1 容器选择决策树
public class ContainerSelectionGuide {
public static void main(String[] args) {
System.out.println("=== 并发容器选择指南 ===");
// Map类型选择
System.out.println("\n--- Map类型选择 ---");
System.out.println("1. 需要排序 -> ConcurrentSkipListMap");
System.out.println("2. 高并发读写 -> ConcurrentHashMap");
System.out.println("3. 简单同步 -> Collections.synchronizedMap()");
// List类型选择
System.out.println("\n--- List类型选择 ---");
System.out.println("1. 读多写少 -> CopyOnWriteArrayList");
System.out.println("2. 频繁修改 -> Collections.synchronizedList()");
System.out.println("3. 单线程 -> ArrayList");
// Queue类型选择
System.out.println("\n--- Queue类型选择 ---");
System.out.println("1. 生产者-消费者模式 -> BlockingQueue");
System.out.println(" - 有界队列 -> ArrayBlockingQueue");
System.out.println(" - 无界队列 -> LinkedBlockingQueue");
System.out.println(" - 优先级队列 -> PriorityBlockingQueue");
System.out.println("2. 高性能无锁 -> ConcurrentLinkedQueue");
demonstrateSelectionCriteria();
}
private static void demonstrateSelectionCriteria() {
System.out.println("\n=== 实际应用场景示例 ===");
// 场景1:缓存系统
System.out.println("\n场景1:缓存系统");
ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();
cache.put("user:123", new User("Alice"));
System.out.println("推荐:ConcurrentHashMap - 高并发读写性能");
// 场景2:事件监听器
System.out.println("\n场景2:事件监听器");
CopyOnWriteArrayList<EventListener> listeners = new CopyOnWriteArrayList<>();
listeners.add(new EmailListener());
listeners.add(new LogListener());
System.out.println("推荐:CopyOnWriteArrayList - 读多写少,遍历安全");
// 场景3:任务队列
System.out.println("\n场景3:任务队列");
LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
taskQueue.offer(() -> System.out.println("执行任务"));
System.out.println("推荐:LinkedBlockingQueue - 生产者消费者模式");
// 场景4:排行榜
System.out.println("\n场景4:排行榜");
ConcurrentSkipListMap<Integer, String> leaderboard = new ConcurrentSkipListMap<>(
Collections.reverseOrder());
leaderboard.put(100, "Player1");
leaderboard.put(95, "Player2");
System.out.println("推荐:ConcurrentSkipListMap - 需要排序的并发Map");
}
static class User {
private String name;
public User(String name) { this.name = name; }
@Override
public String toString() { return "User{name='" + name + "'}"; }
}
interface EventListener {
void onEvent(String event);
}
static class EmailListener implements EventListener {
@Override
public void onEvent(String event) {
System.out.println("Email notification: " + event);
}
}
static class LogListener implements EventListener {
@Override
public void onEvent(String event) {
System.out.println("Log event: " + event);
}
}
}
# 8.2 性能优化技巧
public class PerformanceOptimizationTips {
public static void main(String[] args) {
System.out.println("=== 并发容器性能优化技巧 ===");
// 技巧1:合理设置初始容量
optimizeInitialCapacity();
// 技巧2:减少锁竞争
reduceLockContention();
// 技巧3:批量操作
batchOperations();
}
private static void optimizeInitialCapacity() {
System.out.println("\n--- 技巧1:合理设置初始容量 ---");
// 不好的做法:使用默认容量
ConcurrentHashMap<String, String> map1 = new ConcurrentHashMap<>();
// 好的做法:预估容量
int expectedSize = 10000;
ConcurrentHashMap<String, String> map2 = new ConcurrentHashMap<>(
expectedSize, 0.75f, Runtime.getRuntime().availableProcessors());
System.out.println("预估容量可以减少扩容操作,提高性能");
}
private static void reduceLockContention() {
System.out.println("\n--- 技巧2:减少锁竞争 ---");
ConcurrentHashMap<String, AtomicLong> counters = new ConcurrentHashMap<>();
// 使用原子类减少锁竞争
String key = "counter";
counters.putIfAbsent(key, new AtomicLong(0));
// 多线程安全的计数器
for (int i = 0; i < 10; i++) {
new Thread(() -> {
AtomicLong counter = counters.get(key);
for (int j = 0; j < 1000; j++) {
counter.incrementAndGet();
}
}).start();
}
System.out.println("使用原子类可以减少锁竞争");
}
private static void batchOperations() {
System.out.println("\n--- 技巧3:批量操作 ---");
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 批量添加
List<String> batch = Arrays.asList("item1", "item2", "item3");
queue.addAll(batch);
// 批量移除
List<String> result = new ArrayList<>();
queue.drainTo(result, 10);
System.out.println("批量操作可以减少锁获取次数:" + result);
}
}
# 9. 总结
# 9.1 核心要点
- ConcurrentHashMap:JDK 1.8使用CAS + synchronized,性能优异
- CopyOnWriteArrayList:写时复制,适合读多写少场景
- 阻塞队列:生产者-消费者模式的首选,支持阻塞操作
- 非阻塞队列:高性能无锁实现,适合高并发场景
- ConcurrentSkipListMap:有序的并发Map,基于跳表实现
# 9.2 选择建议
场景 | 推荐容器 | 原因 |
---|---|---|
高并发缓存 | ConcurrentHashMap | 读写性能优异 |
事件监听器 | CopyOnWriteArrayList | 读多写少,遍历安全 |
任务队列 | LinkedBlockingQueue | 生产者消费者模式 |
排行榜 | ConcurrentSkipListMap | 需要排序 |
高性能队列 | ConcurrentLinkedQueue | 无锁实现 |
# 9.3 性能考虑
- 内存开销:CopyOnWrite容器内存开销较大
- 写性能:CopyOnWrite容器写性能较差
- 读性能:ConcurrentHashMap读性能最优
- 扩容成本:预设合理初始容量
- 锁竞争:选择合适的并发级别
通过合理选择和使用并发容器,可以显著提高多线程应用的性能和稳定性。关键是要根据具体的使用场景和性能要求来选择最适合的容器类型。