导语
某交易平台因并发集合误用导致千万级数据错乱!本文通过线程安全失效、性能反降、内存泄漏三大真实案例,揭秘ConcurrentHashMap、CopyOnWriteArrayList等容器的隐藏陷阱。文末附并发安全自检清单。
一、ConcurrentHashMap的线程安全幻觉
灾难现场:
账户系统出现余额为负,排查发现复合操作未加锁:
ConcurrentMap<String, Integer> accounts = new ConcurrentHashMap<>();
// 错误:非原子操作
public void transfer(String from, String to, int amount) {
if (accounts.get(from) >= amount) { // 步骤1
accounts.put(from, accounts.get(from) - amount); // 步骤2
accounts.put(to, accounts.get(to) + amount);
}
}
// 高并发时步骤1和步骤2之间状态可能被修改
解决方案:
// 1. 使用compute原子方法(JDK8+)
accounts.compute(from, (k, v) -> v >= amount ? v - amount : v);
accounts.compute(to, (k, v) -> v + amount);
// 2. 分段锁替代全局锁
private final Striped<Lock> locks = Striped.lock(32); // Guava
public void safeTransfer(String from, String to, int amount) {
Lock fromLock = locks.get(from);
Lock toLock = locks.get(to);
try {
fromLock.lock();
toLock.lock();
// 业务逻辑
} finally {
toLock.unlock();
fromLock.unlock();
}
}
二、CopyOnWriteArrayList的性能反噬
反直觉案例:
使用CopyOnWriteArrayList后系统吞吐量下降40%
压测数据(10万次写操作):
集合类型 | 耗时 | GC暂停 |
Vector | 320ms | 15ms |
Collections.synchronizedList | 280ms | 10ms |
CopyOnWriteArrayList | 4200ms | 650ms |
适用场景铁律:
- 读操作 >> 写操作(建议100:1以上)
- 集合规模 < 1000
- 数据强一致性要求不高
生产级替代方案:
// 1. 读写分离终极方案
public class ConcurrentList<T> {
private volatile ImmutableList<T> snapshot = ImmutableList.of(); // Guava
private final List<T> writeList = new ArrayList<>();
public void add(T item) {
synchronized (writeList) {
writeList.add(item);
if (writeList.size() > 1000) { // 批量更新
snapshot = ImmutableList.copyOf(writeList);
}
}
}
public List<T> getItems() {
return snapshot; // 无锁读取
}
}
// 2. JDK21虚拟线程优化
private final List<T> safeList = new ArrayList<>();
public void addSafe(T item) {
synchronized (VirtualThread.class) { // 虚拟线程轻量锁
safeList.add(item);
}
}
三、BlockingQueue导致的内存泄漏风暴
线上事故:
消息队列积压50GB未消费数据,JVM内存溢出
错误代码:
// 无界队列 + 消费者阻塞
BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); // 默认Integer.MAX_VALUE
// 生产者疯狂写入
public void onMessage(Message msg) {
queue.put(msg); // 不受控写入
}
// 消费者因bug阻塞
public void consume() throws Exception {
while (true) {
Message msg = queue.take();
if (msg == POISON_PILL) break; // 永远收不到毒丸
process(msg); // 处理逻辑异常导致阻塞
}
}
救命方案:
// 1. 强制容量限制+拒绝策略
BlockingQueue<Message> safeQueue = new LinkedBlockingQueue<>(10000);
ExecutorService executor = new ThreadPoolExecutor(
4, 4, 0, TimeUnit.SECONDS,
safeQueue,
new ThreadPoolExecutor.AbortPolicy() // 队列满时拒绝
);
// 2. 死亡线程检测+自动重启
private final ScheduledExecutorService watcher = Executors.newScheduledThreadPool(1);
public void startConsumer() {
Future<?> future = executor.submit(this::consume);
watcher.scheduleAtFixedRate(() -> {
if (future.isDone()) {
// 记录异常并重启
restartConsumer();
}
}, 30, 30, TimeUnit.SECONDS);
}